K2LL33D SHELL

 Apache/2.4.7 (Ubuntu)
 Linux sman1baleendah 3.13.0-24-generic #46-Ubuntu SMP Thu Apr 10 19:11:08 UTC 2014 x86_64
 uid=33(www-data) gid=33(www-data) groups=33(www-data)
 safemode : OFF
 MySQL: ON | Perl: ON | cURL: OFF | WGet: ON
  >  / usr / lib / python2.7 / dist-packages / twisted / internet /
server ip : 104.21.89.46

your ip : 108.162.242.40

H O M E


Filename/usr/lib/python2.7/dist-packages/twisted/internet/_win32serialport.py
Size4.52 kb
Permissionrw-r--r--
Ownerroot : root
Create time27-Apr-2025 09:56
Last modified30-Oct-2011 01:28
Last accessed07-Jul-2025 07:14
Actionsedit | rename | delete | download (gzip)
Viewtext | code | image
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Serial port support for Windows.

Requires PySerial and pywin32.
"""

# system imports
import serial
from serial import PARITY_NONE, PARITY_EVEN, PARITY_ODD
from serial import STOPBITS_ONE, STOPBITS_TWO
from serial import FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS
import win32file, win32event

# twisted imports
from twisted.internet import abstract

# sibling imports
from serialport import BaseSerialPort


class SerialPort(BaseSerialPort, abstract.FileDescriptor):
"""A serial device, acting as a transport, that uses a win32 event."""

connected = 1

def __init__(self, protocol, deviceNameOrPortNumber, reactor,
baudrate = 9600, bytesize = EIGHTBITS, parity = PARITY_NONE,
stopbits = STOPBITS_ONE, xonxoff = 0, rtscts = 0):
self._serial = self._serialFactory(
deviceNameOrPortNumber, baudrate=baudrate, bytesize=bytesize,
parity=parity, stopbits=stopbits, timeout=None,
xonxoff=xonxoff, rtscts=rtscts)
self.flushInput()
self.flushOutput()
self.reactor = reactor
self.protocol = protocol
self.outQueue = []
self.closed = 0
self.closedNotifies = 0
self.writeInProgress = 0

self.protocol = protocol
self._overlappedRead = win32file.OVERLAPPED()
self._overlappedRead.hEvent = win32event.CreateEvent(None, 1, 0, None)
self._overlappedWrite = win32file.OVERLAPPED()
self._overlappedWrite.hEvent = win32event.CreateEvent(None, 0, 0, None)

self.reactor.addEvent(self._overlappedRead.hEvent, self, 'serialReadEvent')
self.reactor.addEvent(self._overlappedWrite.hEvent, self, 'serialWriteEvent')

self.protocol.makeConnection(self)
self._finishPortSetup()


def _finishPortSetup(self):
"""
Finish setting up the serial port.

This is a separate method to facilitate testing.
"""
flags, comstat = win32file.ClearCommError(self._serial.hComPort)
rc, self.read_buf = win32file.ReadFile(self._serial.hComPort,
win32file.AllocateReadBuffer(1),
self._overlappedRead)


def serialReadEvent(self):
#get that character we set up
n = win32file.GetOverlappedResult(self._serial.hComPort, self._overlappedRead, 0)
if n:
first = str(self.read_buf[:n])
#now we should get everything that is already in the buffer
flags, comstat = win32file.ClearCommError(self._serial.hComPort)
if comstat.cbInQue:
win32event.ResetEvent(self._overlappedRead.hEvent)
rc, buf = win32file.ReadFile(self._serial.hComPort,
win32file.AllocateReadBuffer(comstat.cbInQue),
self._overlappedRead)
n = win32file.GetOverlappedResult(self._serial.hComPort, self._overlappedRead, 1)
#handle all the received data:
self.protocol.dataReceived(first + str(buf[:n]))
else:
#handle all the received data:
self.protocol.dataReceived(first)

#set up next one
win32event.ResetEvent(self._overlappedRead.hEvent)
rc, self.read_buf = win32file.ReadFile(self._serial.hComPort,
win32file.AllocateReadBuffer(1),
self._overlappedRead)


def write(self, data):
if data:
if self.writeInProgress:
self.outQueue.append(data)
else:
self.writeInProgress = 1
win32file.WriteFile(self._serial.hComPort, data, self._overlappedWrite)


def serialWriteEvent(self):
try:
dataToWrite = self.outQueue.pop(0)
except IndexError:
self.writeInProgress = 0
return
else:
win32file.WriteFile(self._serial.hComPort, dataToWrite, self._overlappedWrite)


def connectionLost(self, reason):
"""
Called when the serial port disconnects.

Will call C{connectionLost} on the protocol that is handling the
serial data.
"""
self.reactor.removeEvent(self._overlappedRead.hEvent)
self.reactor.removeEvent(self._overlappedWrite.hEvent)
abstract.FileDescriptor.connectionLost(self, reason)
self._serial.close()
self.protocol.connectionLost(reason)