~juju/pyjuju/zookeeper-vendor

« back to all changes in this revision

Viewing changes to src/java/main/org/apache/zookeeper/server/quorum/Observer.java

  • Committer: Gustavo Niemeyer
  • Date: 2011-05-24 20:53:37 UTC
  • Revision ID: gustavo@niemeyer.net-20110524205337-i11yow5biap5xapo
Importing stock Zookeeper 3.3.3 without jars.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/**
 
2
 * Licensed to the Apache Software Foundation (ASF) under one
 
3
 * or more contributor license agreements.  See the NOTICE file
 
4
 * distributed with this work for additional information
 
5
 * regarding copyright ownership.  The ASF licenses this file
 
6
 * to you under the Apache License, Version 2.0 (the
 
7
 * "License"); you may not use this file except in compliance
 
8
 * with the License.  You may obtain a copy of the License at
 
9
 *
 
10
 *     http://www.apache.org/licenses/LICENSE-2.0
 
11
 *
 
12
 * Unless required by applicable law or agreed to in writing, software
 
13
 * distributed under the License is distributed on an "AS IS" BASIS,
 
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
15
 * See the License for the specific language governing permissions and
 
16
 * limitations under the License.
 
17
 */
 
18
 
 
19
package org.apache.zookeeper.server.quorum;
 
20
 
 
21
import java.io.ByteArrayInputStream;
 
22
import java.io.IOException;
 
23
import java.net.InetSocketAddress;
 
24
 
 
25
import org.apache.jute.BinaryInputArchive;
 
26
import org.apache.jute.Record;
 
27
import org.apache.zookeeper.server.ObserverBean;
 
28
import org.apache.zookeeper.server.Request;
 
29
import org.apache.zookeeper.server.util.SerializeUtils;
 
30
import org.apache.zookeeper.txn.TxnHeader;
 
31
 
 
32
/**
 
33
 * Observers are peers that do not take part in the atomic broadcast protocol.
 
34
 * Instead, they are informed of successful proposals by the Leader. Observers
 
35
 * therefore naturally act as a relay point for publishing the proposal stream
 
36
 * and can relieve Followers of some of the connection load. Observers may
 
37
 * submit proposals, but do not vote in their acceptance. 
 
38
 *
 
39
 * See ZOOKEEPER-368 for a discussion of this feature. 
 
40
 */
 
41
public class Observer extends Learner{      
 
42
 
 
43
    Observer(QuorumPeer self,ObserverZooKeeperServer observerZooKeeperServer) {
 
44
        this.self = self;
 
45
        this.zk=observerZooKeeperServer;
 
46
    }
 
47
 
 
48
    @Override
 
49
    public String toString() {
 
50
        StringBuilder sb = new StringBuilder();
 
51
        sb.append("Observer ").append(sock);        
 
52
        sb.append(" pendingRevalidationCount:")
 
53
            .append(pendingRevalidations.size());
 
54
        return sb.toString();
 
55
    }
 
56
    
 
57
    /**
 
58
     * the main method called by the observer to observe the leader
 
59
     *
 
60
     * @throws InterruptedException
 
61
     */
 
62
    void observeLeader() throws InterruptedException {
 
63
        zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);
 
64
 
 
65
        try {
 
66
            InetSocketAddress addr = findLeader();
 
67
            LOG.info("Observing " + addr);
 
68
            try {
 
69
                connectToLeader(addr);
 
70
                long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
 
71
                
 
72
                syncWithLeader(newLeaderZxid);
 
73
                QuorumPacket qp = new QuorumPacket();
 
74
                while (self.isRunning()) {
 
75
                    readPacket(qp);
 
76
                    processPacket(qp);                   
 
77
                }
 
78
            } catch (IOException e) {
 
79
                LOG.warn("Exception when observing the leader", e);
 
80
                try {
 
81
                    sock.close();
 
82
                } catch (IOException e1) {
 
83
                    e1.printStackTrace();
 
84
                }
 
85
    
 
86
                synchronized (pendingRevalidations) {
 
87
                    // clear pending revalidations
 
88
                    pendingRevalidations.clear();
 
89
                    pendingRevalidations.notifyAll();
 
90
                }
 
91
            }
 
92
        } finally {
 
93
            zk.unregisterJMX(this);
 
94
        }
 
95
    }
 
96
    
 
97
    /**
 
98
     * Controls the response of an observer to the receipt of a quorumpacket
 
99
     * @param qp
 
100
     * @throws IOException
 
101
     */
 
102
    protected void processPacket(QuorumPacket qp) throws IOException{
 
103
        switch (qp.getType()) {
 
104
        case Leader.PING:
 
105
            ping(qp);
 
106
            break;
 
107
        case Leader.PROPOSAL:
 
108
            LOG.warn("Ignoring proposal");
 
109
            break;
 
110
        case Leader.COMMIT:
 
111
            LOG.warn("Ignoring commit");            
 
112
            break;            
 
113
        case Leader.UPTODATE:
 
114
            LOG.error("Received an UPTODATE message after Observer started");
 
115
            break;
 
116
        case Leader.REVALIDATE:
 
117
            revalidate(qp);
 
118
            break;
 
119
        case Leader.SYNC:
 
120
            ((ObserverZooKeeperServer)zk).sync();
 
121
            break;
 
122
        case Leader.INFORM:            
 
123
            TxnHeader hdr = new TxnHeader();
 
124
            BinaryInputArchive ia = BinaryInputArchive
 
125
                    .getArchive(new ByteArrayInputStream(qp.getData()));
 
126
            Record txn = SerializeUtils.deserializeTxn(ia, hdr);
 
127
            Request request = new Request (null, hdr.getClientId(), 
 
128
                                           hdr.getCxid(),
 
129
                                           hdr.getType(), null, null);
 
130
            request.txn = txn;
 
131
            request.hdr = hdr;
 
132
            ObserverZooKeeperServer obs = (ObserverZooKeeperServer)zk;
 
133
            obs.commitRequest(request);            
 
134
            break;
 
135
        }
 
136
    }
 
137
 
 
138
    /**
 
139
     * Shutdown the Observer.
 
140
     */
 
141
    public void shutdown() {       
 
142
        LOG.info("shutdown called", new Exception("shutdown Observer"));
 
143
        super.shutdown();
 
144
    }
 
145
}
 
146