~pythonregexp2.7/python/issue2636-11

« back to all changes in this revision

Viewing changes to Lib/bsddb/test/test_replication.py

  • Committer: Jeffrey C. "The TimeHorse" Jacobs
  • Date: 2008-09-21 13:47:31 UTC
  • mfrom: (39021.1.404 Regexp-2.7)
  • mto: This revision was merged to the branch mainline in revision 39030.
  • Revision ID: darklord@timehorse.com-20080921134731-rudomuzeh1b2tz1y
Merged in changes from the latest python source snapshot.

Show diffs side-by-side

added added

removed removed

Lines of Context:
5
5
import time
6
6
import unittest
7
7
 
8
 
try:
9
 
    # For Pythons w/distutils pybsddb
10
 
    from bsddb3 import db
11
 
except ImportError:
12
 
    # For Python 2.3
13
 
    from bsddb import db
14
 
 
15
 
from test_all import get_new_environment_path, get_new_database_path
16
 
 
17
 
try:
18
 
    from bsddb3 import test_support
19
 
except ImportError:
20
 
    from test import test_support
21
 
 
22
 
from test_all import verbose
 
8
from test_all import db, test_support, have_threads, verbose, \
 
9
        get_new_environment_path, get_new_database_path
 
10
 
23
11
 
24
12
#----------------------------------------------------------------------
25
13
 
58
46
        self.dbenvMaster.set_event_notify(confirmed_master)
59
47
        self.dbenvClient.set_event_notify(client_startupdone)
60
48
 
 
49
        #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
 
50
        #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
 
51
        #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
 
52
        #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
 
53
 
 
54
        self.dbMaster = self.dbClient = None
 
55
 
 
56
 
 
57
    def tearDown(self):
 
58
        if self.dbClient :
 
59
            self.dbClient.close()
 
60
        if self.dbMaster :
 
61
            self.dbMaster.close()
 
62
        self.dbenvClient.close()
 
63
        self.dbenvMaster.close()
 
64
        test_support.rmtree(self.homeDirClient)
 
65
        test_support.rmtree(self.homeDirMaster)
 
66
 
 
67
    def test01_basic_replication(self) :
61
68
        master_port = test_support.find_unused_port()
62
69
        self.dbenvMaster.repmgr_set_local_site("127.0.0.1", master_port)
63
70
        client_port = test_support.find_unused_port()
69
76
        self.dbenvMaster.rep_set_priority(10)
70
77
        self.dbenvClient.rep_set_priority(0)
71
78
 
 
79
        self.dbenvMaster.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100123)
 
80
        self.dbenvClient.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100321)
 
81
        self.assertEquals(self.dbenvMaster.rep_get_timeout(
 
82
            db.DB_REP_CONNECTION_RETRY), 100123)
 
83
        self.assertEquals(self.dbenvClient.rep_get_timeout(
 
84
            db.DB_REP_CONNECTION_RETRY), 100321)
 
85
 
 
86
        self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100234)
 
87
        self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100432)
 
88
        self.assertEquals(self.dbenvMaster.rep_get_timeout(
 
89
            db.DB_REP_ELECTION_TIMEOUT), 100234)
 
90
        self.assertEquals(self.dbenvClient.rep_get_timeout(
 
91
            db.DB_REP_ELECTION_TIMEOUT), 100432)
 
92
 
 
93
        self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100345)
 
94
        self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100543)
 
95
        self.assertEquals(self.dbenvMaster.rep_get_timeout(
 
96
            db.DB_REP_ELECTION_RETRY), 100345)
 
97
        self.assertEquals(self.dbenvClient.rep_get_timeout(
 
98
            db.DB_REP_ELECTION_RETRY), 100543)
 
99
 
72
100
        self.dbenvMaster.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL)
73
101
        self.dbenvClient.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL)
74
102
 
84
112
        self.assertEquals(self.dbenvClient.repmgr_get_ack_policy(),
85
113
                db.DB_REPMGR_ACKS_ALL)
86
114
 
87
 
        #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
88
 
        #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
89
 
        #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
90
 
        #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
91
 
 
92
 
        self.dbMaster = self.dbClient = None
93
 
 
94
115
        # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
95
116
        # is not generated if the master has no new transactions.
96
117
        # This is solved in BDB 4.6 (#15542).
