~sidnei/zope3/ztk-1.0a1

« back to all changes in this revision

Viewing changes to src/ZEO/tests/InvalidationTests.py

  • Committer: Sidnei da Silva
  • Date: 2010-03-03 03:29:50 UTC
  • mfrom: (12.1.16 trunk)
  • Revision ID: sidnei.da.silva@canonical.com-20100303032950-duivfaoqsxaf9dgg
Merged newer-from-ztk [r=jkakar,bigkevmcd,free][qa=andreas][f=522474].

Update our monolithic Zope 3 tree to a kgs-based, generated,
monolithic Zope 3 tree built from eggs using the
collective.buildout.omelette recipe.

Show diffs side-by-side

added added

removed removed

Lines of Context:
24
24
from ZEO.tests.TestThread import TestThread
25
25
 
26
26
from ZODB.DB import DB
27
 
from ZODB.POSException \
28
 
     import ReadConflictError, ConflictError, VersionLockError
 
27
from ZODB.POSException import ReadConflictError, ConflictError
29
28
 
30
29
# The tests here let several threads have a go at one or more database
31
30
# instances simultaneously.  Each thread appends a disjoint (from the
235
234
        self.added_keys = keys_added.keys()
236
235
        cn.close()
237
236
 
238
 
class VersionStressThread(FailableThread):
239
 
 
240
 
    def __init__(self, db, stop, threadnum, commitdict, startnum,
241
 
                 step=2, sleep=None):
242
 
        TestThread.__init__(self)
243
 
        self.db = db
244
 
        self.stop = stop
245
 
        self.threadnum = threadnum
246
 
        self.startnum = startnum
247
 
        self.step = step
248
 
        self.sleep = sleep
249
 
        self.added_keys = []
250
 
        self.commitdict = commitdict
251
 
 
252
 
    def _testrun(self):
253
 
        commit = 0
254
 
        key = self.startnum
255
 
        while not self.stop.isSet():
256
 
            version = "%s:%s" % (self.threadnum, key)
257
 
            commit = not commit
258
 
            if self.oneupdate(version, key, commit):
259
 
                self.added_keys.append(key)
260
 
                self.commitdict[self] = 1
261
 
            key += self.step
262
 
 
263
 
    def oneupdate(self, version, key, commit=1):
264
 
        # The mess of sleeps below were added to reduce the number
265
 
        # of VersionLockErrors, based on empirical observation.
266
 
        # It looks like the threads don't switch enough without
267
 
        # the sleeps.
268
 
 
269
 
        cn = self.db.open(version)
270
 
        while not self.stop.isSet():
271
 
            try:
272
 
                tree = cn.root()["tree"]
273
 
                break
274
 
            except (ConflictError, KeyError):
275
 
                transaction.abort()
276
 
        while not self.stop.isSet():
277
 
            try:
278
 
                tree[key] = self.threadnum
279
 
                transaction.commit()
280
 
                if self.sleep:
281
 
                    time.sleep(self.sleep)
282
 
                break
283
 
            except (VersionLockError, ReadConflictError, ConflictError), msg:
284
 
                transaction.abort()
285
 
                if self.sleep:
286
 
                    time.sleep(self.sleep)
287
 
        try:
288
 
            while not self.stop.isSet():
289
 
                try:
290
 
                    if commit:
291
 
                        self.db.commitVersion(version)
292
 
                        transaction.get().note("commit version %s" % version)
293
 
                    else:
294
 
                        self.db.abortVersion(version)
295
 
                        transaction.get().note("abort version %s" % version)
296
 
                    transaction.commit()
297
 
                    if self.sleep:
298
 
                        time.sleep(self.sleep)
299
 
                    return commit
300
 
                except ConflictError, msg:
301
 
                    transaction.abort()
302
 
        finally:
303
 
            cn.close()
304
 
        return 0
305
 
 
306
237
class InvalidationTests:
307
238
 
308
 
    level = 2
309
 
 
310
239
    # Minimum # of seconds the main thread lets the workers run.  The
311
240
    # test stops as soon as this much time has elapsed, and all threads
312
241
    # have managed to commit a change.
341
270
        # the actual database state.
342
271
 
343
272
        expected_keys = []
 
273
        errormsgs = []
 
274
        err = errormsgs.append
 
275
 
344
276
        for t in threads:
345
277
            if not t.added_keys:
346
278
                err("thread %d didn't add any keys" % t.threadnum)
354
286
                break
355
287
            time.sleep(.1)
356
288
        else:
357
 
            errormsgs = []
358
 
            err = errormsgs.append
359
289
            err("expected keys != actual keys")
360
290
            for k in expected_keys:
361
291
                if k not in actual_keys:
507
437
        db1.close()
508
438
        db2.close()
509
439
 
510
 
    # TODO:  Temporarily disabled.  I know it fails, and there's no point
511
 
    # getting an endless number of reports about that.
512
 
    def xxxcheckConcurrentUpdatesInVersions(self):
513
 
        self._storage = storage1 = self.openClientStorage()
514
 
        db1 = DB(storage1)
515
 
        db2 = DB(self.openClientStorage())
516
 
        stop = threading.Event()
517
 
 
518
 
        cn = db1.open()
519
 
        tree = cn.root()["tree"] = OOBTree()
520
 
        transaction.commit()
521
 
        cn.close()
522
 
 
523
 
        # Run three threads that update the BTree.
524
 
        # Two of the threads share a single storage so that it
525
 
        # is possible for both threads to read the same object
526
 
        # at the same time.
527
 
 
528
 
        cd = {}
529
 
        t1 = VersionStressThread(db1, stop, 1, cd, 1, 3)
530
 
        t2 = VersionStressThread(db2, stop, 2, cd, 2, 3, 0.01)
531
 
        t3 = VersionStressThread(db2, stop, 3, cd, 3, 3, 0.01)
532
 
        self.go(stop, cd, t1, t2, t3)
533
 
 
534
 
        while db1.lastTransaction() != db2.lastTransaction():
535
 
            db1._storage.sync()
536
 
            db2._storage.sync()
537
 
 
538
 
 
539
 
        cn = db1.open()
540
 
        tree = cn.root()["tree"]
541
 
        self._check_tree(cn, tree)
542
 
        self._check_threads(tree, t1, t2, t3)
543
 
 
544
 
        cn.close()
545
 
        db1.close()
546
 
        db2.close()
547
 
 
548
440
    def checkConcurrentLargeUpdates(self):
549
441
        # Use 3 threads like the 2StorageMT test above.
550
442
        self._storage = storage1 = self.openClientStorage()