~juju/pyjuju/zookeeper-vendor

« back to all changes in this revision

Viewing changes to src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java

  • Committer: Gustavo Niemeyer
  • Date: 2011-05-24 20:53:37 UTC
  • Revision ID: gustavo@niemeyer.net-20110524205337-i11yow5biap5xapo
Importing stock Zookeeper 3.3.3 without jars.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/**
 
2
 * Licensed to the Apache Software Foundation (ASF) under one
 
3
 * or more contributor license agreements.  See the NOTICE file
 
4
 * distributed with this work for additional information
 
5
 * regarding copyright ownership.  The ASF licenses this file
 
6
 * to you under the Apache License, Version 2.0 (the
 
7
 * "License"); you may not use this file except in compliance
 
8
 * with the License.  You may obtain a copy of the License at
 
9
 *
 
10
 *     http://www.apache.org/licenses/LICENSE-2.0
 
11
 *
 
12
 * Unless required by applicable law or agreed to in writing, software
 
13
 * distributed under the License is distributed on an "AS IS" BASIS,
 
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
15
 * See the License for the specific language governing permissions and
 
16
 * limitations under the License.
 
17
 */
 
18
 
 
19
package org.apache.zookeeper.test;
 
20
 
 
21
import static org.junit.Assert.assertEquals;
 
22
import static org.junit.Assert.assertNotNull;
 
23
import static org.junit.Assert.assertTrue;
 
24
 
 
25
import java.io.IOException;
 
26
import java.util.Collection;
 
27
import java.util.HashSet;
 
28
import java.util.concurrent.Semaphore;
 
29
import java.util.concurrent.TimeUnit;
 
30
import java.util.concurrent.atomic.AtomicBoolean;
 
31
import java.util.concurrent.atomic.AtomicInteger;
 
32
 
 
33
import org.apache.log4j.Logger;
 
34
import org.apache.zookeeper.AsyncCallback;
 
35
import org.apache.zookeeper.CreateMode;
 
36
import org.apache.zookeeper.KeeperException;
 
37
import org.apache.zookeeper.WatchedEvent;
 
38
import org.apache.zookeeper.Watcher;
 
39
import org.apache.zookeeper.ZooDefs;
 
40
import org.apache.zookeeper.ZooKeeper;
 
41
import org.apache.zookeeper.server.ZKDatabase;
 
42
import org.apache.zookeeper.server.quorum.Leader;
 
43
import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
 
44
import org.junit.Test;
 
45
 
 
46
 
 
47
public class FollowerResyncConcurrencyTest extends QuorumBase {
 
48
    volatile int counter = 0;
 
49
    volatile int errors = 0; 
 
50
 
 
51
    private static final Logger LOG = Logger.getLogger(FollowerResyncConcurrencyTest.class);
 
52
    public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT;
 
53
 
 
54
 
 
55
    /**
 
56
     * See ZOOKEEPER-962. This tests for one of the bugs hit while fixing this,
 
57
     * setting the ZXID of the SNAP packet
 
58
     * Starts up 3 ZKs. Shut down F1, write a node, restart the one that was shut down
 
59
     * The non-leader ZKs are writing to cluster
 
60
     * Shut down F1 again
 
61
     * Restart after sessions are expired, expect to get a snap file
 
62
     * Shut down, run some transactions through.
 
63
     * Restart to a diff while transactions are running in leader
 
64
     * @throws IOException
 
65
     * @throws InterruptedException
 
66
     * @throws KeeperException
 
67
     */
 
68
    @Test
 
69
    public void testResyncBySnapThenDiffAfterFollowerCrashes () 
 
70
    throws IOException, InterruptedException, KeeperException,  Throwable{
 
71
        final Semaphore sem = new Semaphore(0);
 
72
 
 
73
        QuorumUtil qu = new QuorumUtil(1);
 
74
        qu.startAll();
 
75
        CountdownWatcher watcher1 = new CountdownWatcher();
 
76
        CountdownWatcher watcher2 = new CountdownWatcher();
 
77
        CountdownWatcher watcher3 = new CountdownWatcher();
 
78
 
 
79
        int index = 1;
 
80
        while(qu.getPeer(index).peer.leader == null)
 
81
            index++;
 
82
 
 
83
        Leader leader = qu.getPeer(index).peer.leader;
 
84
 
 
85
        assertNotNull(leader);    
 
86
        /*
 
87
         * Reusing the index variable to select a follower to connect to
 
88
         */
 
89
        index = (index == 1) ? 2 : 1;
 
90
        qu.shutdown(index);
 
91
        final ZooKeeper zk3 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(3).peer.getClientPort(), 1000,watcher3);
 
92
        watcher3.waitForConnected(CONNECTION_TIMEOUT);
 
93
        zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
 
94
 
 
95
        qu.restart(index);
 
96
        ZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000, watcher1);
 
