~sidnei/zope3/ztk-1.0a1

« back to all changes in this revision

Viewing changes to src/ZODB/Connection.py

  • Committer: Sidnei da Silva
  • Date: 2010-03-03 03:29:50 UTC
  • mfrom: (12.1.16 trunk)
  • Revision ID: sidnei.da.silva@canonical.com-20100303032950-duivfaoqsxaf9dgg
Merged newer-from-ztk [r=jkakar,bigkevmcd,free][qa=andreas][f=522474].

Update our monolithic Zope 3 tree to a kgs-based, generated,
monolithic Zope 3 tree built from eggs using the
collective.buildout.omelette recipe.

Show diffs side-by-side

added added

removed removed

Lines of Context:
13
13
##############################################################################
14
14
"""Database connection support
15
15
 
16
 
$Id: Connection.py 91987 2008-10-10 15:20:32Z tseaver $"""
 
16
$Id: Connection.py 106502 2009-12-14 19:27:08Z jim $"""
17
17
 
18
18
import logging
19
19
import sys
22
22
import warnings
23
23
import os
24
24
import shutil
25
 
from time import time
 
25
import time
26
26
 
27
27
from persistent import PickleCache
28
28
 
30
30
from persistent.interfaces import IPersistentDataManager
31
31
from ZODB.interfaces import IConnection
32
32
from ZODB.interfaces import IBlobStorage
33
 
from ZODB.blob import Blob, rename_or_copy_blob
 
33
from ZODB.interfaces import IMVCCStorage
 
34
from ZODB.blob import Blob, rename_or_copy_blob, remove_committed_dir
34
35
from transaction.interfaces import ISavepointDataManager
35
36
from transaction.interfaces import IDataManagerSavepoint
36
37
from transaction.interfaces import ISynchronizer
38
39
 
39
40
import transaction
40
41
 
 
42
import ZODB
41
43
from ZODB.blob import SAVEPOINT_SUFFIX
42
44
from ZODB.ConflictResolution import ResolvedSerial
43
45
from ZODB.ExportImport import ExportImport
44
46
from ZODB import POSException
45
47
from ZODB.POSException import InvalidObjectReference, ConnectionStateError
46
48
from ZODB.POSException import ConflictError, ReadConflictError
47
 
from ZODB.POSException import Unsupported
 
49
from ZODB.POSException import Unsupported, ReadOnlyHistoryError
48
50
from ZODB.POSException import POSKeyError
49
51
from ZODB.serialize import ObjectWriter, ObjectReader, myhasattr
50
52
from ZODB.utils import p64, u64, z64, oid_repr, positive_id
79
81
    ##########################################################################
80
82
    # Connection methods, ZODB.IConnection
81
83
 
82
 
    def __init__(self, db, version='', cache_size=400):
 
84
    def __init__(self, db, cache_size=400, before=None, cache_size_bytes=0):
83
85
        """Create a new Connection."""
84
86
 
85
87
        self._log = logging.getLogger('ZODB.Connection')
86
88
        self._debug_info = ()
87
89
 
88
90
        self._db = db
 
91
 
 
92
        # historical connection
 
93
        self.before = before
 
94
 
89
95
        # Multi-database support
90
96
        self.connections = {self._db.database_name: self}
91
97
 
92
 
        self._version = version
93
 
        self._normal_storage = self._storage = db._storage
94
 
        self.new_oid = db._storage.new_oid
 
98
        storage = db.storage
 
99
        if IMVCCStorage.providedBy(storage):
 
100
            # Use a connection-specific storage instance.
 
101
            self._mvcc_storage = True
 
102
            storage = storage.new_instance()
 
103
        else:
 
104
            self._mvcc_storage = False
 
105
 
 
106
        self._normal_storage = self._storage = storage
 
107
        self.new_oid = storage.new_oid
95
108
        self._savepoint_storage = None
96
109
 
97
110
        # Do we need to join a txn manager?
98
111
        self._needs_to_join = True
99
112
        self.transaction_manager = None
100
 
        self._opened = None # time.time() when DB.open() opened us
 
113
        self.opened = None # time.time() when DB.open() opened us
101
114
 
102
115
        self._reset_counter = global_reset_counter
103
116
        self._load_count = 0   # Number of objects unghosted
106
119
        # Cache which can ghostify (forget the state of) objects not
107
120
        # recently used. Its API is roughly that of a dict, with
108
121
        # additional gc-related and invalidation-related methods.
109
 
        self._cache = PickleCache(self, cache_size)
 
122
        self._cache = PickleCache(self, cache_size, cache_size_bytes)
110
123
 
111
124
        # The pre-cache is used by get to avoid infinite loops when
112
125
        # objects immediately load their state whern they get their
113
126
        # persistent data set.
114
127
        self._pre_cache = {}
115
 
        
116
 
        if version:
117
 
            # Caches for versions end up empty if the version
118
 
            # is not used for a while. Non-version caches
119
 
            # keep their content indefinitely.
120
 
            # Unclear:  Why do we want version caches to behave this way?
121
 
            self._cache.cache_drain_resistance = 100
122
128
 
123
129
        # List of all objects (not oids) registered as modified by the
124
130
        # persistence machinery, or by add(), or whose access caused a
140
146
        # During commit, all objects go to either _modified or _creating:
141
147
 
142
148
        # Dict of oid->flag of new objects (without serial), either
143
 
        # added by add() or implicitely added (discovered by the
 
149
        # added by add() or implicitly added (discovered by the
144
150
        # serializer during commit). The flag is True for implicit
145
151
        # adding. Used during abort to remove created objects from the
146
152
        # _cache, and by persistent_id to check that a new object isn't
151
157
        # in the cache on abort and in other connections on finish.
152
158
        self._modified = []
153
159
 
154
 
 
155
160
        # _invalidated queues invalidate messages delivered from the DB
156
161
        # _inv_lock prevents one thread from modifying the set while
157
162
        # another is processing invalidations.  All the invalidations
182
187
        # _conflicts).
183
188
        self._conflicts = {}
184
189
 
185
 
        # If MVCC is enabled, then _mvcc is True and _txn_time stores
186
 
        # the upper bound on transactions visible to this connection.
187
 
        # That is, all object revisions must be written before _txn_time.
188
 
        # If it is None, then the current revisions are acceptable.
189
 
        # If the connection is in a version, mvcc will be disabled, because
190
 
        # loadBefore() only returns non-version data.
 
190
        # _txn_time stores the upper bound on transactions visible to
 
191
        # this connection. That is, all object revisions must be
 
192
        # written before _txn_time. If it is None, then the current
 
193
        # revisions are acceptable.
191
194
        self._txn_time = None
192
195
 
193
196
        # To support importFile(), implemented in the ExportImport base
201
204
 
202
205
    def add(self, obj):
203
206
        """Add a new object 'obj' to the database and assign it an oid."""
204
 
        if self._opened is None:
 
207
        if self.opened is None:
205
208
            raise ConnectionStateError("The database connection is closed")
206
209
 
207
210
        marker = object()
225
228
 
226
229
    def get(self, oid):
227
230
        """Return the persistent object with oid 'oid'."""
228
 
        if self._opened is None:
 
231
        if self.opened is None:
229
232
            raise ConnectionStateError("The database connection is closed")
230
233
 
231
234
        obj = self._cache.get(oid, None)
233
236
            return obj
234
237
        obj = self._added.get(oid, None)
235
238
        if obj is not None:
236
 
            return obj        
 
239
            return obj
237
240
        obj = self._pre_cache.get(oid, None)
238
241
        if obj is not None:
239
242
            return obj
240
243
 
241
244
        # This appears to be an MVCC violation because we are loading
242
245
        # the must recent data when perhaps we shouldnt. The key is
243
 
        # that we are only creating a ghost!        
244
 
        p, serial = self._storage.load(oid, self._version)
 
246
        # that we are only creating a ghost!
 
247
        # A disadvantage to this optimization is that _p_serial cannot be
 
248
        # trusted until the object has been loaded, which affects both MVCC
 
249
        # and historical connections.
 
250
        p, serial = self._storage.load(oid, '')
245
251
        obj = self._reader.getGhost(p)
246
252
 
247
253
        # Avoid infiniate loop if obj tries to load its state before
294
300
 
295
301
        self._debug_info = ()
296
302
 
297
 
        if self._opened:
 
303
        if self.opened:
298
304
            self.transaction_manager.unregisterSynch(self)
299
305
 
 
306
        if self._mvcc_storage:
 
307
            self._storage.sync(force=False)
 
308
 
300
309
        if primary:
301
310
            for connection in self.connections.values():
302
311
                if connection is not self:
303
312
                    connection.close(False)
304
313
 
305
314
            # Return the connection to the pool.
306
 
            if self._opened is not None:
 
315
            if self.opened is not None:
307
316
                self._db._returnToPool(self)
308
317
 
309
 
                # _returnToPool() set self._opened to None.
 
318
                # _returnToPool() set self.opened to None.
310
319
                # However, we can't assert that here, because self may
311
320
                # have been reused (by another thread) by the time we
312
321
                # get back here.
313
322
        else:
314
 
            self._opened = None
315
 
            am = self._db._activity_monitor
316
 
            if am is not None:
317
 
                am.closedConnection(self)
 
323
            self.opened = None
318
324
 
319
325
    def db(self):
320
326
        """Returns a handle to the database this connection belongs to."""
321
327
        return self._db
322
328
 
323
329
    def isReadOnly(self):
324
 
        """Returns True if the storage for this connection is read only."""
325
 
        if self._opened is None:
 
330
        """Returns True if this connection is read only."""
 
331
        if self.opened is None:
326
332
            raise ConnectionStateError("The database connection is closed")
327
 
        return self._storage.isReadOnly()
 
333
        return self.before is not None or self._storage.isReadOnly()
328
334
 
329
335
    def invalidate(self, tid, oids):
330
336
        """Notify the Connection that transaction 'tid' invalidated oids."""
 
337
        if self.before is not None:
 
338
            # this is an historical connection.  Invalidations are irrelevant.
 
339
            return
331
340
        self._inv_lock.acquire()
332
341
        try:
333
342
            if self._txn_time is None:
334
343
                self._txn_time = tid
 
344
            elif tid < self._txn_time:
 
345
                raise AssertionError("invalidations out of order, %r < %r"
 
346
                                     % (tid, self._txn_time))
 
347
 
335
348
            self._invalidated.update(oids)
336
349
        finally:
337
350
            self._inv_lock.release()
342
355
            self._invalidatedCache = True
343
356
        finally:
344
357
            self._inv_lock.release()
345
 
        
346
358
 
 
359
    @property
347
360
    def root(self):
348
361
        """Return the database root object."""
349
 
        return self.get(z64)
350
 
 
351
 
    def getVersion(self):
352
 
        """Returns the version this connection is attached to."""
353
 
        if self._storage is None:
354
 
            raise ConnectionStateError("The database connection is closed")
355
 
        return self._version
 
362
        return RootConvenience(self.get(z64))
356
363
 
357
364
    def get_connection(self, database_name):
358
365
        """Return a Connection for the named database."""
360
367
        if connection is None:
361
368
            new_con = self._db.databases[database_name].open(
362
369
                transaction_manager=self.transaction_manager,
363
 
                version=self._version,
 
370
                before=self.before,
364
371
                )
365
372
            self.connections.update(new_con.connections)
366
373
            new_con.connections = self.connections
471
478
 
472
479
    # Process pending invalidations.
473
480
    def _flush_invalidations(self):
 
481
        if self._mvcc_storage:
 
482
            # Poll the storage for invalidations.
 
483
            invalidated = self._storage.poll_invalidations()
 
484
            if invalidated is None:
 
485
                # special value: the transaction is so old that
 
486
                # we need to flush the whole cache.
 
487
                self._cache.invalidate(self._cache.cache_data.keys())
 
488
            elif invalidated:
 
489
                self._cache.invalidate(invalidated)
 
490
 
474
491
        self._inv_lock.acquire()
475
492
        try:
476
493
            # Non-ghostifiable objects may need to read when they are
543
560
    def _commit(self, transaction):
544
561
        """Commit changes to an object"""
545
562
 
 
563
        if self.before is not None:
 
564
            raise ReadOnlyHistoryError()
 
565
 
546
566
        if self._import:
547
567
            # We are importing an export file. We alsways do this
548
568
            # while making a savepoint so we can copy export data
559
579
        self._added_during_commit = []
560
580
 
561
581
        if self._invalidatedCache:
562
 
            raise ConflictError()            
 
582
            raise ConflictError()
563
583
 
564
584
        for obj in self._registered_objects:
565
585
            oid = obj._p_oid
601
621
                 or self._savepoint_storage.creating[oid]
602
622
                 )
603
623
                ):
604
 
                
 
624
 
605
625
                # obj is a new object
606
626
 
607
627
                # Because obj was added, it is now in _creating, so it
622
642
            if isinstance(obj, Blob):
623
643
                if not IBlobStorage.providedBy(self._storage):
624
644
                    raise Unsupported(
625
 
                        "Storing Blobs in %s is not supported." % 
 
645
                        "Storing Blobs in %s is not supported." %
626
646
                        repr(self._storage))
627
647
                if obj.opened():
628
648
                    raise ValueError("Can't commit with opened blobs.")
629
649
                s = self._storage.storeBlob(oid, serial, p,
630
650
                                            obj._uncommitted(),
631
 
                                            self._version, transaction)
 
651
                                            '', transaction)
632
652
                # we invalidate the object here in order to ensure
633
653
                # that that the next attribute access of its name
634
654
                # unghostify it, which will cause its blob data
635
655
                # to be reattached "cleanly"
636
656
                obj._p_invalidate()
637
657
            else:
638
 
                s = self._storage.store(oid, serial, p, self._version,
639
 
                                        transaction)
 
658
                s = self._storage.store(oid, serial, p, '', transaction)
 
659
 
640
660
            self._store_count += 1
641
661
            # Put the object in the cache before handling the
642
662
            # response, just in case the response contains the
651
671
                else:
652
672
                    raise
653
673
 
 
674
            self._cache.update_object_size_estimation(oid, len(p))
 
675
            obj._p_estimated_size = len(p)
 
676
 
654
677
            self._handle_serial(s, oid)
655
678
 
656
679
    def _handle_serial(self, store_return, oid=None, change=1):
748
771
        """Indicate confirmation that the transaction is done."""
749
772
 
750
773
        def callback(tid):
 
774
            if self._mvcc_storage:
 
775
                # Inter-connection invalidation is not needed when the
 
776
                # storage provides MVCC.
 
777
                return
751
778
            d = dict.fromkeys(self._modified)
752
779
            self._db.invalidate(tid, d, self)
753
780
#       It's important that the storage calls the passed function
801
828
        the database."""
