~hardware-certification/zope3/certify-staging-2.5

« back to all changes in this revision

Viewing changes to src/ZODB/tests/.svn/text-base/RevisionStorage.py.svn-base

  • Committer: Marc Tardif
  • Date: 2008-04-26 19:03:34 UTC
  • Revision ID: cr3@lime-20080426190334-u16xo4llz56vliqf
Initial import.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
##############################################################################
 
2
#
 
3
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
 
4
# All Rights Reserved.
 
5
#
 
6
# This software is subject to the provisions of the Zope Public License,
 
7
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
 
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
 
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
 
11
# FOR A PARTICULAR PURPOSE.
 
12
#
 
13
##############################################################################
 
14
"""Check loadSerial() on storages that support historical revisions."""
 
15
 
 
16
from ZODB.tests.MinPO import MinPO
 
17
from ZODB.tests.StorageTestBase import \
 
18
     zodb_unpickle, zodb_pickle, snooze, handle_serials
 
19
from ZODB.utils import p64, u64
 
20
 
 
21
import transaction
 
22
 
 
23
ZERO = '\0'*8
 
24
 
 
25
class RevisionStorage:
 
26
 
 
27
    def checkLoadSerial(self):
 
28
        oid = self._storage.new_oid()
 
29
        revid = ZERO
 
30
        revisions = {}
 
31
        for i in range(31, 38):
 
32
            revid = self._dostore(oid, revid=revid, data=MinPO(i))
 
33
            revisions[revid] = MinPO(i)
 
34
        # Now make sure all the revisions have the correct value
 
35
        for revid, value in revisions.items():
 
36
            data = self._storage.loadSerial(oid, revid)
 
37
            self.assertEqual(zodb_unpickle(data), value)
 
38
 
 
39
    def checkLoadBefore(self):
 
40
        # Store 10 revisions of one object and then make sure that we
 
41
        # can get all the non-current revisions back.
 
42
        oid = self._storage.new_oid()
 
43
        revs = []
 
44
        revid = None
 
45
        for i in range(10):
 
46
            # We need to ensure that successive timestamps are at least
 
47
            # two apart, so that a timestamp exists that's unambiguously
 
48
            # between successive timestamps.  Each call to snooze()
 
49
            # guarantees that the next timestamp will be at least one
 
50
            # larger (and probably much more than that) than the previous
 
51
            # one.
 
52
            snooze()
 
53
            snooze()
 
54
            revid = self._dostore(oid, revid, data=MinPO(i))
 
55
            revs.append(self._storage.load(oid, ""))
 
56
 
 
57
        prev = u64(revs[0][1])
 
58
        for i in range(1, 10):
 
59
            tid = revs[i][1]
 
60
            cur = u64(tid)
 
61
            middle = prev + (cur - prev) // 2
 
62
            assert prev < middle < cur  # else the snooze() trick failed
 
63
            prev = cur
 
64
            t = self._storage.loadBefore(oid, p64(middle))
 
65
            self.assert_(t is not None)
 
66
            data, start, end = t
 
67
            self.assertEqual(revs[i-1][0], data)
 
68
            self.assertEqual(tid, end)
 
69
 
 
70
    def checkLoadBeforeEdges(self):
 
71
        # Check the edges cases for a non-current load.
 
72
        oid = self._storage.new_oid()
 
73
 
 
74
        self.assertRaises(KeyError, self._storage.loadBefore,
 
75
                          oid, p64(0))
 
76
 
 
77
        revid1 = self._dostore(oid, data=MinPO(1))
 
78
 
 
79
        self.assertEqual(self._storage.loadBefore(oid, p64(0)), None)
 
80
        self.assertEqual(self._storage.loadBefore(oid, revid1), None)
 
81
 
 
82
        cur = p64(u64(revid1) + 1)
 
83
        data, start, end = self._storage.loadBefore(oid, cur)
 
84
        self.assertEqual(zodb_unpickle(data), MinPO(1))
 
85
        self.assertEqual(start, revid1)
 
86
        self.assertEqual(end, None)
 
87
 
 
88
        revid2 = self._dostore(oid, revid=revid1, data=MinPO(2))
 