97
 
 
98
        ZooKeeper zk2 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000, watcher2);
 
99
    
 
100
        watcher1.waitForConnected(CONNECTION_TIMEOUT);
 
101
        watcher2.waitForConnected(CONNECTION_TIMEOUT);
 
102
        
 
103
        zk.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);      
 
104
        Thread t = new Thread(new Runnable() {
 
105
 
 
106
            @Override
 
107
            public void run() {
 
108
                for(int i = 0; i < 1000; i++) {
 
109
                    zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
 
110
 
 
111
                        @Override
 
112
                        public void processResult(int rc, String path, Object ctx, String name) {
 
113
                            counter++;
 
114
                            if (rc != 0) {
 
115
                                errors++;
 
116
                            }
 
117
                            if(counter == 14200){
 
118
                                sem.release();
 
119
                            }
 
120
 
 
121
 
 
122
                        }
 
123
                    }, null);
 
124
                    if(i%10==0){
 
125
                        try {
 
126
                            Thread.sleep(100);
 
127
                        } catch (Exception e) {
 
128
 
 
129
                        }
 
130
                    }
 
131
                }
 
132
 
 
133
            }
 
134
        });
 
135
 
 
136
        
 
137
        for(int i = 0; i < 13000; i++) {
 
138
            zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
 
139
 
 
140
                @Override
 
141
                public void processResult(int rc, String path, Object ctx, String name) {
 
142
                    counter++;
 
143
                    if (rc != 0) {
 
144
                        errors++;
 
145
                    }
 
146
                    if(counter == 14200){
 
147
                        sem.release();
 
148
                    }
 
149
 
 
150
 
 
151
                }
 
152
            }, null);            
 
153
 
 
154
            if(i == 5000){
 
155
                qu.shutdown(index);               
 
156
                LOG.info("Shutting down s1");
 
157
            }
 
158
            if(i == 12000){
 
159
                //Restart off of snap, then get some txns for a log, then shut down
 
160
                qu.restart(index);       
 
161
                Thread.sleep(300);
 
162
                qu.shutdown(index);
 
163
                t.start();
 
164
                Thread.sleep(300);                
 
165
                qu.restart(index);
 
166
                LOG.info("Setting up server: " + index);
 
167
            }
 
168
            if((i % 1000) == 0){
 
169
                Thread.sleep(1000);
 
170
            }
 
171
 
 
172
            if(i%50 == 0) {
 
173
                zk2.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
 
174
 
 
175
                    @Override
 
176
                    public void processResult(int rc, String path, Object ctx, String name) {
 
177
                        counter++;
 
178
                        if (rc != 0) {
 
179
                            errors++;
 
180
                        }
 
181
                        if(counter == 14200){
 
182
                            sem.release();
 
183
                        }
 
184
 
 
185
 
 
186
                    }
 
187
                }, null);
 
188
            }
 
189
        }
 
190
 
 
191
        // Wait until all updates return
 
192
        if(!sem.tryAcquire(20000, TimeUnit.MILLISECONDS)) {
 
193
            LOG.warn("Did not aquire semaphore fast enough");
 
194
        }
 
195
        t.join(10000);
 
196
        Thread.sleep(1000);
 
197
        
 
198
            verifyState(qu, index, leader);
 
199
        
 
200
    }      
 