802
829
        oid = obj._p_oid
803
830
 
804
 
        if self._opened is None:
 
831
        if self.opened is None:
805
832
            msg = ("Shouldn't load state for %s "
806
833
                   "when the connection is closed" % oid_repr(oid))
807
834
            self._log.error(msg)
832
859
        # 3. Raise ConflictError.
833
860
 
834
861
        # Does anything actually use _p_independent()?  It would simplify
835
 
        # the code if we could drop support for it.  
 
862
        # the code if we could drop support for it.
836
863
        # (BTrees.Length does.)
837
864
 
838
 
        # There is a harmless data race with self._invalidated.  A
839
 
        # dict update could go on in another thread, but we don't care
840
 
        # because we have to check again after the load anyway.
841
 
 
842
 
 
843
 
        if self._invalidatedCache:
844
 
            raise ReadConflictError()
845
 
 
846
 
        if (obj._p_oid in self._invalidated and
847
 
                not myhasattr(obj, "_p_independent")):
848
 
            # If the object has _p_independent(), we will handle it below.
849
 
            self._load_before_or_conflict(obj)
850
 
            return
851
 
 
852
 
        p, serial = self._storage.load(obj._p_oid, self._version)
853
 
        self._load_count += 1
854
 
 
855
 
        self._inv_lock.acquire()
