Files
storage_fresco/cany_tpdu.py
2019-05-23 13:36:01 +00:00

258 lines
6.8 KiB
Python
Executable File

#!/usr/bin/python
__author__ = "Daniel Egger"
__date__ = "22 March 2002"
__email__ = "egger@interearth.com"
__version__ = "$Revision: 1.1 $"[11:-2]
from tpdu import TPDU, PARM_ACKTIME, PARM_THROUGHPUT, PARM_SIZE, PARM_SRCTSAP, PARM_DSTTSAP, PARM_CHECKSUM, PARM_VERSION, PARM_PROTECTION, PARM_OPTIONS, PARM_ALTERNATIVE
__all__ = ["CANY_TPDU"]
# String representation for any C type TPDU
CANY_STRING = \
"""\
Length: %d
Destination reference: 0x%04x
Source reference: 0x%04x
Class option: 0x%x
Parameter packets:
"""
# String representation for an unknown parameter
# in a c-type TPDU. Shouldn't happen anymore since
# all parameters are at least mentioned now.
UNKNOWN_PARM_STRING = \
"""\
Unknown param:
Parameter code: 0x%x
Parameter length: 0x%x
Parameter argument: %s
"""
# List of all possible parameters in a c-type TPDU
possible_parameters = [
PARM_ACKTIME,
PARM_THROUGHPUT,
PARM_SIZE,
PARM_SRCTSAP,
PARM_DSTTSAP,
PARM_CHECKSUM,
PARM_VERSION,
PARM_PROTECTION,
PARM_OPTIONS,
PARM_ALTERNATIVE
]
def internal_formatbytes (list):
"""
This is a special hack to output data based on information in
a list. This is for internal use only!
"""
string = ""
for i in list:
bytes, value = i
# Special case: seems to be a 4bit value
if bytes == 0:
string += "%c" % (value << 4)
elif bytes == 1:
string += "%c" % value
elif bytes == 2:
string += "%c%c" % ((value >> 8) & 0xff, value & 0xff)
else:
raise Error, "Can only output one or two bytes hexadecimals"
return string
class CANY_TPDU (TPDU):
def __init__ (self, type, srctsap, dsttsap, *args, **args2):
self.parms = {}
self.code = type
self.dstref = args2.get ("dstref", 0x0000)
self.srcref = args2.get ("srcref", 0x0000)
self.classoption = args2.get ("classoption", 0x00)
# Auskommentiert, da nur AK TPDUs von CP im Fetch-Modus
#self.parms[PARM_OPTIONS] = 0x0
self.parms[PARM_SIZE] = 0x9
self.parms[PARM_SRCTSAP] = srctsap
self.parms[PARM_DSTTSAP] = dsttsap
self.__recalclen ()
return
def srctsap (self):
return self.parms [PARM_SRCTSAP]
def dsttsap (self):
return self.parms [PARM_DSTTSAP]
def identify (self):
return "C type TPDU\n"
def __recalclen (self):
"""
Return the length of the TPDU. This function is for
internal use only!
"""
# 1 for the code, 2 for the destination reference
# 2 for the source reference and 1 for the class
# option -> 6
length = 6
for i in self.parms.keys ():
# The code and the size each are one byte
length += 2
if i == PARM_SIZE or i == PARM_OPTIONS:
length += 1
else:
length += len (self.parms[i])
self.len = length
def __repr__ (self):
"""
Print a representation of the TPDU. Use this method via
`-pair to transfer it over the wire.
"""
# Fixed header information
list = [
(1, self.len),
(0, self.code),
(2, self.dstref),
(2, self.srcref),
(1, self.classoption)]
# Dynamically add all remaining parms
for i in self.parms.keys ():
content = self.parms[i]
list.append ((1, i))
if i == PARM_SIZE or i == PARM_OPTIONS:
list.append ((1, 1))
list.append ((1, chr (content)))
else:
list.append ((1, len (content)))
for j in content:
list.append ((1, j))
return internal_formatbytes (list)
def fromstring (self, data):
"""
Decode the binary representation of a TPDU from the given
parameter and note the data in the instance of the object.
Parameters:
data: The binary form of the TPDU
"""
self.len = ord (data[0])
self.code = ord (data[1]) >> 4
self.dstref = (ord (data[2]) << 8) | ord (data[3])
self.srcref = (ord (data[4]) << 8) | ord (data[5])
self.classoption = ord (data[6])
index = 7
while index < len (data):
code = ord (data[index])
size = ord (data[index+1])
# The acknowledge time is only used in class 4 which we don't
# implement. The time is given in milliseconds.
# The throughput is only set when not in class 0 which we
# implement here. It denotes the throughput in Octets/s.
if code in possible_parameters:
if code == PARM_SIZE or code == PARM_OPTIONS:
self.parms[code] = ord (data[index+2])
else:
self.parms[code] = data[index+2:index+2+size]
else:
raise Warning, "Unknown parameter 0x%02x" % code
index += (size + 2)
def __str__ (self):
"""
Return a readable and quite verbose overview of the TPDU instance.
Use this for debugging purposes or if you're just curious.
"""
string = self.identify () + CANY_STRING % (self.len, self.dstref, self.srcref, self.classoption)
for i in self.parms.keys ():
arg = self.parms[i]
if i == PARM_SIZE:
string += " TPDU size: %d\n" % 2**arg
elif i == PARM_SRCTSAP:
string += " Sender: %s (" % arg
for i in arg:
string += "0x%x " % ord (i)
string = string[:-1] + ")\n"
elif i == PARM_DSTTSAP:
string += " Receiver: %s (" % arg
for i in arg:
string += "0x%x " % ord (i)
string = string[:-1] + ")\n"
elif i == PARM_ACKTIME:
string += " Acknowledge time: %s (ms) [Not used in Class 0]\n" % arg
elif i == PARM_THROUGHPUT:
string += " Throughput: %d (octects/s) [Not used in Class 0]\n" % arg
elif i == PARM_CHECKSUM:
string += " Checksum: %s\n" % arg
elif i == PARM_VERSION:
string += " Version: %d (default: 1)\n" % arg
elif i == PARM_PROTECTION:
string += " Protection: %s\n" % arg
elif i == PARM_OPTIONS:
string += " Options:\n"
if arg & 8 != 0:
string += " Use network expedited (Class 1)\n"
else:
string += " Don't use network expedited (Class 1)\n"
if arg & 4 != 0:
string += " Use receipt confirmation (Class 1)\n"
else:
string += " Don't use receipt confirmation (Class 1)\n"
if arg & 2 != 0:
string += " 16 bit checksum (Class 4)\n"
else:
string += " No 16 bit checksum (Class 4)\n"
if arg & 1 != 0:
string += " Use transport expedited data transfer service\n"
else:
string += " Don't use transport expedited data transfer service\n"
elif i == PARM_ALTERNATIVE:
string += " Alternative: %s\n" % arg
else:
string += UNKNOWN_PARM_STRING % i
return string
# Integrated tests start here
if __name__ == '__main__':
testnum = 1
tpdu = CANY_TPDU (0x1, "SOURCE", "DEST")
print "Test %0d sucessful" % testnum
testnum += 1
print tpdu
print "Test %0d sucessful" % testnum
testnum += 1
tpdu2 = CANY_TPDU (0x1, "SOURCE", "DEST")
tpdu3 = CANY_TPDU (0x1, "", "")
tpdu3.fromstring (`tpdu2`)
if `tpdu` != `tpdu3`:
raise Warning, "Serialisation/Deserialisation test failed"
print "Test %0d sucessful" % testnum
testnum += 1