~malept/ubuntu/lucid/python2.6/dev-dependency-fix

« back to all changes in this revision

Viewing changes to Lib/bsddb/test/test_lock.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2009-02-13 12:51:00 UTC
  • Revision ID: james.westby@ubuntu.com-20090213125100-uufgcb9yeqzujpqw
Tags: upstream-2.6.1
ImportĀ upstreamĀ versionĀ 2.6.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""
 
2
TestCases for testing the locking sub-system.
 
3
"""
 
4
 
 
5
import time
 
6
 
 
7
import unittest
 
8
from test_all import db, test_support, verbose, have_threads, \
 
9
        get_new_environment_path, get_new_database_path
 
10
 
 
11
if have_threads :
 
12
    from threading import Thread
 
13
    import sys
 
14
    if sys.version_info[0] < 3 :
 
15
        from threading import currentThread
 
16
    else :
 
17
        from threading import current_thread as currentThread
 
18
 
 
19
#----------------------------------------------------------------------
 
20
 
 
21
class LockingTestCase(unittest.TestCase):
 
22
    import sys
 
23
    if sys.version_info[:3] < (2, 4, 0):
 
24
        def assertTrue(self, expr, msg=None):
 
25
            self.failUnless(expr,msg=msg)
 
26
 
 
27
 
 
28
    def setUp(self):
 
29
        self.homeDir = get_new_environment_path()
 
30
        self.env = db.DBEnv()
 
31
        self.env.open(self.homeDir, db.DB_THREAD | db.DB_INIT_MPOOL |
 
32
                                    db.DB_INIT_LOCK | db.DB_CREATE)
 
33
 
 
34
 
 
35
    def tearDown(self):
 
36
        self.env.close()
 
37
        test_support.rmtree(self.homeDir)
 
38
 
 
39
 
 
40
    def test01_simple(self):
 
41
        if verbose:
 
42
            print '\n', '-=' * 30
 
43
            print "Running %s.test01_simple..." % self.__class__.__name__
 
44
 
 
45
        anID = self.env.lock_id()
 
46
        if verbose:
 
47
            print "locker ID: %s" % anID
 
48
        lock = self.env.lock_get(anID, "some locked thing", db.DB_LOCK_WRITE)
 
49
        if verbose:
 
50
            print "Aquired lock: %s" % lock
 
51
        self.env.lock_put(lock)
 
52
        if verbose:
 
53
            print "Released lock: %s" % lock
 
54
        self.env.lock_id_free(anID)
 
55
 
 
56
 
 
57
    def test02_threaded(self):
 
58
        if verbose:
 
59
            print '\n', '-=' * 30
 
60
            print "Running %s.test02_threaded..." % self.__class__.__name__
 
61
 
 
62
        threads = []
 
63
        threads.append(Thread(target = self.theThread,
 
64
                              args=(db.DB_LOCK_WRITE,)))
 
65
        threads.append(Thread(target = self.theThread,
 
66
                              args=(db.DB_LOCK_READ,)))
 
67
        threads.append(Thread(target = self.theThread,
 
68
                              args=(db.DB_LOCK_READ,)))
 
69
        threads.append(Thread(target = self.theThread,
 
70
                              args=(db.DB_LOCK_WRITE,)))
 
71
        threads.append(Thread(target = self.theThread,
 
72
                              args=(db.DB_LOCK_READ,)))
 
73
        threads.append(Thread(target = self.theThread,
 
74
                              args=(db.DB_LOCK_READ,)))
 
75
        threads.append(Thread(target = self.theThread,
 
76
                              args=(db.DB_LOCK_WRITE,)))
 
77
        threads.append(Thread(target = self.theThread,
 
78
                              args=(db.DB_LOCK_WRITE,)))
 
79
        threads.append(Thread(target = self.theThread,
 
80
                              args=(db.DB_LOCK_WRITE,)))
 
81
 
 
82
        for t in threads:
 
83
            import sys
 
84
            if sys.version_info[0] < 3 :
 
85
                t.setDaemon(True)
 
86
            else :
 
87
                t.daemon = True
 
88
            t.start()
 
89
        for t in threads:
 
90
            t.join()
 
91
 
 
92
    def test03_lock_timeout(self):
 
93
        self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)
 
94
        self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)
 
95
        self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)
 
96
        self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)
 
97
 
 
98
        def deadlock_detection() :
 
99
            while not deadlock_detection.end :
 
100
                deadlock_detection.count = \
 
101
                    self.env.lock_detect(db.DB_LOCK_EXPIRE)
 
102
                if deadlock_detection.count :
 
103
                    while not deadlock_detection.end :
 
104
                        pass
 
105
                    break
 
106
                time.sleep(0.01)
 
107
 
 
108
        deadlock_detection.end=False
 
109
        deadlock_detection.count=0
 
110
        t=Thread(target=deadlock_detection)
 
111
        import sys
 
112
        if sys.version_info[0] < 3 :
 
113
            t.setDaemon(True)
 
114
        else :
 
115
            t.daemon = True
 
116
        t.start()
 
117
        self.env.set_timeout(100000, db.DB_SET_LOCK_TIMEOUT)
 
118
        anID = self.env.lock_id()
 
119
        anID2 = self.env.lock_id()
 
120
        self.assertNotEqual(anID, anID2)
 
121
        lock = self.env.lock_get(anID, "shared lock", db.DB_LOCK_WRITE)
 
122
        start_time=time.time()
 
123
        self.assertRaises(db.DBLockNotGrantedError,
 
124
                self.env.lock_get,anID2, "shared lock", db.DB_LOCK_READ)
 
125
        end_time=time.time()
 
126
        deadlock_detection.end=True
 
127
        self.assertTrue((end_time-start_time) >= 0.1)
 
128
        self.env.lock_put(lock)
 
129
        t.join()
 
130
 
 
131
        self.env.lock_id_free(anID)
 
132
        self.env.lock_id_free(anID2)
 
133
 
 
134
        if db.version() >= (4,6):
 
135
            self.assertTrue(deadlock_detection.count>0)
 
136
 
 
137
    def theThread(self, lockType):
 
138
        import sys
 
139
        if sys.version_info[0] < 3 :
 
140
            name = currentThread().getName()
 
141
        else :
 
142
            name = currentThread().name
 
143
 
 
144
        if lockType ==  db.DB_LOCK_WRITE:
 
145
            lt = "write"
 
146
        else:
 
147
            lt = "read"
 
148
 
 
149
        anID = self.env.lock_id()
 
150
        if verbose:
 
151
            print "%s: locker ID: %s" % (name, anID)
 
152
 
 
153
        for i in xrange(1000) :
 
154
            lock = self.env.lock_get(anID, "some locked thing", lockType)
 
155
            if verbose:
 
156
                print "%s: Aquired %s lock: %s" % (name, lt, lock)
 
157
 
 
158
            self.env.lock_put(lock)
 
159
            if verbose:
 
160
                print "%s: Released %s lock: %s" % (name, lt, lock)
 
161
 
 
162
        self.env.lock_id_free(anID)
 
163
 
 
164
 
 
165
#----------------------------------------------------------------------
 
166
 
 
167
def test_suite():
 
168
    suite = unittest.TestSuite()
 
169
 
 
170
    if have_threads:
 
171
        suite.addTest(unittest.makeSuite(LockingTestCase))
 
172
    else:
 
173
        suite.addTest(unittest.makeSuite(LockingTestCase, 'test01'))
 
174
 
 
175
    return suite
 
176
 
 
177
 
 
178
if __name__ == '__main__':
 
179
    unittest.main(defaultTest='test_suite')