69
76
self.dbenvMaster.rep_set_priority(10)
70
77
self.dbenvClient.rep_set_priority(0)
79
self.dbenvMaster.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100123)
80
self.dbenvClient.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100321)
81
self.assertEquals(self.dbenvMaster.rep_get_timeout(
82
db.DB_REP_CONNECTION_RETRY), 100123)
83
self.assertEquals(self.dbenvClient.rep_get_timeout(
84
db.DB_REP_CONNECTION_RETRY), 100321)
86
self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100234)
87
self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100432)
88
self.assertEquals(self.dbenvMaster.rep_get_timeout(
89
db.DB_REP_ELECTION_TIMEOUT), 100234)
90
self.assertEquals(self.dbenvClient.rep_get_timeout(
91
db.DB_REP_ELECTION_TIMEOUT), 100432)
93
self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100345)
94
self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100543)
95
self.assertEquals(self.dbenvMaster.rep_get_timeout(
96
db.DB_REP_ELECTION_RETRY), 100345)
97
self.assertEquals(self.dbenvClient.rep_get_timeout(
98
db.DB_REP_ELECTION_RETRY), 100543)
72
100
self.dbenvMaster.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL)
73
101
self.dbenvClient.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL)
84
112
self.assertEquals(self.dbenvClient.repmgr_get_ack_policy(),
85
113
db.DB_REPMGR_ACKS_ALL)
87
#self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
88
#self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
89
#self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
90
#self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
92
self.dbMaster = self.dbClient = None
94
115
# The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
95
116
# is not generated if the master has no new transactions.
96
117
# This is solved in BDB 4.6 (#15542).
97
timeout = time.time()+2
119
timeout = time.time()+10
98
120
while (time.time()<timeout) and not (self.confirmed_master and self.client_startupdone) :
100
if db.version() >= (4,6) :
122
# this fails on Windows as self.client_startupdone never gets set
123
# to True - see bug 3892. BUT - even though this assertion
124
# fails on Windows the rest of the test passes - so to prove
125
# that we let the rest of the test run. Sadly we can't
126
# make use of raising TestSkipped() here (unittest still
127
# reports it as an error), so we yell to stderr.
129
if sys.platform=="win32":
130
print >> sys.stderr, \
131
"XXX - windows bsddb replication fails on windows and is skipped"
132
print >> sys.stderr, "XXX - Please see issue #3892"
101
134
self.assertTrue(time.time()<timeout)
103
self.assertTrue(time.time()>=timeout)
105
136
d = self.dbenvMaster.repmgr_site_list()
106
137
self.assertEquals(len(d), 1)
120
151
d = self.dbenvMaster.repmgr_stat(flags=db.DB_STAT_CLEAR);
121
152
self.assertTrue("msgs_queued" in d)
154
self.dbMaster=db.DB(self.dbenvMaster)
155
txn=self.dbenvMaster.txn_begin()
156
self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn)
160
timeout=time.time()+10
161
while (time.time()<timeout) and \
162
not (os.path.exists(os.path.join(self.homeDirClient,"test"))) :
165
self.dbClient=db.DB(self.dbenvClient)
167
txn=self.dbenvClient.txn_begin()
169
self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY,
171
except db.DBRepHandleDeadError :
173
self.dbClient.close()
174
self.dbClient=db.DB(self.dbenvClient)
180
txn=self.dbenvMaster.txn_begin()
181
self.dbMaster.put("ABC", "123", txn=txn)
184
timeout=time.time()+10
186
while (time.time()<timeout) and (v==None) :
187
txn=self.dbenvClient.txn_begin()
188
v=self.dbClient.get("ABC", txn=txn)
192
self.assertTrue(time.time()<timeout)
193
self.assertEquals("123", v)
195
txn=self.dbenvMaster.txn_begin()
196
self.dbMaster.delete("ABC", txn=txn)
198
timeout=time.time()+10
199
while (time.time()<timeout) and (v!=None) :
200
txn=self.dbenvClient.txn_begin()
201
v=self.dbClient.get("ABC", txn=txn)
205
self.assertTrue(time.time()<timeout)
206
self.assertEquals(None, v)
208
class DBBaseReplication(DBReplicationManager):
210
DBReplicationManager.setUp(self)
211
def confirmed_master(a,b,c) :
212
if (b == db.DB_EVENT_REP_MASTER) or (b == db.DB_EVENT_REP_ELECTED) :
213
self.confirmed_master = True
215
def client_startupdone(a,b,c) :
216
if b == db.DB_EVENT_REP_STARTUPDONE :
217
self.client_startupdone = True
219
self.dbenvMaster.set_event_notify(confirmed_master)
220
self.dbenvClient.set_event_notify(client_startupdone)
223
self.m2c = Queue.Queue()
224
self.c2m = Queue.Queue()
226
# There are only two nodes, so we don't need to
227
# do any routing decision
228
def m2c(dbenv, control, rec, lsnp, envid, flags) :
229
self.m2c.put((control, rec))
231
def c2m(dbenv, control, rec, lsnp, envid, flags) :
232
self.c2m.put((control, rec))
234
self.dbenvMaster.rep_set_transport(13,m2c)
235
self.dbenvMaster.rep_set_priority(10)
236
self.dbenvClient.rep_set_transport(3,c2m)
237
self.dbenvClient.rep_set_priority(0)
239
self.assertEquals(self.dbenvMaster.rep_get_priority(),10)
240
self.assertEquals(self.dbenvClient.rep_get_priority(),0)
242
#self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
243
#self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
244
#self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
245
#self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
247
def thread_master() :
248
return self.thread_do(self.dbenvMaster, self.c2m, 3,
249
self.master_doing_election, True)
251
def thread_client() :
252
return self.thread_do(self.dbenvClient, self.m2c, 13,
253
self.client_doing_election, False)
255
from threading import Thread
256
t_m=Thread(target=thread_master)
257
t_c=Thread(target=thread_client)
259
if sys.version_info[0] < 3 :
269
self.dbMaster = self.dbClient = None
271
self.master_doing_election=[False]
272
self.client_doing_election=[False]
123
275
def tearDown(self):
124
276
if self.dbClient :
125
277
self.dbClient.close()
126
278
if self.dbMaster :
127
279
self.dbMaster.close()
128
284
self.dbenvClient.close()
129
285
self.dbenvMaster.close()
130
286
test_support.rmtree(self.homeDirClient)
131
287
test_support.rmtree(self.homeDirMaster)
289
def basic_rep_threading(self) :
290
self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER)
291
self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT)
293
def thread_do(env, q, envid, election_status, must_be_master) :
296
if v == None : return
297
env.rep_process_message(v[0], v[1], envid)
299
self.thread_do = thread_do
133
304
def test01_basic_replication(self) :
305
self.basic_rep_threading()
307
# The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
308
# is not generated if the master has no new transactions.
309
# This is solved in BDB 4.6 (#15542).
311
timeout = time.time()+10
312
while (time.time()<timeout) and not (self.confirmed_master and
313
self.client_startupdone) :
315
self.assertTrue(time.time()<timeout)
134
317
self.dbMaster=db.DB(self.dbenvMaster)
135
318
txn=self.dbenvMaster.txn_begin()
136
319
self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn)
161
344
self.dbMaster.put("ABC", "123", txn=txn)
164
timeout=time.time()+1
347
timeout=time.time()+10
166
349
while (time.time()<timeout) and (v==None) :
167
350
txn=self.dbenvClient.txn_begin()
168
351
v=self.dbClient.get("ABC", txn=txn)
355
self.assertTrue(time.time()<timeout)
170
356
self.assertEquals("123", v)
172
358
txn=self.dbenvMaster.txn_begin()
173
359
self.dbMaster.delete("ABC", txn=txn)
175
timeout=time.time()+1
361
timeout=time.time()+10
176
362
while (time.time()<timeout) and (v!=None) :
177
363
txn=self.dbenvClient.txn_begin()
178
364
v=self.dbClient.get("ABC", txn=txn)
368
self.assertTrue(time.time()<timeout)
180
369
self.assertEquals(None, v)
371
if db.version() >= (4,7) :
372
def test02_test_request(self) :
373
self.basic_rep_threading()
374
(minimum, maximum) = self.dbenvClient.rep_get_request()
375
self.dbenvClient.rep_set_request(minimum-1, maximum+1)
376
self.assertEqual(self.dbenvClient.rep_get_request(),
377
(minimum-1, maximum+1))
379
if db.version() >= (4,6) :
380
def test03_master_election(self) :
381
# Get ready to hold an election
382
#self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER)
383
self.dbenvMaster.rep_start(flags=db.DB_REP_CLIENT)
384
self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT)
386
def thread_do(env, q, envid, election_status, must_be_master) :
389
if v == None : return
390
r = env.rep_process_message(v[0],v[1],envid)
391
if must_be_master and self.confirmed_master :
392
self.dbenvMaster.rep_start(flags = db.DB_REP_MASTER)
393
must_be_master = False
395
if r[0] == db.DB_REP_HOLDELECTION :
400
election_status[0] = False
402
except db.DBRepUnavailError :
404
if not election_status[0] and not self.confirmed_master :
405
from threading import Thread
406
election_status[0] = True
407
t=Thread(target=elect)
409
if sys.version_info[0] < 3 :
415
self.thread_do = thread_do
420
self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000)
421
self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000)
422
self.client_doing_election[0] = True
425
self.dbenvClient.rep_elect(2, 1)
426
self.client_doing_election[0] = False
428
except db.DBRepUnavailError :
431
self.assertTrue(self.confirmed_master)
182
433
#----------------------------------------------------------------------
184
435
def test_suite():
185
436
suite = unittest.TestSuite()
186
if db.version() >= (4,5) :
437
if db.version() >= (4, 6) :
187
438
dbenv = db.DBEnv()
189
440
dbenv.repmgr_get_ack_policy()