Cleanup
This commit is contained in:
15
.gitignore
vendored
15
.gitignore
vendored
@@ -6,17 +6,4 @@
|
||||
*.elf
|
||||
*.hex
|
||||
*.pyc
|
||||
ibhsim5
|
||||
isotest4
|
||||
testAS511
|
||||
testIBH
|
||||
testISO_TCP
|
||||
testISO_TCPload
|
||||
testMPI
|
||||
testMPI_IBHload
|
||||
testMPIload
|
||||
testNLpro
|
||||
testPPI
|
||||
testPPI_IBH
|
||||
testPPI_IBHload
|
||||
testPPIload
|
||||
*.cfg
|
||||
|
||||
12
ioserv.cfg.tpl
Normal file
12
ioserv.cfg.tpl
Normal file
@@ -0,0 +1,12 @@
|
||||
[PARAM]
|
||||
id = I001
|
||||
|
||||
[SOAP]
|
||||
host = localhost
|
||||
|
||||
[REDIS]
|
||||
host = localhost
|
||||
|
||||
[DEBUG]
|
||||
loglevel = 6
|
||||
logfile = /tmp/log_ioserv.txt
|
||||
318
ioserv.py
Executable file
318
ioserv.py
Executable file
@@ -0,0 +1,318 @@
|
||||
#!/usr/bin/env python2.6
|
||||
#-*- coding: utf-8 -*-
|
||||
|
||||
__author__ = "Michael Rest"
|
||||
__date__ = "1 August 2010"
|
||||
__email__ = "michi@rosstein.de"
|
||||
__version__ = "$Revision: 1.0 $"[11:-2]
|
||||
|
||||
|
||||
import os
|
||||
from log import log
|
||||
import sys
|
||||
from datetime import date, datetime, timedelta
|
||||
from select import select
|
||||
from time import time, strptime
|
||||
from ConfigParser import *
|
||||
from SOAPpy import SOAPProxy, SOAPServer
|
||||
from redis import Redis
|
||||
from pydave import pydave
|
||||
from davedefs import *
|
||||
|
||||
|
||||
pid_file= '/tmp/ipserv.lock'
|
||||
|
||||
|
||||
class IOSERV (object):
|
||||
def __init__ (self, loglevel = 6, logfile = "/tmp/log_ioserv.txt"):
|
||||
self.server = None
|
||||
self.rproxy = None
|
||||
self.loglevel = int (loglevel)
|
||||
self.logfile = logfile
|
||||
self.conns = {}
|
||||
|
||||
def __del__ (self):
|
||||
for cname in conns:
|
||||
conns [cname].DisconnectPLC ()
|
||||
|
||||
def log (self, msg, level):
|
||||
"""
|
||||
Print message if in verbose mode.
|
||||
"""
|
||||
if level >= self.loglevel:
|
||||
file = open (self.logfile, "a")
|
||||
dt = datetime.now ()
|
||||
file.write ("%4d-%02d-%02d %02d:%02d:%02d.%06d: " %(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, dt.microsecond)),
|
||||
file.write (msg + "\n")
|
||||
file.close ()
|
||||
|
||||
|
||||
def SOAPserver (self, host = 'localhost', port = 8081):
|
||||
self.server = SOAPServer ((host, port))
|
||||
self.server.registerFunction (self.setQuit)
|
||||
self.server.registerFunction (self.setMaintenance)
|
||||
self.server.registerFunction (self.setDone)
|
||||
self.log ("SOAP Server created", 6)
|
||||
|
||||
def RedisProxy (self, rserv):
|
||||
self.rproxy = Redis (rserv)
|
||||
self.log ("Redis Client created", 6)
|
||||
|
||||
def adddaveconn (self, name, proto, addr, port, mpi, rack, slot, rsid1, rsid2, rrack, rslot, rmpi):
|
||||
#routingSubnetFirst, routingSubnetSecond, routingRack, routingSlot
|
||||
if not proto in ["ISO_TCP", "IBH_Link"]:
|
||||
self.log ("Bad Protocol for DaveConn", 1)
|
||||
sys.exit (1)
|
||||
if name in self.conns:
|
||||
self.log ("Connection already exits", 1)
|
||||
self.conns [name] = pydave ()
|
||||
self.conns [name].OpenSocket (proto, addr, port, mpi, rack, slot, rsid1, rsid2, rrack, rslot, rmpi)
|
||||
|
||||
def logtext (self, txt = "test"):
|
||||
dt = datetime.now ()
|
||||
timestamp = "%4d-%02d-%02d %02d:%02d:%02d.%06d: " %(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, dt.microsecond)
|
||||
self.builder.get_object ("textbuffer1").set_text (timestamp + txt)
|
||||
|
||||
|
||||
def simulation (self):
|
||||
Dummyvar = 0
|
||||
|
||||
def setQuit (self, cname, msgnr):
|
||||
"""
|
||||
SOAP RPC Funciton - Set Errormsg Quit-Bit
|
||||
"""
|
||||
if msgnr not in range (2000, 2500):
|
||||
self.log ("Bad MsgNr for Quit", 1)
|
||||
return -1
|
||||
byteaddr = 2000 + (msgnr - 2000) / 8
|
||||
bitaddr = (msgnr - 2000) % 8
|
||||
self.log ("Set Quit for MsgNr %s Adr %s.%s" % (msgnr, byteaddr, bitaddr), 6)
|
||||
self.conns [cname].SetBit (daveAreas["DB"], 2, byteaddr, bitaddr)
|
||||
self.rcvErrorstates ('ABP-Buero', msgnr)
|
||||
|
||||
|
||||
def setMaintenance (self, cname, msgnr):
|
||||
"""
|
||||
SOAP RPC Funciton - Set Errormsg on Maintenance
|
||||
"""
|
||||
if msgnr not in range (2000, 2500):
|
||||
self.log ("Bad MsgNr for Maintenance", 1)
|
||||
return -1
|
||||
byteaddr = 2064 + (msgnr - 2000) / 8
|
||||
bitaddr = (msgnr - 2000) % 8
|
||||
self.log ("Set Maintenance for MsgNr %s Adr %s.%s" % (msgnr, byteaddr, bitaddr), 6)
|
||||
self.rcvErrorstates ('ABP-Buero', msgnr)
|
||||
self.conns [cname].SetBit (daveAreas["DB"], 2, byteaddr, bitaddr)
|
||||
self.rcvErrorstates ('ABP-Buero', msgnr)
|
||||
|
||||
|
||||
def setDone (self, cname, msgnr):
|
||||
"""
|
||||
SOAP RPC Funciton - Set Errormsg Done
|
||||
"""
|
||||
if msgnr not in range (2000, 2500):
|
||||
self.log ("Bad MsgNr for Done", 1)
|
||||
return -1
|
||||
byteaddr = 2128 + (msgnr - 2000) / 8
|
||||
bitaddr = (msgnr - 2000) % 8
|
||||
self.log ("Set Maintenance for MsgNr %s Adr %s.%s" % (msgnr, byteaddr, bitaddr), 6)
|
||||
self.conns [cname].SetBit (daveAreas["DB"], 2, byteaddr, bitaddr)
|
||||
self.rcvErrorstates ('ABP-Buero', msgnr)
|
||||
|
||||
|
||||
def rcvErrorstates (self, cname, msgnr = 0):
|
||||
if msgnr in range (2000, 2500):
|
||||
nr = msgnr - 1999
|
||||
limit = msgnr - 1999
|
||||
offset = (msgnr - 2000) * 4
|
||||
blocksize = 4
|
||||
else:
|
||||
nr = 1
|
||||
limit = 299
|
||||
offset = 0
|
||||
blocksize = 220
|
||||
|
||||
ttl = 60
|
||||
r_pipe = self.rproxy.pipeline ()
|
||||
while nr <= limit:
|
||||
val = self.conns [cname].ReadBytes (daveAreas["DB"], 2, offset, blocksize)
|
||||
for i in range (len (val) / 4):
|
||||
counter = (val [i * 4 + 0] << 8) + val [i * 4 + 1]
|
||||
self.log ("%s Counter = %d" % (nr, counter), 6)
|
||||
self.log ("%s State = %d" % (nr, val [i * 4 + 2]), 6)
|
||||
self.log ("%s Action = %d" % (nr, val [i * 4 + 3]), 6)
|
||||
r_pipe.setex('Counter%03d' % nr, counter, ttl)
|
||||
r_pipe.setex('State%03d' % nr, val [i * 4 + 2], ttl)
|
||||
r_pipe.setex('Action%03d' % nr, val [i * 4 + 3], ttl)
|
||||
nr += 1
|
||||
offset += blocksize
|
||||
r_pipe.execute ()
|
||||
|
||||
|
||||
def rcvVisuState (self, cname):
|
||||
ttl = 60
|
||||
if cname in ['HALLEA','HALLEB']:
|
||||
dbnr = 3
|
||||
blocksize = 8
|
||||
else:
|
||||
dbnr = 6
|
||||
blocksize = 2
|
||||
|
||||
offset = 0
|
||||
|
||||
val = self.conns [cname].ReadBytes (daveAreas["DB"], dbnr, offset, blocksize)
|
||||
r_pipe = self.rproxy.pipeline ()
|
||||
for i in range (blocksize / 2):
|
||||
state = (val [i * 2 + 0] << 8) + val [i * 2 + 1]
|
||||
self.log ("%s-Visu%02d = %d" % (cname, i, state), 6)
|
||||
r_pipe.setex('%s-Visu%02d' % (cname, i) , state, ttl)
|
||||
r_pipe.execute ()
|
||||
|
||||
|
||||
def rcvCamState (self, cname):
|
||||
ttl = 60
|
||||
offset = 5
|
||||
blocksize = 2
|
||||
val = self.conns [cname].ReadBytes (daveAreas["DB"], 3, offset, blocksize)
|
||||
state = (val [0] << 8) + val [1]
|
||||
self.log ("%s-Cams = %d" % (cname, state), 6)
|
||||
r_pipe = self.rproxy.pipeline ()
|
||||
r_pipe.setex('%s-Cams' % cname, state, ttl)
|
||||
r_pipe.execute ()
|
||||
|
||||
|
||||
def rcvErrorlogs (self, cname = 'ABP-Buero'):
|
||||
val = self.conns [cname].ReadBytes (daveAreas["DB"], 104, 0, 16)
|
||||
|
||||
if int (val [1]):
|
||||
mid = (val [2] << 8) + val [3]
|
||||
year = val [4]
|
||||
month = val [5]
|
||||
day = val [6]
|
||||
hour = val [7]
|
||||
minute = val [8]
|
||||
seconds = val [9]
|
||||
cnt = (val [12] << 8) + val [13]
|
||||
stat = val [14]
|
||||
act = val [15]
|
||||
ts = "20%02x-%02x-%02x %02x:%02x:%02x" % (year, month, day, hour, minute, seconds)
|
||||
self.log ("New Errorlog ID:%s Count:%s State:%s Act:%s %s" % (mid, cnt, stat, act, ts), 6)
|
||||
val = self.conns [cname].WriteBytes (daveAreas["DB"], 104, 1, 1, '0')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
#Double fork to daemonize
|
||||
from os import fork, setsid, umask, dup2
|
||||
from sys import stdin, stdout, stderr
|
||||
from os import getpid
|
||||
|
||||
simulation = 0
|
||||
if '--simulate' in sys.argv:
|
||||
simulation = 1
|
||||
|
||||
|
||||
if os.path.isfile (pid_file):
|
||||
pidfile = open (pid_file, 'r')
|
||||
pid = pidfile.read ()
|
||||
_r = 0
|
||||
try:
|
||||
#Already running
|
||||
os.kill (int (pid), 0)
|
||||
_r = 1
|
||||
except:
|
||||
#No Process on this PID
|
||||
os.remove (pid_file)
|
||||
if _r:
|
||||
sys.exit (0)
|
||||
|
||||
|
||||
pidfile = open (pid_file, 'w')
|
||||
pidfile.write('%i' % getpid())
|
||||
pidfile.close()
|
||||
|
||||
log ("========================================================================================")
|
||||
log ("ioserv.py started")
|
||||
config = ConfigParser ()
|
||||
config.read ('ioserv.cfg')
|
||||
|
||||
uid = config.get ('PARAM', 'id')
|
||||
if not uid:
|
||||
print ("Ioserv ID not defined")
|
||||
raise Warning
|
||||
|
||||
loglevel = config.get ('DEBUG', 'loglevel')
|
||||
logfile = config.get ('DEBUG', 'logfile')
|
||||
|
||||
#Instace
|
||||
app = IOSERV (loglevel = loglevel, logfile = logfile)
|
||||
if simulation:
|
||||
app.simulation ()
|
||||
|
||||
#Create SOAPServer
|
||||
host = config.get ('SOAP', 'host')
|
||||
if host:
|
||||
app.SOAPserver (host, 8089)
|
||||
|
||||
#Create Redis Proxy
|
||||
rserv = config.get ('REDIS', 'host')
|
||||
if rserv:
|
||||
app.RedisProxy (rserv)
|
||||
|
||||
#_name, _proto, _addr, _port, _mpi, _rack, _slot, _rsid1, _rsid2, _rrack, _rslot, _rmpi
|
||||
# MPI Adress 2
|
||||
app.adddaveconn ('HALLEA', 'IBH_Link', '192.168.11.99', 1099, 4, 0, 4, 0, 0, 0, 0, 0)
|
||||
# MPI Adress 3
|
||||
app.adddaveconn ('HALLEB', 'IBH_Link', '192.168.11.99', 1099, 4, 0, 4, 0, 0, 0, 0, 0)
|
||||
# MPI Adress 4
|
||||
app.adddaveconn ('ABP-Buero', 'IBH_Link', '192.168.11.99', 1099, 4, 0, 4, 0, 0, 0, 0, 0)
|
||||
|
||||
|
||||
lastcycle = 0
|
||||
#Mainloop
|
||||
while 1:
|
||||
#prepare select
|
||||
writecons= []
|
||||
sockets = []
|
||||
|
||||
#SOAP Server
|
||||
if app.server:
|
||||
sockets.append ([app.server.fileno ()])
|
||||
|
||||
writes = [i[0] for i in writecons]
|
||||
(reads, writes, xlist) = select ([i[0] for i in sockets], writes, [], 0.1)
|
||||
readcons = [i for i in sockets if i[0] in reads]
|
||||
writecons = [i for i in writecons if i[0] in writes]
|
||||
|
||||
for conn, data, client_id, tpye in writecons:
|
||||
log ("Found a write telegram", 6)
|
||||
## Serial Connection , sockets have other methods
|
||||
while 1:
|
||||
_l = os.write (conn, data)
|
||||
log ("..os.wrote %d bytes" %_l, 6)
|
||||
log ("..os.wrote data: %s" %strlog (data), 6)
|
||||
break
|
||||
|
||||
for conn in readcons:
|
||||
log ('Got Input', 6)
|
||||
if app.server and conn[0] == app.server.fileno ():
|
||||
log ("Got request on SOAP Server", 6)
|
||||
app.server.handle_request ()
|
||||
|
||||
|
||||
nt = time ()
|
||||
app.log ("Retrieve Logs", 6)
|
||||
app.rcvErrorlogs ()
|
||||
if nt - lastcycle > 5:
|
||||
app.log ("<<>> Errorstates", 6)
|
||||
app.rcvErrorstates ('ABP-Buero')
|
||||
app.log ("<<>> Camstates", 6)
|
||||
app.rcvCamState ('HALLEA')
|
||||
app.rcvCamState ('HALLEB')
|
||||
app.log ("<<>> Visustates", 6)
|
||||
app.rcvVisuState ('HALLEA')
|
||||
app.rcvVisuState ('HALLEB')
|
||||
app.rcvVisuState ('ABP-Buero')
|
||||
lastcycle = nt
|
||||
|
||||
|
||||
|
||||
31
log.py
Normal file
31
log.py
Normal file
@@ -0,0 +1,31 @@
|
||||
from datetime import datetime
|
||||
loglevel = 6
|
||||
def log (msg, _level = 3):
|
||||
"""
|
||||
Print message if in verbose mode.
|
||||
"""
|
||||
if _level <= loglevel:
|
||||
file = open ("/tmp/log_ipoint.txt", "a")
|
||||
#file = open ("/var/log/lvr/" + os.getcwd().split('/')[-1] + '-' + lager, "a")
|
||||
dt = datetime.now ()
|
||||
file.write ("%4d-%02d-%02d %02d:%02d:%02d.%06d: " %(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, dt.microsecond)),
|
||||
file.write (msg + "\n")
|
||||
file.close ()
|
||||
|
||||
|
||||
def strlog (instr):
|
||||
"""
|
||||
Converts String into readable format
|
||||
"""
|
||||
oustr = ''
|
||||
for c in instr:
|
||||
oustr = oustr + " x%2X " %ord(c)
|
||||
return oustr
|
||||
|
||||
def doublechar (i):
|
||||
"""
|
||||
build double char val out of int
|
||||
"""
|
||||
return chr (i % 256) + chr ((i >> 8) % 256)
|
||||
|
||||
|
||||
20
serial/__init__.py
Normal file
20
serial/__init__.py
Normal file
@@ -0,0 +1,20 @@
|
||||
#!/usr/bin/env python
|
||||
#portable serial port access with python
|
||||
#this is a wrapper module for different platform implementations
|
||||
#
|
||||
# (C)2001-2002 Chris Liechti <cliechti@gmx.net>
|
||||
# this is distributed under a free software license, see license.txt
|
||||
|
||||
import sys, os, string
|
||||
VERSION = string.split("$Revision: 1.3 $")[1] #extract CVS version
|
||||
|
||||
#chose an implementation, depending on os
|
||||
if os.name == 'nt': #sys.platform == 'win32':
|
||||
from serialwin32 import *
|
||||
elif os.name == 'posix':
|
||||
from serialposix import *
|
||||
elif os.name == 'java':
|
||||
from serialjava import *
|
||||
else:
|
||||
raise Exception("Sorry: no implementation for your platform ('%s') available" % os.name)
|
||||
|
||||
212
serial/serialjava.py
Normal file
212
serial/serialjava.py
Normal file
@@ -0,0 +1,212 @@
|
||||
#!jython
|
||||
#Python Serial Port Extension for Win32, Linux, BSD, Jython
|
||||
#module for serial IO for Jython and JavaComm
|
||||
#see __init__.py
|
||||
#
|
||||
#(C) 2002-2003 Chris Liechti <cliechti@gmx.net>
|
||||
# this is distributed under a free software license, see license.txt
|
||||
|
||||
import javax.comm
|
||||
from serialutil import *
|
||||
|
||||
VERSION = "$Revision: 1.8 $".split()[1] #extract CVS version
|
||||
|
||||
|
||||
def device(portnumber):
|
||||
"""Turn a port number into a device name"""
|
||||
enum = javax.comm.CommPortIdentifier.getPortIdentifiers()
|
||||
ports = []
|
||||
while enum.hasMoreElements():
|
||||
el = enum.nextElement()
|
||||
if el.getPortType() == javax.comm.CommPortIdentifier.PORT_SERIAL:
|
||||
ports.append(el)
|
||||
return ports[portnumber].getName()
|
||||
|
||||
class Serial(SerialBase):
|
||||
"""Serial port class, implemented with javax.comm and thus usable with
|
||||
jython and the appropriate java extension."""
|
||||
|
||||
def open(self):
|
||||
"""Open port with current settings. This may throw a SerialException
|
||||
if the port cannot be opened."""
|
||||
if self._port is None:
|
||||
raise SerialException("Port must be configured before it can be used.")
|
||||
if type(self._port) == type(''): #strings are taken directly
|
||||
portId = javax.comm.CommPortIdentifier.getPortIdentifier(self._port)
|
||||
else:
|
||||
portId = javax.comm.CommPortIdentifier.getPortIdentifier(device(self._port)) #numbers are transformed to a comportid obj
|
||||
try:
|
||||
self.sPort = portId.open("python serial module", 10)
|
||||
except Exception, msg:
|
||||
self.sPort = None
|
||||
raise SerialException("Could not open port: %s" % msg)
|
||||
self._reconfigurePort()
|
||||
self._instream = self.sPort.getInputStream()
|
||||
self._outstream = self.sPort.getOutputStream()
|
||||
self._isOpen = True
|
||||
|
||||
def _reconfigurePort(self):
|
||||
"""Set commuication parameters on opened port."""
|
||||
if not self.sPort:
|
||||
raise SerialException("Can only operate on a valid port handle")
|
||||
|
||||
self.sPort.enableReceiveTimeout(30)
|
||||
if self._bytesize == FIVEBITS:
|
||||
jdatabits = javax.comm.SerialPort.DATABITS_5
|
||||
elif self._bytesize == SIXBITS:
|
||||
jdatabits = javax.comm.SerialPort.DATABITS_6
|
||||
elif self._bytesize == SEVENBITS:
|
||||
jdatabits = javax.comm.SerialPort.DATABITS_7
|
||||
elif self._bytesize == EIGHTBITS:
|
||||
jdatabits = javax.comm.SerialPort.DATABITS_8
|
||||
else:
|
||||
raise ValueError("unsupported bytesize: %r" % self._bytesize)
|
||||
|
||||
if self._stopbits == STOPBITS_ONE:
|
||||
jstopbits = javax.comm.SerialPort.STOPBITS_1
|
||||
elif stopbits == STOPBITS_ONE_HALVE:
|
||||
self._jstopbits = javax.comm.SerialPort.STOPBITS_1_5
|
||||
elif self._stopbits == STOPBITS_TWO:
|
||||
jstopbits = javax.comm.SerialPort.STOPBITS_2
|
||||
else:
|
||||
raise ValueError("unsupported number of stopbits: %r" % self._stopbits)
|
||||
|
||||
if self._parity == PARITY_NONE:
|
||||
jparity = javax.comm.SerialPort.PARITY_NONE
|
||||
elif self._parity == PARITY_EVEN:
|
||||
jparity = javax.comm.SerialPort.PARITY_EVEN
|
||||
elif self._parity == PARITY_ODD:
|
||||
jparity = javax.comm.SerialPort.PARITY_ODD
|
||||
#~ elif self._parity == PARITY_MARK:
|
||||
#~ jparity = javax.comm.SerialPort.PARITY_MARK
|
||||
#~ elif self._parity == PARITY_SPACE:
|
||||
#~ jparity = javax.comm.SerialPort.PARITY_SPACE
|
||||
else:
|
||||
raise ValueError("unsupported parity type: %r" % self._parity)
|
||||
|
||||
jflowin = jflowout = 0
|
||||
if self._rtscts:
|
||||
jflowin |= javax.comm.SerialPort.FLOWCONTROL_RTSCTS_IN
|
||||
jflowout |= javax.comm.SerialPort.FLOWCONTROL_RTSCTS_OUT
|
||||
if self._xonxoff:
|
||||
jflowin |= javax.comm.SerialPort.FLOWCONTROL_XONXOFF_IN
|
||||
jflowout |= javax.comm.SerialPort.FLOWCONTROL_XONXOFF_OUT
|
||||
|
||||
self.sPort.setSerialPortParams(baudrate, jdatabits, jstopbits, jparity)
|
||||
self.sPort.setFlowControlMode(jflowin | jflowout)
|
||||
|
||||
if self._timeout >= 0:
|
||||
self.sPort.enableReceiveTimeout(self._timeout*1000)
|
||||
else:
|
||||
self.sPort.disableReceiveTimeout()
|
||||
|
||||
def close(self):
|
||||
"""Close port"""
|
||||
if self._isOpen:
|
||||
if self.sPort:
|
||||
self._instream.close()
|
||||
self._outstream.close()
|
||||
self.sPort.close()
|
||||
self.sPort = None
|
||||
self._isOpen = False
|
||||
|
||||
def makeDeviceName(self, port):
|
||||
return device(port)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - - - - - - - -
|
||||
|
||||
def inWaiting(self):
|
||||
"""Return the number of characters currently in the input buffer."""
|
||||
if not self.sPort: raise portNotOpenError
|
||||
return self._instream.available()
|
||||
|
||||
def read(self, size=1):
|
||||
"""Read size bytes from the serial port. If a timeout is set it may
|
||||
return less characters as requested. With no timeout it will block
|
||||
until the requested number of bytes is read."""
|
||||
if not self.sPort: raise portNotOpenError
|
||||
read = ''
|
||||
if size > 0:
|
||||
while len(read) < size:
|
||||
x = self._instream.read()
|
||||
if x == -1:
|
||||
if self.timeout >= 0:
|
||||
break
|
||||
else:
|
||||
read = read + chr(x)
|
||||
return read
|
||||
|
||||
def write(self, data):
|
||||
"""Output the given string over the serial port."""
|
||||
if not self.sPort: raise portNotOpenError
|
||||
self._outstream.write(data)
|
||||
|
||||
def flushInput(self):
|
||||
"""Clear input buffer, discarding all that is in the buffer."""
|
||||
if not self.sPort: raise portNotOpenError
|
||||
self._instream.skip(self._instream.available())
|
||||
|
||||
def flushOutput(self):
|
||||
"""Clear output buffer, aborting the current output and
|
||||
discarding all that is in the buffer."""
|
||||
if not self.sPort: raise portNotOpenError
|
||||
self._outstream.flush()
|
||||
|
||||
def sendBreak(self):
|
||||
"""Send break condition."""
|
||||
if not self.sPort: raise portNotOpenError
|
||||
self.sPort.sendBreak()
|
||||
|
||||
def setRTS(self,on=1):
|
||||
"""Set terminal status line: Request To Send"""
|
||||
if not self.sPort: raise portNotOpenError
|
||||
self.sPort.setRTS(on)
|
||||
|
||||
def setDTR(self,on=1):
|
||||
"""Set terminal status line: Data Terminal Ready"""
|
||||
if not self.sPort: raise portNotOpenError
|
||||
self.sPort.setDTR(on)
|
||||
|
||||
def getCTS(self):
|
||||
"""Read terminal status line: Clear To Send"""
|
||||
if not self.sPort: raise portNotOpenError
|
||||
self.sPort.isCTS()
|
||||
|
||||
def getDSR(self):
|
||||
"""Read terminal status line: Data Set Ready"""
|
||||
if not self.sPort: raise portNotOpenError
|
||||
self.sPort.isDSR()
|
||||
|
||||
def getRI(self):
|
||||
"""Read terminal status line: Ring Indicator"""
|
||||
if not self.sPort: raise portNotOpenError
|
||||
self.sPort.isRI()
|
||||
|
||||
def getCD(self):
|
||||
"""Read terminal status line: Carrier Detect"""
|
||||
if not self.sPort: raise portNotOpenError
|
||||
self.sPort.isCD()
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
s = Serial(0,
|
||||
baudrate=19200, #baudrate
|
||||
bytesize=EIGHTBITS, #number of databits
|
||||
parity=PARITY_EVEN, #enable parity checking
|
||||
stopbits=STOPBITS_ONE, #number of stopbits
|
||||
timeout=3, #set a timeout value, None for waiting forever
|
||||
xonxoff=0, #enable software flow control
|
||||
rtscts=0, #enable RTS/CTS flow control
|
||||
)
|
||||
s.setRTS(1)
|
||||
s.setDTR(1)
|
||||
s.flushInput()
|
||||
s.flushOutput()
|
||||
s.write('hello')
|
||||
print repr(s.read(5))
|
||||
print s.inWaiting()
|
||||
del s
|
||||
|
||||
|
||||
|
||||
396
serial/serialposix.py
Normal file
396
serial/serialposix.py
Normal file
@@ -0,0 +1,396 @@
|
||||
#!/usr/bin/env python
|
||||
#Python Serial Port Extension for Win32, Linux, BSD, Jython
|
||||
#module for serial IO for POSIX compatible systems, like Linux
|
||||
#see __init__.py
|
||||
#
|
||||
#(C) 2001-2003 Chris Liechti <cliechti@gmx.net>
|
||||
# this is distributed under a free software license, see license.txt
|
||||
#
|
||||
#parts based on code from Grant B. Edwards <grante@visi.com>:
|
||||
# ftp://ftp.visi.com/users/grante/python/PosixSerial.py
|
||||
# references: http://www.easysw.com/~mike/serial/serial.html
|
||||
|
||||
import sys, os, fcntl, termios, struct, select
|
||||
from serialutil import *
|
||||
|
||||
VERSION = "$Revision: 1.23 $".split()[1] #extract CVS version
|
||||
|
||||
#Do check the Python version as some constants have moved.
|
||||
if (sys.hexversion < 0x020100f0):
|
||||
import TERMIOS
|
||||
else:
|
||||
TERMIOS = termios
|
||||
|
||||
if (sys.hexversion < 0x020200f0):
|
||||
import FCNTL
|
||||
else:
|
||||
FCNTL = fcntl
|
||||
|
||||
#try to detect the os so that a device can be selected...
|
||||
plat = sys.platform.lower()
|
||||
|
||||
if plat[:5] == 'linux': #Linux (confirmed)
|
||||
def device(port):
|
||||
return '/dev/ttyS%d' % port
|
||||
|
||||
elif plat == 'cygwin': #cywin/win32 (confirmed)
|
||||
def device(port):
|
||||
return '/dev/com%d' % (port + 1)
|
||||
|
||||
elif plat == 'openbsd3': #BSD (confirmed)
|
||||
def device(port):
|
||||
return '/dev/ttyp%d' % port
|
||||
|
||||
elif plat[:3] == 'bsd' or \
|
||||
plat[:7] == 'freebsd' or \
|
||||
plat[:7] == 'openbsd' or \
|
||||
plat[:6] == 'darwin': #BSD (confirmed for freebsd4: cuaa%d)
|
||||
def device(port):
|
||||
return '/dev/cuaa%d' % port
|
||||
|
||||
elif plat[:6] == 'netbsd': #NetBSD 1.6 testing by Erk
|
||||
def device(port):
|
||||
return '/dev/dty%02d' % port
|
||||
|
||||
elif plat[:4] == 'irix': #IRIX (not tested)
|
||||
def device(port):
|
||||
return '/dev/ttyf%d' % port
|
||||
|
||||
elif plat[:2] == 'hp': #HP-UX (not tested)
|
||||
def device(port):
|
||||
return '/dev/tty%dp0' % (port+1)
|
||||
|
||||
elif plat[:5] == 'sunos': #Solaris/SunOS (confirmed)
|
||||
def device(port):
|
||||
return '/dev/tty%c' % (ord('a')+port)
|
||||
|
||||
else:
|
||||
#platform detection has failed...
|
||||
print """don't know how to number ttys on this system.
|
||||
! Use an explicit path (eg /dev/ttyS1) or send this information to
|
||||
! the author of this module:
|
||||
|
||||
sys.platform = %r
|
||||
os.name = %r
|
||||
serialposix.py version = %s
|
||||
|
||||
also add the device name of the serial port and where the
|
||||
counting starts for the first serial port.
|
||||
e.g. 'first serial port: /dev/ttyS0'
|
||||
and with a bit luck you can get this module running...
|
||||
""" % (sys.platform, os.name, VERSION)
|
||||
#no exception, just continue with a brave attempt to build a device name
|
||||
#even if the device name is not correct for the platform it has chances
|
||||
#to work using a string with the real device name as port paramter.
|
||||
def device(portum):
|
||||
return '/dev/ttyS%d' % portnum
|
||||
#~ raise Exception, "this module does not run on this platform, sorry."
|
||||
|
||||
#whats up with "aix", "beos", ....
|
||||
#they should work, just need to know the device names.
|
||||
|
||||
|
||||
#load some constants for later use.
|
||||
#try to use values from TERMIOS, use defaults from linux otherwise
|
||||
TIOCMGET = hasattr(TERMIOS, 'TIOCMGET') and TERMIOS.TIOCMGET or 0x5415
|
||||
TIOCMBIS = hasattr(TERMIOS, 'TIOCMBIS') and TERMIOS.TIOCMBIS or 0x5416
|
||||
TIOCMBIC = hasattr(TERMIOS, 'TIOCMBIC') and TERMIOS.TIOCMBIC or 0x5417
|
||||
TIOCMSET = hasattr(TERMIOS, 'TIOCMSET') and TERMIOS.TIOCMSET or 0x5418
|
||||
|
||||
#TIOCM_LE = hasattr(TERMIOS, 'TIOCM_LE') and TERMIOS.TIOCM_LE or 0x001
|
||||
TIOCM_DTR = hasattr(TERMIOS, 'TIOCM_DTR') and TERMIOS.TIOCM_DTR or 0x002
|
||||
TIOCM_RTS = hasattr(TERMIOS, 'TIOCM_RTS') and TERMIOS.TIOCM_RTS or 0x004
|
||||
#TIOCM_ST = hasattr(TERMIOS, 'TIOCM_ST') and TERMIOS.TIOCM_ST or 0x008
|
||||
#TIOCM_SR = hasattr(TERMIOS, 'TIOCM_SR') and TERMIOS.TIOCM_SR or 0x010
|
||||
|
||||
TIOCM_CTS = hasattr(TERMIOS, 'TIOCM_CTS') and TERMIOS.TIOCM_CTS or 0x020
|
||||
TIOCM_CAR = hasattr(TERMIOS, 'TIOCM_CAR') and TERMIOS.TIOCM_CAR or 0x040
|
||||
TIOCM_RNG = hasattr(TERMIOS, 'TIOCM_RNG') and TERMIOS.TIOCM_RNG or 0x080
|
||||
TIOCM_DSR = hasattr(TERMIOS, 'TIOCM_DSR') and TERMIOS.TIOCM_DSR or 0x100
|
||||
TIOCM_CD = hasattr(TERMIOS, 'TIOCM_CD') and TERMIOS.TIOCM_CD or TIOCM_CAR
|
||||
TIOCM_RI = hasattr(TERMIOS, 'TIOCM_RI') and TERMIOS.TIOCM_RI or TIOCM_RNG
|
||||
#TIOCM_OUT1 = hasattr(TERMIOS, 'TIOCM_OUT1') and TERMIOS.TIOCM_OUT1 or 0x2000
|
||||
#TIOCM_OUT2 = hasattr(TERMIOS, 'TIOCM_OUT2') and TERMIOS.TIOCM_OUT2 or 0x4000
|
||||
TIOCINQ = hasattr(TERMIOS, 'FIONREAD') and TERMIOS.FIONREAD or 0x541B
|
||||
|
||||
TIOCM_zero_str = struct.pack('I', 0)
|
||||
TIOCM_RTS_str = struct.pack('I', TIOCM_RTS)
|
||||
TIOCM_DTR_str = struct.pack('I', TIOCM_DTR)
|
||||
|
||||
|
||||
class Serial(SerialBase):
|
||||
"""Serial port class POSIX implementation. Serial port configuration is
|
||||
done with termios and fcntl. Runs on Linux and many other Un*x like
|
||||
systems."""
|
||||
|
||||
def open(self):
|
||||
"""Open port with current settings. This may throw a SerialException
|
||||
if the port cannot be opened."""
|
||||
if self._port is None:
|
||||
raise SerialException("Port must be configured before it can be used.")
|
||||
self.fd = None
|
||||
#open
|
||||
try:
|
||||
self.fd = os.open(self.portstr, os.O_RDWR|os.O_NOCTTY|os.O_NONBLOCK)
|
||||
except Exception, msg:
|
||||
self.fd = None
|
||||
raise SerialException("Could not open port: %s" % msg)
|
||||
#~ fcntl.fcntl(self.fd, FCNTL.F_SETFL, 0) #set blocking
|
||||
|
||||
self._reconfigurePort()
|
||||
self._isOpen = True
|
||||
#~ self.flushInput()
|
||||
|
||||
def get_fd(self):
|
||||
"""
|
||||
Return fd
|
||||
"""
|
||||
return self.fd
|
||||
|
||||
def _reconfigurePort(self):
|
||||
"""Set commuication parameters on opened port."""
|
||||
if self.fd is None:
|
||||
raise SerialException("Can only operate on a valid port handle")
|
||||
|
||||
vmin = vtime = 0 #timeout is done via select
|
||||
try:
|
||||
iflag, oflag, cflag, lflag, ispeed, ospeed, cc = termios.tcgetattr(self.fd)
|
||||
except termios.error, msg: #if a port is nonexistent but has a /dev file, it'll fail here
|
||||
raise SerialException("Could not configure port: %s" % msg)
|
||||
#set up raw mode / no echo / binary
|
||||
cflag |= (TERMIOS.CLOCAL|TERMIOS.CREAD)
|
||||
lflag &= ~(TERMIOS.ICANON|TERMIOS.ECHO|TERMIOS.ECHOE|TERMIOS.ECHOK|TERMIOS.ECHONL|
|
||||
TERMIOS.ISIG|TERMIOS.IEXTEN) #|TERMIOS.ECHOPRT
|
||||
for flag in ('ECHOCTL', 'ECHOKE'): #netbsd workaround for Erk
|
||||
if hasattr(TERMIOS, flag):
|
||||
lflag &= ~getattr(TERMIOS, flag)
|
||||
|
||||
oflag &= ~(TERMIOS.OPOST)
|
||||
iflag &= ~(TERMIOS.INLCR|TERMIOS.IGNCR|TERMIOS.ICRNL|TERMIOS.IGNBRK)
|
||||
if hasattr(TERMIOS, 'IUCLC'):
|
||||
iflag &= ~TERMIOS.IUCLC
|
||||
|
||||
#setup baudrate
|
||||
try:
|
||||
ispeed = ospeed = getattr(TERMIOS,'B%s' % (self._baudrate))
|
||||
except AttributeError:
|
||||
raise ValueError('Invalid baud rate: %r' % self._baudrate)
|
||||
#setup char len
|
||||
cflag &= ~TERMIOS.CSIZE
|
||||
if self._bytesize == 8:
|
||||
cflag |= TERMIOS.CS8
|
||||
elif self._bytesize == 7:
|
||||
cflag |= TERMIOS.CS7
|
||||
elif self._bytesize == 6:
|
||||
cflag |= TERMIOS.CS6
|
||||
elif self._bytesize == 5:
|
||||
cflag |= TERMIOS.CS5
|
||||
else:
|
||||
raise ValueError('Invalid char len: %r' % self._bytesize)
|
||||
#setup stopbits
|
||||
if self._stopbits == STOPBITS_ONE:
|
||||
cflag &= ~(TERMIOS.CSTOPB)
|
||||
elif self._stopbits == STOPBITS_TWO:
|
||||
cflag |= (TERMIOS.CSTOPB)
|
||||
else:
|
||||
raise ValueError('Invalid stopit specification: %r' % self._stopbits)
|
||||
#setup parity
|
||||
iflag &= ~(TERMIOS.INPCK|TERMIOS.ISTRIP)
|
||||
if self._parity == PARITY_NONE:
|
||||
cflag &= ~(TERMIOS.PARENB|TERMIOS.PARODD)
|
||||
elif self._parity == PARITY_EVEN:
|
||||
cflag &= ~(TERMIOS.PARODD)
|
||||
cflag |= (TERMIOS.PARENB)
|
||||
elif self._parity == PARITY_ODD:
|
||||
cflag |= (TERMIOS.PARENB|TERMIOS.PARODD)
|
||||
else:
|
||||
raise ValueError('Invalid parity: %r' % self._parity)
|
||||
#setup flow control
|
||||
#xonxoff
|
||||
if hasattr(TERMIOS, 'IXANY'):
|
||||
if self._xonxoff:
|
||||
iflag |= (TERMIOS.IXON|TERMIOS.IXOFF) #|TERMIOS.IXANY)
|
||||
else:
|
||||
iflag &= ~(TERMIOS.IXON|TERMIOS.IXOFF|TERMIOS.IXANY)
|
||||
else:
|
||||
if self._xonxoff:
|
||||
iflag |= (TERMIOS.IXON|TERMIOS.IXOFF)
|
||||
else:
|
||||
iflag &= ~(TERMIOS.IXON|TERMIOS.IXOFF)
|
||||
#rtscts
|
||||
if hasattr(TERMIOS, 'CRTSCTS'):
|
||||
if self._rtscts:
|
||||
cflag |= (TERMIOS.CRTSCTS)
|
||||
else:
|
||||
cflag &= ~(TERMIOS.CRTSCTS)
|
||||
elif hasattr(TERMIOS, 'CNEW_RTSCTS'): #try it with alternate constant name
|
||||
if self._rtscts:
|
||||
cflag |= (TERMIOS.CNEW_RTSCTS)
|
||||
else:
|
||||
cflag &= ~(TERMIOS.CNEW_RTSCTS)
|
||||
#XXX should there be a warning if setting up rtscts (and xonxoff etc) fails??
|
||||
|
||||
#buffer
|
||||
#vmin "minimal number of characters to be read. = for non blocking"
|
||||
if vmin < 0 or vmin > 255:
|
||||
raise ValueError('Invalid vmin: %r ' % vmin)
|
||||
cc[TERMIOS.VMIN] = vmin
|
||||
#vtime
|
||||
if vtime < 0 or vtime > 255:
|
||||
raise ValueError('Invalid vtime: %r' % vtime)
|
||||
cc[TERMIOS.VTIME] = vtime
|
||||
#activate settings
|
||||
termios.tcsetattr(self.fd, TERMIOS.TCSANOW, [iflag, oflag, cflag, lflag, ispeed, ospeed, cc])
|
||||
|
||||
def close(self):
|
||||
"""Close port"""
|
||||
if self._isOpen:
|
||||
if self.fd is not None:
|
||||
os.close(self.fd)
|
||||
self.fd = None
|
||||
self._isOpen = False
|
||||
|
||||
def makeDeviceName(self, port):
|
||||
return device(port)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - - - - - - - -
|
||||
|
||||
def inWaiting(self):
|
||||
"""Return the number of characters currently in the input buffer."""
|
||||
#~ s = fcntl.ioctl(self.fd, TERMIOS.FIONREAD, TIOCM_zero_str)
|
||||
s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str)
|
||||
return struct.unpack('I',s)[0]
|
||||
|
||||
def read(self, size=1):
|
||||
"""Read size bytes from the serial port. If a timeout is set it may
|
||||
return less characters as requested. With no timeout it will block
|
||||
until the requested number of bytes is read."""
|
||||
if self.fd is None: raise portNotOpenError
|
||||
read = ''
|
||||
inp = None
|
||||
if size > 0:
|
||||
while len(read) < size:
|
||||
#print "\tread(): size",size, "have", len(read) #debug
|
||||
ready,_,_ = select.select([self.fd],[],[], self._timeout)
|
||||
if not ready:
|
||||
break #timeout
|
||||
buf = os.read(self.fd, size-len(read))
|
||||
read = read + buf
|
||||
if self._timeout >= 0 and not buf:
|
||||
break #early abort on timeout
|
||||
return read
|
||||
|
||||
def write(self, data):
|
||||
"""Output the given string over the serial port."""
|
||||
if self.fd is None: raise portNotOpenError
|
||||
t = len(data)
|
||||
d = data
|
||||
while t > 0:
|
||||
if self._writeTimeout is not None and self._writeTimeout > 0:
|
||||
_,ready,_ = select.select([],[self.fd],[], self._writeTimeout)
|
||||
if not ready:
|
||||
raise writeTimeoutError
|
||||
n = os.write(self.fd, d)
|
||||
if self._writeTimeout is not None and self._writeTimeout > 0:
|
||||
_,ready,_ = select.select([],[self.fd],[], self._writeTimeout)
|
||||
if not ready:
|
||||
raise writeTimeoutError
|
||||
d = d[n:]
|
||||
t = t - n
|
||||
|
||||
def flush(self):
|
||||
"""Flush of file like objects. In this case, wait until all data
|
||||
is written."""
|
||||
self.drainOutput()
|
||||
|
||||
def flushInput(self):
|
||||
"""Clear input buffer, discarding all that is in the buffer."""
|
||||
if self.fd is None:
|
||||
raise portNotOpenError
|
||||
termios.tcflush(self.fd, TERMIOS.TCIFLUSH)
|
||||
|
||||
def flushOutput(self):
|
||||
"""Clear output buffer, aborting the current output and
|
||||
discarding all that is in the buffer."""
|
||||
if self.fd is None:
|
||||
raise portNotOpenError
|
||||
termios.tcflush(self.fd, TERMIOS.TCOFLUSH)
|
||||
|
||||
def sendBreak(self):
|
||||
"""Send break condition."""
|
||||
if self.fd is None:
|
||||
raise portNotOpenError
|
||||
termios.tcsendbreak(self.fd, 0)
|
||||
|
||||
def setRTS(self,on=1):
|
||||
"""Set terminal status line: Request To Send"""
|
||||
if self.fd is None: raise portNotOpenError
|
||||
if on:
|
||||
fcntl.ioctl(self.fd, TIOCMBIS, TIOCM_RTS_str)
|
||||
else:
|
||||
fcntl.ioctl(self.fd, TIOCMBIC, TIOCM_RTS_str)
|
||||
|
||||
def setDTR(self,on=1):
|
||||
"""Set terminal status line: Data Terminal Ready"""
|
||||
if self.fd is None: raise portNotOpenError
|
||||
if on:
|
||||
fcntl.ioctl(self.fd, TIOCMBIS, TIOCM_DTR_str)
|
||||
else:
|
||||
fcntl.ioctl(self.fd, TIOCMBIC, TIOCM_DTR_str)
|
||||
|
||||
def getCTS(self):
|
||||
"""Read terminal status line: Clear To Send"""
|
||||
if self.fd is None: raise portNotOpenError
|
||||
s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
|
||||
return struct.unpack('I',s)[0] & TIOCM_CTS != 0
|
||||
|
||||
def getDSR(self):
|
||||
"""Read terminal status line: Data Set Ready"""
|
||||
if self.fd is None: raise portNotOpenError
|
||||
s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
|
||||
return struct.unpack('I',s)[0] & TIOCM_DSR != 0
|
||||
|
||||
def getRI(self):
|
||||
"""Read terminal status line: Ring Indicator"""
|
||||
if self.fd is None: raise portNotOpenError
|
||||
s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
|
||||
return struct.unpack('I',s)[0] & TIOCM_RI != 0
|
||||
|
||||
def getCD(self):
|
||||
"""Read terminal status line: Carrier Detect"""
|
||||
if self.fd is None: raise portNotOpenError
|
||||
s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
|
||||
return struct.unpack('I',s)[0] & TIOCM_CD != 0
|
||||
|
||||
# - - platform specific - - - -
|
||||
|
||||
def drainOutput(self):
|
||||
"""internal - not portable!"""
|
||||
if self.fd is None: raise portNotOpenError
|
||||
termios.tcdrain(self.fd)
|
||||
|
||||
def nonblocking(self):
|
||||
"""internal - not portable!"""
|
||||
if self.fd is None:
|
||||
raise portNotOpenError
|
||||
fcntl.fcntl(self.fd, FCNTL.F_SETFL, FCNTL.O_NONBLOCK)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
s = Serial(0,
|
||||
baudrate=19200, #baudrate
|
||||
bytesize=EIGHTBITS, #number of databits
|
||||
parity=PARITY_EVEN, #enable parity checking
|
||||
stopbits=STOPBITS_ONE, #number of stopbits
|
||||
timeout=3, #set a timeout value, None for waiting forever
|
||||
xonxoff=0, #enable software flow control
|
||||
rtscts=0, #enable RTS/CTS flow control
|
||||
)
|
||||
s.setRTS(1)
|
||||
s.setDTR(1)
|
||||
s.flushInput()
|
||||
s.flushOutput()
|
||||
s.write('hello')
|
||||
print repr(s.read(5))
|
||||
print s.inWaiting()
|
||||
del s
|
||||
345
serial/serialutil.py
Normal file
345
serial/serialutil.py
Normal file
@@ -0,0 +1,345 @@
|
||||
#! python
|
||||
#Python Serial Port Extension for Win32, Linux, BSD, Jython
|
||||
#see __init__.py
|
||||
#
|
||||
#(C) 2001-2003 Chris Liechti <cliechti@gmx.net>
|
||||
# this is distributed under a free software license, see license.txt
|
||||
|
||||
PARITY_NONE, PARITY_EVEN, PARITY_ODD = 'N', 'E', 'O'
|
||||
STOPBITS_ONE, STOPBITS_TWO = (1, 2)
|
||||
FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS = (5,6,7,8)
|
||||
|
||||
PARITY_NAMES = {
|
||||
PARITY_NONE: 'None',
|
||||
PARITY_EVEN: 'Even',
|
||||
PARITY_ODD: 'Odd',
|
||||
}
|
||||
|
||||
XON = chr(17)
|
||||
XOFF = chr(19)
|
||||
|
||||
#Python < 2.2.3 compatibility
|
||||
try:
|
||||
True
|
||||
except:
|
||||
True = 1
|
||||
False = not True
|
||||
|
||||
class SerialException(Exception):
|
||||
"""Base class for serial port related exceptions."""
|
||||
|
||||
portNotOpenError = SerialException('Port not open')
|
||||
|
||||
class SerialTimeoutException(SerialException):
|
||||
"""Write timeouts give an exception"""
|
||||
|
||||
writeTimeoutError = SerialTimeoutException("Write timeout")
|
||||
|
||||
class FileLike(object):
|
||||
"""An abstract file like class.
|
||||
|
||||
This class implements readline and readlines based on read and
|
||||
writelines based on write.
|
||||
This class is used to provide the above functions for to Serial
|
||||
port objects.
|
||||
|
||||
Note that when the serial port was opened with _NO_ timeout that
|
||||
readline blocks until it sees a newline (or the specified size is
|
||||
reached) and that readlines would never return and therefore
|
||||
refuses to work (it raises an exception in this case)!
|
||||
"""
|
||||
|
||||
def read(self, size): raise NotImplementedError
|
||||
def write(self, s): raise NotImplementedError
|
||||
|
||||
def readline(self, size=None, eol='\n'):
|
||||
"""read a line which is terminated with end-of-line (eol) character
|
||||
('\n' by default) or until timeout"""
|
||||
line = ''
|
||||
while 1:
|
||||
c = self.read(1)
|
||||
if c:
|
||||
line += c #not very efficient but lines are usually not that long
|
||||
if c == eol:
|
||||
break
|
||||
if size is not None and len(line) >= size:
|
||||
break
|
||||
else:
|
||||
break
|
||||
return line
|
||||
|
||||
def readlines(self, sizehint=None, eol='\n'):
|
||||
"""read a list of lines, until timeout
|
||||
sizehint is ignored"""
|
||||
if self.timeout is None:
|
||||
raise ValueError, "Serial port MUST have enabled timeout for this function!"
|
||||
lines = []
|
||||
while 1:
|
||||
line = self.readline(eol=eol)
|
||||
if line:
|
||||
lines.append(line)
|
||||
if line[-1] != eol: #was the line received with a timeout?
|
||||
break
|
||||
else:
|
||||
break
|
||||
return lines
|
||||
|
||||
def xreadlines(self, sizehint=None):
|
||||
"""just call readlines - here for compatibility"""
|
||||
return self.readlines()
|
||||
|
||||
def writelines(self, sequence):
|
||||
for line in sequence:
|
||||
self.write(line)
|
||||
|
||||
def flush(self):
|
||||
"""flush of file like objects"""
|
||||
pass
|
||||
|
||||
class SerialBase(FileLike):
|
||||
"""Serial port base class. Provides __init__ function and properties to
|
||||
get/set port settings."""
|
||||
|
||||
#default values, may be overriden in subclasses that do not support all values
|
||||
BAUDRATES = (50,75,110,134,150,200,300,600,1200,1800,2400,4800,9600,
|
||||
19200,38400,57600,115200,230400,460800,500000,576000,921600,
|
||||
1000000,1152000,1500000,2000000,2500000,3000000,3500000,4000000)
|
||||
BYTESIZES = (FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS)
|
||||
PARITIES = (PARITY_NONE, PARITY_EVEN, PARITY_ODD)
|
||||
STOPBITS = (STOPBITS_ONE, STOPBITS_TWO)
|
||||
|
||||
def __init__(self,
|
||||
port = None, #number of device, numbering starts at
|
||||
#zero. if everything fails, the user
|
||||
#can specify a device string, note
|
||||
#that this isn't portable anymore
|
||||
#port will be opened if one is specified
|
||||
baudrate=9600, #baudrate
|
||||
bytesize=EIGHTBITS, #number of databits
|
||||
parity=PARITY_NONE, #enable parity checking
|
||||
stopbits=STOPBITS_ONE, #number of stopbits
|
||||
timeout=None, #set a timeout value, None to wait forever
|
||||
xonxoff=0, #enable software flow control
|
||||
rtscts=0, #enable RTS/CTS flow control
|
||||
writeTimeout=None, #set a timeout for writes
|
||||
):
|
||||
"""Initialize comm port object. If a port is given, then the port will be
|
||||
opened immediately. Otherwise a Serial port object in closed state
|
||||
is returned."""
|
||||
|
||||
self._isOpen = False
|
||||
self._port = None #correct value is assigned below trough properties
|
||||
self._baudrate = None #correct value is assigned below trough properties
|
||||
self._bytesize = None #correct value is assigned below trough properties
|
||||
self._parity = None #correct value is assigned below trough properties
|
||||
self._stopbits = None #correct value is assigned below trough properties
|
||||
self._timeout = None #correct value is assigned below trough properties
|
||||
self._writeTimeout = None #correct value is assigned below trough properties
|
||||
self._xonxoff = None #correct value is assigned below trough properties
|
||||
self._rtscts = None #correct value is assigned below trough properties
|
||||
|
||||
#assign values using get/set methods using the properties feature
|
||||
self.port = port
|
||||
self.baudrate = baudrate
|
||||
self.bytesize = bytesize
|
||||
self.parity = parity
|
||||
self.stopbits = stopbits
|
||||
self.timeout = timeout
|
||||
self.writeTimeout = writeTimeout
|
||||
self.xonxoff = xonxoff
|
||||
self.rtscts = rtscts
|
||||
|
||||
if port is not None:
|
||||
self.open()
|
||||
|
||||
def isOpen(self):
|
||||
"""Check if the port is opened."""
|
||||
return self._isOpen
|
||||
|
||||
# - - - - - - - - - - - - - - - - - - - - - - - -
|
||||
|
||||
#TODO: these are not realy needed as the is the BAUDRATES etc attribute...
|
||||
#maybe i remove them before the final release...
|
||||
|
||||
def getSupportedBaudrates(self):
|
||||
return [(str(b), b) for b in self.BAUDRATES]
|
||||
|
||||
def getSupportedByteSizes(self):
|
||||
return [(str(b), b) for b in self.BYTESIZES]
|
||||
|
||||
def getSupportedStopbits(self):
|
||||
return [(str(b), b) for b in self.STOPBITS]
|
||||
|
||||
def getSupportedParities(self):
|
||||
return [(PARITY_NAMES[b], b) for b in self.PARITIES]
|
||||
|
||||
# - - - - - - - - - - - - - - - - - - - - - - - -
|
||||
|
||||
def setPort(self, port):
|
||||
"""Change the port. The attribute portstr is set to a string that
|
||||
contains the name of the port."""
|
||||
|
||||
was_open = self._isOpen
|
||||
if was_open: self.close()
|
||||
if port is not None:
|
||||
if type(port) == type(''): #strings are taken directly
|
||||
self.portstr = port
|
||||
else:
|
||||
self.portstr = self.makeDeviceName(port)
|
||||
else:
|
||||
self.portstr = None
|
||||
self._port = port
|
||||
if was_open: self.open()
|
||||
|
||||
def getPort(self):
|
||||
"""Get the current port setting. The value that was passed on init or using
|
||||
setPort() is passed back. See also the attribute portstr which contains
|
||||
the name of the port as a string."""
|
||||
return self._port
|
||||
|
||||
port = property(getPort, setPort, "Port setting")
|
||||
|
||||
|
||||
def setBaudrate(self, baudrate):
|
||||
"""Change baudrate. It raises a ValueError if the port is open and the
|
||||
baudrate is not possible. If the port is closed, then tha value is
|
||||
accepted and the exception is raised when the port is opened."""
|
||||
#~ if baudrate not in self.BAUDRATES: raise ValueError("Not a valid baudrate: %r" % baudrate)
|
||||
try:
|
||||
self._baudrate = int(baudrate)
|
||||
except TypeError:
|
||||
raise ValueError("Not a valid baudrate: %r" % baudrate)
|
||||
else:
|
||||
if self._isOpen: self._reconfigurePort()
|
||||
|
||||
def getBaudrate(self):
|
||||
"""Get the current baudrate setting."""
|
||||
return self._baudrate
|
||||
|
||||
baudrate = property(getBaudrate, setBaudrate, "Baudrate setting")
|
||||
|
||||
|
||||
def setByteSize(self, bytesize):
|
||||
"""Change byte size."""
|
||||
if bytesize not in self.BYTESIZES: raise ValueError("Not a valid byte size: %r" % bytesize)
|
||||
self._bytesize = bytesize
|
||||
if self._isOpen: self._reconfigurePort()
|
||||
|
||||
def getByteSize(self):
|
||||
"""Get the current byte size setting."""
|
||||
return self._bytesize
|
||||
|
||||
bytesize = property(getByteSize, setByteSize, "Byte size setting")
|
||||
|
||||
|
||||
def setParity(self, parity):
|
||||
"""Change parity setting."""
|
||||
if parity not in self.PARITIES: raise ValueError("Not a valid parity: %r" % parity)
|
||||
self._parity = parity
|
||||
if self._isOpen: self._reconfigurePort()
|
||||
|
||||
def getParity(self):
|
||||
"""Get the current parity setting."""
|
||||
return self._parity
|
||||
|
||||
parity = property(getParity, setParity, "Parity setting")
|
||||
|
||||
|
||||
def setStopbits(self, stopbits):
|
||||
"""Change stopbits size."""
|
||||
if stopbits not in self.STOPBITS: raise ValueError("Not a valid stopbit size: %r" % stopbits)
|
||||
self._stopbits = stopbits
|
||||
if self._isOpen: self._reconfigurePort()
|
||||
|
||||
def getStopbits(self):
|
||||
"""Get the current stopbits setting."""
|
||||
return self._stopbits
|
||||
|
||||
stopbits = property(getStopbits, setStopbits, "Stopbits setting")
|
||||
|
||||
|
||||
def setTimeout(self, timeout):
|
||||
"""Change timeout setting."""
|
||||
if timeout is not None:
|
||||
if timeout < 0: raise ValueError("Not a valid timeout: %r" % timeout)
|
||||
try:
|
||||
timeout + 1 #test if it's a number, will throw a TypeError if not...
|
||||
except TypeError:
|
||||
raise ValueError("Not a valid timeout: %r" % timeout)
|
||||
|
||||
self._timeout = timeout
|
||||
if self._isOpen: self._reconfigurePort()
|
||||
|
||||
def getTimeout(self):
|
||||
"""Get the current timeout setting."""
|
||||
return self._timeout
|
||||
|
||||
timeout = property(getTimeout, setTimeout, "Timeout setting for read()")
|
||||
|
||||
|
||||
def setWriteTimeout(self, timeout):
|
||||
"""Change timeout setting."""
|
||||
if timeout is not None:
|
||||
if timeout < 0: raise ValueError("Not a valid timeout: %r" % timeout)
|
||||
try:
|
||||
timeout + 1 #test if it's a number, will throw a TypeError if not...
|
||||
except TypeError:
|
||||
raise ValueError("Not a valid timeout: %r" % timeout)
|
||||
|
||||
self._writeTimeout = timeout
|
||||
if self._isOpen: self._reconfigurePort()
|
||||
|
||||
def getWriteTimeout(self):
|
||||
"""Get the current timeout setting."""
|
||||
return self._writeTimeout
|
||||
|
||||
writeTimeout = property(getWriteTimeout, setWriteTimeout, "Timeout setting for write()")
|
||||
|
||||
|
||||
def setXonXoff(self, xonxoff):
|
||||
"""Change XonXoff setting."""
|
||||
self._xonxoff = xonxoff
|
||||
if self._isOpen: self._reconfigurePort()
|
||||
|
||||
def getXonXoff(self):
|
||||
"""Get the current XonXoff setting."""
|
||||
return self._xonxoff
|
||||
|
||||
xonxoff = property(getXonXoff, setXonXoff, "Xon/Xoff setting")
|
||||
|
||||
def setRtsCts(self, rtscts):
|
||||
"""Change RtsCts setting."""
|
||||
self._rtscts = rtscts
|
||||
if self._isOpen: self._reconfigurePort()
|
||||
|
||||
def getRtsCts(self):
|
||||
"""Get the current RtsCts setting."""
|
||||
return self._rtscts
|
||||
|
||||
rtscts = property(getRtsCts, setRtsCts, "RTS/CTS setting")
|
||||
|
||||
# - - - - - - - - - - - - - - - - - - - - - - - -
|
||||
|
||||
def __repr__(self):
|
||||
"""String representation of the current port settings and its state."""
|
||||
return "%s<id=0x%x, open=%s>(port=%r, baudrate=%r, bytesize=%r, parity=%r, stopbits=%r, timeout=%r, xonxoff=%r, rtscts=%r)" % (
|
||||
self.__class__.__name__,
|
||||
id(self),
|
||||
self._isOpen,
|
||||
self.portstr,
|
||||
self.baudrate,
|
||||
self.bytesize,
|
||||
self.parity,
|
||||
self.stopbits,
|
||||
self.timeout,
|
||||
self.xonxoff,
|
||||
self.rtscts,
|
||||
)
|
||||
|
||||
if __name__ == '__main__':
|
||||
s = SerialBase()
|
||||
print s.getSupportedBaudrates()
|
||||
print s.getSupportedByteSizes()
|
||||
print s.getSupportedParities()
|
||||
print s.getSupportedStopbits()
|
||||
print s
|
||||
305
serial/serialwin32.py
Normal file
305
serial/serialwin32.py
Normal file
@@ -0,0 +1,305 @@
|
||||
#! python
|
||||
#Python Serial Port Extension for Win32, Linux, BSD, Jython
|
||||
#serial driver for win32
|
||||
#see __init__.py
|
||||
#
|
||||
#(C) 2001-2003 Chris Liechti <cliechti@gmx.net>
|
||||
# this is distributed under a free software license, see license.txt
|
||||
|
||||
import win32file # The base COM port and file IO functions.
|
||||
import win32event # We use events and the WaitFor[Single|Multiple]Objects functions.
|
||||
import win32con # constants.
|
||||
from serialutil import *
|
||||
|
||||
VERSION = "$Revision: 1.29 $".split()[1] #extract CVS version
|
||||
|
||||
#from winbase.h. these should realy be in win32con
|
||||
MS_CTS_ON = 16
|
||||
MS_DSR_ON = 32
|
||||
MS_RING_ON = 64
|
||||
MS_RLSD_ON = 128
|
||||
|
||||
def device(portnum):
|
||||
"""Turn a port number into a device name"""
|
||||
#the "//./COMx" format is required for devices >= 9
|
||||
#not all versions of windows seem to support this propperly
|
||||
#so that the first few ports are used with the DOS device name
|
||||
if portnum < 9:
|
||||
return 'COM%d' % (portnum+1) #numbers are transformed to a string
|
||||
else:
|
||||
return r'\\.\COM%d' % (portnum+1)
|
||||
|
||||
class Serial(SerialBase):
|
||||
"""Serial port implemenation for Win32. This implemenatation requires a
|
||||
win32all installation."""
|
||||
|
||||
BAUDRATES = (50,75,110,134,150,200,300,600,1200,1800,2400,4800,9600,
|
||||
19200,38400,57600,115200)
|
||||
|
||||
def open(self):
|
||||
"""Open port with current settings. This may throw a SerialException
|
||||
if the port cannot be opened."""
|
||||
if self._port is None:
|
||||
raise SerialException("Port must be configured before it can be used.")
|
||||
self.hComPort = None
|
||||
try:
|
||||
self.hComPort = win32file.CreateFile(self.portstr,
|
||||
win32con.GENERIC_READ | win32con.GENERIC_WRITE,
|
||||
0, # exclusive access
|
||||
None, # no security
|
||||
win32con.OPEN_EXISTING,
|
||||
win32con.FILE_ATTRIBUTE_NORMAL | win32con.FILE_FLAG_OVERLAPPED,
|
||||
None)
|
||||
except Exception, msg:
|
||||
self.hComPort = None #'cause __del__ is called anyway
|
||||
raise SerialException("could not open port: %s" % msg)
|
||||
# Setup a 4k buffer
|
||||
win32file.SetupComm(self.hComPort, 4096, 4096)
|
||||
|
||||
#Save original timeout values:
|
||||
self._orgTimeouts = win32file.GetCommTimeouts(self.hComPort)
|
||||
|
||||
self._reconfigurePort()
|
||||
|
||||
# Clear buffers:
|
||||
# Remove anything that was there
|
||||
win32file.PurgeComm(self.hComPort,
|
||||
win32file.PURGE_TXCLEAR | win32file.PURGE_TXABORT |
|
||||
win32file.PURGE_RXCLEAR | win32file.PURGE_RXABORT)
|
||||
|
||||
self._overlappedRead = win32file.OVERLAPPED()
|
||||
self._overlappedRead.hEvent = win32event.CreateEvent(None, 1, 0, None)
|
||||
self._overlappedWrite = win32file.OVERLAPPED()
|
||||
#~ self._overlappedWrite.hEvent = win32event.CreateEvent(None, 1, 0, None)
|
||||
self._overlappedWrite.hEvent = win32event.CreateEvent(None, 0, 0, None)
|
||||
self._isOpen = True
|
||||
|
||||
def _reconfigurePort(self):
|
||||
"""Set commuication parameters on opened port."""
|
||||
if not self.hComPort:
|
||||
raise SerialException("Can only operate on a valid port handle")
|
||||
|
||||
#Set Windows timeout values
|
||||
#timeouts is a tuple with the following items:
|
||||
#(ReadIntervalTimeout,ReadTotalTimeoutMultiplier,
|
||||
# ReadTotalTimeoutConstant,WriteTotalTimeoutMultiplier,
|
||||
# WriteTotalTimeoutConstant)
|
||||
if self._timeout is None:
|
||||
timeouts = (0, 0, 0, 0, 0)
|
||||
elif self._timeout == 0:
|
||||
timeouts = (win32con.MAXDWORD, 0, 0, 0, 0)
|
||||
else:
|
||||
timeouts = (0, 0, int(self._timeout*1000), 0, 0)
|
||||
if self._writeTimeout is None:
|
||||
pass
|
||||
elif self._writeTimeout == 0:
|
||||
timeouts = timeouts[:-2] + (0, win32con.MAXDWORD)
|
||||
else:
|
||||
timeouts = timeouts[:-2] + (0, int(self._writeTimeout*1000))
|
||||
win32file.SetCommTimeouts(self.hComPort, timeouts)
|
||||
|
||||
win32file.SetCommMask(self.hComPort, win32file.EV_ERR)
|
||||
|
||||
# Setup the connection info.
|
||||
# Get state and modify it:
|
||||
comDCB = win32file.GetCommState(self.hComPort)
|
||||
comDCB.BaudRate = self._baudrate
|
||||
|
||||
if self._bytesize == FIVEBITS:
|
||||
comDCB.ByteSize = 5
|
||||
elif self._bytesize == SIXBITS:
|
||||
comDCB.ByteSize = 6
|
||||
elif self._bytesize == SEVENBITS:
|
||||
comDCB.ByteSize = 7
|
||||
elif self._bytesize == EIGHTBITS:
|
||||
comDCB.ByteSize = 8
|
||||
else:
|
||||
raise ValueError("Unsupported number of data bits: %r" % self._bytesize)
|
||||
|
||||
if self._parity == PARITY_NONE:
|
||||
comDCB.Parity = win32file.NOPARITY
|
||||
comDCB.fParity = 0 # Dis/Enable Parity Check
|
||||
elif self._parity == PARITY_EVEN:
|
||||
comDCB.Parity = win32file.EVENPARITY
|
||||
comDCB.fParity = 1 # Dis/Enable Parity Check
|
||||
elif self._parity == PARITY_ODD:
|
||||
comDCB.Parity = win32file.ODDPARITY
|
||||
comDCB.fParity = 1 # Dis/Enable Parity Check
|
||||
else:
|
||||
raise ValueError("Unsupported parity mode: %r" % self._parity)
|
||||
|
||||
if self._stopbits == STOPBITS_ONE:
|
||||
comDCB.StopBits = win32file.ONESTOPBIT
|
||||
elif self._stopbits == STOPBITS_TWO:
|
||||
comDCB.StopBits = win32file.TWOSTOPBITS
|
||||
else:
|
||||
raise ValueError("Unsupported number of stop bits: %r" % self._stopbits)
|
||||
|
||||
comDCB.fBinary = 1 # Enable Binary Transmission
|
||||
# Char. w/ Parity-Err are replaced with 0xff (if fErrorChar is set to TRUE)
|
||||
if self._rtscts:
|
||||
comDCB.fRtsControl = win32file.RTS_CONTROL_HANDSHAKE
|
||||
comDCB.fDtrControl = win32file.DTR_CONTROL_HANDSHAKE
|
||||
else:
|
||||
comDCB.fRtsControl = win32file.RTS_CONTROL_ENABLE
|
||||
comDCB.fDtrControl = win32file.DTR_CONTROL_ENABLE
|
||||
comDCB.fOutxCtsFlow = self._rtscts
|
||||
comDCB.fOutxDsrFlow = self._rtscts
|
||||
comDCB.fOutX = self._xonxoff
|
||||
comDCB.fInX = self._xonxoff
|
||||
comDCB.fNull = 0
|
||||
comDCB.fErrorChar = 0
|
||||
comDCB.fAbortOnError = 0
|
||||
comDCB.XonChar = XON
|
||||
comDCB.XoffChar = XOFF
|
||||
|
||||
try:
|
||||
win32file.SetCommState(self.hComPort, comDCB)
|
||||
except win32file.error, e:
|
||||
raise ValueError("Cannot configure port, some setting was wrong. Original message: %s" % e)
|
||||
|
||||
#~ def __del__(self):
|
||||
#~ self.close()
|
||||
|
||||
def close(self):
|
||||
"""Close port"""
|
||||
if self._isOpen:
|
||||
if self.hComPort:
|
||||
#Restore original timeout values:
|
||||
win32file.SetCommTimeouts(self.hComPort, self._orgTimeouts)
|
||||
#Close COM-Port:
|
||||
win32file.CloseHandle(self.hComPort)
|
||||
self.hComPort = None
|
||||
self._isOpen = False
|
||||
|
||||
def makeDeviceName(self, port):
|
||||
return device(port)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - - - - - - - -
|
||||
|
||||
def inWaiting(self):
|
||||
"""Return the number of characters currently in the input buffer."""
|
||||
flags, comstat = win32file.ClearCommError(self.hComPort)
|
||||
return comstat.cbInQue
|
||||
|
||||
def read(self, size=1):
|
||||
"""Read size bytes from the serial port. If a timeout is set it may
|
||||
return less characters as requested. With no timeout it will block
|
||||
until the requested number of bytes is read."""
|
||||
if not self.hComPort: raise portNotOpenError
|
||||
if size > 0:
|
||||
win32event.ResetEvent(self._overlappedRead.hEvent)
|
||||
flags, comstat = win32file.ClearCommError(self.hComPort)
|
||||
if self.timeout == 0:
|
||||
n = min(comstat.cbInQue, size)
|
||||
if n > 0:
|
||||
rc, buf = win32file.ReadFile(self.hComPort, win32file.AllocateReadBuffer(n), self._overlappedRead)
|
||||
win32event.WaitForSingleObject(self._overlappedRead.hEvent, win32event.INFINITE)
|
||||
read = str(buf)
|
||||
else:
|
||||
read = ''
|
||||
else:
|
||||
rc, buf = win32file.ReadFile(self.hComPort, win32file.AllocateReadBuffer(size), self._overlappedRead)
|
||||
n = win32file.GetOverlappedResult(self.hComPort, self._overlappedRead, 1)
|
||||
read = str(buf[:n])
|
||||
else:
|
||||
read = ''
|
||||
return read
|
||||
|
||||
def write(self, s):
|
||||
"""Output the given string over the serial port."""
|
||||
if not self.hComPort: raise portNotOpenError
|
||||
#print repr(s),
|
||||
if s:
|
||||
#~ win32event.ResetEvent(self._overlappedWrite.hEvent)
|
||||
err, n = win32file.WriteFile(self.hComPort, s, self._overlappedWrite)
|
||||
if err: #will be ERROR_IO_PENDING:
|
||||
# Wait for the write to complete.
|
||||
#~ win32event.WaitForSingleObject(self._overlappedWrite.hEvent, win32event.INFINITE)
|
||||
n = win32file.GetOverlappedResult(self.hComPort, self._overlappedWrite, 1)
|
||||
if n != len(s):
|
||||
raise writeTimeoutError
|
||||
|
||||
|
||||
def flushInput(self):
|
||||
"""Clear input buffer, discarding all that is in the buffer."""
|
||||
if not self.hComPort: raise portNotOpenError
|
||||
win32file.PurgeComm(self.hComPort, win32file.PURGE_RXCLEAR | win32file.PURGE_RXABORT)
|
||||
|
||||
def flushOutput(self):
|
||||
"""Clear output buffer, aborting the current output and
|
||||
discarding all that is in the buffer."""
|
||||
if not self.hComPort: raise portNotOpenError
|
||||
win32file.PurgeComm(self.hComPort, win32file.PURGE_TXCLEAR | win32file.PURGE_TXABORT)
|
||||
|
||||
def sendBreak(self):
|
||||
"""Send break condition."""
|
||||
if not self.hComPort: raise portNotOpenError
|
||||
import time
|
||||
win32file.SetCommBreak(self.hComPort)
|
||||
#TODO: how to set the correct duration??
|
||||
time.sleep(0.020)
|
||||
win32file.ClearCommBreak(self.hComPort)
|
||||
|
||||
def setRTS(self,level=1):
|
||||
"""Set terminal status line: Request To Send"""
|
||||
if not self.hComPort: raise portNotOpenError
|
||||
if level:
|
||||
win32file.EscapeCommFunction(self.hComPort, win32file.SETRTS)
|
||||
else:
|
||||
win32file.EscapeCommFunction(self.hComPort, win32file.CLRRTS)
|
||||
|
||||
def setDTR(self,level=1):
|
||||
"""Set terminal status line: Data Terminal Ready"""
|
||||
if not self.hComPort: raise portNotOpenError
|
||||
if level:
|
||||
win32file.EscapeCommFunction(self.hComPort, win32file.SETDTR)
|
||||
else:
|
||||
win32file.EscapeCommFunction(self.hComPort, win32file.CLRDTR)
|
||||
|
||||
def getCTS(self):
|
||||
"""Read terminal status line: Clear To Send"""
|
||||
if not self.hComPort: raise portNotOpenError
|
||||
return MS_CTS_ON & win32file.GetCommModemStatus(self.hComPort) != 0
|
||||
|
||||
def getDSR(self):
|
||||
"""Read terminal status line: Data Set Ready"""
|
||||
if not self.hComPort: raise portNotOpenError
|
||||
return MS_DSR_ON & win32file.GetCommModemStatus(self.hComPort) != 0
|
||||
|
||||
def getRI(self):
|
||||
"""Read terminal status line: Ring Indicator"""
|
||||
if not self.hComPort: raise portNotOpenError
|
||||
return MS_RING_ON & win32file.GetCommModemStatus(self.hComPort) != 0
|
||||
|
||||
def getCD(self):
|
||||
"""Read terminal status line: Carrier Detect"""
|
||||
if not self.hComPort: raise portNotOpenError
|
||||
return MS_RLSD_ON & win32file.GetCommModemStatus(self.hComPort) != 0
|
||||
|
||||
# - - platform specific - - - -
|
||||
|
||||
def setXON(self, level=True):
|
||||
"""Platform specific - set flow state."""
|
||||
if not self.hComPort: raise portNotOpenError
|
||||
if level:
|
||||
win32file.EscapeCommFunction(self.hComPort, win32file.SETXON)
|
||||
else:
|
||||
win32file.EscapeCommFunction(self.hComPort, win32file.SETXOFF)
|
||||
|
||||
#Nur Testfunktion!!
|
||||
if __name__ == '__main__':
|
||||
print __name__
|
||||
s = Serial()
|
||||
print s
|
||||
|
||||
s = Serial(0)
|
||||
print s
|
||||
|
||||
s.baudrate = 19200
|
||||
s.databits = 7
|
||||
s.close()
|
||||
s.port = 3
|
||||
s.open()
|
||||
print s
|
||||
|
||||
Reference in New Issue
Block a user