97
 
        timeout = time.time()+2
 
118
        import time
 
119
        timeout = time.time()+10
98
120
        while (time.time()<timeout) and not (self.confirmed_master and self.client_startupdone) :
99
121
            time.sleep(0.02)
100
 
        if db.version() >= (4,6) :
 
122
        # this fails on Windows as self.client_startupdone never gets set
 
123
        # to True - see bug 3892.  BUT - even though this assertion
 
124
        # fails on Windows the rest of the test passes - so to prove
 
125
        # that we let the rest of the test run.  Sadly we can't
 
126
        # make use of raising TestSkipped() here (unittest still
 
127
        # reports it as an error), so we yell to stderr.
 
128
        import sys
 
129
        if sys.platform=="win32":
 
130
            print >> sys.stderr, \
 
131
                "XXX - windows bsddb replication fails on windows and is skipped"
 
132
            print >> sys.stderr, "XXX - Please see issue #3892"
 
133
        else:
101
134
            self.assertTrue(time.time()<timeout)
102
 
        else :
103
 
            self.assertTrue(time.time()>=timeout)
104
135
 
105
136
        d = self.dbenvMaster.repmgr_site_list()
106
137
        self.assertEquals(len(d), 1)
120
151
            d = self.dbenvMaster.repmgr_stat(flags=db.DB_STAT_CLEAR);
121
152
            self.assertTrue("msgs_queued" in d)
122
153
 
 
154
        self.dbMaster=db.DB(self.dbenvMaster)
 
155
        txn=self.dbenvMaster.txn_begin()
 
156
        self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn)
 
157
        txn.commit()
 
158
 
 
159
        import time,os.path
 
160
        timeout=time.time()+10
 
161
        while (time.time()<timeout) and \
 
162
          not (os.path.exists(os.path.join(self.homeDirClient,"test"))) :
 
163
            time.sleep(0.01)
 
164
 
 
165
        self.dbClient=db.DB(self.dbenvClient)
 
166
        while True :
 
167
            txn=self.dbenvClient.txn_begin()
 
168
            try :
 
169
                self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY,
 
170
                        mode=0666, txn=txn)
 
171
            except db.DBRepHandleDeadError :
 
172
                txn.abort()
 
173
                self.dbClient.close()
 
174
                self.dbClient=db.DB(self.dbenvClient)
 
175
                continue
 
176
 
 
177
            txn.commit()
 
178
            break
 
179
 
 
180
        txn=self.dbenvMaster.txn_begin()
 
181
        self.dbMaster.put("ABC", "123", txn=txn)
 
182
        txn.commit()
 
183
        import time
 
184
        timeout=time.time()+10
 
185
        v=None
 
186
        while (time.time()<timeout) and (v==None) :
 
187
            txn=self.dbenvClient.txn_begin()
 
188
            v=self.dbClient.get("ABC", txn=txn)
 
189
            txn.commit()
 
190
            if v==None :
 
191
                time.sleep(0.02)
 
192
        self.assertTrue(time.time()<timeout)
 
193
        self.assertEquals("123", v)
 
194
 
 
195
        txn=self.dbenvMaster.txn_begin()
 
196
        self.dbMaster.delete("ABC", txn=txn)
 
197
        txn.commit()
 
198
        timeout=time.time()+10
 
199
        while (time.time()<timeout) and (v!=None) :
 
200
            txn=self.dbenvClient.txn_begin()
 
201
            v=self.dbClient.get("ABC", txn=txn)
 
202
            txn.commit()
 
203
            if v==None :
 
204
                time.sleep(0.02)
 
205
        self.assertTrue(time.time()<timeout)
 
206
        self.assertEquals(None, v)
 
207
 
 
208
class DBBaseReplication(DBReplicationManager):
 
209
    def setUp(self) :
 
210
        DBReplicationManager.setUp(self)
 
211
        def confirmed_master(a,b,c) :
 
212
            if (b == db.DB_EVENT_REP_MASTER) or (b == db.DB_EVENT_REP_ELECTED) :
 
213
                self.confirmed_master = True
 
214
 
 
215
        def client_startupdone(a,b,c) :
 
216
            if b == db.DB_EVENT_REP_STARTUPDONE :
 
217
                self.client_startupdone = True
 