201
    
 
202
    /**
 
203
     * This test:
 
204
     * Starts up 3 ZKs. The non-leader ZKs are writing to cluster
 
205
     * Shut down one of the non-leader ZKs. 
 
206
     * Restart after sessions have expired but <500 txns have taken place (get a diff)
 
207
     * Shut down immediately after restarting, start running separate thread with other transactions
 
208
     * Restart to a diff while transactions are running in leader
 
209
     * 
 
210
     * 
 
211
     * Before fixes for ZOOKEEPER-962, restarting off of diff could get an inconsistent view of data missing transactions that
 
212
     * completed during diff syncing. Follower would also be considered "restarted" before all forwarded transactions
 
213
     * were completely processed, so restarting would cause a snap file with a too-high zxid to be written, and transactions
 
214
     * would be missed
 
215
     * 
 
216
     * This test should pretty reliably catch the failure of restarting the server before all diff messages have been processed,
 
217
     * however, due to the transient nature of the system it may not catch failures due to concurrent processing of transactions
 
218
     * during the leader's diff forwarding.
 
219
     * 
 
220
     * @throws IOException
 
221
     * @throws InterruptedException
 
222
     * @throws KeeperException
 
223
     * @throws Throwable
 
224
     */
 
225
 
 
226
    @Test
 
227
    public void testResyncByDiffAfterFollowerCrashes () 
 
228
    throws IOException, InterruptedException, KeeperException, Throwable{
 
229
        final Semaphore sem = new Semaphore(0);
 
230
 
 
231
        QuorumUtil qu = new QuorumUtil(1);
 
232
        qu.startAll();
 
233
        CountdownWatcher watcher1 = new CountdownWatcher();
 
234
        CountdownWatcher watcher2 = new CountdownWatcher();
 
235
        CountdownWatcher watcher3 = new CountdownWatcher();
 
236
 
 
237
 
 
238
        int index = 1;
 
239
        while(qu.getPeer(index).peer.leader == null)
 
240
            index++;
 
241
 
 
242
        Leader leader = qu.getPeer(index).peer.leader;
 
243
 
 
244
        assertNotNull(leader);
 
245
 
 
246
        /*
 
247
         * Reusing the index variable to select a follower to connect to
 
248
         */
 
249
        index = (index == 1) ? 2 : 1;
 
250
 
 
251
        ZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000, watcher1);
 
252
 
 
253
        ZooKeeper zk2 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000,watcher2);
 
254
        final ZooKeeper zk3 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(3).peer.getClientPort(), 1000, watcher3);
 
255
        watcher1.waitForConnected(CONNECTION_TIMEOUT);
 
256
        watcher2.waitForConnected(CONNECTION_TIMEOUT);
 
257
        watcher3.waitForConnected(CONNECTION_TIMEOUT);
 
258
        zk.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);      
 
259
        zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
 
260
 
 
261
        
 
262
        final AtomicBoolean runNow = new AtomicBoolean(false);
 
263
        Thread t = new Thread(new Runnable() {
 
264
 
 
265
            @Override
 
266
            public void run() {                                
 
267
                int inSyncCounter = 0;
 
268
                while(inSyncCounter < 400) {    
 
269
                    if(runNow.get()) {
 
270
                        zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
 
271
 
 
272
                            @Override
 
273
                            public void processResult(int rc, String path, Object ctx, String name) {
 
274
                                counter++;
 
275
                                if (rc != 0) {
 
276
                                    errors++;
 
277
                                }
 
278
                                if(counter > 7300){
 
279
                                    sem.release();
 
280
                                }
 
281
 
 
282
 
 
283
                            }
 
284
                        }, null);
 
285
                        
 
286
                        try {
 
287
                            Thread.sleep(10);
 
288
                        } catch (Exception e) {
 
289
                        }
 
290
                        inSyncCounter++;
 
291
                    }
 
292
                    else {
 
293
                        Thread.yield();
 
294
                    }
 
295
                }
 
296
 
 
297
            }
 
298
        });
 
299
 
 
300
        t.start();
 
301
        for(int i = 0; i < 5000; i++) {
 
302
            zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
 
303
 
 
304
                @Override
 
305
                public void processResult(int rc, String path, Object ctx, String name) {
 
306
                    counter++;
 
307
                    if (rc != 0) {
 
308
                        errors++;
 
309
                    }
 
310
                    if(counter > 7300){
 
311
                        sem.release();
 
312
                    }
 
313
 
 
314
 
 
315
                }
 
316
            }, null);            
 
317
 
 
318
            if(i == 1000){
 
319
                qu.shutdown(index);      
 
320
                Thread.sleep(1100);
 
321
                LOG.info("Shutting down s1");
 
322
 
 
323
            }
 
