/* * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. * * Copyright 1997-2007 Sun Microsystems, Inc. All rights reserved. * * The contents of this file are subject to the terms of either the GNU * General Public License Version 2 only ("GPL") or the Common Development * and Distribution License("CDDL") (collectively, the "License"). You * may not use this file except in compliance with the License. You can obtain * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific * language governing permissions and limitations under the License. * * When distributing the software, include this License Header Notice in each * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt. * Sun designates this particular file as subject to the "Classpath" exception * as provided by Sun in the GPL Version 2 section of the License file that * accompanied this code. If applicable, add the following below the License * Header, with the fields enclosed by brackets [] replaced by your own * identifying information: "Portions Copyrighted [year] * [name of copyright owner]" * * Contributor(s): * * If you wish your version of this file to be governed by only the CDDL or * only the GPL Version 2, indicate your decision by adding "[Contributor] * elects to include this software in this distribution under the [CDDL or GPL * Version 2] license." If you don't indicate a single choice of license, a * recipient has the option to distribute your version of this file under * either the CDDL, the GPL Version 2 or to extend the choice of license to * its licensees as provided above. However, if you add GPL Version 2 code * and therefore, elected the GPL Version 2 license, then the option applies * only if the new code is made subject to such option by the copyright * holder. */ package com.sun.enterprise.jxtamgmt; import net.jxta.document.*; import net.jxta.endpoint.Message; import net.jxta.endpoint.MessageElement; import net.jxta.endpoint.TextDocumentMessageElement; import net.jxta.id.ID; import net.jxta.peer.PeerID; import net.jxta.pipe.*; import net.jxta.protocol.PipeAdvertisement; import net.jxta.protocol.RouteAdvertisement; import net.jxta.impl.pipe.BlockingWireOutputPipe; import net.jxta.endpoint.StringMessageElement; import java.io.IOException; import java.text.MessageFormat; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; import java.net.InetAddress; import java.net.Socket; import java.net.URI; /** * HealthMonitor utilizes MasterNode to determine self designation. All nodes * cache other node's states, and can act as a master node at any given * point in time. The intention behind the designation is that no node other than * the master node should determine collective state and communicate it to * group members. *

* TODO: Convert the InDoubt Peer Determination and Failure Verification into * Callable FutureTask using java.util.concurrent */ public class HealthMonitor implements PipeMsgListener, Runnable { private static final Logger LOG = JxtaUtil.getLogger(HealthMonitor.class.getName()); // Default health reporting period private long timeout = 10 * 1000L; private long verifyTimeout = 10 * 1000L; private int maxMissedBeats = 3; private final String threadLock = new String("threadLock"); private final ConcurrentHashMap cache = new ConcurrentHashMap(); private MasterNode masterNode = null; private ClusterManager manager = null; private final PeerID localPeerID; InputPipe inputPipe = null; private OutputPipe outputPipe = null; private PipeAdvertisement pipeAdv = null; private final PipeService pipeService; private volatile boolean started = false; private volatile boolean stop = false; private Thread healthMonitorThread = null; private Thread failureDetectorThread = null; private static final String NODEADV = "NAD"; private InDoubtPeerDetector inDoubtPeerDetector; private final String[] states = {"starting", "started", "alive", "clusterstopping", "peerstopping", "stopped", "dead", "indoubt", "unknown", "ready", "aliveandready"}; // "alive_in_isolation"}; private static final short STARTING = 0; private static final short ALIVE = 2; private static final short CLUSTERSTOPPING = 3; private static final short PEERSTOPPING = 4; private static final short STOPPED = 5; private static final short DEAD = 6; private static final short INDOUBT = 7; private static final short UNKNOWN = 8; private static final short READY = 9; private static final short ALIVEANDREADY = 10; //private static final short ALIVE_IN_ISOLATION = 10; private static final String HEALTHM = "HM"; private static final String NAMESPACE = "HEALTH"; private static final String cacheLock = "cacheLock"; private static final String verifierLock = "verifierLock"; private static final String MEMBER_STATE_QUERY = "MEMBERSTATEQUERY"; private static final String MEMBER_STATE_RESPONSE = "MEMBERSTATERESPONSE"; private boolean readyStateComplete = false; private Message aliveMsg = null; private Message aliveAndReadyMsg = null; private transient Map pipeCache = new Hashtable(); //counter for keeping track of the seq ids of the health messages AtomicLong hmSeqID = new AtomicLong(); //use LWRMulticast to send messages for getting the member state LWRMulticast mcast = null; int lwrTimeout = 6000; private static final String memberStateLock = new String("memberStateLock"); private String memberState; private final String hwFailureDetectionthreadLock = new String ("hwFailureDetectionthreadLock"); private static final String CONNECTION_REFUSED = "Connection refused"; private long failureDetectionTCPTimeout; private int failureDetectionTCPPort; private final ThreadPoolExecutor isConnectedPool; //private ShutdownHook shutdownHook; /** * Constructor for the HealthMonitor object * * @param manager the ClusterManager * @param maxMissedBeats Maximum retries before failure * @param verifyTimeout timeout in milliseconds that the health monitor * waits before finalizing that the in doubt peer is dead. * @param timeout in milliseconds that the health monitor waits before * retrying an indoubt peer's availability. */ public HealthMonitor(final ClusterManager manager, final long timeout, final int maxMissedBeats, final long verifyTimeout, final long failureDetectionTCPTimeout, final int failureDetectionTCPPort) { this.timeout = timeout; this.maxMissedBeats = maxMissedBeats; this.verifyTimeout = verifyTimeout; this.manager = manager; this.masterNode = manager.getMasterNode(); this.localPeerID = manager.getNetPeerGroup().getPeerID(); this.pipeService = manager.getNetPeerGroup().getPipeService(); this.failureDetectionTCPTimeout = failureDetectionTCPTimeout; this.failureDetectionTCPPort = failureDetectionTCPPort; isConnectedPool = (ThreadPoolExecutor)Executors.newCachedThreadPool(); try { mcast = new LWRMulticast(manager, createPipeAdv(), this); mcast.setSoTimeout(lwrTimeout); } catch (IOException e) { LOG.warning("Cound not instantiate LWRMulticast : " + e.getMessage()); } //this.shutdownHook = new ShutdownHook(); //Runtime.getRuntime().addShutdownHook(shutdownHook); } void fine(String msg, Object[] obj) { if (LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, msg, obj); } } void fine(String msg) { if (LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, msg); } } /** * in the event of a failure or planned shutdown, remove the * pipe from the pipeCache */ public void removePipeFromCache(ID token) { pipeCache.remove(token); } /** * Creates a Message containing this node's state * * @param state member state * @return a Message containing this node's state */ private Message createHealthMessage(final short state) { Message msg = createMessage(state, HEALTHM, manager.getSystemAdvertisement()); masterNode.addRoute(msg); return msg; } private Message createMessage(final short state, final String tag, final SystemAdvertisement adv) { final Message msg = new Message(); final HealthMessage hm = new HealthMessage(); hm.setSrcID(localPeerID); final HealthMessage.Entry entry = new HealthMessage.Entry(adv, states[state], hmSeqID.incrementAndGet()); hm.add(entry); msg.addMessageElement(NAMESPACE, new TextDocumentMessageElement(tag, (XMLDocument) hm.getDocument(MimeMediaType.XMLUTF8), null)); //add this state to the local health cache. cache.put(entry.id, entry); return msg; } private Message getAliveMessage() { if (aliveMsg == null) { aliveMsg = createHealthMessage(ALIVE); } return aliveMsg; } private Message getAliveAndReadyMessage() { if (aliveAndReadyMsg == null) { aliveAndReadyMsg = createHealthMessage(ALIVEANDREADY); } return aliveAndReadyMsg; } /** * Given a pipeid it returns a HealthMonitor pipe advertisement of propagate type * * @return a HealthMonitor pipe advertisement of propagate type */ private PipeAdvertisement createPipeAdv() { final PipeAdvertisement pipeAdv; // create the pipe advertisement, to be used in creating the pipe pipeAdv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement( PipeAdvertisement.getAdvertisementType()); pipeAdv.setPipeID(manager.getNetworkManager().getHealthPipeID()); pipeAdv.setType(PipeService.PropagateType); return pipeAdv; } /** * {@inheritDoc} */ public void pipeMsgEvent(final PipeMsgEvent event) { //if this peer is stopping, then stop processing incoming health messages. if (manager.isStopping()) { return; } if (started) { final Message msg; MessageElement msgElement; try { // grab the message from the event msg = event.getMessage(); if (msg != null) { final Message.ElementIterator iter = msg.getMessageElements(); while (iter.hasNext()) { msgElement = iter.next(); if (msgElement != null && msgElement.getElementName().equals(HEALTHM)) { HealthMessage hm = getHealthMessage(msgElement); if (!hm.getSrcID().equals(localPeerID)) { masterNode.processRoute(msg); } process(hm); } else if (msgElement != null && msgElement.getElementName().equals(MEMBER_STATE_QUERY)) { processMemberStateQuery(msg); } else if (msgElement != null && msgElement.getElementName().equals(MEMBER_STATE_RESPONSE)) { processMemberStateResponse(msg); } } } } catch (IOException ex) { if (LOG.isLoggable(Level.FINE)) { ex.printStackTrace(); } LOG.log(Level.WARNING, "HealthMonitor:Caught IOException : " + ex.getLocalizedMessage()); } catch (Throwable e) { if (LOG.isLoggable(Level.FINE)) { e.printStackTrace(); } LOG.log(Level.WARNING, e.getLocalizedMessage()); } } } private void processMemberStateQuery(Message msg) { boolean foundNodeAdv = false; LOG.fine(" received a MemberStateQuery..."); try { //SystemAdvertisement adv = masterNode.processNodeAdvertisement(msg); Message.ElementIterator iter = msg.getMessageElements(); MessageElement msgElement = null; while (iter.hasNext()) { msgElement = iter.next(); if (msgElement.getElementName().equals(NODEADV)) { foundNodeAdv = true; break; } } if (foundNodeAdv == true) { final StructuredDocument asDoc; asDoc = StructuredDocumentFactory.newStructuredDocument(msgElement.getMimeType(), msgElement.getStream()); final SystemAdvertisement adv = new SystemAdvertisement(asDoc); if (!adv.getID().equals(localPeerID)) { LOG.log(Level.FINER, "Received a System advertisment Name :" + adv.getName()); } if (adv != null) { ID sender = adv.getID(); //sender of this query String state = getStateFromCache(localPeerID); Message response = createMemberStateResponse(state); LOG.fine(" sending via LWR response to " + sender.toString() + " with state " + state + " for " + localPeerID); mcast.send((PeerID) sender, response); //send the response back to the query sender } } else { LOG.warning("Don't know where this query came from. SysAdv is null"); } } catch (IOException e) { if (LOG.isLoggable(Level.FINE)) { e.printStackTrace(); } LOG.warning("Could not send the message via LWRMulticast : " + e.getMessage()); } } private void processMemberStateResponse(Message msg) { LOG.fine("received a MemberStateResponse..."); memberState = msg.getMessageElement(NAMESPACE, MEMBER_STATE_RESPONSE).toString(); LOG.fine(" member state in processMemberStateResponse() is " + memberState); synchronized (memberStateLock) { memberStateLock.notify(); } } /** * creates a node query message specially for querying the member state * * @return msg Message */ private Message createMemberStateQuery() { Message msg = new Message(); msg.addMessageElement(NAMESPACE, new TextDocumentMessageElement(NODEADV, (XMLDocument) manager.getSystemAdvertisement() .getDocument(MimeMediaType.XMLUTF8), null)); msg.addMessageElement(NAMESPACE, new StringMessageElement(MEMBER_STATE_QUERY, "member state query", null)); LOG.log(Level.FINE, "Created a Member State Query Message "); return msg; } /** * creates a node response message specially for sending the member state * * @return msg Message */ private Message createMemberStateResponse(String myState) { Message msg = new Message(); msg.addMessageElement(NAMESPACE, new StringMessageElement(MEMBER_STATE_RESPONSE, myState, null)); msg.addMessageElement(NAMESPACE, new TextDocumentMessageElement(NODEADV, (XMLDocument) manager.getSystemAdvertisement() .getDocument(MimeMediaType.XMLUTF8), null)); LOG.log(Level.FINE, "Created a Member State Response Message with " + myState); return msg; } /** * process a health message received on the wire into cache * * @param hm Health message to process */ private void process(final HealthMessage hm) { //discard loopback messages if (!hm.getSrcID().equals(localPeerID)) { //current assumption is that there is only 1 entry //per health message for (HealthMessage.Entry entry : hm.getEntries()) { LOG.log(Level.FINEST, "Processing Health Message " + entry.getSeqID() + " for entry " + entry.adv.getName()); LOG.log(Level.FINEST, "Getting the cachedEntry " + entry.id); HealthMessage.Entry cachedEntry = cache.get(entry.id); if (cachedEntry != null) { LOG.log(Level.FINEST, "cachedEntry is not null"); if (entry.getSeqID() <= cachedEntry.getSeqID()) { LOG.log(Level.FINER, MessageFormat.format("Received an older health message sequence {0}." + " Current sequence id is {1}. ", entry.getSeqID(), cachedEntry.getSeqID())); if (entry.state.equals(states[CLUSTERSTOPPING]) || entry.state.equals(states[PEERSTOPPING])) { //dont discard the message //and don't alter the state if the cachedEntry's state is stopped LOG.log(Level.FINER, "Received out of order health message " + "with clusterstopping state." + " Calling handleStopEvent() to handle shutdown state."); handleStopEvent(entry); } else if (entry.state.equals(states[READY])) { LOG.finer("Received out of order health message with Joined and Ready state. " + "Calling handleReadyEvent() for handling the peer's ready state"); handleReadyEvent(entry); } else { LOG.log(Level.FINER, "Discarding out of sequence health message"); } return; } } LOG.log(Level.FINEST, "Putting into cache " + entry.adv.getName() + " state = " + entry.state + " peerid = " + entry.id); cache.put(entry.id, entry); if (!manager.getClusterViewManager().containsKey(entry.id) && (!entry.state.equals(states[CLUSTERSTOPPING]) && !entry.state.equals(states[PEERSTOPPING]) && !entry.state.equals(states[STOPPED]) && !entry.state.equals(states[DEAD]))) { try { masterNode.probeNode(entry); } catch (IOException e) { if (LOG.isLoggable(Level.FINE)) { e.printStackTrace(); } LOG.warning("IOException occured while sending probeNode() Message in HealthMonitor:"+e.getLocalizedMessage()); } } if (entry.state.equals(states[READY])) { handleReadyEvent( entry ); } if (entry.state.equals(states[PEERSTOPPING]) || entry.state.equals(states[CLUSTERSTOPPING])) { handleStopEvent(entry); } if (entry.state.equals(states[INDOUBT]) || entry.state.equals(states[DEAD])) { if (entry.id.equals(localPeerID)) { if (readyStateComplete) { reportMyState(ALIVEANDREADY,hm.getSrcID()); } else { reportMyState(ALIVE, hm.getSrcID()); } } else { if (entry.state.equals(states[INDOUBT])) { LOG.log(Level.FINE, "Peer " + entry.id.toString() + " is suspected failed. Its state is " + entry.state); notifyLocalListeners(entry.state, entry.adv); } if (entry.state.equals(states[DEAD])) { LOG.log(Level.FINE, "Peer " + entry.id.toString() + " has failed. Its state is " + entry.state); cleanAllCaches(entry); } } } } } } private void handleReadyEvent(final HealthMessage.Entry entry) { cache.put(entry.id, entry); if(entry.id.equals(masterNode.getMasterNodeID())){ //if this is a ready state sent out by master, take no action here as master would send out a view to the group. return; } //if this is a ready state sent by a non-master, and if I am the assigned master, send out a new view and notify local listeners. if(masterNode.isMaster() && masterNode.isMasterAssigned()){ LOG.log(Level.FINEST, MessageFormat.format("Handling Ready Event for peer :{0}", entry.adv.getName())); final ClusterViewEvent cvEvent = masterNode.sendReadyEventView(entry.adv); manager.getClusterViewManager().notifyListeners(cvEvent); } } private void handleStopEvent(final HealthMessage.Entry entry) { LOG.log(Level.FINEST, MessageFormat.format("Handling Stop Event for peer :{0}", entry.adv.getName())); short stateByte = PEERSTOPPING; if (entry.state.equals(states[CLUSTERSTOPPING])) { stateByte = CLUSTERSTOPPING; } if (entry.adv.getID().equals(masterNode.getMasterNodeID())) { //if masternode is resigning, remove master node from view and start discovery LOG.log(Level.FINER, MessageFormat.format("Removing master node {0} from view as it has stopped.", entry.adv.getName())); removeMasterAdv(entry, stateByte); masterNode.resetMaster(); masterNode.appointMasterNode(); } else if (masterNode.isMaster() && masterNode.isMasterAssigned()) { removeMasterAdv(entry, stateByte); LOG.log(Level.FINE, "Announcing Peer Stop Event of " + entry.adv.getName() + " to group ..."); final ClusterViewEvent cvEvent; if (entry.state.equals(states[CLUSTERSTOPPING])) { cvEvent = new ClusterViewEvent(ClusterViewEvents.CLUSTER_STOP_EVENT, entry.adv); } else { cvEvent = new ClusterViewEvent(ClusterViewEvents.PEER_STOP_EVENT, entry.adv); } masterNode.viewChanged(cvEvent); } cleanAllCaches(entry); } private void cleanAllCaches(HealthMessage.Entry entry) { LOG.fine("HealthMonitor.cleanAllCaches : removing pipes and route from cache..." + entry.id); removePipeFromCache(entry.id); manager.removePipeFromCache(entry.id); manager.removeRouteFromCache(entry.id); masterNode.removePipeFromCache(entry.id); } private Map getCacheCopy() { ConcurrentHashMap clone = new ConcurrentHashMap(); clone.putAll(cache); return clone; } /** * Reports on the wire the specified state * * @param state specified state can be * ALIVE|SLEEP|HIBERNATING|SHUTDOWN|DEAD * @param id destination node ID, if null broadcast to group */ private void reportMyState(final short state, final PeerID id) { LOG.log(Level.FINER, MessageFormat.format("Sending state {0} to {1}", states[state], id)); if (state == ALIVE) { send(id, getAliveMessage()); } else { if (state == ALIVEANDREADY) { send(id, getAliveAndReadyMessage()); } else send(id, createHealthMessage(state)); } } private void reportOtherPeerState(final short state, final SystemAdvertisement adv) { final Message msg = createMessage(state, HEALTHM, adv); LOG.log(Level.FINEST, MessageFormat.format("Reporting {0} health state as {1}", adv.getName(), states[state])); send(null, msg); } /** * Main processing method for the HealthMonitor object */ public void run() { //At start, the starting state is reported. But reporting ready state is dependent on //the parent application making the call that it is in the ready state, which is not //necessarily deterministic. Hence we move on from starting state to reporting //this peer's alive state. For join notification event, the liveness state of a peer // would be one of starting or ready or alive (other than peerstopping, etc). reportMyState(STARTING, null); //System.out.println("Running HealthMonitor Thread at interval :"+actualto); while (!stop) { try { synchronized (threadLock) { threadLock.wait(timeout); } if (!stop) { if (readyStateComplete) { reportMyState(ALIVEANDREADY, null); } else { reportMyState(ALIVE, null); } } } catch (InterruptedException e) { stop = true; LOG.log(Level.FINEST, "Shoal Health Monitor Thread Stopping as the thread is now interrupted...:"+e.getLocalizedMessage()); break; } catch (Throwable all) { LOG.log(Level.WARNING, "Uncaught Throwable in healthMonitorThread " + Thread.currentThread().getName() + ":" + all); } } } /** * Send a message to a specific node. In case the peerId is null the * message is multicast to the group * * @param peerid Peer ID to send massage to * @param msg the message to send */ private void send(final PeerID peerid, final Message msg) { try { if (peerid != null) { // Unicast datagram // create a op pipe to the destination peer LOG.log(Level.FINE, "Unicasting Message to :" + peerid.toString()); OutputPipe output = null; if (!pipeCache.containsKey(peerid)) { RouteAdvertisement route = manager.getCachedRoute((PeerID) peerid); if (route != null) { output = new BlockingWireOutputPipe(manager.getNetPeerGroup(), pipeAdv, (PeerID) peerid, route); } if (output == null) { // Unicast datagram // create a op pipe to the destination peer output = pipeService.createOutputPipe(pipeAdv, Collections.singleton(peerid), 1); } pipeCache.put(peerid, output); } else { output = pipeCache.get(peerid); if (output.isClosed()) { output = pipeService.createOutputPipe(pipeAdv, Collections.singleton(peerid), 1); pipeCache.put(peerid, output); } } output.send(msg); } else { outputPipe.send(msg); } } catch (IOException io) { LOG.log(Level.WARNING, "Failed to send message", io); } } /** * Creates both input/output pipes, and starts monitoring service */ void start() { if (!started) { // ensure whether discovery is done. // If masterNode.probeNode(entry) is called before master's discovery is not done, unexpected results can be occurred in the master selection algorithm. if( masterNode.isDiscoveryInProgress() ) { synchronized( masterNode.discoveryLock ) { try { masterNode.discoveryLock.wait(); LOG.log( Level.FINEST, "start() waiting for masternode discovery to finish..." ); } catch( InterruptedException e ) { LOG.log( Level.FINEST, "MasterNode's DiscoveryLock Thread is interrupted " + e ); } } } LOG.log(Level.FINE, "Starting HealthMonitor"); try { // create the pipe advertisement, to be used in creating the pipe pipeAdv = createPipeAdv(); // create input inputPipe = pipeService.createInputPipe(pipeAdv, this); // create output outputPipe = pipeService.createOutputPipe(pipeAdv, 1); this.healthMonitorThread = new Thread(this, "HealthMonitor"); healthMonitorThread.start(); inDoubtPeerDetector = new InDoubtPeerDetector(); inDoubtPeerDetector.start(); started = true; } catch (IOException ioe) { LOG.log(Level.WARNING, "Failed to create health monitoring pipe advertisement :" + ioe); } } } /** * Announces Stop event to all members indicating that this peer * will gracefully exit the group. * TODO: Make this a synchronous call or a simulated synchronous call so that responses from majority of members can be collected before returning from this method * * @param isClusterShutdown boolean value indicating whether this * announcement is in the context of a clusterwide shutdown or a shutdown of * this peer only. */ void announceStop(final boolean isClusterShutdown) { //System.out.println("Announcing Shutdown"); LOG.log(Level.FINE, MessageFormat.format("Announcing stop event to group with clusterShutdown set to {0}", isClusterShutdown)); if (isClusterShutdown) { reportMyState(CLUSTERSTOPPING, null); } else { reportMyState(PEERSTOPPING, null); } } /** * Stops this service * * @param isClusterShutdown true if the cluster is shutting down */ void stop(boolean isClusterShutdown) { stop = true; started = false; announceStop(isClusterShutdown); reportMyState(STOPPED, null); LOG.log(Level.FINE, "Stopping HealthMonitor"); final Thread tmpThread = healthMonitorThread; healthMonitorThread = null; if (tmpThread != null) { tmpThread.interrupt(); } inDoubtPeerDetector.stop(); inputPipe.close(); outputPipe.close(); pipeCache.clear(); manager.clearAllCaches(); masterNode.clearPipeCache(); } private HealthMessage getHealthMessage(final MessageElement msgElement) throws IOException { final HealthMessage hm; hm = new HealthMessage(getStructuredDocument(msgElement), hmSeqID.incrementAndGet()); return hm; } private static StructuredTextDocument getStructuredDocument( final MessageElement msgElement) throws IOException { return (StructuredTextDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, msgElement.getStream()); } private void notifyLocalListeners(final String state, final SystemAdvertisement adv) { if (state.equals(states[INDOUBT])) { manager.getClusterViewManager().setInDoubtPeerState(adv); } else if (state.equals(states[ALIVE])) { manager.getClusterViewManager().setPeerNoLongerInDoubtState(adv); } else if (state.equals(states[ALIVEANDREADY])) { manager.getClusterViewManager().setPeerNoLongerInDoubtState(adv); } else if (state.equals(states[CLUSTERSTOPPING])) { manager.getClusterViewManager().setClusterStoppingState(adv); } else if (state.equals(states[PEERSTOPPING])) { manager.getClusterViewManager().setPeerStoppingState(adv); } else if (state.equals(states[READY])) { manager.getClusterViewManager().setPeerReadyState(adv); } /*else if (state.equals(states[ALIVE_IN_ISOLATION])) { manager.getClusterViewManager().setInIsolationState(adv); } */ } public String getMemberState(final ID peerID) { //don't get the cached member state for that instance //instead send a query to that instance to get the most up-to-date state LOG.fine("inside getMemberState for " + peerID.toString()); Message msg = createMemberStateQuery(); //send it via LWRMulticast try { mcast.send((PeerID) peerID, msg); LOG.fine("send message in getMemberState via LWR..."); } catch (IOException e) { LOG.warning("Could not send the LWR Multicast message to get the member state of " + peerID.toString() + " IOException : " + e.getMessage()); } synchronized (memberStateLock) { try { memberStateLock.wait(timeout); } catch (InterruptedException e) { LOG.warning("wait() was interrupted : " + e.getMessage()); } } if (memberState != null) { String state = memberState; memberState = null; //nullify this string before returning so that the next time this method is accessed // memberState is null before it is assigned a value in processMemberStateResponse LOG.fine("inside getMemberState got state via lwr " + state); return state; } else { //if timeout happened even before notify() was called String state = getStateFromCache(peerID); LOG.fine("inside getMemberState got state after timeout " + state); return state; } } String getStateFromCache(final ID peerID) { HealthMessage.Entry entry; final String state; entry = cache.get((PeerID) peerID); if (entry != null) { state = entry.state; } else { if (((PeerID) peerID).equals(localPeerID)) { if (!started) { state = states[STARTING]; } else if (readyStateComplete) { state = states[ALIVEANDREADY]; } else { state = states[ALIVE]; } } else { entry = cache.get((PeerID) peerID); if (entry != null) { state = entry.state; } else { if (manager.getClusterViewManager().containsKey(peerID)) { state = states[STARTING];//we assume that the peer is in starting state hence its state is not yet known in this peer } else { state = states[UNKNOWN]; } } } } return state; } public void reportJoinedAndReadyState() { //if I am the master send out a new view with ready event but wait for master discovery to be over.Also send out a health message //with state set to READY. Recipients of health message would only update their health state cache. //Master Node discovery completion and assignment is necessary for the ready state of a peer to be sent out as an event with a view. //if I am not the master, report my state to the group as READY. MasterNode will send out a //JOINED_AND_READY_EVENT view to the group on receipt of this health state message. if(masterNode.isDiscoveryInProgress()){ synchronized (masterNode.discoveryLock){ try { masterNode.discoveryLock.wait(); LOG.log(Level.FINEST, "reportJoinedAndReadyState() waiting for masternode discovery to finish..."); } catch (InterruptedException e) { LOG.log(Level.FINEST, "MasterNode's DiscoveryLock Thread is interrupted "+e); } } } if(masterNode.isMaster() && masterNode.isMasterAssigned()){ LOG.log(Level.FINEST, "Sending Ready Event View for "+ manager.getSystemAdvertisement().getName()); ClusterViewEvent cvEvent = masterNode.sendReadyEventView(manager.getSystemAdvertisement()); LOG.log(Level.FINEST, MessageFormat.format("Notifying Local listeners about " + "Joined and Ready Event View for peer :{0}", manager.getSystemAdvertisement().getName())); manager.getClusterViewManager().notifyListeners(cvEvent); } LOG.log(Level.FINEST, "Calling reportMyState() with READY..."); reportMyState(READY, null); readyStateComplete = true; } /** * Detects suspected failures and then delegates final decision to * FailureVerifier * * @author : Shreedhar Ganapathy */ private class InDoubtPeerDetector implements Runnable { void start() { failureDetectorThread = new Thread(this, "InDoubtPeerDetector Thread"); LOG.log(Level.FINE, "Starting InDoubtPeerDetector Thread"); failureDetectorThread.start(); FailureVerifier fverifier = new FailureVerifier(); final Thread fvThread = new Thread(fverifier, "FailureVerifier Thread"); LOG.log(Level.FINE, "Starting FailureVerifier Thread"); fvThread.start(); } void stop() { final Thread tmpThread = failureDetectorThread; failureDetectorThread = null; if (tmpThread != null) { tmpThread.interrupt(); } synchronized (verifierLock) { verifierLock.notify(); } } public void run() { while (!stop) { synchronized (cacheLock) { try { //System.out.println("InDoubtPeerDetector failureDetectorThread waiting for :"+timeout); //wait for specified timeout or until woken up cacheLock.wait(timeout); //LOG.log(Level.FINEST, "Analyzing cache for health..."); //get the copy of the states cache if (!manager.isStopping()) { processCacheUpdate(); } } catch (InterruptedException ex) { LOG.log(Level.FINEST, "InDoubtPeerDetector Thread stopping as it is now interrupted :"+ex.getLocalizedMessage()); break; } catch(Throwable all){ if (LOG.isLoggable(Level.FINE)) { all.printStackTrace(); } LOG.warning("Uncaught Throwable in failureDetectorThread " + Thread.currentThread().getName() + ":" + all); } } } } /** * computes the number of heart beats missed based on an entry's timestamp * * @param entry the Health entry * @return the number heart beats missed */ int computeMissedBeat(HealthMessage.Entry entry) { return (int) ((System.currentTimeMillis() - entry.timestamp) / timeout); } private void processCacheUpdate() { final Map cacheCopy = getCacheCopy(); //for each peer id for (HealthMessage.Entry entry : cacheCopy.values()) { //don't check for isConnected with your own self if (!entry.id.equals(manager.getSystemAdvertisement().getID())) { LOG.fine("processCacheUpdate : " + entry.adv.getName() + " 's state is " + entry.state); if (entry.state.equals(states[ALIVE]) || (entry.state.equals(states[ALIVEANDREADY])) ) { //if there is a record, then get the number of //retries performed in an earlier iteration try { determineInDoubtPeers(entry); } catch (NumberFormatException nfe) { if (LOG.isLoggable(Level.FINE)) { nfe.printStackTrace(); } LOG.log(Level.WARNING, "Exception occurred during time stamp conversion : " + nfe.getLocalizedMessage()); } } } } } private void determineInDoubtPeers(final HealthMessage.Entry entry) { //if current time exceeds the last state update timestamp from this peer id, by more than the //the specified max timeout if (!stop) { if (computeMissedBeat(entry) >= maxMissedBeats && !isConnected(entry)) { LOG.log(Level.FINEST, "timeDiff > maxTime"); if (canProcessInDoubt(entry)) { LOG.log(Level.FINER, "Designating InDoubtState"); designateInDoubtState(entry); //delegate verification to Failure Verifier LOG.log(Level.FINER, "Notifying FailureVerifier for "+entry.adv.getName()); synchronized (verifierLock) { verifierLock.notify(); LOG.log(Level.FINER, "Done Notifying FailureVerifier for "+entry.adv.getName()); } } } else { //dont suspect self if (!entry.id.equals(localPeerID)) { if (canProcessInDoubt(entry)) { LOG.log(Level.FINE, MessageFormat.format("For instance = {0}; last recorded heart-beat = {1}ms ago, heart-beat # {2} out of a max of {3}", entry.adv.getName(), (System.currentTimeMillis() - entry.timestamp), computeMissedBeat(entry), maxMissedBeats)); } } } } } private boolean canProcessInDoubt(final HealthMessage.Entry entry) { boolean canProcessIndoubt = false; if (masterNode.getMasterNodeID().equals(entry.id)) { canProcessIndoubt = true; } else if (masterNode.isMaster()) { canProcessIndoubt = true; } return canProcessIndoubt; } private void designateInDoubtState(final HealthMessage.Entry entry) { entry.state = states[INDOUBT]; cache.put(entry.id, entry); if (masterNode.isMaster()) { //do this only when masternode is not suspect. // When masternode is suspect, all members update their states // anyways so no need to report //Send group message announcing InDoubt State LOG.log(Level.FINE, "Sending INDOUBT state message about node ID: " + entry.id + " to the cluster..."); reportOtherPeerState(INDOUBT, entry.adv); } LOG.log(Level.FINEST, "Notifying Local Listeners of designated indoubt state for "+entry.adv.getName()); notifyLocalListeners(entry.state, entry.adv); } } private class FailureVerifier implements Runnable { private final long buffer = 500; public void run() { try { synchronized (verifierLock) { while (!stop) { LOG.log(Level.FINER, "FV: Entering verifierLock Wait...."); verifierLock.wait(); LOG.log(Level.FINER, "FV: Woken up from verifierLock Wait by a notify ...."); if (!stop) { LOG.log(Level.FINER, "FV: Calling verify() ...."); verify(); LOG.log(Level.FINER, "FV: Done verifying ...."); } } } } catch (InterruptedException ex) { LOG.log(Level.FINEST, MessageFormat.format("failure Verifier Thread stopping as it is now interrupted: {0}", ex.getLocalizedMessage())); } } void verify() throws InterruptedException { //wait for the specified timeout for verification Thread.sleep(verifyTimeout + buffer); HealthMessage.Entry entry; for (HealthMessage.Entry entry1 : getCacheCopy().values()) { entry = entry1; LOG.log(Level.FINER, "FV: Verifying state of "+entry.adv.getName()+" state = "+entry.state); if(entry.state.equals(states[INDOUBT]) && !isConnected(entry)){ LOG.log(Level.FINER, "FV: Assigning and reporting failure ...."); assignAndReportFailure(entry); } } } } private void assignAndReportFailure(final HealthMessage.Entry entry) { if (entry != null) { entry.state = states[DEAD]; cache.put(entry.id, entry); if (masterNode.isMaster()) { LOG.log(Level.FINE, MessageFormat.format("Reporting Failed Node {0}", entry.id.toString())); reportOtherPeerState(DEAD, entry.adv); } final boolean masterFailed = (masterNode.getMasterNodeID()).equals(entry.id); if (masterNode.isMaster() && masterNode.isMasterAssigned()) { LOG.log(Level.FINE, MessageFormat.format("Removing System Advertisement :{0} for name {1}", entry.id.toString(), entry.adv.getName())); removeMasterAdv(entry, DEAD); LOG.log(Level.FINE, MessageFormat.format("Announcing Failure Event of {0} for name {1}...", entry.id, entry.adv.getName())); final ClusterViewEvent cvEvent = new ClusterViewEvent(ClusterViewEvents.FAILURE_EVENT, entry.adv); masterNode.viewChanged(cvEvent); } else if (masterFailed) { //remove the failed node LOG.log(Level.FINE, MessageFormat.format("Master Failed. Removing System Advertisement :{0} for master named {1}", entry.id.toString(), entry.adv.getName())); removeMasterAdv(entry, DEAD); //manager.getClusterViewManager().remove(entry.id); masterNode.resetMaster(); masterNode.appointMasterNode(); } cleanAllCaches(entry); } } private void removeMasterAdv(HealthMessage.Entry entry, short state) { manager.getClusterViewManager().remove(entry.adv); if (entry.adv != null) { switch (state) { case DEAD: LOG.log(Level.FINER, "FV: Notifying local listeners of Failure of "+entry.adv.getName()); manager.getClusterViewManager().notifyListeners( new ClusterViewEvent(ClusterViewEvents.FAILURE_EVENT, entry.adv)); break; case PEERSTOPPING: LOG.log(Level.FINER, "FV: Notifying local listeners of Shutdown of "+entry.adv.getName()); manager.getClusterViewManager().notifyListeners( new ClusterViewEvent(ClusterViewEvents.PEER_STOP_EVENT, entry.adv)); break; case CLUSTERSTOPPING: LOG.log(Level.FINER, "FV: Notifying local listeners of Cluster_Stopping of "+entry.adv.getName()); manager.getClusterViewManager().notifyListeners( new ClusterViewEvent(ClusterViewEvents.CLUSTER_STOP_EVENT, entry.adv)); break; default: LOG.log(Level.FINEST, MessageFormat.format("Invalid State for removing adv from view {0}", state)); } } else { LOG.log(Level.WARNING, states[state] + " peer: " + entry.id + " does not exist in local ClusterView"); } } /** * This method is for designating myself in network isolation * if my network interface is not up. * It does not matter if I am the master or not. * @param entry */ /* private void designateInIsolationState(final HealthMessage.Entry entry) { entry.state = states[ALIVE_IN_ISOLATION]; cache.put(entry.id, entry); LOG.log(Level.FINE, "Sending ALIVE_IN_ISOLATION state message about node ID: " + entry.id + " to the cluster..."); reportMyState(ALIVE_IN_ISOLATION, entry.id); LOG.log(Level.FINE, "Notifying Local Listeners of designated ALIVE_IN_ISOLATION state for " + entry.adv.getName()); notifyLocalListeners(entry.state, entry.adv); } */ /** * Determines whether a connection to a specific node exists, or one can be created * * @param pid Node ID * @return true, if a connection already exists, or a new was sucessfully created */ /* public boolean isConnected(PeerID pid) { //if System property for InetAddress.isReachable() is set, then check for the following: //if InetAddress.isReachable() is true, then check for isConnected() //if InetAddress.isReachable() is false, then simply return false return masterNode.getRouteControl().isConnected(pid, manager.getCachedRoute(pid)); } */ public boolean isConnected(HealthMessage.Entry entry) { //TODO : put only during the first heartbeat that comes in for that entry //TODO : cleaup needed if entry goes down //TODO : map host to uris. and entries to host List list = entry.adv.getURIs(); List array = new ArrayList(); for (URI uri : list) { LOG.info("Checking for machine status for network interface : " + uri.toString()); CheckConnectionToPeerMachine connectionToPeer = new CheckConnectionToPeerMachine(entry, uri.getHost()); Future future = isConnectedPool.submit(connectionToPeer); array.add(future); } try { synchronized (hwFailureDetectionthreadLock) { hwFailureDetectionthreadLock.wait(failureDetectionTCPTimeout); } } catch (InterruptedException e) { fine("InterruptedException occurred..." + e.getMessage(), new Object[] {e}); } //check if the interrupt() call really does interrupt the thread //for (int i = 0; i < array.size(); i++) { for (Future future : array) { try { //if the task has completed after waiting for the //specified timeout if (future.isDone()) { if (future.get().equals(Boolean.TRUE)) { fine("Peer Machine for " + entry.adv.getName() + " is up"); boolean connection = masterNode.getRouteControl().isConnected(entry.id, manager.getCachedRoute(entry.id)); fine("routeControl.isConnected() for " + entry.adv.getName() + " is => " + connection); return connection; } } else { //interrupt the thread which is running the task submitted to it fine("Going to cancel the future task..."); future.cancel(true); isConnectedPool.purge(); fine("Finished cancelling and purging the future task..."); //cancelling the task is as good as getting false //underlying socket that got created will still be alive. } } catch (ExecutionException e) { fine("Exception occurred while getting the return value from the Future object " + e.getMessage()); return false; } catch (InterruptedException e) { fine("InterruptedException occurred..." + e.getMessage(), new Object[]{e}); } return false; } fine("Peer Machine for " + entry.adv.getName() + " is down!"); return false; } //This is the Callable Object that gets executed by the ExecutorService private class CheckConnectionToPeerMachine implements Callable { HealthMessage.Entry entry; String address; CheckConnectionToPeerMachine(HealthMessage.Entry entry, String address) { this.entry = entry; this.address = address; } public Object call() { Socket socket = null; try { fine("Going to create a socket at " + address + "port = " + failureDetectionTCPPort); socket = new Socket(InetAddress.getByName(address), failureDetectionTCPPort); fine("Socket created at " + address + "port = " + failureDetectionTCPPort); synchronized (hwFailureDetectionthreadLock) { hwFailureDetectionthreadLock.notify(); } //for whatever reason a socket got created, close it and return true //i.e. this instance was able to create a socket on that machine so it is up return Boolean.valueOf(true); } catch (IOException e) { fine("IOException occurred while trying to connect to peer " + entry.adv.getName() + "'s machine : " + e.getMessage(), new Object[] {e}); if (e.getMessage().trim().equals(CONNECTION_REFUSED)) { fine("Going to call notify since the peer machine is up"); synchronized (hwFailureDetectionthreadLock) { hwFailureDetectionthreadLock.notify(); } return Boolean.valueOf(true); } return Boolean.valueOf(false); } finally { if (socket != null) { try { socket.close(); } catch (IOException e) { fine("Could not close the socket due to " + e.getMessage()); } } } } } /** * Determines whether a connection to a specific node exists, or one can be created * * @param entry HealthMessage.Entry * le@return true, if a connection already exists, or a new was sucessfully created */ /* public boolean isConnected(HealthMessage.Entry entry) { //if System property for InetAddress.isReachable() is set, then check for the following: //if InetAddress.isReachable() is true, then check for isConnected() //if InetAddress.isReachable() is false, then simply return false //check if using JDK 5 or 6. isUp() API available only in 6 Method method ; try { Class c = Class.forName("java.net.NetworkInterface"); method = c.getMethod("isUp", new Class[]{}); } catch (NoSuchMethodException nsme) { //we are using JDK version < 6 return masterNode.getRouteControl().isConnected(entry.id, manager.getCachedRoute(entry.id)); } catch (SecurityException s) { return masterNode.getRouteControl().isConnected(entry.id, manager.getCachedRoute(entry.id)); } catch (ClassNotFoundException c) { return masterNode.getRouteControl().isConnected(entry.id, manager.getCachedRoute(entry.id)); } try { String ipAddr = manager.getSystemAdvertisement().getIP(); //get my IP address LOG.fine("ipAddr in isConnected => " + ipAddr); NetworkInterface ni = NetworkInterface.getByInetAddress(InetAddress.getByName(ipAddr)); //if (ni.isUp()) { if (((Boolean) method.invoke(ni, new Object[]{})).booleanValue()) { LOG.fine("The network interface " + ni.getDisplayName() + " is up"); return masterNode.getRouteControl().isConnected(entry.id, manager.getCachedRoute(entry.id)); } else { LOG.fine("The network interface " + ni.getDisplayName() + " is NOT up"); MasterNode.INSTANCE_IN_NETWORK_ISOLATION = true; designateInIsolationState((HealthMessage.Entry) cache.get(manager.getSystemAdvertisement().getID())); //put myself in network isolation category return false; } } catch (Exception e) { return false; } } */ /* private void shutdown() { } private class ShutdownHook extends Thread { public void run() { shutdown(); } } */ }