~juju/pyjuju/zookeeper-vendor

« back to all changes in this revision

Viewing changes to src/contrib/zooinspector/src/java/org/apache/zookeeper/retry/ZooKeeperRetry.java

  • Committer: Gustavo Niemeyer
  • Date: 2011-05-24 20:53:37 UTC
  • Revision ID: gustavo@niemeyer.net-20110524205337-i11yow5biap5xapo
Importing stock Zookeeper 3.3.3 without jars.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/**
 
2
 * Licensed to the Apache Software Foundation (ASF) under one
 
3
 * or more contributor license agreements.  See the NOTICE file
 
4
 * distributed with this work for additional information
 
5
 * regarding copyright ownership.  The ASF licenses this file
 
6
 * to you under the Apache License, Version 2.0 (the
 
7
 * "License"); you may not use this file except in compliance
 
8
 * with the License.  You may obtain a copy of the License at
 
9
 *
 
10
 *     http://www.apache.org/licenses/LICENSE-2.0
 
11
 *
 
12
 * Unless required by applicable law or agreed to in writing, software
 
13
 * distributed under the License is distributed on an "AS IS" BASIS,
 
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
15
 * See the License for the specific language governing permissions and
 
16
 * limitations under the License.
 
17
 */
 
18
package org.apache.zookeeper.retry;
 
19
 
 
20
import java.io.IOException;
 
21
import java.util.ArrayList;
 
22
import java.util.List;
 
23
 
 
24
import org.apache.zookeeper.CreateMode;
 
25
import org.apache.zookeeper.KeeperException;
 
26
import org.apache.zookeeper.Watcher;
 
27
import org.apache.zookeeper.ZooKeeper;
 
28
import org.apache.zookeeper.data.ACL;
 
29
import org.apache.zookeeper.data.Stat;
 
30
import org.apache.zookeeper.inspector.logger.LoggerFactory;
 
31
 
 
32
/**
 
33
 * A Class which extends {@link ZooKeeper} and will automatically retry calls to
 
34
 * zookeeper if a {@link KeeperException.ConnectionLossException} occurs
 
35
 */
 