856
 
        try:
857
 
            invalid = obj._p_oid in self._invalidated
858
 
        finally:
859
 
            self._inv_lock.release()
860
 
 
861
 
        if invalid:
862
 
            if myhasattr(obj, "_p_independent"):
863
 
                # This call will raise a ReadConflictError if something
864
 
                # goes wrong
865
 
                self._handle_independent(obj)
866
 
            else:
 
865
 
 
866
        if self.before is not None:
 
867
            # Load data that was current before the time we have.
 
868
            before = self.before
 
869
            t = self._storage.loadBefore(obj._p_oid, before)
 
870
            if t is None:
 
871
                raise POSKeyError() # historical connection!
 
872
            p, serial, end = t
 
873
 
 
874
        else:
 
875
            # There is a harmless data race with self._invalidated.  A
 
876
            # dict update could go on in another thread, but we don't care
 
877
            # because we have to check again after the load anyway.
 
878
 
 
879
            if self._invalidatedCache:
 
880
                raise ReadConflictError()
 
881
 
 
882
            if (obj._p_oid in self._invalidated and
 
883
                    not myhasattr(obj, "_p_independent")):
 
884
                # If the object has _p_independent(), we will handle it below.
867
885
                self._load_before_or_conflict(obj)
868
886
                return
869
887
 
 
888
            p, serial = self._storage.load(obj._p_oid, '')
 
