2
* Licensed to the Apache Software Foundation (ASF) under one
3
* or more contributor license agreements. See the NOTICE file
4
* distributed with this work for additional information
5
* regarding copyright ownership. The ASF licenses this file
6
* to you under the Apache License, Version 2.0 (the
7
* "License"); you may not use this file except in compliance
8
* with the License. You may obtain a copy of the License at
10
* http://www.apache.org/licenses/LICENSE-2.0
12
* Unless required by applicable law or agreed to in writing, software
13
* distributed under the License is distributed on an "AS IS" BASIS,
14
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
* See the License for the specific language governing permissions and
16
* limitations under the License.
18
package org.apache.zookeeper.retry;
20
import java.io.IOException;
21
import java.util.ArrayList;
22
import java.util.List;
24
import org.apache.zookeeper.CreateMode;
25
import org.apache.zookeeper.KeeperException;
26
import org.apache.zookeeper.Watcher;
27
import org.apache.zookeeper.ZooKeeper;
28
import org.apache.zookeeper.data.ACL;
29
import org.apache.zookeeper.data.Stat;
30
import org.apache.zookeeper.inspector.logger.LoggerFactory;
33
* A Class which extends {@link ZooKeeper} and will automatically retry calls to
34
* zookeeper if a {@link KeeperException.ConnectionLossException} occurs
36
public class ZooKeeperRetry extends ZooKeeper {
38
private boolean closed = false;
39
private final Watcher watcher;
40
private int limit = -1;
43
* @param connectString
44
* @param sessionTimeout
48
public ZooKeeperRetry(String connectString, int sessionTimeout,
49
Watcher watcher) throws IOException {
50
super(connectString, sessionTimeout, watcher);
51
this.watcher = watcher;
55
* @param connectString
56
* @param sessionTimeout
59
* @param sessionPasswd
62
public ZooKeeperRetry(String connectString, int sessionTimeout,
63
Watcher watcher, long sessionId, byte[] sessionPasswd)
65
super(connectString, sessionTimeout, watcher, sessionId, sessionPasswd);
66
this.watcher = watcher;
70
public synchronized void close() throws InterruptedException {
76
public String create(String path, byte[] data, List<ACL> acl,
77
CreateMode createMode) throws KeeperException, InterruptedException {
81
return super.create(path, data, acl, createMode);
82
} catch (KeeperException.ConnectionLossException e) {
83
LoggerFactory.getLogger().warn(
84
"ZooKeeper connection lost. Trying to reconnect.");
85
if (exists(path, false) != null) {
88
} catch (KeeperException.NodeExistsException e) {
91
} while (!closed && (limit == -1 || count++ < limit));
96
public void delete(String path, int version) throws InterruptedException,
101
super.delete(path, version);
102
} catch (KeeperException.ConnectionLossException e) {
103
LoggerFactory.getLogger().warn(
104
"ZooKeeper connection lost. Trying to reconnect.");
105
if (exists(path, false) == null) {
108
} catch (KeeperException.NoNodeException e) {
111
} while (!closed && (limit == -1 || count++ < limit));
115
public Stat exists(String path, boolean watch) throws KeeperException,
116
InterruptedException {
120
return super.exists(path, watch ? watcher : null);
121
} catch (KeeperException.ConnectionLossException e) {
122
LoggerFactory.getLogger().warn(
123
"ZooKeeper connection lost. Trying to reconnect.");
125
} while (!closed && (limit == -1 || count++ < limit));
130
public Stat exists(String path, Watcher watcher) throws KeeperException,
131
InterruptedException {
135
return super.exists(path, watcher);
136
} catch (KeeperException.ConnectionLossException e) {
137
LoggerFactory.getLogger().warn(
138
"ZooKeeper connection lost. Trying to reconnect.");
140
} while (!closed && (limit == -1 || count++ < limit));
145
public List<ACL> getACL(String path, Stat stat) throws KeeperException,
146
InterruptedException {
150
return super.getACL(path, stat);
151
} catch (KeeperException.ConnectionLossException e) {
152
LoggerFactory.getLogger().warn(
153
"ZooKeeper connection lost. Trying to reconnect.");
155
} while (!closed && (limit == -1 || count++ < limit));
160
public List<String> getChildren(String path, boolean watch)
161
throws KeeperException, InterruptedException {
165
return super.getChildren(path, watch ? watcher : null);
166
} catch (KeeperException.ConnectionLossException e) {
167
LoggerFactory.getLogger().warn(
168
"ZooKeeper connection lost. Trying to reconnect.");
170
} while (!closed && (limit == -1 || count++ < limit));
171
return new ArrayList<String>();
175
public List<String> getChildren(String path, Watcher watcher)
176
throws KeeperException, InterruptedException {
180
return super.getChildren(path, watcher);
181
} catch (KeeperException.ConnectionLossException e) {
182
LoggerFactory.getLogger().warn(
183
"ZooKeeper connection lost. Trying to reconnect.");
185
} while (!closed && (limit == -1 || count++ < limit));
186
return new ArrayList<String>();
190
public byte[] getData(String path, boolean watch, Stat stat)
191
throws KeeperException, InterruptedException {
195
return super.getData(path, watch ? watcher : null, stat);
196
} catch (KeeperException.ConnectionLossException e) {
197
LoggerFactory.getLogger().warn(
198
"ZooKeeper connection lost. Trying to reconnect.");
200
} while (!closed && (limit == -1 || count++ < limit));
205
public byte[] getData(String path, Watcher watcher, Stat stat)
206
throws KeeperException, InterruptedException {
210
return super.getData(path, watcher, stat);
211
} catch (KeeperException.ConnectionLossException e) {
212
LoggerFactory.getLogger().warn(
213
"ZooKeeper connection lost. Trying to reconnect.");
215
} while (!closed && (limit == -1 || count++ < limit));
220
public Stat setACL(String path, List<ACL> acl, int version)
221
throws KeeperException, InterruptedException {
225
return super.setACL(path, acl, version);
226
} catch (KeeperException.ConnectionLossException e) {
227
LoggerFactory.getLogger().warn(
228
"ZooKeeper connection lost. Trying to reconnect.");
229
Stat s = exists(path, false);
231
if (getACL(path, s).equals(acl)) {
238
} while (!closed && (limit == -1 || count++ < limit));
243
public Stat setData(String path, byte[] data, int version)
244
throws KeeperException, InterruptedException {
248
return super.setData(path, data, version);
249
} catch (KeeperException.ConnectionLossException e) {
250
LoggerFactory.getLogger().warn(
251
"ZooKeeper connection lost. Trying to reconnect.");
252
Stat s = exists(path, false);
254
if (getData(path, false, s) == data) {
261
} while (!closed && (limit == -1 || count++ < limit));
268
public void setRetryLimit(int limit) {
273
* @return true if successfully connected to zookeeper
275
public boolean testConnection() {
279
return super.exists("/", null) != null;
280
} catch (Exception e) {
281
LoggerFactory.getLogger().warn(
282
"ZooKeeper connection lost. Trying to reconnect.");
284
} while (count++ < 5);