1
# read and write cache, search mixin, read/write interface with offset
3
#(C) 2004 Chris Liechti <cliechti@gmx.net>
4
# this is distributed under a free software license, see license.txt
6
from framework import util
7
import string #printable characters
9
def blockSplitter(data, blocksize):
10
"""split up a an input sequence to small block of the given length"""
19
class BufferedFile(object):
20
"""access an undelying file, using a read cache"""
21
def __init__(self, fileobj, blocksize=1024*64, blocks=32):
22
self.fileobj = fileobj
24
if fileobj is not None:
27
self.blocksize = blocksize
29
self.invalidate() #init cache
31
def seek(self, position, origin=0):
33
self.offset = position
35
self.offset += position
37
self.offset = self.size() - position
39
if self.offset >= self.size():
40
self.offset = self.size()
41
#~ return self.fileobj.seek(position, origin)
45
#~ return self.fileobj.tell()
48
#clip at the end of the file
49
if self.offset + size > self._size:
50
size = max(0, self._size - self.offset)
52
start_block = self.offset / self.blocksize #starting sector
53
start_offset = self.offset % self.blocksize #offset of desired data in sector
54
num_blocks = (start_offset + size) / self.blocksize #rounded down number of required sectors
55
if (start_offset + size) % self.blocksize: #check if req data is in the next sector too
57
#seek to the start cluster and read
59
for i in xrange(num_blocks):
60
datalist.append(self._cacheRead(start_block+i))
61
data = ''.join(datalist) #TODO optimize cutting
64
#return only requested data
65
return data[start_offset:start_offset+size]
67
def write(self, data):
68
"""replace the bytes at offset with the new data"""
69
self.fileobj.seek(self.offset)
70
self.fileobj.write(data)
71
self.invalidate() #make sure that the data is consistent
82
def _cacheRead(self, blocknum):
83
#~ print "_cacheRead(%r)" % blocknum
84
if blocknum not in self.cacheorder:
85
#if chache would grow larger than theallowd size, throw away the oldest entry
86
if len(self.cacheorder) > self.blocks:
87
entry = self.cacheorder[0]
88
del self.datacache[entry]
89
del self.cacheorder[0]
91
self.fileobj.seek(self.blocksize*blocknum)
92
self.datacache[blocknum] = self.fileobj.read(self.blocksize)
93
#~ self.cacheorder.append[blocknum]
94
#update order of entries to get a LRU cache
96
self.cacheorder.remove(blocknum)
99
self.cacheorder.append(blocknum)
100
#return data from cache
101
return self.datacache[blocknum]
103
def invalidate(self):
111
"""find out file size. seek and tell are used so that it work
112
with any seekable file"""
113
if self._size is None:
114
self.fileobj.seek(0, 2)
115
self._size = self.fileobj.tell()
119
class SeekableBinFile(object, util.Subject):
120
"""access an undelying file, read and write methods work with offset"""
121
def __init__(self, fileobj):
122
util.Subject.__init__(self)
123
self.fileobj = fileobj
125
def read(self, offset, size):
126
"""read size bytes from the specified offset"""
127
self.fileobj.seek(offset)
128
return self.fileobj.read(size)
130
def write(self, offset, data):
131
"""replace the bytes at offset with the new data"""
133
self.fileobj.seek(offset)
134
self.fileobj.write(data)
135
self.notify(offset, len(data))
138
"""find out file size. seek and tell are used so that it work
139
with any seekable file"""
140
self.fileobj.seek(0, 2)
141
return self.fileobj.tell()
144
class HistoryBinFile(SeekableBinFile):
145
"""don't write directly to the undelying file. write history of chages on commit()"""
146
def __init__(self, *args, **kwargs):
147
SeekableBinFile.__init__(self, *args, **kwargs)
150
def read(self, offset, size):
151
"""read from the file, applying the history"""
152
#~ print "read(%r, %r)" % (offset, size)
154
data = SeekableBinFile.read(self, offset, size)
155
#overlay with history
157
for h_offset, h_end, h_data in self.history:
158
if offset >= h_offset and end <= h_end: #data fully contained in history block
159
data = h_data[offset-h_offset:end-h_offset]
160
elif offset <= h_offset and end >= h_end: #history block fully contained in data
161
data = ''.join([data[:h_offset-offset], h_data, data[h_end-offset:]])
162
elif offset <= h_offset <= end: #start of history block overlaping data
163
data = ''.join([data[:h_offset-offset], h_data[:end-h_offset]])
164
elif offset <= h_end <= end: #end of history block overlaping data
165
data = ''.join([h_data[offset-h_offset:], data[h_end-offset:]])
168
def write(self, offset, data):
169
"""write data to history. its not written to the file, use commit() to
170
write tothe real file
174
self.history.append( (offset, offset+length, data) )
175
self.notify(offset, length)
176
#append to last entry if consecutive write
177
#~ if self.history and self.history[-1][1] == offset:
178
#~ self.history[-1][1] = offset+len(data)
179
#~ self.history[-1][2] = self.history[-1][2] + data
181
#~ self.history.append( [offset, offset+len(data), data] )
182
#~ print "H:\n%s" % '\n'.join([str(entry) for entry in self.history])
184
#~ def comressHistory(self):
185
#~ """collect sucessive history entries"""
187
#~ for entry in self.history:
188
#~ h_offset, h_end, h_data = entry
189
#~ l_offset, l_end, l_data = last_entry
190
#~ if h_offset == l_end:
191
#~ entry = [l_offset, h_end, l_data+h_data]
193
#~ new_history.append(entry)
196
def isModified(self):
197
"""returns True if the history buffer is not empty"""
198
return bool(self.history)
201
"""apply history and save the changes to the file"""
202
for h_offset, h_end, h_data in self.history:
203
#~ SeekableBinFile.write(self, h_offset, h_data)
204
self.fileobj.seek(h_offset)
205
self.fileobj.write(h_data)
210
def looseChanges(self):
211
"""trow away history, loosing the changes"""
216
class SearchMixIn(object):
217
"""implement a simple text search, iterating over blocks so that the
218
entire file does not have to be loaded into RAM"""
219
def abortSearch(self):
220
self.abortSearchFlag = True
222
def _abortcheckfunction(self):
223
return self.abortSearchFlag
225
def blockReader(self, offset, blocksize, endoffset=None):
226
def blockReaderGenratror(offset, blocksize):
229
if endoffset is not None:
230
if offset + blocksize > endoffset:
231
blocksize = endoffset - offset
233
data = self.read(offset, blocksize)
234
if not data: raise StopIteration
236
if last_read: raise StopIteration
238
return blockReaderGenratror(offset, blocksize)
240
def find(self, startoffset, string, casesensitive=True, wrap=False, abortcheck=None):
241
if abortcheck is None:
242
abortcheck = self._abortcheckfunction
243
self.abortSearchFlag = False
246
lastoffset = startoffset
248
if not casesensitive:
249
string = string.lower()
251
for offset, block in self.blockReader(startoffset, blocksize):
253
raise StopIteration("Search Aborted")
254
if not casesensitive:
255
block = block.lower()
256
pos = (lastblock + block).find(string)
258
return lastoffset + pos
264
for offset, block in self.blockReader(0, blocksize, startoffset):
266
raise StopIteration("Search Aborted")
267
if not casesensitive:
268
block = block.lower()
269
pos = (lastblock + block).find(string)
271
return lastoffset + pos
275
def findAll(self, string, casesensitive=True):
279
offset = self.find(offset+1, string, casesensitive=casesensitive, wrap=False)
280
if offset is not None:
281
offsets.append(offset)
287
class VirtualFile(object):
288
"""pseudo file for tests, it may be of any size as it uses
289
arbitrary precision numbers for the internals and it generates
290
the read data on demand.
291
write is not supported.
294
def __init__(self, size):
298
def read(self, size):
300
#~ print self.offset, size
301
if self.offset + size >= self.size:
302
size = self.size - self.offset
303
for n in xrange(size):
304
offset = (self.offset + n)
306
# 0..255 are characters form 0.. 255
307
result.append(chr(offset))
309
# the rest is the offet itself as hexadecimal number
312
result.append(("%016x" % ((self.offset + n)/16))[x])
314
return ''.join(result)
316
def write(self, data):
317
raise IOError("it's read only")
319
def seek(self, position, origin=0):
321
self.offset = position
323
self.offset += position
325
self.offset = self.size - position
327
if self.offset >= self.size:
328
self.offset = self.size
337
class Formater(object):
338
"""hex dump formater"""
339
def __init__(self, width=16, address_width=8):
340
self.setWidths(width, address_width)
342
def setWidths(self, width, address_width):
344
self.address_width = address_width
345
self.template = '%%0%sx %%-%ss %%s' % (address_width, 3*width)
347
def bestFit(self, characterwidth):
348
a, b = divmod((characterwidth - self.address_width - 5) / 4, 4)
351
def getPositions(self):
352
address_end = self.address_width
353
hex_start = address_end + 2
354
hex_end = hex_start + 3*self.width - 1
355
ascii_start = hex_end + 2
356
ascii_end = ascii_start + self.width
357
return (address_end, hex_start, hex_end, ascii_start, ascii_end)
359
def formatDump(self, data):
360
return ' '.join(['%02x' % ord(c) for c in data])
362
def formatASCII(self, data):
363
return ''.join([((c >= ' ' and c in string.printable) and c or '.' ) for c in data])
365
def format(self, offset, data):
366
if len(data) > self.width:
367
raise ValueError("too much data for the width")
368
return self.template % (offset, self.formatDump(data), self.formatASCII(data))
370
def formatBlock(self, offset, data):
372
for block in blockSplitter(data, self.width):
373
lines.append('%s\n' % self.format(offset, block))
378
class BinFile(HistoryBinFile, SearchMixIn):
381
class HexEdit(object, util.SimpleEventSource, util.Observer):
382
"""hex editor, managing the binary file"""
384
util.SimpleEventSource.__init__(self)
389
self._modified = False
391
def isReadonly(self):
394
def isModified(self):
395
if self.bin is not None:
396
return self.bin.isModified()
400
def new(self, filename):
401
"""create a new, empty file in binary mode for read and write"""
403
self.readonly = False
404
self.fileobj = file(filename, 'w+b')
405
self.bin = BinFile(self.fileobj)
406
self.bin.attach(self)
407
self.send_event('OPEN', filename)
410
"""reload current file, loosing all changes"""
411
# for buffered files, forget read cache
413
self.bin.fileobj.invalidate()
414
except AttributeError, e:
417
# for HistoryBinFile, forget about changes
418
self.bin.looseChanges()
419
# send notification event
420
self._modified = False
421
self.send_event('MODIFY', self._modified)
423
def load(self, filename, readonly=False):
425
self.filename = filename
427
self.fileobj = file(filename, 'rb')
430
self.fileobj = file(filename, 'r+b')
431
self.readonly = False
432
self.bin = BinFile(self.fileobj)
433
self.bin.readonly = readonly
434
self.bin.attach(self)
435
self.send_event('OPEN', filename)
437
def useFile(self, fileobj, readonly=True, filename=None):
439
self.filename = filename
440
self.readonly = readonly
441
self.fileobj = fileobj
442
self.bin = BinFile(self.fileobj)
443
self.bin.readonly = readonly
444
self.bin.attach(self)
445
self.send_event('OPEN', filename)
448
if self.bin is not None:
449
self.send_event('CLOSING')
450
self.bin.detach(self)
452
if self.fileobj is not None:
453
#~ self.fileobj.close()
457
self.send_event('CLOSED')
460
if self.bin is not None:
461
return self.bin.size()
465
def update(self, model, offset=None, length=None):
466
if offset is not None and not self._modified:
467
self._modified = True
468
self.send_event('MODIFY', self._modified)
470
self._modified = self.isModified()
471
self.send_event('MODIFY', self._modified)
472
self.send_event('CHANGE', offset, length)
473
#~ self.notify(offset, length)
476
return "%s(fileobj=%r, bin=%r, filename=%r, readonly=%r, _modified=%r)" % (
477
self.__class__.__name__,
485
if __name__ == '__main__':
486
import sys, unittest, random
487
from cStringIO import StringIO
489
class TestSplitter(unittest.TestCase):
490
def test_empty(self):
491
blocks = [x for x in blockSplitter("", 2)]
492
self.failUnlessEqual(blocks, [])
493
def test_oddEnd(self):
494
blocks = [x for x in blockSplitter("aabbccdde", 2)]
495
self.failUnlessEqual(blocks, ['aa', 'bb', 'cc', 'dd', 'e'])
496
def test_evenEnd(self):
497
blocks = [x for x in blockSplitter("aabbccddee", 2)]
498
self.failUnlessEqual(blocks, ['aa', 'bb', 'cc', 'dd', 'ee'])
500
class TestHistoryBinFile(unittest.TestCase):
501
TESTDATA = "test data in a file"
503
self.fileobj = StringIO() #w/o init as we need a IO object and not just an I
504
self.fileobj.write(self.TESTDATA) #init with test data
505
self.bin = HistoryBinFile(self.fileobj)
508
self.failUnlessEqual(self.bin.read(0, 7), self.TESTDATA[0:7])
509
self.failUnlessEqual(self.bin.read(7, 6), self.TESTDATA[7:7+6])
510
self.failUnlessEqual(self.bin.read(3, 5), self.TESTDATA[3:3+5])
511
self.failUnlessEqual(self.bin.read(0, 999), self.TESTDATA) #end is clipped
512
self.failUnlessEqual(self.bin.read(999, 99), '') #read after EOF -> empty str
515
self.failUnlessEqual(len(self.bin.history), 0)
516
self.bin.write(3, 'hello world!')
517
self.failUnlessEqual(self.bin.read(0,999), 'teshello world!file') #history blk smaller than read
518
self.failUnlessEqual(self.bin.read(0,4), 'tesh') #history blk start overlapping
519
self.failUnlessEqual(self.bin.read(4,4), 'ello') #history blk end overlapping
520
self.failUnlessEqual(self.bin.read(13,4), 'd!fi') #full read within history blk
521
#writes are kept in the history until a commit follows
522
self.failUnlessEqual(self.fileobj.getvalue(), self.TESTDATA)
523
self.failUnlessEqual(len(self.bin.history), 1)
525
self.failUnlessEqual(len(self.bin.history), 0)
526
self.failUnlessEqual(self.fileobj.getvalue(), 'teshello world!file')
528
class TestBinFile(unittest.TestCase):
529
TESTDATA = "test data in a file"
531
self.fileobj = StringIO() #w/o init as we need a IO object and not just an I
532
self.fileobj.write(self.TESTDATA) #init with test data
533
self.bin = BinFile(self.fileobj)
536
self.failUnlessEqual(self.bin.find(0, "data"), self.TESTDATA.find("data"))
537
self.failUnlessEqual(self.bin.find(1, "data"), self.TESTDATA.find("data"))
538
self.failUnlessEqual(self.bin.find(3, "data"), self.TESTDATA.find("data"))
539
self.failUnlessEqual(self.bin.find(10, "data", wrap=True), self.TESTDATA.find("data"))
542
class TestFormater(unittest.TestCase):
544
self.formater = Formater()
546
def testBestFit(self):
547
#~ for i in range(0, 200, 10):
548
#~ print i, self.formater.bestFit(i)
549
self.failUnlessEqual(self.formater.bestFit(80), 16)
550
self.failUnlessEqual(self.formater.bestFit(160), 36)
552
def testPositions(self):
553
self.failUnlessEqual(self.formater.getPositions(), (8, 10, 57, 59, 75))
555
class TestBufferedFile(unittest.TestCase):
557
self.unbuffered = StringIO()
558
self.unbuffered.write(''.join([chr(random.randrange(0,255)) for x in range(2000)]))
559
self.buffered = BufferedFile(self.unbuffered, blocksize=7, blocks=13)
561
def testSimpleRead(self):
562
for pos in range(2000):
563
self.unbuffered.seek(pos)
564
ub = self.unbuffered.read(20)
565
self.buffered.seek(pos)
566
bu = self.buffered.read(20)
567
self.failUnlessEqual(bu, ub, 'failed at pos %d' % pos)
569
def testRandomRead(self):
570
for i in range(2000):
571
pos = random.randrange(0,2000)
572
size = random.randrange(1, 12)
573
self.unbuffered.seek(pos)
574
ub = self.unbuffered.read(size)
575
self.buffered.seek(pos)
576
bu = self.buffered.read(size)
577
self.failUnlessEqual(bu, ub, 'failed at pos %d' % pos)
578
#~ print self.buffered.datacache
579
#~ print self.buffered.cacheorder
582
testdata = "yet an other test string"
583
self.buffered.seek(0)
584
self.buffered.write(testdata)
585
self.buffered.seek(0)
586
self.failUnlessEqual(self.buffered.read(len(testdata)), testdata)
587
self.unbuffered.seek(0)
588
self.failUnlessEqual(self.unbuffered.read(len(testdata)), testdata)
590
class TestVirtualFile(unittest.TestCase):
592
self.failUnlessEqual(len(VirtualFile(123).read(1000)), 123)
593
self.failUnlessEqual(len(VirtualFile(120).read(1000)), 120)
595
def testReadEOF(self):
596
f = VirtualFile(2**80L)
598
self.failUnlessEqual(f.read(1000), '')
600
self.failUnlessEqual(f.read(1000), '')
602
def testAlignment(self):
603
f = VirtualFile(2**80L)
604
a = f.read(100)[20:20+16]
607
self.failUnlessEqual(a, b)
610
sys.argv = sys.argv[0:1] + ['-v']
615
formater = Formater()
616
f = BinFile(file('ramp.bin', 'r+b'))
617
#~ print '%r' % f.read(0,256)
618
#~ for offset, block in f.blockReader(0, 16):
619
#~ print formater.format(offset, block)
620
#~ print f.find(0, "abc")
621
#~ print f.findAll("abc")
622
for offset in f.findAll("abc", casesensitive=False):
623
print formater.format(offset, f.read(offset, 16))