1
##############################################################################
3
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
6
# This software is subject to the provisions of the Zope Public License,
7
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE.
13
##############################################################################
14
"""Check loadSerial() on storages that support historical revisions."""
16
from ZODB.tests.MinPO import MinPO
17
from ZODB.tests.StorageTestBase import \
18
zodb_unpickle, zodb_pickle, snooze, handle_serials
19
from ZODB.utils import p64, u64
25
class RevisionStorage:
27
def checkLoadSerial(self):
28
oid = self._storage.new_oid()
31
for i in range(31, 38):
32
revid = self._dostore(oid, revid=revid, data=MinPO(i))
33
revisions[revid] = MinPO(i)
34
# Now make sure all the revisions have the correct value
35
for revid, value in revisions.items():
36
data = self._storage.loadSerial(oid, revid)
37
self.assertEqual(zodb_unpickle(data), value)
39
def checkLoadBefore(self):
40
# Store 10 revisions of one object and then make sure that we
41
# can get all the non-current revisions back.
42
oid = self._storage.new_oid()
46
# We need to ensure that successive timestamps are at least
47
# two apart, so that a timestamp exists that's unambiguously
48
# between successive timestamps. Each call to snooze()
49
# guarantees that the next timestamp will be at least one
50
# larger (and probably much more than that) than the previous
54
revid = self._dostore(oid, revid, data=MinPO(i))
55
revs.append(self._storage.load(oid, ""))
57
prev = u64(revs[0][1])
58
for i in range(1, 10):
61
middle = prev + (cur - prev) // 2
62
assert prev < middle < cur # else the snooze() trick failed
64
t = self._storage.loadBefore(oid, p64(middle))
65
self.assert_(t is not None)
67
self.assertEqual(revs[i-1][0], data)
68
self.assertEqual(tid, end)
70
def checkLoadBeforeEdges(self):
71
# Check the edges cases for a non-current load.
72
oid = self._storage.new_oid()
74
self.assertRaises(KeyError, self._storage.loadBefore,
77
revid1 = self._dostore(oid, data=MinPO(1))
79
self.assertEqual(self._storage.loadBefore(oid, p64(0)), None)
80
self.assertEqual(self._storage.loadBefore(oid, revid1), None)
82
cur = p64(u64(revid1) + 1)
83
data, start, end = self._storage.loadBefore(oid, cur)
84
self.assertEqual(zodb_unpickle(data), MinPO(1))
85
self.assertEqual(start, revid1)
86
self.assertEqual(end, None)
88
revid2 = self._dostore(oid, revid=revid1, data=MinPO(2))
89
data, start, end = self._storage.loadBefore(oid, cur)
90
self.assertEqual(zodb_unpickle(data), MinPO(1))
91
self.assertEqual(start, revid1)
92
self.assertEqual(end, revid2)
94
def checkLoadBeforeOld(self):
95
# Look for a very old revision. With the BaseStorage implementation
96
# this should require multple history() calls.
97
oid = self._storage.new_oid()
101
revid = self._dostore(oid, revid, data=MinPO(i))
104
data, start, end = self._storage.loadBefore(oid, revs[12])
105
self.assertEqual(zodb_unpickle(data), MinPO(11))
106
self.assertEqual(start, revs[11])
107
self.assertEqual(end, revs[12])
110
# Unsure: Is it okay to assume everyone testing against RevisionStorage
113
def checkLoadBeforeUndo(self):
114
# Do several transactions then undo them.
115
oid = self._storage.new_oid()
118
revid = self._dostore(oid, revid, data=MinPO(i))
121
info = self._storage.undoInfo()
123
# Always undo the most recent txn, so the value will
124
# alternate between 3 and 4.
125
self._undo(tid, [oid], note="undo %d" % i)
126
revs.append(self._storage.load(oid, ""))
129
for i, (data, tid) in enumerate(revs):
130
t = self._storage.loadBefore(oid, p64(u64(tid) + 1))
131
self.assertEqual(data, t[0])
132
self.assertEqual(tid, t[1])
134
self.assert_(prev_tid < t[1])
137
self.assertEqual(revs[i+1][1], t[2])
139
self.assertEqual(None, t[2])
141
def checkLoadBeforeConsecutiveTids(self):
142
eq = self.assertEqual
143
oid = self._storage.new_oid()
144
def helper(tid, revid, x):
145
data = zodb_pickle(MinPO(x))
146
t = transaction.Transaction()
148
self._storage.tpc_begin(t, p64(tid))
149
r1 = self._storage.store(oid, revid, data, '', t)
150
# Finish the transaction
151
r2 = self._storage.tpc_vote(t)
152
newrevid = handle_serials(oid, r1, r2)
153
self._storage.tpc_finish(t)
155
self._storage.tpc_abort(t)
158
revid1 = helper(1, None, 1)
159
revid2 = helper(2, revid1, 2)
160
revid3 = helper(3, revid2, 3)
161
data, start_tid, end_tid = self._storage.loadBefore(oid, p64(2))
162
eq(zodb_unpickle(data), MinPO(1))
163
eq(u64(start_tid), 1)
166
def checkLoadBeforeCreation(self):
167
eq = self.assertEqual
168
oid1 = self._storage.new_oid()
169
oid2 = self._storage.new_oid()
170
revid1 = self._dostore(oid1)
171
revid2 = self._dostore(oid2)
172
results = self._storage.loadBefore(oid2, revid2)
175
# TODO: There are other edge cases to handle, including pack.