889
            self._load_count += 1
 
890
 
 
891
            self._inv_lock.acquire()
 
892
            try:
 
893
                invalid = obj._p_oid in self._invalidated
 
894
            finally:
 
895
                self._inv_lock.release()
 
896
 
 
897
            if invalid:
 
898
                if myhasattr(obj, "_p_independent"):
 
899
                    # This call will raise a ReadConflictError if something
 
900
                    # goes wrong
 
901
                    self._handle_independent(obj)
 
902
                else:
 
903
                    self._load_before_or_conflict(obj)
 
904
                    return
 
905
 
870
906
        self._reader.setGhostState(obj, p)
871
907
        obj._p_serial = serial
 
908
        self._cache.update_object_size_estimation(obj._p_oid, len(p))
 
909
        obj._p_estimated_size = len(p)
872
910
 
873
911
        # Blob support
874
912
        if isinstance(obj, Blob):
877
915
 
878
916
    def _load_before_or_conflict(self, obj):
879
917
        """Load non-current state for obj or raise ReadConflictError."""
880
 
        if not ((not self._version) and self._setstate_noncurrent(obj)):
 
918
        if not self._setstate_noncurrent(obj):
881
919
            self._register(obj)
882
920
            self._conflicts[obj._p_oid] = True
883
921
            raise ReadConflictError(object=obj)