218
 
 
219
        self.dbenvMaster.set_event_notify(confirmed_master)
 
220
        self.dbenvClient.set_event_notify(client_startupdone)
 
221
 
 
222
        import Queue
 
223
        self.m2c = Queue.Queue()
 
224
        self.c2m = Queue.Queue()
 
225
 
 
226
        # There are only two nodes, so we don't need to
 
227
        # do any routing decision
 
228
        def m2c(dbenv, control, rec, lsnp, envid, flags) :
 
229
            self.m2c.put((control, rec))
 
230
 
 
231
        def c2m(dbenv, control, rec, lsnp, envid, flags) :
 
232
            self.c2m.put((control, rec))
 
233
 
 
234
        self.dbenvMaster.rep_set_transport(13,m2c)
 
235
        self.dbenvMaster.rep_set_priority(10)
 
236
        self.dbenvClient.rep_set_transport(3,c2m)
 
237
        self.dbenvClient.rep_set_priority(0)
 
238
 
 
239
        self.assertEquals(self.dbenvMaster.rep_get_priority(),10)
 
240
        self.assertEquals(self.dbenvClient.rep_get_priority(),0)
 
241
 
 
242
        #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
 
243
        #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
 
244
        #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
 
245
        #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
 
246
 
 
247
        def thread_master() :
 
248
            return self.thread_do(self.dbenvMaster, self.c2m, 3,
 
249
                    self.master_doing_election, True)
 
250
 
 
251
        def thread_client() :
 
252
            return self.thread_do(self.dbenvClient, self.m2c, 13,
 
253
                    self.client_doing_election, False)
 
254
 
 
255
        from threading import Thread
 
256
        t_m=Thread(target=thread_master)
 
257
        t_c=Thread(target=thread_client)
 
258
        import sys
 
259
        if sys.version_info[0] < 3 :
 
260
            t_m.setDaemon(True)
 
261
            t_c.setDaemon(True)
 
262
        else :
 
263
            t_m.daemon = True
 
264
            t_c.daemon = True
 
265
 
 
266
        self.t_m = t_m
 
267
        self.t_c = t_c
 
268
 
 
269
        self.dbMaster = self.dbClient = None
 
270
 
 
271
        self.master_doing_election=[False]
 
272
        self.client_doing_election=[False]
 
273
 
 
274
 
123
275
    def tearDown(self):
124
276
        if self.dbClient :
125
277
            self.dbClient.close()
126
278
        if self.dbMaster :
127
279
            self.dbMaster.close()
 
280
        self.m2c.put(None)
 
281
        self.c2m.put(None)
 
282
        self.t_m.join()
 
283
        self.t_c.join()
128
284
        self.dbenvClient.close()
129
285
        self.dbenvMaster.close()
130
286
        test_support.rmtree(self.homeDirClient)
131
287
        test_support.rmtree(self.homeDirMaster)
132
288
 
 
289
    def basic_rep_threading(self) :
 
290
        self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER)
 
291
        self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT)
 
292
 
 
293
        def thread_do(env, q, envid, election_status, must_be_master) :
 
294
            while True :
 
295
                v=q.get()
 
296
                if v == None : return
 
297
                env.rep_process_message(v[0], v[1], envid)
 
298
 
 
299
        self.thread_do = thread_do
 
300
 
 
301
        self.t_m.start()
 
302
        self.t_c.start()
 
303
 
133
304
    def test01_basic_replication(self) :
 
305
        self.basic_rep_threading()
 
306
 
 
307
        # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
 
308
        # is not generated if the master has no new transactions.
 
309
        # This is solved in BDB 4.6 (#15542).
 
310
        import time
 
311
        timeout = time.time()+10
 
312
        while (time.time()<timeout) and not (self.confirmed_master and
 
313
                self.client_startupdone) :
 
314
            time.sleep(0.02)
 
315
        self.assertTrue(time.time()<timeout)
 
316
 
134
317
        self.dbMaster=db.DB(self.dbenvMaster)
135
318
        txn=self.dbenvMaster.txn_begin()
136
319
        self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn)
161
344
        self.dbMaster.put("ABC", "123", txn=txn)
162
345
        txn.commit()
163
346
        import time
164
 
        timeout=time.time()+1
 
347
        timeout=time.time()+10
165
348
        v=None
