2
TestCases for exercising a Queue DB.
6
from pprint import pprint
9
from test_all import db, verbose, get_new_database_path
11
#----------------------------------------------------------------------
13
class SimpleQueueTestCase(unittest.TestCase):
15
self.filename = get_new_database_path()
19
os.remove(self.filename)
24
def test01_basic(self):
25
# Basic Queue tests using the deprecated DBCursor.consume method.
29
print "Running %s.test01_basic..." % self.__class__.__name__
32
d.set_re_len(40) # Queues must be fixed length
33
d.open(self.filename, db.DB_QUEUE, db.DB_CREATE)
36
print "before appends" + '-' * 30
39
for x in string.letters:
42
self.assertEqual(len(d), len(string.letters))
44
d.put(100, "some more data")
45
d.put(101, "and some more ")
46
d.put(75, "out of order")
47
d.put(1, "replacement data")
49
self.assertEqual(len(d), len(string.letters)+3)
52
print "before close" + '-' * 30
61
print "after open" + '-' * 30
64
# Test "txn" as a positional parameter
65
d.append("one more", None)
66
# Test "txn" as a keyword parameter
67
d.append("another one", txn=None)
72
print "after append" + '-' * 30
83
print "after consume loop" + '-' * 30
86
self.assertEqual(len(d), 0, \
87
"if you see this message then you need to rebuild " \
88
"Berkeley DB 3.1.17 with the patch in patches/qam_stat.diff")
94
def test02_basicPost32(self):
95
# Basic Queue tests using the new DB.consume method in DB 3.2+
100
print "Running %s.test02_basicPost32..." % self.__class__.__name__
102
if db.version() < (3, 2, 0):
104
print "Test not run, DB not new enough..."
108
d.set_re_len(40) # Queues must be fixed length
109
d.open(self.filename, db.DB_QUEUE, db.DB_CREATE)
112
print "before appends" + '-' * 30
115
for x in string.letters:
118
self.assertEqual(len(d), len(string.letters))
120
d.put(100, "some more data")
121
d.put(101, "and some more ")
122
d.put(75, "out of order")
123
d.put(1, "replacement data")
125
self.assertEqual(len(d), len(string.letters)+3)
128
print "before close" + '-' * 30
134
d.open(self.filename)
135
#d.set_get_returns_none(true)
138
print "after open" + '-' * 30
144
print "after append" + '-' * 30
154
print "after consume loop" + '-' * 30
161
#----------------------------------------------------------------------
164
return unittest.makeSuite(SimpleQueueTestCase)
167
if __name__ == '__main__':
168
unittest.main(defaultTest='test_suite')