36
public class ZooKeeperRetry extends ZooKeeper {
 
37
 
 
38
    private boolean closed = false;
 
39
    private final Watcher watcher;
 
40
    private int limit = -1;
 
41
 
 
42
    /**
 
43
     * @param connectString
 
44
     * @param sessionTimeout
 
45
     * @param watcher
 
46
     * @throws IOException
 
47
     */
 
48
    public ZooKeeperRetry(String connectString, int sessionTimeout,
 
49
            Watcher watcher) throws IOException {
 
50
        super(connectString, sessionTimeout, watcher);
 
51
        this.watcher = watcher;
 
52
    }
 
53
 
 
54
    /**
 
55
     * @param connectString
 
56
     * @param sessionTimeout
 
57
     * @param watcher
 
58
     * @param sessionId
 
59
     * @param sessionPasswd
 
60
     * @throws IOException
 
61
     */
 
62
    public ZooKeeperRetry(String connectString, int sessionTimeout,
 
63
            Watcher watcher, long sessionId, byte[] sessionPasswd)
 
64
            throws IOException {
 
65
        super(connectString, sessionTimeout, watcher, sessionId, sessionPasswd);
 
66
        this.watcher = watcher;
 
67
    }
 
68
 
 
69
    @Override
 
70
    public synchronized void close() throws InterruptedException {
 
71
        this.closed = true;
 
72
        super.close();
 
73
    }
 
74
 
 
75
    @Override
 
76
    public String create(String path, byte[] data, List<ACL> acl,
 
77
            CreateMode createMode) throws KeeperException, InterruptedException {
 
78
        int count = 0;
 
79
        do {
 
80
            try {
 
81
                return super.create(path, data, acl, createMode);
 
82
            } catch (KeeperException.ConnectionLossException e) {
 
83
                LoggerFactory.getLogger().warn(
 
84
                        "ZooKeeper connection lost.  Trying to reconnect.");
 
85
                if (exists(path, false) != null) {
 
86
                    return path;
 
87
                }
 
88
            } catch (KeeperException.NodeExistsException e) {
 
89
                return path;
 
90
            }
 
91
        } while (!closed && (limit == -1 || count++ < limit));
 
92
        return null;
 
93
    }
 
94
 
 
95
    @Override
 
96
    public void delete(String path, int version) throws InterruptedException,
 
97
            KeeperException {
 
98
        int count = 0;
 
99
        do {
 
100
            try {
 
101
                super.delete(path, version);
 
102
            } catch (KeeperException.ConnectionLossException e) {
 
103
                LoggerFactory.getLogger().warn(
 
104
                        "ZooKeeper connection lost.  Trying to reconnect.");
 
105
                if (exists(path, false) == null) {
 
106
                    return;
 
107
                }
 
108
            } catch (KeeperException.NoNodeException e) {
 
109
                break;
 
110
            }
 
111
        } while (!closed && (limit == -1 || count++ < limit));
 
112
    }
 
113
 
 
114
    @Override
 
115
    public Stat exists(String path, boolean watch) throws KeeperException,
 
116
            InterruptedException {
 
117
        int count = 0;
 
118
        do {
 
119
            try {
 
120
                return super.exists(path, watch ? watcher : null);
 
121
            } catch (KeeperException.ConnectionLossException e) {
 
122
                LoggerFactory.getLogger().warn(
 
123
                        "ZooKeeper connection lost.  Trying to reconnect.");
 
124
            }
 
125
        } while (!closed && (limit == -1 || count++ < limit));
 
126
        return null;
 
127
    }
 
128
 
 
129
    @Override
 
130
    public Stat exists(String path, Watcher watcher) throws KeeperException,
 
131
            InterruptedException {
 
132
        int count = 0;
 
133
        do {
 
134
            try {
 
135
                return super.exists(path, watcher);
 
136
            } catch (KeeperException.ConnectionLossException e) {
 
137
                LoggerFactory.getLogger().warn(
 
138
                        "ZooKeeper connection lost.  Trying to reconnect.");
 
139
            }
 
140
        } while (!closed && (limit == -1 || count++ < limit));
 
141
        return null;
 
142
    }
 
143
 
 
144
    @Override
 
145
    public List<ACL> getACL(String path, Stat stat) throws KeeperException,
 
146
            InterruptedException {
 
147
        int count = 0;
 
148
        do {
 
149
            try {
 
150
                return super.getACL(path, stat);
 
151
            } catch (KeeperException.ConnectionLossException e) {
 
152
                LoggerFactory.getLogger().warn(
 
153
                        "ZooKeeper connection lost.  Trying to reconnect.");
 
154
            }
 
155
        } while (!closed && (limit == -1 || count++ < limit));
 
156
        return null;
 
157
    }
 
158
 
 
159
    @Override
 
160
    public List<String> getChildren(String path, boolean watch)
 
161
            throws KeeperException, InterruptedException {
 
162
        int count = 0;
 
163
        do {
 
164
            try {
 
165
                return super.getChildren(path, watch ? watcher : null);
 
166
            } catch (KeeperException.ConnectionLossException e) {
 
167
                LoggerFactory.getLogger().warn(
 
168
                        "ZooKeeper connection lost.  Trying to reconnect.");
 
169
            }
 
170
        } while (!closed && (limit == -1 || count++ < limit));
 
171
        return new ArrayList<String>();
 
172
    }
 
173
 
 
174
    @Override
 
175
    public List<String> getChildren(String path, Watcher watcher)
 
176
            throws KeeperException, InterruptedException {
 
177
        int count = 0;
 
178
        do {
 
179
            try {
 
180
                return super.getChildren(path, watcher);
 
181
            } catch (KeeperException.ConnectionLossException e) {
 
182
                LoggerFactory.getLogger().warn(
 
183
                        "ZooKeeper connection lost.  Trying to reconnect.");
 
184
            }
 
185
        } while (!closed && (limit == -1 || count++ < limit));
 
186
        return new ArrayList<String>();
 
187
    }
 
188
 
 
189
    @Override
 
190
    public byte[] getData(String path, boolean watch, Stat stat)
 
191
            throws KeeperException, InterruptedException {
 
192
        int count = 0;
 
193
        do {
 
194
            try {
 
195
                return super.getData(path, watch ? watcher : null, stat);
 
196
            } catch (KeeperException.ConnectionLossException e) {
 
197
                LoggerFactory.getLogger().warn(
 
198
                        "ZooKeeper connection lost.  Trying to reconnect.");
 
199
            }
 
200
        } while (!closed && (limit == -1 || count++ < limit));
 
201
        return null;
 
202
    }
 
203
 
 
204
    @Override
 
205
    public byte[] getData(String path, Watcher watcher, Stat stat)
 
206
            throws KeeperException, InterruptedException {
 
207
        int count = 0;
 
208
        do {
 
209
            try {
 
210
                return super.getData(path, watcher, stat);
 
211
            } catch (KeeperException.ConnectionLossException e) {
 
212
                LoggerFactory.getLogger().warn(
 
213
                        "ZooKeeper connection lost.  Trying to reconnect.");
 
214
            }
 
215
        } while (!closed && (limit == -1 || count++ < limit));
 
216
        return null;
 
217
    }
 
218
 
 
219
    @Override
 
220
    public Stat setACL(String path, List<ACL> acl, int version)
 
221
            throws KeeperException, InterruptedException {
 
222
        int count = 0;
 
223
        do {
 
224
            try {
 
225
                return super.setACL(path, acl, version);
 
226
            } catch (KeeperException.ConnectionLossException e) {
 
227
                LoggerFactory.getLogger().warn(
 
228
                        "ZooKeeper connection lost.  Trying to reconnect.");
 
229
                Stat s = exists(path, false);
 
230
                if (s != null) {
 
231
                    if (getACL(path, s).equals(acl)) {
 
232
                        return s;
 
233
                    }
 
234
                } else {
 
235
                    return null;
 
236
                }
 
237
            }
 
238
        } while (!closed && (limit == -1 || count++ < limit));
 
239
        return null;
 
240
    }
 
241
 
 
242
    @Override
 
243
    public Stat setData(String path, byte[] data, int version)
 
244
            throws KeeperException, InterruptedException {
 
245
        int count = 0;
 
246
        do {
 
247
            try {
 
248
                return super.setData(path, data, version);
 
249
            } catch (KeeperException.ConnectionLossException e) {
 
250
                LoggerFactory.getLogger().warn(
 
251
                        "ZooKeeper connection lost.  Trying to reconnect.");
 
252
                Stat s = exists(path, false);
 
253
                if (s != null) {
 
254
                    if (getData(path, false, s) == data) {
 
255
                        return s;
 
256
                    }
 
257
                } else {
 
258
                    return null;
 
259
                }
 
260
            }
 
261
        } while (!closed && (limit == -1 || count++ < limit));
 
262
        return null;
 
263
    }
 
264
 
 
265
    /**
 
266
     * @param limit
 
267
     */
 
268
    public void setRetryLimit(int limit) {
 
269
        this.limit = limit;
 
270
    }
 
271
 
 
272
    /**
 
273
     * @return true if successfully connected to zookeeper
 
274
     */
 
275
    public boolean testConnection() {
 
276
        int count = 0;
 
277
        do {
 
278
            try {
 
279
                return super.exists("/", null) != null;
 
280
            } catch (Exception e) {
 
281
                LoggerFactory.getLogger().warn(
 
282
                        "ZooKeeper connection lost.  Trying to reconnect.");
 
283
            }
 
284
        } while (count++ < 5);
 
285
        return false;
 
286
    }
 
287
 
 
288
}