258 lines
6.8 KiB
Python
Executable File
258 lines
6.8 KiB
Python
Executable File
#!/usr/bin/python
|
|
|
|
__author__ = "Daniel Egger"
|
|
__date__ = "22 March 2002"
|
|
__email__ = "egger@interearth.com"
|
|
__version__ = "$Revision: 1.1 $"[11:-2]
|
|
|
|
from tpdu import TPDU, PARM_ACKTIME, PARM_THROUGHPUT, PARM_SIZE, PARM_SRCTSAP, PARM_DSTTSAP, PARM_CHECKSUM, PARM_VERSION, PARM_PROTECTION, PARM_OPTIONS, PARM_ALTERNATIVE
|
|
|
|
__all__ = ["CANY_TPDU"]
|
|
|
|
# String representation for any C type TPDU
|
|
CANY_STRING = \
|
|
"""\
|
|
Length: %d
|
|
Destination reference: 0x%04x
|
|
Source reference: 0x%04x
|
|
Class option: 0x%x
|
|
Parameter packets:
|
|
"""
|
|
|
|
# String representation for an unknown parameter
|
|
# in a c-type TPDU. Shouldn't happen anymore since
|
|
# all parameters are at least mentioned now.
|
|
UNKNOWN_PARM_STRING = \
|
|
"""\
|
|
Unknown param:
|
|
Parameter code: 0x%x
|
|
Parameter length: 0x%x
|
|
Parameter argument: %s
|
|
"""
|
|
|
|
# List of all possible parameters in a c-type TPDU
|
|
possible_parameters = [
|
|
PARM_ACKTIME,
|
|
PARM_THROUGHPUT,
|
|
PARM_SIZE,
|
|
PARM_SRCTSAP,
|
|
PARM_DSTTSAP,
|
|
PARM_CHECKSUM,
|
|
PARM_VERSION,
|
|
PARM_PROTECTION,
|
|
PARM_OPTIONS,
|
|
PARM_ALTERNATIVE
|
|
]
|
|
|
|
def internal_formatbytes (list):
|
|
"""
|
|
This is a special hack to output data based on information in
|
|
a list. This is for internal use only!
|
|
"""
|
|
string = ""
|
|
for i in list:
|
|
bytes, value = i
|
|
# Special case: seems to be a 4bit value
|
|
if bytes == 0:
|
|
string += "%c" % (value << 4)
|
|
elif bytes == 1:
|
|
string += "%c" % value
|
|
elif bytes == 2:
|
|
string += "%c%c" % ((value >> 8) & 0xff, value & 0xff)
|
|
else:
|
|
raise Error, "Can only output one or two bytes hexadecimals"
|
|
|
|
return string
|
|
|
|
class CANY_TPDU (TPDU):
|
|
def __init__ (self, type, srctsap, dsttsap, *args, **args2):
|
|
self.parms = {}
|
|
self.code = type
|
|
|
|
self.dstref = args2.get ("dstref", 0x0000)
|
|
self.srcref = args2.get ("srcref", 0x0000)
|
|
self.classoption = args2.get ("classoption", 0x00)
|
|
|
|
|
|
# Auskommentiert, da nur AK TPDUs von CP im Fetch-Modus
|
|
#self.parms[PARM_OPTIONS] = 0x0
|
|
self.parms[PARM_SIZE] = 0x9
|
|
self.parms[PARM_SRCTSAP] = srctsap
|
|
self.parms[PARM_DSTTSAP] = dsttsap
|
|
self.__recalclen ()
|
|
return
|
|
|
|
def srctsap (self):
|
|
return self.parms [PARM_SRCTSAP]
|
|
|
|
def dsttsap (self):
|
|
return self.parms [PARM_DSTTSAP]
|
|
|
|
def identify (self):
|
|
return "C type TPDU\n"
|
|
|
|
def __recalclen (self):
|
|
"""
|
|
Return the length of the TPDU. This function is for
|
|
internal use only!
|
|
"""
|
|
|
|
# 1 for the code, 2 for the destination reference
|
|
# 2 for the source reference and 1 for the class
|
|
# option -> 6
|
|
length = 6
|
|
|
|
for i in self.parms.keys ():
|
|
# The code and the size each are one byte
|
|
length += 2
|
|
if i == PARM_SIZE or i == PARM_OPTIONS:
|
|
length += 1
|
|
else:
|
|
length += len (self.parms[i])
|
|
|
|
self.len = length
|
|
|
|
|
|
def __repr__ (self):
|
|
"""
|
|
Print a representation of the TPDU. Use this method via
|
|
`-pair to transfer it over the wire.
|
|
"""
|
|
|
|
# Fixed header information
|
|
list = [
|
|
(1, self.len),
|
|
(0, self.code),
|
|
(2, self.dstref),
|
|
(2, self.srcref),
|
|
(1, self.classoption)]
|
|
|
|
# Dynamically add all remaining parms
|
|
for i in self.parms.keys ():
|
|
content = self.parms[i]
|
|
list.append ((1, i))
|
|
if i == PARM_SIZE or i == PARM_OPTIONS:
|
|
list.append ((1, 1))
|
|
list.append ((1, chr (content)))
|
|
else:
|
|
list.append ((1, len (content)))
|
|
for j in content:
|
|
list.append ((1, j))
|
|
|
|
return internal_formatbytes (list)
|
|
|
|
|
|
def fromstring (self, data):
|
|
"""
|
|
Decode the binary representation of a TPDU from the given
|
|
parameter and note the data in the instance of the object.
|
|
|
|
Parameters:
|
|
data: The binary form of the TPDU
|
|
"""
|
|
|
|
self.len = ord (data[0])
|
|
self.code = ord (data[1]) >> 4
|
|
self.dstref = (ord (data[2]) << 8) | ord (data[3])
|
|
self.srcref = (ord (data[4]) << 8) | ord (data[5])
|
|
self.classoption = ord (data[6])
|
|
|
|
index = 7
|
|
while index < len (data):
|
|
code = ord (data[index])
|
|
size = ord (data[index+1])
|
|
# The acknowledge time is only used in class 4 which we don't
|
|
# implement. The time is given in milliseconds.
|
|
# The throughput is only set when not in class 0 which we
|
|
# implement here. It denotes the throughput in Octets/s.
|
|
|
|
if code in possible_parameters:
|
|
if code == PARM_SIZE or code == PARM_OPTIONS:
|
|
self.parms[code] = ord (data[index+2])
|
|
else:
|
|
self.parms[code] = data[index+2:index+2+size]
|
|
else:
|
|
raise Warning, "Unknown parameter 0x%02x" % code
|
|
|
|
index += (size + 2)
|
|
|
|
|
|
def __str__ (self):
|
|
"""
|
|
Return a readable and quite verbose overview of the TPDU instance.
|
|
Use this for debugging purposes or if you're just curious.
|
|
"""
|
|
|
|
string = self.identify () + CANY_STRING % (self.len, self.dstref, self.srcref, self.classoption)
|
|
|
|
for i in self.parms.keys ():
|
|
arg = self.parms[i]
|
|
if i == PARM_SIZE:
|
|
string += " TPDU size: %d\n" % 2**arg
|
|
elif i == PARM_SRCTSAP:
|
|
string += " Sender: %s (" % arg
|
|
for i in arg:
|
|
string += "0x%x " % ord (i)
|
|
string = string[:-1] + ")\n"
|
|
elif i == PARM_DSTTSAP:
|
|
string += " Receiver: %s (" % arg
|
|
for i in arg:
|
|
string += "0x%x " % ord (i)
|
|
string = string[:-1] + ")\n"
|
|
elif i == PARM_ACKTIME:
|
|
string += " Acknowledge time: %s (ms) [Not used in Class 0]\n" % arg
|
|
elif i == PARM_THROUGHPUT:
|
|
string += " Throughput: %d (octects/s) [Not used in Class 0]\n" % arg
|
|
elif i == PARM_CHECKSUM:
|
|
string += " Checksum: %s\n" % arg
|
|
elif i == PARM_VERSION:
|
|
string += " Version: %d (default: 1)\n" % arg
|
|
elif i == PARM_PROTECTION:
|
|
string += " Protection: %s\n" % arg
|
|
elif i == PARM_OPTIONS:
|
|
string += " Options:\n"
|
|
if arg & 8 != 0:
|
|
string += " Use network expedited (Class 1)\n"
|
|
else:
|
|
string += " Don't use network expedited (Class 1)\n"
|
|
if arg & 4 != 0:
|
|
string += " Use receipt confirmation (Class 1)\n"
|
|
else:
|
|
string += " Don't use receipt confirmation (Class 1)\n"
|
|
if arg & 2 != 0:
|
|
string += " 16 bit checksum (Class 4)\n"
|
|
else:
|
|
string += " No 16 bit checksum (Class 4)\n"
|
|
if arg & 1 != 0:
|
|
string += " Use transport expedited data transfer service\n"
|
|
else:
|
|
string += " Don't use transport expedited data transfer service\n"
|
|
elif i == PARM_ALTERNATIVE:
|
|
string += " Alternative: %s\n" % arg
|
|
else:
|
|
string += UNKNOWN_PARM_STRING % i
|
|
|
|
return string
|
|
|
|
# Integrated tests start here
|
|
if __name__ == '__main__':
|
|
testnum = 1
|
|
|
|
tpdu = CANY_TPDU (0x1, "SOURCE", "DEST")
|
|
print "Test %0d sucessful" % testnum
|
|
testnum += 1
|
|
|
|
print tpdu
|
|
print "Test %0d sucessful" % testnum
|
|
testnum += 1
|
|
|
|
tpdu2 = CANY_TPDU (0x1, "SOURCE", "DEST")
|
|
tpdu3 = CANY_TPDU (0x1, "", "")
|
|
tpdu3.fromstring (`tpdu2`)
|
|
|
|
if `tpdu` != `tpdu3`:
|
|
raise Warning, "Serialisation/Deserialisation test failed"
|
|
|
|
print "Test %0d sucessful" % testnum
|
|
testnum += 1
|