2
* Licensed to the Apache Software Foundation (ASF) under one
3
* or more contributor license agreements. See the NOTICE file
4
* distributed with this work for additional information
5
* regarding copyright ownership. The ASF licenses this file
6
* to you under the Apache License, Version 2.0 (the
7
* "License"); you may not use this file except in compliance
8
* with the License. You may obtain a copy of the License at
10
* http://www.apache.org/licenses/LICENSE-2.0
12
* Unless required by applicable law or agreed to in writing, software
13
* distributed under the License is distributed on an "AS IS" BASIS,
14
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
* See the License for the specific language governing permissions and
16
* limitations under the License.
19
package org.apache.zookeeper.server.quorum;
21
import java.io.ByteArrayInputStream;
22
import java.io.IOException;
23
import java.net.InetSocketAddress;
25
import org.apache.jute.BinaryInputArchive;
26
import org.apache.jute.Record;
27
import org.apache.zookeeper.server.ObserverBean;
28
import org.apache.zookeeper.server.Request;
29
import org.apache.zookeeper.server.util.SerializeUtils;
30
import org.apache.zookeeper.txn.TxnHeader;
33
* Observers are peers that do not take part in the atomic broadcast protocol.
34
* Instead, they are informed of successful proposals by the Leader. Observers
35
* therefore naturally act as a relay point for publishing the proposal stream
36
* and can relieve Followers of some of the connection load. Observers may
37
* submit proposals, but do not vote in their acceptance.
39
* See ZOOKEEPER-368 for a discussion of this feature.
41
public class Observer extends Learner{
43
Observer(QuorumPeer self,ObserverZooKeeperServer observerZooKeeperServer) {
45
this.zk=observerZooKeeperServer;
49
public String toString() {
50
StringBuilder sb = new StringBuilder();
51
sb.append("Observer ").append(sock);
52
sb.append(" pendingRevalidationCount:")
53
.append(pendingRevalidations.size());
58
* the main method called by the observer to observe the leader
60
* @throws InterruptedException
62
void observeLeader() throws InterruptedException {
63
zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);
66
InetSocketAddress addr = findLeader();
67
LOG.info("Observing " + addr);
69
connectToLeader(addr);
70
long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
72
syncWithLeader(newLeaderZxid);
73
QuorumPacket qp = new QuorumPacket();
74
while (self.isRunning()) {
78
} catch (IOException e) {
79
LOG.warn("Exception when observing the leader", e);
82
} catch (IOException e1) {
86
synchronized (pendingRevalidations) {
87
// clear pending revalidations
88
pendingRevalidations.clear();
89
pendingRevalidations.notifyAll();
93
zk.unregisterJMX(this);
98
* Controls the response of an observer to the receipt of a quorumpacket
100
* @throws IOException
102
protected void processPacket(QuorumPacket qp) throws IOException{
103
switch (qp.getType()) {
107
case Leader.PROPOSAL:
108
LOG.warn("Ignoring proposal");
111
LOG.warn("Ignoring commit");
113
case Leader.UPTODATE:
114
LOG.error("Received an UPTODATE message after Observer started");
116
case Leader.REVALIDATE:
120
((ObserverZooKeeperServer)zk).sync();
123
TxnHeader hdr = new TxnHeader();
124
BinaryInputArchive ia = BinaryInputArchive
125
.getArchive(new ByteArrayInputStream(qp.getData()));
126
Record txn = SerializeUtils.deserializeTxn(ia, hdr);
127
Request request = new Request (null, hdr.getClientId(),
129
hdr.getType(), null, null);
132
ObserverZooKeeperServer obs = (ObserverZooKeeperServer)zk;
133
obs.commitRequest(request);
139
* Shutdown the Observer.
141
public void shutdown() {
142
LOG.info("shutdown called", new Exception("shutdown Observer"));