324
            if(i == 1100 || i == 1150 || i == 1200) {
 
325
                Thread.sleep(1000);
 
326
            }
 
327
            
 
328
            if(i == 1200){
 
329
                qu.startThenShutdown(index);                                
 
330
                runNow.set(true);
 
331
                qu.restart(index);
 
332
                LOG.info("Setting up server: " + index);
 
333
            }
 
334
        
 
335
 
 
336
            if(i>=1000 &&  i%2== 0) {
 
337
                zk3.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
 
338
 
 
339
                    @Override
 
340
                    public void processResult(int rc, String path, Object ctx, String name) {
 
341
                        counter++;
 
342
                        if (rc != 0) {
 
343
                            errors++;
 
344
                        }
 
345
                        if(counter > 7300){
 
346
                            sem.release();
 
347
                        }
 
348
 
 
349
 
 
350
                    }
 
351
                }, null);
 
352
            }
 
353
            if(i == 1050 || i == 1100 || i == 1150) {
 
354
                Thread.sleep(1000);
 
355
            }
 
356
        }
 
357
 
 
358
        // Wait until all updates return
 
359
        if(!sem.tryAcquire(15000, TimeUnit.MILLISECONDS)) {
 
360
            LOG.warn("Did not aquire semaphore fast enough");
 
361
        }
 
362
        t.join(10000);
 
363
        Thread.sleep(1000);
 
364
        // Verify that server is following and has the same epoch as the leader
 
365
        
 
366
        verifyState(qu, index, leader);
 
367
        
 
368
    }
 
369
 
 
370
    private void verifyState(QuorumUtil qu, int index, Leader leader) {
 
371
        assertTrue("Not following", qu.getPeer(index).peer.follower != null);
 
372
        long epochF = (qu.getPeer(index).peer.getActiveServer().getZxid() >> 32L);
 
373
        long epochL = (leader.getEpoch() >> 32L);
 
374
        assertTrue("Zxid: " + qu.getPeer(index).peer.getActiveServer().getZKDatabase().getDataTreeLastProcessedZxid() + 
 
375
                "Current epoch: " + epochF, epochF == epochL);
 
376
        int leaderIndex = (index == 1) ? 2 : 1;    
 
377
        Collection<Long> sessionsRestarted = qu.getPeer(index).peer.getActiveServer().getZKDatabase().getSessions();
 
378
        Collection<Long> sessionsNotRestarted = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase().getSessions();
 
379
        
 
380
        for(Long l : sessionsRestarted) {
 
381
            assertTrue("Should have same set of sessions in both servers, did not expect: " + l, sessionsNotRestarted.contains(l));        
 
382
        }      
 
383
        assertEquals("Should have same number of sessions", sessionsNotRestarted.size(), sessionsRestarted.size());
 
384
        ZKDatabase restarted = qu.getPeer(index).peer.getActiveServer().getZKDatabase();
 
385
        ZKDatabase clean =  qu.getPeer(3).peer.getActiveServer().getZKDatabase();
 
386
        ZKDatabase lead = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase();
 
387
        for(Long l : sessionsRestarted) {
 
388
            assertTrue("Should have same set of sessions in both servers, did not expect: " + l, sessionsNotRestarted.contains(l));
 
389
            HashSet ephemerals = restarted.getEphemerals(l);
 
390
            HashSet cleanEphemerals = clean.getEphemerals(l);
 
391
            for(Object o : cleanEphemerals) {
 
392
                if(!ephemerals.contains(o)) {
 
393
                    LOG.info("Restarted follower doesn't contain ephemeral " + o);
 
394
                }
 
395
            }
 
396
            HashSet leadEphemerals = lead.getEphemerals(l);
 
397
            for(Object o : leadEphemerals) {
 
398
                if(!cleanEphemerals.contains(o)) {
 
399
                    LOG.info("Follower doesn't contain ephemeral from leader " + o);
 
400
                }
 
401
            }
 
402
            assertEquals("Should have same number of ephemerals in both followers", ephemerals.size(), cleanEphemerals.size());            
 
403
            assertEquals("Leader should equal follower", lead.getEphemerals(l).size(), cleanEphemerals.size());
 
404
        }
 
405
    }      
 
406
}