2
* JBoss, Home of Professional Open Source
3
* Copyright 2005, JBoss Inc., and individual contributors as indicated
4
* by the @authors tag. See the copyright.txt in the distribution for a
5
* full listing of individual contributors.
7
* This is free software; you can redistribute it and/or modify it
8
* under the terms of the GNU Lesser General Public License as
9
* published by the Free Software Foundation; either version 2.1 of
10
* the License, or (at your option) any later version.
12
* This software is distributed in the hope that it will be useful,
13
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
* Lesser General Public License for more details.
17
* You should have received a copy of the GNU Lesser General Public
18
* License along with this software; if not, write to the Free
19
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
22
package org.jboss.remoting.detection;
25
import org.jboss.logging.Logger;
26
import org.jboss.remoting.ConnectionValidator;
27
import org.jboss.remoting.InvokerLocator;
28
import org.jboss.remoting.InvokerRegistry;
29
import org.jboss.remoting.ServerInvoker;
30
import org.jboss.remoting.ident.Identity;
31
import org.jboss.remoting.network.NetworkInstance;
32
import org.jboss.remoting.network.NetworkRegistryFinder;
33
import org.jboss.remoting.network.NetworkRegistryMBean;
34
import org.jboss.remoting.network.NetworkRegistryWrapper;
35
import org.w3c.dom.Element;
36
import org.w3c.dom.Node;
37
import org.w3c.dom.NodeList;
39
import javax.management.MBeanServer;
40
import javax.management.MBeanServerInvocationHandler;
41
import javax.management.ObjectName;
43
import java.security.AccessController;
44
import java.security.PrivilegedAction;
45
import java.util.ArrayList;
46
import java.util.Collection;
47
import java.util.HashMap;
48
import java.util.HashSet;
49
import java.util.List;
52
import java.util.Timer;
53
import java.util.TimerTask;
59
* @author <a href="mailto:jhaynie@vocalocity.net">Jeff Haynie</a>
60
* @author <a href="mailto:tom.elrod@jboss.com">Tom Elrod</a>
61
* @version $Revision: 5116 $
63
public abstract class AbstractDetector implements AbstractDetectorMBean
65
static protected final Logger log = Logger.getLogger(AbstractDetector.class);
67
private long defaultTimeDelay = 5000;
68
private long heartbeatTimeDelay = 1000;
69
protected MBeanServer mbeanserver;
70
protected ObjectName objectName;
71
protected ObjectName registryObjectName;
72
protected NetworkRegistryMBean networkRegistry;
74
private Identity myself;
75
private Timer heartbeatTimer;
76
private Timer failureTimer;
77
private Map servers = new HashMap();
79
private Set domains = new HashSet();
80
private boolean acceptLocal = false;
84
public AbstractDetector()
89
public AbstractDetector(Map config)
91
this.config = new HashMap();
93
this.config.putAll(config);
97
* The amount of time to wait between sending (and sometimes receiving) detection messages.
99
* @param heartbeatTimeDelay
100
* @throws IllegalArgumentException
102
public void setHeartbeatTimeDelay(long heartbeatTimeDelay)
104
if(heartbeatTimeDelay > 0 && heartbeatTimeDelay < defaultTimeDelay)
106
this.heartbeatTimeDelay = heartbeatTimeDelay;
110
throw new IllegalArgumentException("Can not set heartbeat time delay (" + heartbeatTimeDelay + ") to a negative number or " +
111
"to a number greater than the default time delay (" + defaultTimeDelay + ").");
116
* The amount of time to wait between sending (and sometimes receiving) detection messages.
120
public long getHeartbeatTimeDelay()
122
return heartbeatTimeDelay;
126
* The amount of time which can elapse without receiving a detection event before a server
127
* will be suspected as being dead and peroforming an explicit invocation on it to verify it is alive.
129
* @param defaultTimeDelay time in milliseconds
130
* @throws IllegalArgumentException
132
public void setDefaultTimeDelay(long defaultTimeDelay)
134
if(defaultTimeDelay >= heartbeatTimeDelay)
136
this.defaultTimeDelay = defaultTimeDelay;
140
throw new IllegalArgumentException("Can not set the default time delay (" + defaultTimeDelay + ") to be less" +
141
" than that of the heartbeat time delay (" + heartbeatTimeDelay + ").");
146
* @return The amount of time which can elapse without receiving a detection event before a server
147
* will be suspected as being dead and peroforming an explicit invocation on it to verify it is alive.
149
public long getDefaultTimeDelay()
151
return defaultTimeDelay;
155
* Will create a detection message based on the server invokers registered within the local InvokerRegistry.
156
* The detection message will contain the identity and array of server invoker metadata.
160
public Detection createDetection()
162
Detection detection = null;
164
ServerInvoker invokers[] = InvokerRegistry.getServerInvokers();
165
if(invokers == null || invokers.length <= 0)
169
List l = new ArrayList(invokers.length);
170
for(int c = 0; c < invokers.length; c++)
172
if(invokers[c].isStarted())
174
ServerInvokerMetadata serverInvoker = new ServerInvokerMetadata(invokers[c].getLocator(),
175
invokers[c].getSupportedSubsystems());
176
l.add(serverInvoker);
183
ServerInvokerMetadata metadata[] = (ServerInvokerMetadata[]) l.toArray(new ServerInvokerMetadata[l.size()]);
184
detection = new Detection(Identity.get(mbeanserver), metadata);
189
* called by MBeanServer to start the mbean lifecycle
193
public void start() throws Exception
195
// get our own identity
196
myself = Identity.get(mbeanserver);
198
// add my domain if domains empty and xml not set
199
if(domains.isEmpty() && xml == null)
201
domains.add(myself.getDomain());
204
// find our NetworkRegistry
205
registryObjectName = NetworkRegistryFinder.find(mbeanserver);
206
if(registryObjectName == null)
208
log.warn("Detector: " + getClass().getName() + " could not be loaded because the NetworkRegistry is not registered");
209
log.warn("This means that only the broadcasting of detection messages will be functional and will not be able to discover other servers.");
213
Object o = MBeanServerInvocationHandler.newProxyInstance(mbeanserver,
215
NetworkRegistryMBean.class,
217
networkRegistry = new NetworkRegistryWrapper((NetworkRegistryMBean) o);
220
startPinger(getPingerDelay(), getPingerPeriod());
221
startHeartbeat(getHeartbeatDelay(), getHeartbeatPeriod());
225
* return the delay in milliseconds between when the timer is created to when the first pinger thread runs.
226
* defaults to <tt>5000</tt>
230
protected long getPingerDelay()
236
* return the period in milliseconds between checking lost servers against the last detection timestamp.
237
* defaults to <tt>1500</tt>
241
protected long getPingerPeriod()
247
* start the pinger timer thread
252
protected void startPinger(long delay, long period)
254
failureTimer = new Timer(false);
255
failureTimer.schedule(new FailureDetector(), delay, period);
259
* stop the pinger timer thread
261
protected void stopPinger()
263
if(failureTimer != null)
265
failureTimer.cancel();
271
* called by the MBeanServer to stop the mbean lifecycle
275
public void stop() throws Exception
282
public void postDeregister()
286
public void postRegister(Boolean aBoolean)
290
public void preDeregister() throws Exception
294
public ObjectName preRegister(MBeanServer mBeanServer, ObjectName objectName) throws Exception
296
this.mbeanserver = mBeanServer;
297
this.objectName = objectName;
302
* set the configuration for the domains to be recognized by detector
305
* @jmx.managed-attribute description="Configuration is an xml element indicating domains to be recognized by detector"
306
* access="read-write"
308
public void setConfiguration(Element xml)
313
// check configuration xml
316
// clearing collection of domains since have new ones to set
319
NodeList domainNodes = xml.getElementsByTagName("domain");
320
if(domainNodes == null || domainNodes.getLength() <= 0)
322
// no domains specified, so will accept all domains
323
log.debug("No domains specified. Will accept all domains.");
325
int len = domainNodes.getLength();
326
for(int c = 0; c < len; c++)
328
Node node = domainNodes.item(c);
329
String domain = node.getFirstChild().getNodeValue();
331
log.debug("Added domain " + domain + " to detector list.");
334
// now look to see if local server detection should be accepted
335
NodeList localNode = xml.getElementsByTagName("local");
336
if(localNode != null)
346
* The <code>getConfiguration</code> method
348
* @return an <code>Element</code> value
349
* @jmx.managed-attribute
351
public Element getConfiguration()
356
//----------------------- protected
364
protected void startHeartbeat(long delay, long period)
366
if(heartbeatTimer == null)
368
heartbeatTimer = new Timer(false);
373
heartbeatTimer.schedule(new Heartbeat(), delay, period);
375
catch (IllegalStateException e)
377
log.debug("Unable to schedule TimerTask on existing Timer", e);
378
heartbeatTimer = new Timer(false);
379
heartbeatTimer.schedule(new Heartbeat(), delay, period);
386
protected void stopHeartbeat()
388
if(heartbeatTimer != null)
392
heartbeatTimer.cancel();
397
heartbeatTimer = null;
402
* return the initial delay in milliseconds before the initial heartbeat is fired.
403
* Defaults to <tt>0</tt>
407
protected long getHeartbeatDelay()
413
* return the period in milliseconds between subsequent heartbeats. Defaults to
418
protected long getHeartbeatPeriod()
420
return heartbeatTimeDelay;
424
* subclasses must implement to provide the specific heartbeat protocol
425
* for this server to send out to other servers on the network
427
protected abstract void heartbeat();
430
* To be used to force detection to occur in synchronouse manner
431
* instead of being passive and waiting for detection messages to
432
* come in from remote detectors. The servers returned should be
433
* the remote servers that are online at this point in time. Note, calling this
434
* method may take a few seconds to complete.
437
public NetworkInstance[] forceDetection()
440
if(networkRegistry != null)
442
return (NetworkInstance[]) AccessController.doPrivileged( new PrivilegedAction()
446
return networkRegistry.getServers();
457
* Used to force detection messages to be sent by remoting servers
458
* and consumed by detector and registered with network registry.
460
protected abstract void forceHeartbeat();
464
* called when a remote detection from a peer is received by a detector
468
protected void detect(final Detection detection)
470
if (detection != null)
472
if (log.isTraceEnabled())
474
log.trace("Detection message received.");
475
log.trace("Id = " + detection.getIdentity() != null ? detection.getIdentity().getInstanceId() : "null");
476
log.trace("isRemoteDetection() = " + isRemoteDetection(detection));
478
// we only track detections within our own domain and not ourself
479
if (isRemoteDetection(detection))
483
boolean found = false;
484
Server server = null;
486
synchronized (servers)
488
server = (Server) servers.get(detection);
489
found = server != null;
492
// update either way the timestamp and the detection
493
servers.put(detection, new Server(detection));
497
server.lastDetection = System.currentTimeMillis();
502
if (networkRegistry != null)
504
log.debug(this + " detected NEW server: " + detection);
506
AccessController.doPrivileged( new PrivilegedAction()
510
networkRegistry.addServer(detection.getIdentity(), detection.getServerInvokers());
518
if (server.changed(detection))
521
servers.put(detection, new Server(detection));
522
if (networkRegistry != null)
524
if (log.isTraceEnabled())
526
log.trace(this + " detected UPDATE for server: " + detection);
529
AccessController.doPrivileged( new PrivilegedAction()
533
networkRegistry.updateServer(detection.getIdentity(), detection.getServerInvokers());
543
log.warn("Error during detection of: " + detection);
544
log.debug("Error during detection of: " + detection, e);
547
else if (log.isTraceEnabled())
549
log.trace("detection from myself - ignored");
554
protected boolean isRemoteDetection(Detection detection)
556
String domain = null;
557
if(detection != null)
559
Identity identity = detection.getIdentity();
562
domain = identity.getDomain();
565
// is detection domain in accepted domain collection and not local
566
// if domains empty, then accept all
567
return (domain == null || domains.isEmpty() || domains.contains(domain)) &&
568
(acceptLocal ? true : (myself.isSameJVM(detection.getIdentity()) == false));
571
protected boolean checkInvokerServer(final Detection detection, ClassLoader cl)
574
ServerInvokerMetadata[] invokerMetadataArray = detection.getServerInvokers();
575
ArrayList validinvokers = new ArrayList();
576
for(int c = 0; c < invokerMetadataArray.length; c++)
578
InvokerLocator locator = null;
581
ServerInvokerMetadata invokerMetadata = invokerMetadataArray[c];
582
locator = invokerMetadata.getInvokerLocator();
584
boolean isValid = ConnectionValidator.checkConnection(locator, config);
587
// the transport was successful
589
validinvokers.add(invokerMetadata);
590
if(log.isTraceEnabled())
592
log.trace("Successful connection check for " + locator);
599
log.debug("failed calling ping on " + detection + " due to " + ig.getMessage());
600
if(log.isTraceEnabled())
608
// the server is down!
609
// would be nice to also remove from the invoker registry as well, but since
610
// don't know all the possible entries for the config map passed when was created,
611
// won't be able to identify it. This means that clients currently using that invoker
612
// for the server will have to find out the hard way (by getting exception calling on it).
615
if(networkRegistry != null)
617
AccessController.doPrivileged( new PrivilegedAction()
621
networkRegistry.removeServer(detection.getIdentity());
626
log.debug("Removed detection " + detection);
631
log.debug("Error removing server for detection (" + detection + "). Possible network registry does not exist.");
635
// remove this server, it isn't available any more
636
servers.remove(detection);
639
else // at least one of the server invokers is still valid
641
if(log.isTraceEnabled())
643
log.trace("Done checking all locators for suspected dead server. " +
644
"There are " + validinvokers.size() + " out of original " +
645
invokerMetadataArray.length + " still valid.");
647
// need to cause an update to be fired if any server invokers failed
648
if(validinvokers.size() != invokerMetadataArray.length)
650
ServerInvokerMetadata[] newLocators = (ServerInvokerMetadata[])validinvokers.toArray(new ServerInvokerMetadata[validinvokers.size()]);
651
Detection newDetection = new Detection(detection.getIdentity(), newLocators);
652
if(log.isTraceEnabled())
654
log.trace("Since at least one invoker failed while doing connection check, will be re-evaluating detection for:\n" + newDetection);
656
detect(newDetection);
664
private final class FailureDetector extends TimerTask
666
private int threadCounter = 0;
670
Thread.currentThread().setName("Remoting Detector - Failure Detector Thread: " + threadCounter++);
672
synchronized (servers)
674
if (servers.isEmpty())
678
ClassLoader cl = (ClassLoader) AccessController.doPrivileged( new PrivilegedAction()
682
return AbstractDetector.class.getClassLoader();
685
// walk through each detection and see if it needs checking up on ...
686
Collection serverCollection = servers.values();
687
Server[] serverArray = (Server[])serverCollection.toArray(new Server[serverCollection.size()]);
688
for(int x = 0; x < serverArray.length; x++)
690
Server svr = serverArray[x];
691
Detection detect = svr.detection;
692
long lastDetection = svr.lastDetection;
693
long duration = System.currentTimeMillis() - lastDetection;
694
if (duration >= defaultTimeDelay)
696
if (log.isTraceEnabled())
698
log.trace("detection for: " + detect + " has not been received in: " + defaultTimeDelay + " ms, contacting..");
700
// OK, we've exceeded the time delay since the last time we've detected
701
// this dude, he might be down, let's walk through each of his transports and
702
// see if any of them lead to a valid invocation
703
if (checkInvokerServer(detect, cl))
705
if (log.isTraceEnabled())
707
log.trace("detection for: " + detect + " recovered on ping");
709
svr.lastDetection = System.currentTimeMillis();
718
private final class Server
721
private int hashCode = 0;
722
long lastDetection = System.currentTimeMillis();
724
Server(Detection detection)
726
this.detection = detection;
730
private void rehash(Detection d)
732
this.hashCode = hash(d);
735
private int hash(Detection d)
738
InvokerLocator locators[] = d.getLocators();
741
for(int c = 0; c < locators.length; c++)
743
hc += locators[c].hashCode();
749
boolean changed(Detection detection)
751
return hashCode != hash(detection);
754
public boolean equals(Object obj)
756
return obj instanceof Server && hashCode == obj.hashCode();
759
public int hashCode()
765
private final class Heartbeat extends TimerTask
767
private int threadCounter = 0;
771
Thread.currentThread().setName("Remoting Detector - Heartbeat Thread: " + threadCounter++);