30
30
from persistent.interfaces import IPersistentDataManager
31
31
from ZODB.interfaces import IConnection
32
32
from ZODB.interfaces import IBlobStorage
33
from ZODB.blob import Blob, rename_or_copy_blob
33
from ZODB.interfaces import IMVCCStorage
34
from ZODB.blob import Blob, rename_or_copy_blob, remove_committed_dir
34
35
from transaction.interfaces import ISavepointDataManager
35
36
from transaction.interfaces import IDataManagerSavepoint
36
37
from transaction.interfaces import ISynchronizer
41
43
from ZODB.blob import SAVEPOINT_SUFFIX
42
44
from ZODB.ConflictResolution import ResolvedSerial
43
45
from ZODB.ExportImport import ExportImport
44
46
from ZODB import POSException
45
47
from ZODB.POSException import InvalidObjectReference, ConnectionStateError
46
48
from ZODB.POSException import ConflictError, ReadConflictError
47
from ZODB.POSException import Unsupported
49
from ZODB.POSException import Unsupported, ReadOnlyHistoryError
48
50
from ZODB.POSException import POSKeyError
49
51
from ZODB.serialize import ObjectWriter, ObjectReader, myhasattr
50
52
from ZODB.utils import p64, u64, z64, oid_repr, positive_id
79
81
##########################################################################
80
82
# Connection methods, ZODB.IConnection
82
def __init__(self, db, version='', cache_size=400):
84
def __init__(self, db, cache_size=400, before=None, cache_size_bytes=0):
83
85
"""Create a new Connection."""
85
87
self._log = logging.getLogger('ZODB.Connection')
86
88
self._debug_info = ()
92
# historical connection
89
95
# Multi-database support
90
96
self.connections = {self._db.database_name: self}
92
self._version = version
93
self._normal_storage = self._storage = db._storage
94
self.new_oid = db._storage.new_oid
99
if IMVCCStorage.providedBy(storage):
100
# Use a connection-specific storage instance.
101
self._mvcc_storage = True
102
storage = storage.new_instance()
104
self._mvcc_storage = False
106
self._normal_storage = self._storage = storage
107
self.new_oid = storage.new_oid
95
108
self._savepoint_storage = None
97
110
# Do we need to join a txn manager?
98
111
self._needs_to_join = True
99
112
self.transaction_manager = None
100
self._opened = None # time.time() when DB.open() opened us
113
self.opened = None # time.time() when DB.open() opened us
102
115
self._reset_counter = global_reset_counter
103
116
self._load_count = 0 # Number of objects unghosted
106
119
# Cache which can ghostify (forget the state of) objects not
107
120
# recently used. Its API is roughly that of a dict, with
108
121
# additional gc-related and invalidation-related methods.
109
self._cache = PickleCache(self, cache_size)
122
self._cache = PickleCache(self, cache_size, cache_size_bytes)
111
124
# The pre-cache is used by get to avoid infinite loops when
112
125
# objects immediately load their state whern they get their
113
126
# persistent data set.
114
127
self._pre_cache = {}
117
# Caches for versions end up empty if the version
118
# is not used for a while. Non-version caches
119
# keep their content indefinitely.
120
# Unclear: Why do we want version caches to behave this way?
121
self._cache.cache_drain_resistance = 100
123
129
# List of all objects (not oids) registered as modified by the
124
130
# persistence machinery, or by add(), or whose access caused a
140
146
# During commit, all objects go to either _modified or _creating:
142
148
# Dict of oid->flag of new objects (without serial), either
143
# added by add() or implicitely added (discovered by the
149
# added by add() or implicitly added (discovered by the
144
150
# serializer during commit). The flag is True for implicit
145
151
# adding. Used during abort to remove created objects from the
146
152
# _cache, and by persistent_id to check that a new object isn't
183
188
self._conflicts = {}
185
# If MVCC is enabled, then _mvcc is True and _txn_time stores
186
# the upper bound on transactions visible to this connection.
187
# That is, all object revisions must be written before _txn_time.
188
# If it is None, then the current revisions are acceptable.
189
# If the connection is in a version, mvcc will be disabled, because
190
# loadBefore() only returns non-version data.
190
# _txn_time stores the upper bound on transactions visible to
191
# this connection. That is, all object revisions must be
192
# written before _txn_time. If it is None, then the current
193
# revisions are acceptable.
191
194
self._txn_time = None
193
196
# To support importFile(), implemented in the ExportImport base
234
237
obj = self._added.get(oid, None)
235
238
if obj is not None:
237
240
obj = self._pre_cache.get(oid, None)
238
241
if obj is not None:
241
244
# This appears to be an MVCC violation because we are loading
242
245
# the must recent data when perhaps we shouldnt. The key is
243
# that we are only creating a ghost!
244
p, serial = self._storage.load(oid, self._version)
246
# that we are only creating a ghost!
247
# A disadvantage to this optimization is that _p_serial cannot be
248
# trusted until the object has been loaded, which affects both MVCC
249
# and historical connections.
250
p, serial = self._storage.load(oid, '')
245
251
obj = self._reader.getGhost(p)
247
253
# Avoid infiniate loop if obj tries to load its state before
295
301
self._debug_info = ()
298
304
self.transaction_manager.unregisterSynch(self)
306
if self._mvcc_storage:
307
self._storage.sync(force=False)
301
310
for connection in self.connections.values():
302
311
if connection is not self:
303
312
connection.close(False)
305
314
# Return the connection to the pool.
306
if self._opened is not None:
315
if self.opened is not None:
307
316
self._db._returnToPool(self)
309
# _returnToPool() set self._opened to None.
318
# _returnToPool() set self.opened to None.
310
319
# However, we can't assert that here, because self may
311
320
# have been reused (by another thread) by the time we
315
am = self._db._activity_monitor
317
am.closedConnection(self)
320
326
"""Returns a handle to the database this connection belongs to."""
323
329
def isReadOnly(self):
324
"""Returns True if the storage for this connection is read only."""
325
if self._opened is None:
330
"""Returns True if this connection is read only."""
331
if self.opened is None:
326
332
raise ConnectionStateError("The database connection is closed")
327
return self._storage.isReadOnly()
333
return self.before is not None or self._storage.isReadOnly()
329
335
def invalidate(self, tid, oids):
330
336
"""Notify the Connection that transaction 'tid' invalidated oids."""
337
if self.before is not None:
338
# this is an historical connection. Invalidations are irrelevant.
331
340
self._inv_lock.acquire()
333
342
if self._txn_time is None:
334
343
self._txn_time = tid
344
elif tid < self._txn_time:
345
raise AssertionError("invalidations out of order, %r < %r"
346
% (tid, self._txn_time))
335
348
self._invalidated.update(oids)
337
350
self._inv_lock.release()
342
355
self._invalidatedCache = True
344
357
self._inv_lock.release()
348
361
"""Return the database root object."""
351
def getVersion(self):
352
"""Returns the version this connection is attached to."""
353
if self._storage is None:
354
raise ConnectionStateError("The database connection is closed")
362
return RootConvenience(self.get(z64))
357
364
def get_connection(self, database_name):
358
365
"""Return a Connection for the named database."""
472
479
# Process pending invalidations.
473
480
def _flush_invalidations(self):
481
if self._mvcc_storage:
482
# Poll the storage for invalidations.
483
invalidated = self._storage.poll_invalidations()
484
if invalidated is None:
485
# special value: the transaction is so old that
486
# we need to flush the whole cache.
487
self._cache.invalidate(self._cache.cache_data.keys())
489
self._cache.invalidate(invalidated)
474
491
self._inv_lock.acquire()
476
493
# Non-ghostifiable objects may need to read when they are
543
560
def _commit(self, transaction):
544
561
"""Commit changes to an object"""
563
if self.before is not None:
564
raise ReadOnlyHistoryError()
547
567
# We are importing an export file. We alsways do this
548
568
# while making a savepoint so we can copy export data
622
642
if isinstance(obj, Blob):
623
643
if not IBlobStorage.providedBy(self._storage):
624
644
raise Unsupported(
625
"Storing Blobs in %s is not supported." %
645
"Storing Blobs in %s is not supported." %
626
646
repr(self._storage))
628
648
raise ValueError("Can't commit with opened blobs.")
629
649
s = self._storage.storeBlob(oid, serial, p,
630
650
obj._uncommitted(),
631
self._version, transaction)
632
652
# we invalidate the object here in order to ensure
633
653
# that that the next attribute access of its name
634
654
# unghostify it, which will cause its blob data
635
655
# to be reattached "cleanly"
636
656
obj._p_invalidate()
638
s = self._storage.store(oid, serial, p, self._version,
658
s = self._storage.store(oid, serial, p, '', transaction)
640
660
self._store_count += 1
641
661
# Put the object in the cache before handling the
642
662
# response, just in case the response contains the
832
859
# 3. Raise ConflictError.
834
861
# Does anything actually use _p_independent()? It would simplify
835
# the code if we could drop support for it.
862
# the code if we could drop support for it.
836
863
# (BTrees.Length does.)
838
# There is a harmless data race with self._invalidated. A
839
# dict update could go on in another thread, but we don't care
840
# because we have to check again after the load anyway.
843
if self._invalidatedCache:
844
raise ReadConflictError()
846
if (obj._p_oid in self._invalidated and
847
not myhasattr(obj, "_p_independent")):
848
# If the object has _p_independent(), we will handle it below.
849
self._load_before_or_conflict(obj)
852
p, serial = self._storage.load(obj._p_oid, self._version)
853
self._load_count += 1
855
self._inv_lock.acquire()
857
invalid = obj._p_oid in self._invalidated
859
self._inv_lock.release()
862
if myhasattr(obj, "_p_independent"):
863
# This call will raise a ReadConflictError if something
865
self._handle_independent(obj)
866
if self.before is not None:
867
# Load data that was current before the time we have.
869
t = self._storage.loadBefore(obj._p_oid, before)
871
raise POSKeyError() # historical connection!
875
# There is a harmless data race with self._invalidated. A
876
# dict update could go on in another thread, but we don't care
877
# because we have to check again after the load anyway.
879
if self._invalidatedCache:
880
raise ReadConflictError()
882
if (obj._p_oid in self._invalidated and
883
not myhasattr(obj, "_p_independent")):
884
# If the object has _p_independent(), we will handle it below.
867
885
self._load_before_or_conflict(obj)
888
p, serial = self._storage.load(obj._p_oid, '')
889
self._load_count += 1
891
self._inv_lock.acquire()
893
invalid = obj._p_oid in self._invalidated
895
self._inv_lock.release()
898
if myhasattr(obj, "_p_independent"):
899
# This call will raise a ReadConflictError if something
901
self._handle_independent(obj)
903
self._load_before_or_conflict(obj)
870
906
self._reader.setGhostState(obj, p)
871
907
obj._p_serial = serial
908
self._cache.update_object_size_estimation(obj._p_oid, len(p))
909
obj._p_estimated_size = len(p)
874
912
if isinstance(obj, Blob):
878
916
def _load_before_or_conflict(self, obj):
879
917
"""Load non-current state for obj or raise ReadConflictError."""
880
if not ((not self._version) and self._setstate_noncurrent(obj)):
918
if not self._setstate_noncurrent(obj):
881
919
self._register(obj)
882
920
self._conflicts[obj._p_oid] = True
883
921
raise ReadConflictError(object=obj)
904
942
assert self._txn_time <= end, (u64(self._txn_time), u64(end))
905
943
self._reader.setGhostState(obj, data)
906
944
obj._p_serial = start
947
if isinstance(obj, Blob):
948
obj._p_blob_uncommitted = None
949
obj._p_blob_committed = self._storage.loadBlob(obj._p_oid, start)
909
953
def _handle_independent(self, obj):
1027
1071
self._invalidated.clear()
1028
1072
self._invalidatedCache = False
1029
1073
cache_size = self._cache.cache_size
1030
self._reader._cache = self._cache = PickleCache(self, cache_size)
1074
cache_size_bytes = self._cache.cache_size_bytes
1075
self._cache = cache = PickleCache(self, cache_size, cache_size_bytes)
1076
if getattr(self, '_reader', None) is not None:
1077
self._reader._cache = cache
1079
def _releaseStorage(self):
1080
"""Tell the storage to release resources it's using"""
1081
if self._mvcc_storage:
1082
self._storage.release()
1032
1084
##########################################################################
1033
1085
# Python protocol
1035
1087
def __repr__(self):
1037
ver = ' (in version %s)' % `self._version`
1040
return '<Connection at %08x%s>' % (positive_id(self), ver)
1088
return '<Connection at %08x>' % (positive_id(self),)
1042
1090
# Python protocol
1043
1091
##########################################################################
1048
1096
__getitem__ = get
1050
def modifiedInVersion(self, oid):
1051
"""Returns the version the object with the given oid was modified in.
1053
If it wasn't modified in a version, the current version of this
1054
connection is returned.
1057
return self._db.modifiedInVersion(oid)
1059
return self.getVersion()
1061
1098
def exchange(self, old, new):
1062
1099
# called by a ZClasses method that isn't executed by the test suite
1063
1100
oid = old._p_oid
1126
1163
for oid in oids:
1127
1164
data, serial = src.load(oid, src)
1165
obj = self._cache.get(oid, None)
1167
self._cache.update_object_size_estimation(obj._p_oid, len(data))
1168
obj._p_estimated_size = len(data)
1128
1169
if isinstance(self._reader.getGhost(data), Blob):
1129
1170
blobfilename = src.loadBlob(oid, serial)
1130
1171
s = self._storage.storeBlob(oid, serial, data, blobfilename,
1131
self._version, transaction)
1132
1173
# we invalidate the object here in order to ensure
1133
1174
# that that the next attribute access of its name
1134
1175
# unghostify it, which will cause its blob data
1136
1177
self.invalidate(s, {oid:True})
1138
1179
s = self._storage.store(oid, serial, data,
1139
self._version, transaction)
1141
1182
self._handle_serial(s, oid, change=False)
1187
1228
implements(IBlobStorage)
1189
def __init__(self, base_version, storage):
1230
def __init__(self, storage):
1190
1231
self._storage = storage
1191
1232
for method in (
1192
1233
'getName', 'new_oid', 'getSize', 'sortKey', 'loadBefore',
1194
1236
setattr(self, method, getattr(storage, method))
1197
supportsVersions = storage.supportsVersions
1198
except AttributeError:
1201
if supportsVersions():
1202
self.modifiedInVersion = storage.modifiedInVersion
1203
self.versionEmpty = storage.versionEmpty
1205
self._base_version = base_version
1206
1238
self._file = tempfile.TemporaryFile()
1207
1239
# position: current file position
1208
1240
# _tpos: file position at last commit point
1211
1243
self.index = {}
1212
1244
self.creating = {}
1246
self._blob_dir = None
1214
1248
def __len__(self):
1215
1249
return len(self.index)
1217
1251
def close(self):
1218
1252
self._file.close()
1253
if self._blob_dir is not None:
1254
remove_committed_dir(self._blob_dir)
1255
self._blob_dir = None
1220
1257
def load(self, oid, version):
1221
1258
pos = self.index.get(oid)
1222
1259
if pos is None:
1223
return self._storage.load(oid, self._base_version)
1260
return self._storage.load(oid, '')
1224
1261
self._file.seek(pos)
1225
1262
h = self._file.read(8)
1226
1263
oidlen = u64(h)
1250
1287
def storeBlob(self, oid, serial, data, blobfilename, version,
1252
serial = self.store(oid, serial, data, version, transaction)
1289
assert version == ''
1290
serial = self.store(oid, serial, data, '', transaction)
1254
1292
targetpath = self._getBlobPath()
1255
1293
if not os.path.exists(targetpath):
1270
1308
return self._storage.loadBlob(oid, serial)
1271
1309
return filename
1311
def openCommittedBlobFile(self, oid, serial, blob=None):
1312
blob_filename = self.loadBlob(oid, serial)
1314
return open(blob_filename, 'rb')
1316
return ZODB.blob.BlobFile(blob_filename, 'r', blob)
1273
1318
def _getBlobPath(self):
1274
return os.path.join(self.temporaryDirectory(), 'savepoints')
1319
blob_dir = self._blob_dir
1320
if blob_dir is None:
1321
blob_dir = tempfile.mkdtemp(dir=self.temporaryDirectory(),
1322
prefix='savepoints')
1323
self._blob_dir = blob_dir
1276
1326
def _getCleanFilename(self, oid, tid):
1277
return os.path.join(self._getBlobPath(),
1278
"%s-%s%s" % (utils.oid_repr(oid), utils.tid_repr(tid), SAVEPOINT_SUFFIX,)
1327
return os.path.join(
1328
self._getBlobPath(),
1329
"%s-%s%s" % (utils.oid_repr(oid), utils.tid_repr(tid),
1281
1333
def temporaryDirectory(self):
1282
1334
return self._storage.temporaryDirectory()
1294
1346
# a copy of the index here. An alternative would be to ensure that
1295
1347
# all callers pass copies. As is, our callers do not make copies.
1296
1348
self.index = index.copy()
1350
class RootConvenience(object):
1352
def __init__(self, root):
1353
self.__dict__['_root'] = root
1355
def __getattr__(self, name):
1357
return self._root[name]
1359
raise AttributeError(name)
1361
def __setattr__(self, name, v):
1362
self._root[name] = v
1364
def __delattr__(self, name):
1366
del self._root[name]
1368
raise AttributeError(name)
1374
names = " ".join(sorted(self._root))
1376
names = names[:57].rsplit(' ', 1)[0] + ' ...'
1377
return "<root: %s>" % names