166
349
        while (time.time()<timeout) and (v==None) :
167
350
            txn=self.dbenvClient.txn_begin()
168
351
            v=self.dbClient.get("ABC", txn=txn)
169
352
            txn.commit()
 
353
            if v==None :
 
354
                time.sleep(0.02)
 
355
        self.assertTrue(time.time()<timeout)
170
356
        self.assertEquals("123", v)
171
357
 
172
358
        txn=self.dbenvMaster.txn_begin()
173
359
        self.dbMaster.delete("ABC", txn=txn)
174
360
        txn.commit()
175
 
        timeout=time.time()+1
 
361
        timeout=time.time()+10
176
362
        while (time.time()<timeout) and (v!=None) :
177
363
            txn=self.dbenvClient.txn_begin()
178
364
            v=self.dbClient.get("ABC", txn=txn)
179
365
            txn.commit()
 
366
            if v==None :
 
367
                time.sleep(0.02)
 
368
        self.assertTrue(time.time()<timeout)
180
369
        self.assertEquals(None, v)
181
370
 
 
371
    if db.version() >= (4,7) :
 
372
        def test02_test_request(self) :
 
373
            self.basic_rep_threading()
 
374
            (minimum, maximum) = self.dbenvClient.rep_get_request()
 
375
            self.dbenvClient.rep_set_request(minimum-1, maximum+1)
 
376
            self.assertEqual(self.dbenvClient.rep_get_request(),
 
377
                    (minimum-1, maximum+1))
 
378
 
 
379
    if db.version() >= (4,6) :
 
380
        def test03_master_election(self) :
 
381
            # Get ready to hold an election
 
382
            #self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER)
 
383
            self.dbenvMaster.rep_start(flags=db.DB_REP_CLIENT)
 
384
            self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT)
 
385
 
 
386
            def thread_do(env, q, envid, election_status, must_be_master) :
 
387
                while True :
 
388
                    v=q.get()
 
389
                    if v == None : return
 
390
                    r = env.rep_process_message(v[0],v[1],envid)
 
391
                    if must_be_master and self.confirmed_master :
 
392
                        self.dbenvMaster.rep_start(flags = db.DB_REP_MASTER)
 
393
                        must_be_master = False
 
394
 
 
395
                    if r[0] == db.DB_REP_HOLDELECTION :
 
396
                        def elect() :
 
397
                            while True :
 
398
                                try :
 
399
                                    env.rep_elect(2, 1)
 
400
                                    election_status[0] = False
 
401
                                    break
 
402
                                except db.DBRepUnavailError :
 
403
                                    pass
 
404
                        if not election_status[0] and not self.confirmed_master :
 
405
                            from threading import Thread
 
406
                            election_status[0] = True
 
407
                            t=Thread(target=elect)
 
408
                            import sys
 
409
                            if sys.version_info[0] < 3 :
 
410
                                t.setDaemon(True)
 
411
                            else :
 
412
                                t.daemon = True
 
413
                            t.start()
 
414
 
 
415
            self.thread_do = thread_do
 
416
 
 
417
            self.t_m.start()
 
418
            self.t_c.start()
 
419
 
 
420
            self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000)
 
421
            self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000)
 
422
            self.client_doing_election[0] = True
 
423
            while True :
 
424
                try :
 
425
                    self.dbenvClient.rep_elect(2, 1)
 
426
                    self.client_doing_election[0] = False
 
427
                    break
 
428
                except db.DBRepUnavailError :
 
429
                    pass
 
430
 
 
431
            self.assertTrue(self.confirmed_master)
 
432
 
182
433
#----------------------------------------------------------------------
183
434
 
184
435
def test_suite():
185
436
    suite = unittest.TestSuite()
186
 
    if db.version() >= (4,5) :
 
437
    if db.version() >= (4, 6) :
187
438
        dbenv = db.DBEnv()
188
439
        try :
189
440
            dbenv.repmgr_get_ack_policy()
194
445
        del dbenv
195
446
        if ReplicationManager_available :
196
447
            suite.addTest(unittest.makeSuite(DBReplicationManager))
 
448
 
 
449
        if have_threads :
 
450
            suite.addTest(unittest.makeSuite(DBBaseReplication))
 
451
 
197
452
    return suite
198
453
 
199
454