904
942
        assert self._txn_time <= end, (u64(self._txn_time), u64(end))
905
943
        self._reader.setGhostState(obj, data)
906
944
        obj._p_serial = start
 
945
 
 
946
        # MVCC Blob support
 
947
        if isinstance(obj, Blob):
 
948
            obj._p_blob_uncommitted = None
 
949
            obj._p_blob_committed = self._storage.loadBlob(obj._p_oid, start)
 
950
 
907
951
        return True
908
952
 
909
953
    def _handle_independent(self, obj):
994
1038
        register for afterCompletion() calls.
995
1039
        """
996
1040
 
997
 
        self._opened = time()
 
1041
        self.opened = time.time()
998
1042
 
999
1043
        if transaction_manager is None:
1000
1044
            transaction_manager = transaction.manager
1027
1071
        self._invalidated.clear()
1028
1072
        self._invalidatedCache = False
1029
1073
        cache_size = self._cache.cache_size
1030
 
        self._reader._cache = self._cache = PickleCache(self, cache_size)
 
1074
        cache_size_bytes = self._cache.cache_size_bytes
 
1075
        self._cache = cache = PickleCache(self, cache_size, cache_size_bytes)
 
1076
        if getattr(self, '_reader', None) is not None:
 
1077
            self._reader._cache = cache
 
1078
 
 
1079
    def _releaseStorage(self):
 
1080
        """Tell the storage to release resources it's using"""
 
1081
        if self._mvcc_storage:
 
1082
            self._storage.release()
1031
1083
 
1032
1084
    ##########################################################################
