2
#Python Serial Port Extension for Win32, Linux, BSD, Jython
3
#serial driver for win32
6
#(C) 2001-2003 Chris Liechti <cliechti@gmx.net>
7
# this is distributed under a free software license, see license.txt
9
import win32file # The base COM port and file IO functions.
10
import win32event # We use events and the WaitFor[Single|Multiple]Objects functions.
11
import win32con # constants.
12
from serialutil import *
14
VERSION = "$Revision: 1.31 $".split()[1] #extract CVS version
16
#from winbase.h. these should realy be in win32con
23
"""Turn a port number into a device name"""
24
#the "//./COMx" format is required for devices >= 9
25
#not all versions of windows seem to support this propperly
26
#so that the first few ports are used with the DOS device name
28
return 'COM%d' % (portnum+1) #numbers are transformed to a string
30
return r'\\.\COM%d' % (portnum+1)
32
class Serial(SerialBase):
33
"""Serial port implemenation for Win32. This implemenatation requires a
34
win32all installation."""
36
BAUDRATES = (50,75,110,134,150,200,300,600,1200,1800,2400,4800,9600,
37
19200,38400,57600,115200)
40
"""Open port with current settings. This may throw a SerialException
41
if the port cannot be opened."""
42
if self._port is None:
43
raise SerialException("Port must be configured before it can be used.")
46
self.hComPort = win32file.CreateFile(self.portstr,
47
win32con.GENERIC_READ | win32con.GENERIC_WRITE,
50
win32con.OPEN_EXISTING,
51
win32con.FILE_ATTRIBUTE_NORMAL | win32con.FILE_FLAG_OVERLAPPED,
53
except Exception, msg:
54
self.hComPort = None #'cause __del__ is called anyway
55
raise SerialException("could not open port: %s" % msg)
57
win32file.SetupComm(self.hComPort, 4096, 4096)
59
#Save original timeout values:
60
self._orgTimeouts = win32file.GetCommTimeouts(self.hComPort)
62
self._rtsState = win32file.RTS_CONTROL_ENABLE
63
self._dtrState = win32file.RTS_CONTROL_ENABLE
65
self._reconfigurePort()
68
# Remove anything that was there
69
win32file.PurgeComm(self.hComPort,
70
win32file.PURGE_TXCLEAR | win32file.PURGE_TXABORT |
71
win32file.PURGE_RXCLEAR | win32file.PURGE_RXABORT)
73
self._overlappedRead = win32file.OVERLAPPED()
74
self._overlappedRead.hEvent = win32event.CreateEvent(None, 1, 0, None)
75
self._overlappedWrite = win32file.OVERLAPPED()
76
#~ self._overlappedWrite.hEvent = win32event.CreateEvent(None, 1, 0, None)
77
self._overlappedWrite.hEvent = win32event.CreateEvent(None, 0, 0, None)
80
def _reconfigurePort(self):
81
"""Set commuication parameters on opened port."""
83
raise SerialException("Can only operate on a valid port handle")
85
#Set Windows timeout values
86
#timeouts is a tuple with the following items:
87
#(ReadIntervalTimeout,ReadTotalTimeoutMultiplier,
88
# ReadTotalTimeoutConstant,WriteTotalTimeoutMultiplier,
89
# WriteTotalTimeoutConstant)
90
if self._timeout is None:
91
timeouts = (0, 0, 0, 0, 0)
92
elif self._timeout == 0:
93
timeouts = (win32con.MAXDWORD, 0, 0, 0, 0)
95
timeouts = (0, 0, int(self._timeout*1000), 0, 0)
96
if self._writeTimeout is None:
98
elif self._writeTimeout == 0:
99
timeouts = timeouts[:-2] + (0, win32con.MAXDWORD)
101
timeouts = timeouts[:-2] + (0, int(self._writeTimeout*1000))
102
win32file.SetCommTimeouts(self.hComPort, timeouts)
104
win32file.SetCommMask(self.hComPort, win32file.EV_ERR)
106
# Setup the connection info.
107
# Get state and modify it:
108
comDCB = win32file.GetCommState(self.hComPort)
109
comDCB.BaudRate = self._baudrate
111
if self._bytesize == FIVEBITS:
113
elif self._bytesize == SIXBITS:
115
elif self._bytesize == SEVENBITS:
117
elif self._bytesize == EIGHTBITS:
120
raise ValueError("Unsupported number of data bits: %r" % self._bytesize)
122
if self._parity == PARITY_NONE:
123
comDCB.Parity = win32file.NOPARITY
124
comDCB.fParity = 0 # Dis/Enable Parity Check
125
elif self._parity == PARITY_EVEN:
126
comDCB.Parity = win32file.EVENPARITY
127
comDCB.fParity = 1 # Dis/Enable Parity Check
128
elif self._parity == PARITY_ODD:
129
comDCB.Parity = win32file.ODDPARITY
130
comDCB.fParity = 1 # Dis/Enable Parity Check
132
raise ValueError("Unsupported parity mode: %r" % self._parity)
134
if self._stopbits == STOPBITS_ONE:
135
comDCB.StopBits = win32file.ONESTOPBIT
136
elif self._stopbits == STOPBITS_TWO:
137
comDCB.StopBits = win32file.TWOSTOPBITS
139
raise ValueError("Unsupported number of stop bits: %r" % self._stopbits)
141
comDCB.fBinary = 1 # Enable Binary Transmission
142
# Char. w/ Parity-Err are replaced with 0xff (if fErrorChar is set to TRUE)
144
comDCB.fRtsControl = win32file.RTS_CONTROL_HANDSHAKE
146
comDCB.fRtsControl = self._rtsState
148
comDCB.fDtrControl = win32file.DTR_CONTROL_HANDSHAKE
150
comDCB.fDtrControl = self._dtrState
151
comDCB.fOutxCtsFlow = self._rtscts
152
comDCB.fOutxDsrFlow = self._dsrdtr
153
comDCB.fOutX = self._xonxoff
154
comDCB.fInX = self._xonxoff
156
comDCB.fErrorChar = 0
157
comDCB.fAbortOnError = 0
159
comDCB.XoffChar = XOFF
162
win32file.SetCommState(self.hComPort, comDCB)
163
except win32file.error, e:
164
raise ValueError("Cannot configure port, some setting was wrong. Original message: %s" % e)
166
#~ def __del__(self):
173
#Restore original timeout values:
174
win32file.SetCommTimeouts(self.hComPort, self._orgTimeouts)
176
win32file.CloseHandle(self.hComPort)
180
def makeDeviceName(self, port):
183
# - - - - - - - - - - - - - - - - - - - - - - - -
186
"""Return the number of characters currently in the input buffer."""
187
flags, comstat = win32file.ClearCommError(self.hComPort)
188
return comstat.cbInQue
190
def read(self, size=1):
191
"""Read size bytes from the serial port. If a timeout is set it may
192
return less characters as requested. With no timeout it will block
193
until the requested number of bytes is read."""
194
if not self.hComPort: raise portNotOpenError
196
win32event.ResetEvent(self._overlappedRead.hEvent)
197
flags, comstat = win32file.ClearCommError(self.hComPort)
198
if self.timeout == 0:
199
n = min(comstat.cbInQue, size)
201
rc, buf = win32file.ReadFile(self.hComPort, win32file.AllocateReadBuffer(n), self._overlappedRead)
202
win32event.WaitForSingleObject(self._overlappedRead.hEvent, win32event.INFINITE)
207
rc, buf = win32file.ReadFile(self.hComPort, win32file.AllocateReadBuffer(size), self._overlappedRead)
208
n = win32file.GetOverlappedResult(self.hComPort, self._overlappedRead, 1)
215
"""Output the given string over the serial port."""
216
if not self.hComPort: raise portNotOpenError
219
#~ win32event.ResetEvent(self._overlappedWrite.hEvent)
220
err, n = win32file.WriteFile(self.hComPort, s, self._overlappedWrite)
221
if err: #will be ERROR_IO_PENDING:
222
# Wait for the write to complete.
223
#~ win32event.WaitForSingleObject(self._overlappedWrite.hEvent, win32event.INFINITE)
224
n = win32file.GetOverlappedResult(self.hComPort, self._overlappedWrite, 1)
226
raise writeTimeoutError
229
def flushInput(self):
230
"""Clear input buffer, discarding all that is in the buffer."""
231
if not self.hComPort: raise portNotOpenError
232
win32file.PurgeComm(self.hComPort, win32file.PURGE_RXCLEAR | win32file.PURGE_RXABORT)
234
def flushOutput(self):
235
"""Clear output buffer, aborting the current output and
236
discarding all that is in the buffer."""
237
if not self.hComPort: raise portNotOpenError
238
win32file.PurgeComm(self.hComPort, win32file.PURGE_TXCLEAR | win32file.PURGE_TXABORT)
241
"""Send break condition."""
242
if not self.hComPort: raise portNotOpenError
244
win32file.SetCommBreak(self.hComPort)
245
#TODO: how to set the correct duration??
247
win32file.ClearCommBreak(self.hComPort)
249
def setRTS(self,level=1):
250
"""Set terminal status line: Request To Send"""
251
if not self.hComPort: raise portNotOpenError
253
self._rtsState = win32file.RTS_CONTROL_ENABLE
254
win32file.EscapeCommFunction(self.hComPort, win32file.SETRTS)
256
self._rtsState = win32file.RTS_CONTROL_DISABLE
257
win32file.EscapeCommFunction(self.hComPort, win32file.CLRRTS)
259
def setDTR(self,level=1):
260
"""Set terminal status line: Data Terminal Ready"""
261
if not self.hComPort: raise portNotOpenError
263
self._dtrState = win32file.DTR_CONTROL_ENABLE
264
win32file.EscapeCommFunction(self.hComPort, win32file.SETDTR)
266
self._dtrState = win32file.DTR_CONTROL_DISABLE
267
win32file.EscapeCommFunction(self.hComPort, win32file.CLRDTR)
270
"""Read terminal status line: Clear To Send"""
271
if not self.hComPort: raise portNotOpenError
272
return MS_CTS_ON & win32file.GetCommModemStatus(self.hComPort) != 0
275
"""Read terminal status line: Data Set Ready"""
276
if not self.hComPort: raise portNotOpenError
277
return MS_DSR_ON & win32file.GetCommModemStatus(self.hComPort) != 0
280
"""Read terminal status line: Ring Indicator"""
281
if not self.hComPort: raise portNotOpenError
282
return MS_RING_ON & win32file.GetCommModemStatus(self.hComPort) != 0
285
"""Read terminal status line: Carrier Detect"""
286
if not self.hComPort: raise portNotOpenError
287
return MS_RLSD_ON & win32file.GetCommModemStatus(self.hComPort) != 0
289
# - - platform specific - - - -
291
def setXON(self, level=True):
292
"""Platform specific - set flow state."""
293
if not self.hComPort: raise portNotOpenError
295
win32file.EscapeCommFunction(self.hComPort, win32file.SETXON)
297
win32file.EscapeCommFunction(self.hComPort, win32file.SETXOFF)
300
if __name__ == '__main__':
2
#Python Serial Port Extension for Win32, Linux, BSD, Jython
3
#serial driver for win32
6
#(C) 2001-2003 Chris Liechti <cliechti@gmx.net>
7
# this is distributed under a free software license, see license.txt
9
import win32file # The base COM port and file IO functions.
10
import win32event # We use events and the WaitFor[Single|Multiple]Objects functions.
11
import win32con # constants.
12
from serialutil import *
14
VERSION = "$Revision: 1.40 $".split()[1] #extract CVS version
16
#from winbase.h. these should realy be in win32con
23
"""Turn a port number into a device name"""
24
return 'COM%d' % (portnum+1) #numbers are transformed to a string
26
class Serial(SerialBase):
27
"""Serial port implemenation for Win32. This implemenatation requires a
28
win32all installation."""
30
BAUDRATES = (50,75,110,134,150,200,300,600,1200,1800,2400,4800,9600,
31
19200,38400,57600,115200)
34
"""Open port with current settings. This may throw a SerialException
35
if the port cannot be opened."""
36
if self._port is None:
37
raise SerialException("Port must be configured before it can be used.")
40
self.hComPort = win32file.CreateFile(self.makeDeviceName(self.portstr),
41
win32con.GENERIC_READ | win32con.GENERIC_WRITE,
44
win32con.OPEN_EXISTING,
45
win32con.FILE_FLAG_OVERLAPPED,
47
except Exception, msg:
48
self.hComPort = None #'cause __del__ is called anyway
49
raise SerialException("could not open port %s: %s" % (self.portstr, msg))
51
win32file.SetupComm(self.hComPort, 4096, 4096)
53
#Save original timeout values:
54
self._orgTimeouts = win32file.GetCommTimeouts(self.hComPort)
56
self._rtsState = win32file.RTS_CONTROL_ENABLE
57
self._dtrState = win32file.DTR_CONTROL_ENABLE
59
self._reconfigurePort()
62
# Remove anything that was there
63
win32file.PurgeComm(self.hComPort,
64
win32file.PURGE_TXCLEAR | win32file.PURGE_TXABORT |
65
win32file.PURGE_RXCLEAR | win32file.PURGE_RXABORT)
67
self._overlappedRead = win32file.OVERLAPPED()
68
self._overlappedRead.hEvent = win32event.CreateEvent(None, 1, 0, None)
69
self._overlappedWrite = win32file.OVERLAPPED()
70
#~ self._overlappedWrite.hEvent = win32event.CreateEvent(None, 1, 0, None)
71
self._overlappedWrite.hEvent = win32event.CreateEvent(None, 0, 0, None)
74
def _reconfigurePort(self):
75
"""Set commuication parameters on opened port."""
77
raise SerialException("Can only operate on a valid port handle")
79
#Set Windows timeout values
80
#timeouts is a tuple with the following items:
81
#(ReadIntervalTimeout,ReadTotalTimeoutMultiplier,
82
# ReadTotalTimeoutConstant,WriteTotalTimeoutMultiplier,
83
# WriteTotalTimeoutConstant)
84
if self._timeout is None:
85
timeouts = (0, 0, 0, 0, 0)
86
elif self._timeout == 0:
87
timeouts = (win32con.MAXDWORD, 0, 0, 0, 0)
89
timeouts = (0, 0, int(self._timeout*1000), 0, 0)
90
if self._writeTimeout is None:
92
elif self._writeTimeout == 0:
93
timeouts = timeouts[:-2] + (0, win32con.MAXDWORD)
95
timeouts = timeouts[:-2] + (0, int(self._writeTimeout*1000))
96
win32file.SetCommTimeouts(self.hComPort, timeouts)
98
win32file.SetCommMask(self.hComPort, win32file.EV_ERR)
100
# Setup the connection info.
101
# Get state and modify it:
102
comDCB = win32file.GetCommState(self.hComPort)
103
comDCB.BaudRate = self._baudrate
105
if self._bytesize == FIVEBITS:
107
elif self._bytesize == SIXBITS:
109
elif self._bytesize == SEVENBITS:
111
elif self._bytesize == EIGHTBITS:
114
raise ValueError("Unsupported number of data bits: %r" % self._bytesize)
116
if self._parity == PARITY_NONE:
117
comDCB.Parity = win32file.NOPARITY
118
comDCB.fParity = 0 # Dis/Enable Parity Check
119
elif self._parity == PARITY_EVEN:
120
comDCB.Parity = win32file.EVENPARITY
121
comDCB.fParity = 1 # Dis/Enable Parity Check
122
elif self._parity == PARITY_ODD:
123
comDCB.Parity = win32file.ODDPARITY
124
comDCB.fParity = 1 # Dis/Enable Parity Check
126
raise ValueError("Unsupported parity mode: %r" % self._parity)
128
if self._stopbits == STOPBITS_ONE:
129
comDCB.StopBits = win32file.ONESTOPBIT
130
elif self._stopbits == STOPBITS_TWO:
131
comDCB.StopBits = win32file.TWOSTOPBITS
133
raise ValueError("Unsupported number of stop bits: %r" % self._stopbits)
135
comDCB.fBinary = 1 # Enable Binary Transmission
136
# Char. w/ Parity-Err are replaced with 0xff (if fErrorChar is set to TRUE)
138
comDCB.fRtsControl = win32file.RTS_CONTROL_HANDSHAKE
140
comDCB.fRtsControl = self._rtsState
142
comDCB.fDtrControl = win32file.DTR_CONTROL_HANDSHAKE
144
comDCB.fDtrControl = self._dtrState
145
comDCB.fOutxCtsFlow = self._rtscts
146
comDCB.fOutxDsrFlow = self._dsrdtr
147
comDCB.fOutX = self._xonxoff
148
comDCB.fInX = self._xonxoff
150
comDCB.fErrorChar = 0
151
comDCB.fAbortOnError = 0
153
comDCB.XoffChar = XOFF
156
win32file.SetCommState(self.hComPort, comDCB)
157
except win32file.error, e:
158
raise ValueError("Cannot configure port, some setting was wrong. Original message: %s" % e)
160
#~ def __del__(self):
168
# Restore original timeout values:
169
win32file.SetCommTimeouts(self.hComPort, self._orgTimeouts)
170
except win32file.error:
171
# ignore errors. can happen for unplugged USB serial devices
174
win32file.CloseHandle(self.hComPort)
175
win32file.CloseHandle(self._overlappedRead.hEvent)
176
win32file.CloseHandle(self._overlappedWrite.hEvent)
180
def makeDeviceName(self, port):
181
# the "\\.\COMx" format is required for devices other than COM1-COM8
182
# not all versions of windows seem to support this properly
183
# so that the first few ports are used with the DOS device name
184
if port.upper().startswith('COM') and int(port[3:]) <= 8:
186
return '\\\\.\\' + port
188
# - - - - - - - - - - - - - - - - - - - - - - - -
191
"""Return the number of characters currently in the input buffer."""
192
flags, comstat = win32file.ClearCommError(self.hComPort)
193
return comstat.cbInQue
195
def read(self, size=1):
196
"""Read size bytes from the serial port. If a timeout is set it may
197
return less characters as requested. With no timeout it will block
198
until the requested number of bytes is read."""
199
if not self.hComPort: raise portNotOpenError
201
win32event.ResetEvent(self._overlappedRead.hEvent)
202
flags, comstat = win32file.ClearCommError(self.hComPort)
203
if self.timeout == 0:
204
n = min(comstat.cbInQue, size)
206
rc, buf = win32file.ReadFile(self.hComPort, win32file.AllocateReadBuffer(n), self._overlappedRead)
207
win32event.WaitForSingleObject(self._overlappedRead.hEvent, win32event.INFINITE)
212
rc, buf = win32file.ReadFile(self.hComPort, win32file.AllocateReadBuffer(size), self._overlappedRead)
213
n = win32file.GetOverlappedResult(self.hComPort, self._overlappedRead, 1)
219
def write(self, data):
220
"""Output the given string over the serial port."""
221
if not self.hComPort: raise portNotOpenError
222
if not isinstance(data, str):
223
raise TypeError('expected str, got %s' % type(data))
226
#~ win32event.ResetEvent(self._overlappedWrite.hEvent)
227
err, n = win32file.WriteFile(self.hComPort, data, self._overlappedWrite)
228
if err: #will be ERROR_IO_PENDING:
229
# Wait for the write to complete.
230
#~ win32event.WaitForSingleObject(self._overlappedWrite.hEvent, win32event.INFINITE)
231
n = win32file.GetOverlappedResult(self.hComPort, self._overlappedWrite, 1)
233
raise writeTimeoutError
236
def flushInput(self):
237
"""Clear input buffer, discarding all that is in the buffer."""
238
if not self.hComPort: raise portNotOpenError
239
win32file.PurgeComm(self.hComPort, win32file.PURGE_RXCLEAR | win32file.PURGE_RXABORT)
241
def flushOutput(self):
242
"""Clear output buffer, aborting the current output and
243
discarding all that is in the buffer."""
244
if not self.hComPort: raise portNotOpenError
245
win32file.PurgeComm(self.hComPort, win32file.PURGE_TXCLEAR | win32file.PURGE_TXABORT)
247
def sendBreak(self, duration=0.25):
248
"""Send break condition."""
249
if not self.hComPort: raise portNotOpenError
251
win32file.SetCommBreak(self.hComPort)
253
win32file.ClearCommBreak(self.hComPort)
255
def setRTS(self, level=1):
256
"""Set terminal status line: Request To Send"""
257
if not self.hComPort: raise portNotOpenError
259
self._rtsState = win32file.RTS_CONTROL_ENABLE
260
win32file.EscapeCommFunction(self.hComPort, win32file.SETRTS)
262
self._rtsState = win32file.RTS_CONTROL_DISABLE
263
win32file.EscapeCommFunction(self.hComPort, win32file.CLRRTS)
265
def setDTR(self, level=1):
266
"""Set terminal status line: Data Terminal Ready"""
267
if not self.hComPort: raise portNotOpenError
269
self._dtrState = win32file.DTR_CONTROL_ENABLE
270
win32file.EscapeCommFunction(self.hComPort, win32file.SETDTR)
272
self._dtrState = win32file.DTR_CONTROL_DISABLE
273
win32file.EscapeCommFunction(self.hComPort, win32file.CLRDTR)
276
"""Read terminal status line: Clear To Send"""
277
if not self.hComPort: raise portNotOpenError
278
return MS_CTS_ON & win32file.GetCommModemStatus(self.hComPort) != 0
281
"""Read terminal status line: Data Set Ready"""
282
if not self.hComPort: raise portNotOpenError
283
return MS_DSR_ON & win32file.GetCommModemStatus(self.hComPort) != 0
286
"""Read terminal status line: Ring Indicator"""
287
if not self.hComPort: raise portNotOpenError
288
return MS_RING_ON & win32file.GetCommModemStatus(self.hComPort) != 0
291
"""Read terminal status line: Carrier Detect"""
292
if not self.hComPort: raise portNotOpenError
293
return MS_RLSD_ON & win32file.GetCommModemStatus(self.hComPort) != 0
295
# - - platform specific - - - -
297
def setXON(self, level=True):
298
"""Platform specific - set flow state."""
299
if not self.hComPort: raise portNotOpenError
301
win32file.EscapeCommFunction(self.hComPort, win32file.SETXON)
303
win32file.EscapeCommFunction(self.hComPort, win32file.SETXOFF)
306
if __name__ == '__main__':