24
24
from ZEO.tests.TestThread import TestThread
26
26
from ZODB.DB import DB
27
from ZODB.POSException \
28
import ReadConflictError, ConflictError, VersionLockError
27
from ZODB.POSException import ReadConflictError, ConflictError
30
29
# The tests here let several threads have a go at one or more database
31
30
# instances simultaneously. Each thread appends a disjoint (from the
235
234
self.added_keys = keys_added.keys()
238
class VersionStressThread(FailableThread):
240
def __init__(self, db, stop, threadnum, commitdict, startnum,
242
TestThread.__init__(self)
245
self.threadnum = threadnum
246
self.startnum = startnum
250
self.commitdict = commitdict
255
while not self.stop.isSet():
256
version = "%s:%s" % (self.threadnum, key)
258
if self.oneupdate(version, key, commit):
259
self.added_keys.append(key)
260
self.commitdict[self] = 1
263
def oneupdate(self, version, key, commit=1):
264
# The mess of sleeps below were added to reduce the number
265
# of VersionLockErrors, based on empirical observation.
266
# It looks like the threads don't switch enough without
269
cn = self.db.open(version)
270
while not self.stop.isSet():
272
tree = cn.root()["tree"]
274
except (ConflictError, KeyError):
276
while not self.stop.isSet():
278
tree[key] = self.threadnum
281
time.sleep(self.sleep)
283
except (VersionLockError, ReadConflictError, ConflictError), msg:
286
time.sleep(self.sleep)
288
while not self.stop.isSet():
291
self.db.commitVersion(version)
292
transaction.get().note("commit version %s" % version)
294
self.db.abortVersion(version)
295
transaction.get().note("abort version %s" % version)
298
time.sleep(self.sleep)
300
except ConflictError, msg:
306
237
class InvalidationTests:
310
239
# Minimum # of seconds the main thread lets the workers run. The
311
240
# test stops as soon as this much time has elapsed, and all threads
312
241
# have managed to commit a change.
510
# TODO: Temporarily disabled. I know it fails, and there's no point
511
# getting an endless number of reports about that.
512
def xxxcheckConcurrentUpdatesInVersions(self):
513
self._storage = storage1 = self.openClientStorage()
515
db2 = DB(self.openClientStorage())
516
stop = threading.Event()
519
tree = cn.root()["tree"] = OOBTree()
523
# Run three threads that update the BTree.
524
# Two of the threads share a single storage so that it
525
# is possible for both threads to read the same object
529
t1 = VersionStressThread(db1, stop, 1, cd, 1, 3)
530
t2 = VersionStressThread(db2, stop, 2, cd, 2, 3, 0.01)
531
t3 = VersionStressThread(db2, stop, 3, cd, 3, 3, 0.01)
532
self.go(stop, cd, t1, t2, t3)
534
while db1.lastTransaction() != db2.lastTransaction():
540
tree = cn.root()["tree"]
541
self._check_tree(cn, tree)
542
self._check_threads(tree, t1, t2, t3)
548
440
def checkConcurrentLargeUpdates(self):
549
441
# Use 3 threads like the 2StorageMT test above.
550
442
self._storage = storage1 = self.openClientStorage()