1033
1085
    # Python protocol
1034
1086
 
1035
1087
    def __repr__(self):
1036
 
        if self._version:
1037
 
            ver = ' (in version %s)' % `self._version`
1038
 
        else:
1039
 
            ver = ''
1040
 
        return '<Connection at %08x%s>' % (positive_id(self), ver)
 
1088
        return '<Connection at %08x>' % (positive_id(self),)
1041
1089
 
1042
1090
    # Python protocol
1043
1091
    ##########################################################################
1047
1095
 
1048
1096
    __getitem__ = get
1049
1097
 
1050
 
    def modifiedInVersion(self, oid):
1051
 
        """Returns the version the object with the given oid was modified in.
1052
 
 
1053
 
        If it wasn't modified in a version, the current version of this
1054
 
        connection is returned.
1055
 
        """
1056
 
        try:
1057
 
            return self._db.modifiedInVersion(oid)
1058
 
        except KeyError:
1059
 
            return self.getVersion()
1060
 
 
1061
1098
    def exchange(self, old, new):
1062
1099
        # called by a ZClasses method that isn't executed by the test suite
1063
1100
        oid = old._p_oid
1083
1120
 
1084
1121
    def savepoint(self):
1085
1122
        if self._savepoint_storage is None:
1086
 
            tmpstore = TmpStore(self._version, self._normal_storage)
 
1123
            tmpstore = TmpStore(self._normal_storage)
1087
1124
            self._savepoint_storage = tmpstore
1088
1125
            self._storage = self._savepoint_storage
1089
1126
 
1125
1162
 
1126
1163
        for oid in oids:
1127
1164
            data, serial = src.load(oid, src)
 
1165
            obj = self._cache.get(oid, None)
 
1166
            if obj is not None:
 
1167
                self._cache.update_object_size_estimation(obj._p_oid, len(data))
 
1168
                obj._p_estimated_size = len(data)
1128
1169
            if isinstance(self._reader.getGhost(data), Blob):
1129
1170
                blobfilename = src.loadBlob(oid, serial)
1130
1171
                s = self._storage.storeBlob(oid, serial, data, blobfilename,
1131
 
                                            self._version, transaction)
 
1172
                                            '', transaction)
1132
1173
                # we invalidate the object here in order to ensure
1133
1174
                # that that the next attribute access of its name
1134
1175
                # unghostify it, which will cause its blob data
1136
1177
                self.invalidate(s, {oid:True})
1137
1178
            else:
1138
1179
                s = self._storage.store(oid, serial, data,
1139
 
                                        self._version, transaction)
 
1180
                                        '', transaction)
1140
1181
 
1141
1182
            self._handle_serial(s, oid, change=False)
1142
1183
        src.close()
1186
1227
 
1187
1228
    implements(IBlobStorage)
1188
1229
 
1189
 
    def __init__(self, base_version, storage):
 
1230
    def __init__(self, storage):
1190
1231
        self._storage = storage
1191
1232
        for method in (
1192
1233
            'getName', 'new_oid', 'getSize', 'sortKey', 'loadBefore',
 
1234
            'isReadOnly'
1193
1235
            ):
1194
1236
            setattr(self, method, getattr(storage, method))
1195
1237
 
1196
 
        try:
1197
 
            supportsVersions = storage.supportsVersions
1198
 
        except AttributeError:
1199
 
            pass
1200
 
        else:
1201
 
            if supportsVersions():
1202
 
                self.modifiedInVersion = storage.modifiedInVersion
1203
 
                self.versionEmpty = storage.versionEmpty
1204
 
 
1205
 
        self._base_version = base_version
1206
1238
        self._file = tempfile.TemporaryFile()
1207
1239
        # position: current file position
1208
1240
        # _tpos: file position at last commit point
1211
1243
        self.index = {}
1212
1244
        self.creating = {}
1213
1245
 
 
1246
        self._blob_dir = None
 
1247
 
1214
1248
    def __len__(self):
1215
1249
        return len(self.index)
1216
1250
 
1217
1251
    def close(self):
1218
1252
        self._file.close()
 
1253
        if self._blob_dir is not None:
 
