3
# Copyright 2008 Dan Smith <dsmith@danplanet.com>
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU General Public License as published by
7
# the Free Software Foundation, either version 3 of the License, or
8
# (at your option) any later version.
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
# GNU General Public License for more details.
15
# You should have received a copy of the GNU General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
27
ENCODED_HEADER = "[SOB]"
28
ENCODED_TRAILER = "[EOB]"
30
def update_crc(c, crc):
49
def calc_checksum(data):
52
checksum = update_crc(ord(i), checksum)
54
checksum = update_crc(0, checksum)
55
checksum = update_crc(0, checksum)
60
return yencode.yencode_buffer(data)
63
return yencode.ydecode_buffer(data)
65
class DDT2Frame(object):
66
format = "!BHBBHH8s8s"
79
self.sent_event = threading.Event()
80
self.ackd_event = threading.Event()
88
def get_xmit_bps(self):
90
print "Block not sent, can't determine BPS!"
93
if self._xmit_s == self._xmit_e:
94
return self._xmit_z * 100 # Fudge for sockets
96
return self._xmit_z / (self._xmit_e - self._xmit_s)
98
def set_compress(self, compress=True):
99
self.compress = compress
101
def get_packed(self):
103
data = zlib.compress(self.data, 9)
106
self.magic = (~self.magic) & 0xFF
110
s_station = self.s_station.ljust(8, "~")
111
d_station = self.d_station.ljust(8, "~")
113
val = struct.pack(self.format,
123
checksum = calc_checksum(val + data)
125
val = struct.pack(self.format,
135
self._xmit_z = len(val) + len(data)
139
def unpack(self, val):
144
self.compress = False
146
print "Magic 0x%X not recognized" % magic
152
(magic, self.seq, self.session, self.type,
154
self.s_station, self.d_station) = struct.unpack(self.format, header)
156
_header = struct.pack(self.format,
166
_checksum = calc_checksum(_header + data)
168
self.s_station = self.s_station.replace("~", "")
169
self.d_station = self.d_station.replace("~", "")
171
if _checksum != checksum:
172
print "Checksum failed: %s != %s" % (checksum, _checksum)
176
self.data = zlib.decompress(data)
188
data = utils.filter_to_ascii(self.data[:20])
190
return "DDT2%s: %i:%i:%i %s->%s (%s...[%i])" % (c,
202
f.session = self.session
204
f.s_station = self.s_station
205
f.d_station = self.d_station
207
f.set_compress(self.compress)
210
class DDT2EncodedFrame(DDT2Frame):
211
def get_packed(self):
212
raw = DDT2Frame.get_packed(self)
214
encoded = encode(raw)
216
return ENCODED_HEADER + encoded + ENCODED_TRAILER
218
def unpack(self, val):
220
h = val.index(ENCODED_HEADER) + len(ENCODED_TRAILER)
221
t = val.rindex(ENCODED_TRAILER)
224
print "Block has no header/trailer: %s" % e
228
decoded = decode(payload)
230
print "Unable to decode frame: %s" % e
233
return DDT2Frame.unpack(self, decoded)
235
class DDT2RawData(DDT2Frame):
236
def get_packed(self):
239
def unpack(self, string):
242
def test_symmetric(compress=True):
243
fin = DDT2EncodedFrame()
247
fin.s_station = "FOO"
248
fin.d_station = "BAR"
249
fin.data = "This is a test"
250
fin.set_compress(compress)
255
fout = DDT2EncodedFrame()
262
f = DDT2EncodedFrame()
264
if f.unpack("[SOB]foobar[EOB]"):
271
if __name__ == "__main__":
273
test_symmetric(False)