89
        data, start, end = self._storage.loadBefore(oid, cur)
 
90
        self.assertEqual(zodb_unpickle(data), MinPO(1))
 
91
        self.assertEqual(start, revid1)
 
92
        self.assertEqual(end, revid2)
 
93
 
 
94
    def checkLoadBeforeOld(self):
 
95
        # Look for a very old revision.  With the BaseStorage implementation
 
96
        # this should require multple history() calls.
 
97
        oid = self._storage.new_oid()
 
98
        revs = []
 
99
        revid = None
 
100
        for i in range(50):
 
101
            revid = self._dostore(oid, revid, data=MinPO(i))
 
102
            revs.append(revid)
 
103
 
 
104
        data, start, end = self._storage.loadBefore(oid, revs[12])
 
105
        self.assertEqual(zodb_unpickle(data), MinPO(11))
 
106
        self.assertEqual(start, revs[11])
 
107
        self.assertEqual(end, revs[12])
 
108
 
 
109
 
 
110
    # Unsure:  Is it okay to assume everyone testing against RevisionStorage
 
111
    # implements undo?
 
112
 
 
113
    def checkLoadBeforeUndo(self):
 
114
        # Do several transactions then undo them.
 
115
        oid = self._storage.new_oid()
 
116
        revid = None
 
117
        for i in range(5):
 
118
            revid = self._dostore(oid, revid, data=MinPO(i))
 
119
        revs = []
 
120
        for i in range(4):
 
121
            info = self._storage.undoInfo()
 
122
            tid = info[0]["id"]
 
123
            # Always undo the most recent txn, so the value will
 
124
            # alternate between 3 and 4.
 
125
            self._undo(tid, [oid], note="undo %d" % i)
 
126
            revs.append(self._storage.load(oid, ""))
 
127
 
 
128
        prev_tid = None
 
129
        for i, (data, tid) in enumerate(revs):
 
130
            t = self._storage.loadBefore(oid, p64(u64(tid) + 1))
 
131
            self.assertEqual(data, t[0])
 
132
            self.assertEqual(tid, t[1])
 
133
            if prev_tid:
 
134
                self.assert_(prev_tid < t[1])
 
135
            prev_tid = t[1]
 
136
            if i < 3:
 
137
                self.assertEqual(revs[i+1][1], t[2])
 
138
            else:
 
139
                self.assertEqual(None, t[2])
 
140
 
 
141
    def checkLoadBeforeConsecutiveTids(self):
 
142
        eq = self.assertEqual
 
143
        oid = self._storage.new_oid()
 
144
        def helper(tid, revid, x):
 
145
            data = zodb_pickle(MinPO(x))
 
146
            t = transaction.Transaction()
 
147
            try:
 
148
                self._storage.tpc_begin(t, p64(tid))
 
149
                r1 = self._storage.store(oid, revid, data, '', t)
 
150
                # Finish the transaction
 
151
                r2 = self._storage.tpc_vote(t)
 
152
                newrevid = handle_serials(oid, r1, r2)
 
153
                self._storage.tpc_finish(t)
 
154
            except:
 
155
                self._storage.tpc_abort(t)
 
156
                raise
 
157
            return newrevid
 
158
        revid1 = helper(1, None, 1)
 
159
        revid2 = helper(2, revid1, 2)
 
160
        revid3 = helper(3, revid2, 3)
 
161
        data, start_tid, end_tid = self._storage.loadBefore(oid, p64(2))
 
162
        eq(zodb_unpickle(data), MinPO(1))
 
163
        eq(u64(start_tid), 1)
 
164
        eq(u64(end_tid), 2)
 
165
 
 
166
    def checkLoadBeforeCreation(self):
 
167
        eq = self.assertEqual
 
168
        oid1 = self._storage.new_oid()
 
169
        oid2 = self._storage.new_oid()
 
170
        revid1 = self._dostore(oid1)
 
171
        revid2 = self._dostore(oid2)
 
172
        results = self._storage.loadBefore(oid2, revid2)
 
173
        eq(results, None)
 
174
 
 
175
    # TODO:  There are other edge cases to handle, including pack.