1254
            remove_committed_dir(self._blob_dir)
 
1255
            self._blob_dir = None
1219
1256
 
1220
1257
    def load(self, oid, version):
1221
1258
        pos = self.index.get(oid)
1222
1259
        if pos is None:
1223
 
            return self._storage.load(oid, self._base_version)
 
1260
            return self._storage.load(oid, '')
1224
1261
        self._file.seek(pos)
1225
1262
        h = self._file.read(8)
1226
1263
        oidlen = u64(h)
1235
1272
    def store(self, oid, serial, data, version, transaction):
1236
1273
        # we have this funny signature so we can reuse the normal non-commit
1237
1274
        # commit logic
1238
 
        assert version == self._base_version
 
1275
        assert version == ''
1239
1276
        self._file.seek(self.position)
1240
1277
        l = len(data)
1241
1278
        if serial is None:
1249
1286
 
1250
1287
    def storeBlob(self, oid, serial, data, blobfilename, version,
1251
1288
                  transaction):
1252
 
        serial = self.store(oid, serial, data, version, transaction)
 
1289
        assert version == ''
 
1290
        serial = self.store(oid, serial, data, '', transaction)
1253
1291
 
1254
1292
        targetpath = self._getBlobPath()
1255
1293
        if not os.path.exists(targetpath):
1270
1308
            return self._storage.loadBlob(oid, serial)
1271
1309
        return filename
1272
1310
 
 
1311
    def openCommittedBlobFile(self, oid, serial, blob=None):
 
1312
        blob_filename = self.loadBlob(oid, serial)
 
1313
        if blob is None:
 
1314
            return open(blob_filename, 'rb')
 
1315
        else:
 
1316
            return ZODB.blob.BlobFile(blob_filename, 'r', blob)
 
1317
 
1273
1318
    def _getBlobPath(self):
1274
 
        return os.path.join(self.temporaryDirectory(), 'savepoints')
 
1319
        blob_dir = self._blob_dir
 
1320
        if blob_dir is None:
 
1321
            blob_dir = tempfile.mkdtemp(dir=self.temporaryDirectory(),
 
1322
                                        prefix='savepoints')
 
1323
            self._blob_dir = blob_dir
 
1324
        return blob_dir
1275
1325
 
1276
1326
    def _getCleanFilename(self, oid, tid):
1277
 
        return os.path.join(self._getBlobPath(),
1278
 
                            "%s-%s%s" % (utils.oid_repr(oid), utils.tid_repr(tid), SAVEPOINT_SUFFIX,)
1279
 
                            )
 
1327
        return os.path.join(
 
1328
            self._getBlobPath(),
 
1329
            "%s-%s%s" % (utils.oid_repr(oid), utils.tid_repr(tid),
 
1330
                         SAVEPOINT_SUFFIX,)
 
1331
            )
1280
1332
 
1281
1333
    def temporaryDirectory(self):
1282
1334
        return self._storage.temporaryDirectory()
1294
1346
        # a copy of the index here.  An alternative would be to ensure that
1295
1347
        # all callers pass copies.  As is, our callers do not make copies.
1296
1348
        self.index = index.copy()
 
1349
 
 
1350
class RootConvenience(object):
 
1351
 
 
1352
    def __init__(self, root):
 
1353
        self.__dict__['_root'] = root
 
1354
 
 
1355
    def __getattr__(self, name):
 
1356
        try:
 
1357
            return self._root[name]
 
1358
        except KeyError:
 
1359
            raise AttributeError(name)
 
1360
 
 
1361
    def __setattr__(self, name, v):
 
1362
        self._root[name] = v
 
1363
 
 
1364
    def __delattr__(self, name):
 
1365
        try:
 
1366
            del self._root[name]
 
1367
        except KeyError:
 
1368
            raise AttributeError(name)
 
1369
 
 
1370
    def __call__(self):
 
1371
        return self._root
 
1372
 
 
1373
    def __repr__(self):
 
1374
        names = " ".join(sorted(self._root))
 
1375
        if len(names) > 60:
 
1376
            names = names[:57].rsplit(' ', 1)[0] + ' ...'
 
1377
        return "<root: %s>" % names