2
TestCases for testing the locking sub-system.
8
from test_all import db, test_support, verbose, have_threads, \
9
get_new_environment_path, get_new_database_path
12
from threading import Thread
14
if sys.version_info[0] < 3 :
15
from threading import currentThread
17
from threading import current_thread as currentThread
19
#----------------------------------------------------------------------
21
class LockingTestCase(unittest.TestCase):
23
if sys.version_info[:3] < (2, 4, 0):
24
def assertTrue(self, expr, msg=None):
25
self.failUnless(expr,msg=msg)
29
self.homeDir = get_new_environment_path()
31
self.env.open(self.homeDir, db.DB_THREAD | db.DB_INIT_MPOOL |
32
db.DB_INIT_LOCK | db.DB_CREATE)
37
test_support.rmtree(self.homeDir)
40
def test01_simple(self):
43
print "Running %s.test01_simple..." % self.__class__.__name__
45
anID = self.env.lock_id()
47
print "locker ID: %s" % anID
48
lock = self.env.lock_get(anID, "some locked thing", db.DB_LOCK_WRITE)
50
print "Aquired lock: %s" % lock
51
self.env.lock_put(lock)
53
print "Released lock: %s" % lock
54
self.env.lock_id_free(anID)
57
def test02_threaded(self):
60
print "Running %s.test02_threaded..." % self.__class__.__name__
63
threads.append(Thread(target = self.theThread,
64
args=(db.DB_LOCK_WRITE,)))
65
threads.append(Thread(target = self.theThread,
66
args=(db.DB_LOCK_READ,)))
67
threads.append(Thread(target = self.theThread,
68
args=(db.DB_LOCK_READ,)))
69
threads.append(Thread(target = self.theThread,
70
args=(db.DB_LOCK_WRITE,)))
71
threads.append(Thread(target = self.theThread,
72
args=(db.DB_LOCK_READ,)))
73
threads.append(Thread(target = self.theThread,
74
args=(db.DB_LOCK_READ,)))
75
threads.append(Thread(target = self.theThread,
76
args=(db.DB_LOCK_WRITE,)))
77
threads.append(Thread(target = self.theThread,
78
args=(db.DB_LOCK_WRITE,)))
79
threads.append(Thread(target = self.theThread,
80
args=(db.DB_LOCK_WRITE,)))
84
if sys.version_info[0] < 3 :
92
def test03_lock_timeout(self):
93
self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)
94
self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)
95
self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)
96
self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)
98
def deadlock_detection() :
99
while not deadlock_detection.end :
100
deadlock_detection.count = \
101
self.env.lock_detect(db.DB_LOCK_EXPIRE)
102
if deadlock_detection.count :
103
while not deadlock_detection.end :
108
deadlock_detection.end=False
109
deadlock_detection.count=0
110
t=Thread(target=deadlock_detection)
112
if sys.version_info[0] < 3 :
117
self.env.set_timeout(100000, db.DB_SET_LOCK_TIMEOUT)
118
anID = self.env.lock_id()
119
anID2 = self.env.lock_id()
120
self.assertNotEqual(anID, anID2)
121
lock = self.env.lock_get(anID, "shared lock", db.DB_LOCK_WRITE)
122
start_time=time.time()
123
self.assertRaises(db.DBLockNotGrantedError,
124
self.env.lock_get,anID2, "shared lock", db.DB_LOCK_READ)
126
deadlock_detection.end=True
127
self.assertTrue((end_time-start_time) >= 0.1)
128
self.env.lock_put(lock)
131
self.env.lock_id_free(anID)
132
self.env.lock_id_free(anID2)
134
if db.version() >= (4,6):
135
self.assertTrue(deadlock_detection.count>0)
137
def theThread(self, lockType):
139
if sys.version_info[0] < 3 :
140
name = currentThread().getName()
142
name = currentThread().name
144
if lockType == db.DB_LOCK_WRITE:
149
anID = self.env.lock_id()
151
print "%s: locker ID: %s" % (name, anID)
153
for i in xrange(1000) :
154
lock = self.env.lock_get(anID, "some locked thing", lockType)
156
print "%s: Aquired %s lock: %s" % (name, lt, lock)
158
self.env.lock_put(lock)
160
print "%s: Released %s lock: %s" % (name, lt, lock)
162
self.env.lock_id_free(anID)
165
#----------------------------------------------------------------------
168
suite = unittest.TestSuite()
171
suite.addTest(unittest.makeSuite(LockingTestCase))
173
suite.addTest(unittest.makeSuite(LockingTestCase, 'test01'))
178
if __name__ == '__main__':
179
unittest.main(defaultTest='test_suite')