2
* Licensed to the Apache Software Foundation (ASF) under one
3
* or more contributor license agreements. See the NOTICE file
4
* distributed with this work for additional information
5
* regarding copyright ownership. The ASF licenses this file
6
* to you under the Apache License, Version 2.0 (the
7
* "License"); you may not use this file except in compliance
8
* with the License. You may obtain a copy of the License at
10
* http://www.apache.org/licenses/LICENSE-2.0
12
* Unless required by applicable law or agreed to in writing, software
13
* distributed under the License is distributed on an "AS IS" BASIS,
14
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
* See the License for the specific language governing permissions and
16
* limitations under the License.
19
package org.apache.zookeeper.test;
21
import static org.junit.Assert.assertEquals;
22
import static org.junit.Assert.assertNotNull;
23
import static org.junit.Assert.assertTrue;
25
import java.io.IOException;
26
import java.util.Collection;
27
import java.util.HashSet;
28
import java.util.concurrent.Semaphore;
29
import java.util.concurrent.TimeUnit;
30
import java.util.concurrent.atomic.AtomicBoolean;
31
import java.util.concurrent.atomic.AtomicInteger;
33
import org.apache.log4j.Logger;
34
import org.apache.zookeeper.AsyncCallback;
35
import org.apache.zookeeper.CreateMode;
36
import org.apache.zookeeper.KeeperException;
37
import org.apache.zookeeper.WatchedEvent;
38
import org.apache.zookeeper.Watcher;
39
import org.apache.zookeeper.ZooDefs;
40
import org.apache.zookeeper.ZooKeeper;
41
import org.apache.zookeeper.server.ZKDatabase;
42
import org.apache.zookeeper.server.quorum.Leader;
43
import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
44
import org.junit.Test;
47
public class FollowerResyncConcurrencyTest extends QuorumBase {
48
volatile int counter = 0;
49
volatile int errors = 0;
51
private static final Logger LOG = Logger.getLogger(FollowerResyncConcurrencyTest.class);
52
public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT;
56
* See ZOOKEEPER-962. This tests for one of the bugs hit while fixing this,
57
* setting the ZXID of the SNAP packet
58
* Starts up 3 ZKs. Shut down F1, write a node, restart the one that was shut down
59
* The non-leader ZKs are writing to cluster
61
* Restart after sessions are expired, expect to get a snap file
62
* Shut down, run some transactions through.
63
* Restart to a diff while transactions are running in leader
65
* @throws InterruptedException
66
* @throws KeeperException
69
public void testResyncBySnapThenDiffAfterFollowerCrashes ()
70
throws IOException, InterruptedException, KeeperException, Throwable{
71
final Semaphore sem = new Semaphore(0);
73
QuorumUtil qu = new QuorumUtil(1);
75
CountdownWatcher watcher1 = new CountdownWatcher();
76
CountdownWatcher watcher2 = new CountdownWatcher();
77
CountdownWatcher watcher3 = new CountdownWatcher();
80
while(qu.getPeer(index).peer.leader == null)
83
Leader leader = qu.getPeer(index).peer.leader;
85
assertNotNull(leader);
87
* Reusing the index variable to select a follower to connect to
89
index = (index == 1) ? 2 : 1;
91
final ZooKeeper zk3 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(3).peer.getClientPort(), 1000,watcher3);
92
watcher3.waitForConnected(CONNECTION_TIMEOUT);
93
zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
96
ZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000, watcher1);
98
ZooKeeper zk2 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000, watcher2);
100
watcher1.waitForConnected(CONNECTION_TIMEOUT);
101
watcher2.waitForConnected(CONNECTION_TIMEOUT);
103
zk.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
104
Thread t = new Thread(new Runnable() {
108
for(int i = 0; i < 1000; i++) {
109
zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
112
public void processResult(int rc, String path, Object ctx, String name) {
117
if(counter == 14200){
127
} catch (Exception e) {
137
for(int i = 0; i < 13000; i++) {
138
zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
141
public void processResult(int rc, String path, Object ctx, String name) {
146
if(counter == 14200){
156
LOG.info("Shutting down s1");
159
//Restart off of snap, then get some txns for a log, then shut down
166
LOG.info("Setting up server: " + index);
173
zk2.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
176
public void processResult(int rc, String path, Object ctx, String name) {
181
if(counter == 14200){
191
// Wait until all updates return
192
if(!sem.tryAcquire(20000, TimeUnit.MILLISECONDS)) {
193
LOG.warn("Did not aquire semaphore fast enough");
198
verifyState(qu, index, leader);
204
* Starts up 3 ZKs. The non-leader ZKs are writing to cluster
205
* Shut down one of the non-leader ZKs.
206
* Restart after sessions have expired but <500 txns have taken place (get a diff)
207
* Shut down immediately after restarting, start running separate thread with other transactions
208
* Restart to a diff while transactions are running in leader
211
* Before fixes for ZOOKEEPER-962, restarting off of diff could get an inconsistent view of data missing transactions that
212
* completed during diff syncing. Follower would also be considered "restarted" before all forwarded transactions
213
* were completely processed, so restarting would cause a snap file with a too-high zxid to be written, and transactions
216
* This test should pretty reliably catch the failure of restarting the server before all diff messages have been processed,
217
* however, due to the transient nature of the system it may not catch failures due to concurrent processing of transactions
218
* during the leader's diff forwarding.
220
* @throws IOException
221
* @throws InterruptedException
222
* @throws KeeperException
227
public void testResyncByDiffAfterFollowerCrashes ()
228
throws IOException, InterruptedException, KeeperException, Throwable{
229
final Semaphore sem = new Semaphore(0);
231
QuorumUtil qu = new QuorumUtil(1);
233
CountdownWatcher watcher1 = new CountdownWatcher();
234
CountdownWatcher watcher2 = new CountdownWatcher();
235
CountdownWatcher watcher3 = new CountdownWatcher();
239
while(qu.getPeer(index).peer.leader == null)
242
Leader leader = qu.getPeer(index).peer.leader;
244
assertNotNull(leader);
247
* Reusing the index variable to select a follower to connect to
249
index = (index == 1) ? 2 : 1;
251
ZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000, watcher1);
253
ZooKeeper zk2 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000,watcher2);
254
final ZooKeeper zk3 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(3).peer.getClientPort(), 1000, watcher3);
255
watcher1.waitForConnected(CONNECTION_TIMEOUT);
256
watcher2.waitForConnected(CONNECTION_TIMEOUT);
257
watcher3.waitForConnected(CONNECTION_TIMEOUT);
258
zk.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
259
zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
262
final AtomicBoolean runNow = new AtomicBoolean(false);
263
Thread t = new Thread(new Runnable() {
267
int inSyncCounter = 0;
268
while(inSyncCounter < 400) {
270
zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
273
public void processResult(int rc, String path, Object ctx, String name) {
288
} catch (Exception e) {
301
for(int i = 0; i < 5000; i++) {
302
zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
305
public void processResult(int rc, String path, Object ctx, String name) {
321
LOG.info("Shutting down s1");
324
if(i == 1100 || i == 1150 || i == 1200) {
329
qu.startThenShutdown(index);
332
LOG.info("Setting up server: " + index);
336
if(i>=1000 && i%2== 0) {
337
zk3.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
340
public void processResult(int rc, String path, Object ctx, String name) {
353
if(i == 1050 || i == 1100 || i == 1150) {
358
// Wait until all updates return
359
if(!sem.tryAcquire(15000, TimeUnit.MILLISECONDS)) {
360
LOG.warn("Did not aquire semaphore fast enough");
364
// Verify that server is following and has the same epoch as the leader
366
verifyState(qu, index, leader);
370
private void verifyState(QuorumUtil qu, int index, Leader leader) {
371
assertTrue("Not following", qu.getPeer(index).peer.follower != null);
372
long epochF = (qu.getPeer(index).peer.getActiveServer().getZxid() >> 32L);
373
long epochL = (leader.getEpoch() >> 32L);
374
assertTrue("Zxid: " + qu.getPeer(index).peer.getActiveServer().getZKDatabase().getDataTreeLastProcessedZxid() +
375
"Current epoch: " + epochF, epochF == epochL);
376
int leaderIndex = (index == 1) ? 2 : 1;
377
Collection<Long> sessionsRestarted = qu.getPeer(index).peer.getActiveServer().getZKDatabase().getSessions();
378
Collection<Long> sessionsNotRestarted = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase().getSessions();
380
for(Long l : sessionsRestarted) {
381
assertTrue("Should have same set of sessions in both servers, did not expect: " + l, sessionsNotRestarted.contains(l));
383
assertEquals("Should have same number of sessions", sessionsNotRestarted.size(), sessionsRestarted.size());
384
ZKDatabase restarted = qu.getPeer(index).peer.getActiveServer().getZKDatabase();
385
ZKDatabase clean = qu.getPeer(3).peer.getActiveServer().getZKDatabase();
386
ZKDatabase lead = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase();
387
for(Long l : sessionsRestarted) {
388
assertTrue("Should have same set of sessions in both servers, did not expect: " + l, sessionsNotRestarted.contains(l));
389
HashSet ephemerals = restarted.getEphemerals(l);
390
HashSet cleanEphemerals = clean.getEphemerals(l);
391
for(Object o : cleanEphemerals) {
392
if(!ephemerals.contains(o)) {
393
LOG.info("Restarted follower doesn't contain ephemeral " + o);
396
HashSet leadEphemerals = lead.getEphemerals(l);
397
for(Object o : leadEphemerals) {
398
if(!cleanEphemerals.contains(o)) {
399
LOG.info("Follower doesn't contain ephemeral from leader " + o);
402
assertEquals("Should have same number of ephemerals in both followers", ephemerals.size(), cleanEphemerals.size());
403
assertEquals("Leader should equal follower", lead.getEphemerals(l).size(), cleanEphemerals.size());