/* * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. * * Copyright 1997-2007 Sun Microsystems, Inc. All rights reserved. * * The contents of this file are subject to the terms of either the GNU * General Public License Version 2 only ("GPL") or the Common Development * and Distribution License("CDDL") (collectively, the "License"). You * may not use this file except in compliance with the License. You can obtain * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific * language governing permissions and limitations under the License. * * When distributing the software, include this License Header Notice in each * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt. * Sun designates this particular file as subject to the "Classpath" exception * as provided by Sun in the GPL Version 2 section of the License file that * accompanied this code. If applicable, add the following below the License * Header, with the fields enclosed by brackets [] replaced by your own * identifying information: "Portions Copyrighted [year] * [name of copyright owner]" * * Contributor(s): * * If you wish your version of this file to be governed by only the CDDL or * only the GPL Version 2, indicate your decision by adding "[Contributor] * elects to include this software in this distribution under the [CDDL or GPL * Version 2] license." If you don't indicate a single choice of license, a * recipient has the option to distribute your version of this file under * either the CDDL, the GPL Version 2 or to extend the choice of license to * its licensees as provided above. However, if you add GPL Version 2 code * and therefore, elected the GPL Version 2 license, then the option applies * only if the new code is made subject to such option by the copyright * holder. */ package com.sun.enterprise.jxtamgmt; import static com.sun.enterprise.jxtamgmt.ClusterViewEvents.ADD_EVENT; import static com.sun.enterprise.jxtamgmt.JxtaUtil.getObjectFromByteArray; import net.jxta.document.AdvertisementFactory; import net.jxta.document.MimeMediaType; import net.jxta.document.StructuredDocument; import net.jxta.document.StructuredDocumentFactory; import net.jxta.document.XMLDocument; import net.jxta.endpoint.ByteArrayMessageElement; import net.jxta.endpoint.Message; import net.jxta.endpoint.MessageElement; import net.jxta.endpoint.MessageTransport; import net.jxta.endpoint.StringMessageElement; import net.jxta.endpoint.TextDocumentMessageElement; import net.jxta.id.ID; import net.jxta.impl.endpoint.router.EndpointRouter; import net.jxta.impl.endpoint.router.RouteControl; import net.jxta.impl.pipe.BlockingWireOutputPipe; import net.jxta.peergroup.PeerGroup; import net.jxta.pipe.InputPipe; import net.jxta.pipe.OutputPipe; import net.jxta.pipe.PipeMsgEvent; import net.jxta.pipe.PipeMsgListener; import net.jxta.pipe.PipeService; import net.jxta.protocol.PipeAdvertisement; import net.jxta.protocol.RouteAdvertisement; import net.jxta.peer.PeerID; import java.io.IOException; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Hashtable; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; /** * Master Node defines a protocol by which a set of nodes connected to a JXTA infrastructure group * may dynamically organize into a group with a determinically self elected master. The protocol is * composed of a JXTA message containing a "NAD", and "ROUTE" and one of the following : *

* -"MQ", is used to query for a master node *

* -"NR", is used to respond to a "MQ", which also contains the following *

* -"AMV", is the MasterView of the cluster *

* -"CCNTL", is used to indicate collision between two nodes *

* -"NADV", contains a node's SystemAdvertisement *

* -"ROUTE", contains a node's list of current physical addresses, which is used to issue ioctl to the JXTA * endpoint to update any existing routes to the nodes. (useful when a node changes physical addresses. * *

* MasterNode will attempt to discover a master node with the specified timeout (timeout * number of iterations) * after which, it determine whether to become a master, if it happens to be first node in the ordered list of discovered nodes. * Note: due to startup time, a master node may not always be the first node node. However if the master node fails, * the first node is expected to assert it's designation as the master, otherwise, all nodes will repeat the master node discovery * process. */ class MasterNode implements PipeMsgListener, Runnable { private static final Logger LOG = JxtaUtil.getLogger(MasterNode.class.getName()); private final ClusterManager manager; private InputPipe inputPipe; private OutputPipe outputPipe; private boolean masterAssigned = false; private volatile boolean discoveryInProgress = true; private ID localNodeID = ID.nullID; private final SystemAdvertisement sysAdv; private PipeAdvertisement pipeAdv = null; private final PipeService pipeService; private final MessageElement sysAdvElement; private MessageElement routeAdvElement = null; private volatile boolean started = false; private volatile boolean stop = false; private Thread thread = null; private ClusterViewManager clusterViewManager; private ClusterView discoveryView; private final AtomicLong masterViewID = new AtomicLong(); //Collision control final String MASTERLOCK = new String("MASTERLOCK"); private static final String CCNTL = "CCNTL"; private static final String MASTERNODE = "MN"; private static final String MASTERQUERY = "MQ"; private static final String NODEQUERY = "NQ"; private static final String MASTERNODERESPONSE = "MR"; private static final String NODERESPONSE = "NR"; private static final String NAMESPACE = "MASTER"; private static final String NODEADV = "NAD"; private static final String ROUTEADV = "ROUTE"; private static final String AMASTERVIEW = "AMV"; private static final String MASTERVIEWSEQ = "SEQ"; private int interval = 6; // Default master node discovery timeout private long timeout = 10 * 1000L; private static final String VIEW_CHANGE_EVENT = "VCE"; private RouteControl routeControl = null; private MessageTransport endpointRouter = null; private transient Map pipeCache = new Hashtable(); private boolean clusterStopping = false; final Object discoveryLock = new Object(); /** * Constructor for the MasterNode object * * @param timeout - waiting intreval to receive a response to a master * node discovery * @param interval - number of iterations to perform master node discovery * @param manager the cluster manager */ MasterNode(final ClusterManager manager, final long timeout, final int interval) { PeerGroup group = manager.getNetPeerGroup(); pipeService = group.getPipeService(); localNodeID = group.getPeerID(); if (timeout > 0) { this.timeout = timeout; } this.interval = interval; this.manager = manager; sysAdv = manager.getSystemAdvertisement(); discoveryView = new ClusterView(sysAdv); sysAdvElement = new TextDocumentMessageElement(NODEADV, (XMLDocument) manager.getSystemAdvertisement() .getDocument(MimeMediaType.XMLUTF8), null); //used to ensure up to date routes are used endpointRouter = (group.getEndpointService()).getMessageTransport("jxta"); if (endpointRouter != null) { routeControl = (RouteControl) endpointRouter.transportControl(EndpointRouter.GET_ROUTE_CONTROL, null); RouteAdvertisement route = routeControl.getMyLocalRoute(); if (route != null) { routeAdvElement = new TextDocumentMessageElement(ROUTEADV, (XMLDocument) route.getDocument(MimeMediaType.XMLUTF8), null); } } try { // create the pipe advertisement, to be used in creating the pipe pipeAdv = createPipeAdv(); //create output outputPipe = pipeService.createOutputPipe(pipeAdv, 0); } catch (IOException io) { io.printStackTrace(); LOG.log(Level.WARNING, "Failed to create master outputPipe : " + io); } } /** * in the event of a failure or planned shutdown, remove the * pipe from the pipeCache */ void removePipeFromCache(ID token) { pipeCache.remove(token); } void clearPipeCache() { pipeCache.clear(); } /** * returns the cumulative MasterNode timeout * * @return timeout */ long getTimeout() { return timeout * interval; } /** * Sets the Master Node peer ID, also checks for collisions at which event * A Conflict Message is sent to the conflicting node, and master designation * is reiterated over the wire after a short timeout * * @param systemAdv the system advertisement * @return true if no collisions detected, false otherwise */ boolean checkMaster(final SystemAdvertisement systemAdv) { if (masterAssigned && isMaster()) { LOG.log(Level.FINER,"checkMaster : clusterStopping() = " + clusterStopping); if (clusterStopping) { //accept the DAS as the new Master LOG.log(Level.FINER, "Resigning Master Node role in anticipation of a master node announcement"); LOG.log(Level.FINER, "Accepting DAS as new master in the event of cluster stopping..."); clusterViewManager.setMaster(systemAdv, false); masterAssigned = true; return false; } LOG.log(Level.FINER, "Master node role collision with " + systemAdv.getName() + " .... attempting to resolve"); send(systemAdv.getID(), systemAdv.getName(), createMasterCollisionMessage()); //TODO add code to ensure whether this node should remain as master or resign (basically noop) if (manager.getNodeID().toString().compareTo(systemAdv.getID().toString()) >= 0) { LOG.log(Level.FINER, "Affirming Master Node role"); // 2008.06.10, added by carryel // When affirming master node role, notify MASTER_CHANGE_EVNET in the master collison clusterViewManager.notifyListeners( new ClusterViewEvent(ClusterViewEvents.MASTER_CHANGE_EVENT, manager.getSystemAdvertisement() ) ); } else { LOG.log(Level.FINER, "Resigning Master Node role in anticipation of a master node announcement"); clusterViewManager.setMaster(systemAdv, false); } return false; } else return true; // 2008.06.10, removed by carryel // When no collision detected, new master setting will be done with new master's view in processMasterNodeAnnouncement() /* else { clusterViewManager.setMaster(systemAdv, true); masterAssigned = true; synchronized (MASTERLOCK) { MASTERLOCK.notifyAll(); } LOG.log(Level.FINE, "Discovered a Master node :" + systemAdv.getName()); } return true; */ } /** * Creates a Master Collision Message. A collision message is used * to indicate the conflict. Nodes receiving this message then required to * assess the candidate master node based on their knowledge of the network * should await for an assertion of the master node candidate * * @return Master Collision Message */ private Message createMasterCollisionMessage() { final Message msg = createSelfNodeAdvertisement(); final MessageElement el = new StringMessageElement(CCNTL, localNodeID.toString(), null); msg.addMessageElement(NAMESPACE, el); LOG.log(Level.FINER, "Created a Master Collision Message"); return msg; } private Message createSelfNodeAdvertisement() { Message msg = new Message(); msg.addMessageElement(NAMESPACE, sysAdvElement); return msg; } private void sendSelfNodeAdvertisement(final ID id, final String name) { final Message msg = createSelfNodeAdvertisement(); LOG.log(Level.FINER, "Sending a Node Response Message "); final MessageElement el = new StringMessageElement(NODERESPONSE, "noderesponse", null); msg.addMessageElement(NAMESPACE, el); send(id, name, msg); } /** * Creates a Master Query Message * * @return a message containing a master query */ private Message createMasterQuery() { final Message msg = createSelfNodeAdvertisement(); final MessageElement el = new StringMessageElement(MASTERQUERY, "query", null); msg.addMessageElement(NAMESPACE, el); addRoute(msg); LOG.log(Level.FINER, "Created a Master Node Query Message "); return msg; } void addRoute(Message msg) { if (routeAdvElement != null && routeControl != null) { msg.addMessageElement(NAMESPACE, routeAdvElement); } } /** * Creates a Node Query Message * * @return a message containing a node query */ private Message createNodeQuery() { final Message msg = createSelfNodeAdvertisement(); final MessageElement el = new StringMessageElement(NODEQUERY, "nodequery", null); msg.addMessageElement(NAMESPACE, el); if (routeAdvElement != null && routeControl != null) { msg.addMessageElement(NAMESPACE, routeAdvElement); } LOG.log(Level.FINER, "Created a Node Query Message "); return msg; } /** * Creates a Master Response Message * * @param masterID the MasterNode ID * @param announcement if true, creates an anouncement type message, otherwise it creates a response type. * @return a message containing a MasterResponse element */ private Message createMasterResponse(boolean announcement, final ID masterID) { final Message msg = createSelfNodeAdvertisement(); String type = MASTERNODE; if (!announcement) { type = MASTERNODERESPONSE; } final MessageElement el = new StringMessageElement(type, masterID.toString(), null); msg.addMessageElement(NAMESPACE, el); if (routeAdvElement != null && routeControl != null) { msg.addMessageElement(NAMESPACE, routeAdvElement); } LOG.log(Level.FINER, "Created a Master Response Message with masterId = " + masterID.toString()); return msg; } /** * Constructs a propagated PipeAdvertisement for the MasterNode discovery * protocol * * @return MasterNode discovery protocol PipeAdvertisement */ private PipeAdvertisement createPipeAdv() { final PipeAdvertisement pipeAdv; // create the pipe advertisement, to be used in creating the pipe pipeAdv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType()); pipeAdv.setPipeID(manager.getNetworkManager().getMasterPipeID()); pipeAdv.setType(PipeService.PropagateType); return pipeAdv; } /** * Returns the ID of a discovered Master node * * @return the MasterNode ID */ boolean discoverMaster() { masterViewID.set(clusterViewManager.getMasterViewID()); final long timeToWait = timeout; LOG.log(Level.FINER, "Attempting to discover a master node"); Message query = createMasterQuery(); send(null, null, query); LOG.log(Level.FINER, " waiting for " + timeout + " ms"); try { synchronized (MASTERLOCK) { MASTERLOCK.wait(timeToWait); } } catch (InterruptedException intr) { Thread.interrupted(); LOG.log(Level.FINER, "Thread interrupted", intr); } LOG.log(Level.FINE, "masterAssigned=" + masterAssigned); return masterAssigned; } /** * Returns true if this node is the master node * * @return The master value */ boolean isMaster() { LOG.log(Level.FINER, "isMaster :" + clusterViewManager.isMaster() + " MasterAssigned :" + masterAssigned + " View Size :" + clusterViewManager.getViewSize()); return clusterViewManager.isMaster(); } /** * Returns true if this node is the master node * * @return The master value */ boolean isMasterAssigned() { return masterAssigned; } /** * Returns master node ID * * @return The master node ID */ ID getMasterNodeID() { return clusterViewManager.getMaster().getID(); } /** * return true if this service has been started, false otherwise * * @return true if this service has been started, false otherwise */ synchronized boolean isStarted() { return started; } /** * Resets the master node designation to the original state. This is typically * done when an existing master leaves or fails and a new master node is to * selected. */ void resetMaster() { LOG.log(Level.FINER, "Resetting Master view"); masterAssigned = false; } /** * Parseses out the source SystemAdvertisement * * @param msg the Message * @return true if the message is a MasterNode announcement message * @throws IOException if an io error occurs */ SystemAdvertisement processNodeAdvertisement(final Message msg) throws IOException { final MessageElement msgElement = msg.getMessageElement(NAMESPACE, NODEADV); if (msgElement == null) { // no need to go any further LOG.log(Level.WARNING, "Missing NODEADV message element"); JxtaUtil.printMessageStats(msg, false); return null; } final StructuredDocument asDoc; asDoc = StructuredDocumentFactory.newStructuredDocument(msgElement.getMimeType(), msgElement.getStream()); final SystemAdvertisement adv = new SystemAdvertisement(asDoc); if (!adv.getID().equals(localNodeID)) { LOG.log(Level.FINER, "Received a System advertisment Name :" + adv.getName()); } return adv; } /** * Processes a MasterNode announcement. * * @param msg the Message * @param source the source node SystemAdvertisement * @return true if the message is a MasterNode announcement message * @throws IOException if an io error occurs */ boolean processMasterNodeAnnouncement(final Message msg, final SystemAdvertisement source) throws IOException { MessageElement msgElement = msg.getMessageElement(NAMESPACE, MASTERNODE); if (msgElement == null) { return false; } processRoute(msg); LOG.log(Level.FINER, "Received a Master Node Announcement from Name :" + source.getName()); if (checkMaster(source)) { msgElement = msg.getMessageElement(NAMESPACE, AMASTERVIEW); if ( msgElement == null ) { clusterViewManager.setMaster( source, true ); masterAssigned = true; synchronized( MASTERLOCK ) { MASTERLOCK.notifyAll(); } LOG.log( Level.FINE, "Discovered a Master node :" + source.getName() ); return true; } final ArrayList newLocalView = (ArrayList) getObjectFromByteArray(msgElement); if (newLocalView != null) { LOG.log(Level.FINER, MessageFormat.format("Received an authoritative view from {0}, of size {1}" + " resetting local view containing {2}", source.getName(), newLocalView.size(), clusterViewManager.getLocalView().getSize())); } long seqID = getLongFromMessage(msg, NAMESPACE, MASTERVIEWSEQ); msgElement = msg.getMessageElement(NAMESPACE, VIEW_CHANGE_EVENT); if ( msgElement == null ) { clusterViewManager.setMaster( source, true ); masterAssigned = true; synchronized( MASTERLOCK ) { MASTERLOCK.notifyAll(); } LOG.log( Level.FINE, "Discovered a Master node :" + source.getName() ); LOG.log(Level.WARNING, "New View Received without corresponding ViewChangeEvent details"); //TODO according to the implementation MasterNode does not include VIEW_CHANGE_EVENT //when it announces a Authortative master view //throw new IOException("New View Received without corresponding ViewChangeEvent details"); return true; } LOG.log(Level.FINEST, "MasterNode:PMNA: Received Master View with Seq Id="+seqID + "Current sequence is "+clusterViewManager.getMasterViewID()); if (seqID <= clusterViewManager.getMasterViewID()) { clusterViewManager.setMaster( source, true ); masterAssigned = true; synchronized( MASTERLOCK ) { MASTERLOCK.notifyAll(); } LOG.log( Level.FINE, "Discovered a Master node :" + source.getName() ); LOG.log(Level.FINER, MessageFormat.format("Received an older clusterView sequence {0}." + " Current sequence :{1} discarding out of sequence view", seqID, clusterViewManager.getMasterViewID())); return true; } final ClusterViewEvent cvEvent = (ClusterViewEvent) getObjectFromByteArray(msgElement); assert newLocalView != null; if (!newLocalView.contains(manager.getSystemAdvertisement())) { // 2008.06.10, added by carryel // before calling sendSelfnodeAdvertisement(), new master setting should be done with new master's view clusterViewManager.setMaster( newLocalView, source ); masterAssigned = true; synchronized( MASTERLOCK ) { MASTERLOCK.notifyAll(); } LOG.log( Level.FINE, "Discovered a Master node :" + source.getName() ); LOG.log(Level.FINER, "New ClusterViewManager does not contain self. Publishing Self"); sendSelfNodeAdvertisement(source.getID(), null); //update the view once the the master node includes this node return true; } clusterViewManager.setMasterViewID(seqID); masterViewID.set(seqID); LOG.log(Level.FINER, "MN: New MasterViewID = "+clusterViewManager.getMasterViewID()); // 2008.06.10, edited by carryel // new master setting should be done with new master's view // If master changed, MASTER_CHANGE_EVENT would be notified in setMaster() boolean masterChanged = clusterViewManager.setMaster( newLocalView, source ); masterAssigned = true; synchronized( MASTERLOCK ) { MASTERLOCK.notifyAll(); } LOG.log( Level.FINE, "Discovered a Master node :" + source.getName() ); if ( masterChanged && cvEvent.getEvent() != ClusterViewEvents.MASTER_CHANGE_EVENT ) clusterViewManager.notifyListeners( cvEvent ); else { clusterViewManager.addToView(newLocalView, true, cvEvent); } //LOG.log(Level.FINER, "MN: New MasterViewID = "+clusterViewManager.getMasterViewID()); //clusterViewManager.addToView(newLocalView, true, cvEvent); } //synchronized (MASTERLOCK) { // MASTERLOCK.notifyAll(); //} return true; } /** * Processes a MasterNode response. * * @param msg the Message * @param source the source node SystemAdvertisement * @return true if the message is a master node response message * @throws IOException if an io error occurs */ boolean processMasterNodeResponse(final Message msg, final SystemAdvertisement source) throws IOException { MessageElement msgElement = msg.getMessageElement(NAMESPACE, MASTERNODERESPONSE); if ( msgElement == null ) return false; LOG.log(Level.FINE, "Received a MasterNode Response from Name :" + source.getName()); processRoute(msg); msgElement = msg.getMessageElement(NAMESPACE, AMASTERVIEW); if ( msgElement == null ) { clusterViewManager.setMaster( source, true ); masterAssigned = true; synchronized( MASTERLOCK ) { MASTERLOCK.notifyAll(); } return true; } final ArrayList newLocalView = (ArrayList) getObjectFromByteArray(msgElement); msgElement = msg.getMessageElement(NAMESPACE, VIEW_CHANGE_EVENT); if ( msgElement == null ) { clusterViewManager.setMaster( source, true ); masterAssigned = true; synchronized( MASTERLOCK ) { MASTERLOCK.notifyAll(); } return true; } long seqID = getLongFromMessage(msg, NAMESPACE, MASTERVIEWSEQ); LOG.log(Level.FINEST, "MasterNode:PMNR Received Master View with Seq Id="+seqID); if (seqID <= clusterViewManager.getMasterViewID()) { clusterViewManager.setMaster( source, true ); masterAssigned = true; synchronized( MASTERLOCK ) { MASTERLOCK.notifyAll(); } LOG.log(Level.FINER, MessageFormat.format("Received an older clusterView sequence {0} of size :{1}" + " Current sequence :{2} discarding out of sequence view", seqID, newLocalView.size(), clusterViewManager.getMasterViewID())); return true; } else { LOG.log(Level.FINER, MessageFormat.format("Received a VIEW_CHANGE_EVENT from : {0}, seqID of :{1}, size :{2}", source.getName(), seqID, newLocalView.size())); } final ClusterViewEvent cvEvent = (ClusterViewEvent) getObjectFromByteArray(msgElement); if (!newLocalView.contains(manager.getSystemAdvertisement())) { // 2008.06.10, added by carryel // before calling sendSelfnodeAdvertisement(), new master setting should be done with new master's view clusterViewManager.setMaster( newLocalView, source ); masterAssigned = true; synchronized( MASTERLOCK ) { MASTERLOCK.notifyAll(); } LOG.log(Level.FINER, "Received view does not contain self. Publishing self"); sendSelfNodeAdvertisement(source.getID(), null); //update the view once the the master node includes this node return true; } clusterViewManager.setMasterViewID(seqID); masterViewID.set(seqID); // 2008.06.10, edited by carryel // new master setting should be done with new master's view // If master changed, MASTER_CHANGE_EVENT would be notified in setMatser() boolean masterChanged = clusterViewManager.setMaster( newLocalView, source ); masterAssigned = true; if ( masterChanged && cvEvent.getEvent() != ClusterViewEvents.MASTER_CHANGE_EVENT ) clusterViewManager.notifyListeners( cvEvent ); else clusterViewManager.addToView(newLocalView, true, cvEvent); synchronized (MASTERLOCK) { MASTERLOCK.notifyAll(); } return true; } /** * Processes a cluster change event. This results in adding the new * members to the cluster view, and subsequenlty in application notification. * * @param msg the Message * @param source the source node SystemAdvertisement * @return true if the message is a change event message * @throws IOException if an io error occurs */ boolean processChangeEvent(final Message msg, final SystemAdvertisement source) throws IOException { MessageElement msgElement = msg.getMessageElement(NAMESPACE, VIEW_CHANGE_EVENT); LOG.log(Level.FINER,"Inside processChangeEvent..." ); if (msgElement != null) { final ClusterViewEvent cvEvent = (ClusterViewEvent) getObjectFromByteArray(msgElement); msgElement = msg.getMessageElement(NAMESPACE, AMASTERVIEW); if (msgElement != null && cvEvent != null) { long seqID = getLongFromMessage(msg, NAMESPACE, MASTERVIEWSEQ); if (seqID <= clusterViewManager.getMasterViewID()) { LOG.log(Level.FINER, MessageFormat.format("Received an older clusterView sequence {0}." + " Current sequence :{1} discarding out of sequence view", seqID, clusterViewManager.getMasterViewID())); return true; } final ArrayList newLocalView = (ArrayList) getObjectFromByteArray(msgElement); LOG.log(Level.FINER, MessageFormat.format("Received a new view of size :{0}, event :{1}", newLocalView.size(), cvEvent.getEvent().toString())); if (!newLocalView.contains(manager.getSystemAdvertisement())) { LOG.log(Level.FINER, "Received ClusterViewManager does not contain self. Publishing Self"); sendSelfNodeAdvertisement(source.getID(), null); //update the view once the the master node includes this node return true; } clusterViewManager.setMasterViewID(seqID); masterViewID.set(seqID); clusterViewManager.addToView(newLocalView, true, cvEvent); return true; } } return false; } /** * Processes a Masternode Query message. This results in a master node * response if this node is a master node. * * @param msg the Message * @param adv the source node SystemAdvertisement * @return true if the message is a query message * @throws IOException if an io error occurs */ boolean processMasterNodeQuery(final Message msg, final SystemAdvertisement adv) throws IOException { final MessageElement msgElement = msg.getMessageElement(NAMESPACE, MASTERQUERY); if (msgElement == null || adv == null) { return false; } processRoute(msg); if (isMaster() && masterAssigned) { LOG.log(Level.FINER, MessageFormat.format("Received a MasterNode Query from Name :{0} ID :{1}", adv.getName(), adv.getID())); final ClusterViewEvent cvEvent = new ClusterViewEvent(ADD_EVENT, adv); clusterViewManager.setMasterViewID(masterViewID.incrementAndGet()); clusterViewManager.notifyListeners(cvEvent); sendNewView(null, cvEvent, createMasterResponse(false, localNodeID), true); } return true; } /** * Processes a Masternode Query message. This results in a master node * response if this node is a master node. * * @param msg the Message * @param adv the source node SystemAdvertisement * @return true if the message is a query message * @throws IOException if an io error occurs */ boolean processNodeQuery(final Message msg, final SystemAdvertisement adv) throws IOException { final MessageElement msgElement = msg.getMessageElement(NAMESPACE, NODEQUERY); if (msgElement == null || adv == null) { return false; } processRoute(msg); LOG.log(Level.FINER, MessageFormat.format("Received a Node Query from Name :{0} ID :{1}", adv.getName(), adv.getID())); if (isMaster() && masterAssigned) { LOG.log(Level.FINER, MessageFormat.format("Received a Node Query from Name :{0} ID :{1}", adv.getName(), adv.getID())); final ClusterViewEvent cvEvent = new ClusterViewEvent(ADD_EVENT, adv); clusterViewManager.setMasterViewID(masterViewID.incrementAndGet()); clusterViewManager.notifyListeners(cvEvent); sendNewView(null, cvEvent, createMasterResponse(false, localNodeID), true); } else { final Message response = createSelfNodeAdvertisement(); final MessageElement el = new StringMessageElement(NODERESPONSE, "noderesponse", null); response.addMessageElement(NAMESPACE, el); LOG.log(Level.FINER, "Sending Node response to :" + adv.getName()); send(adv.getID(), null, response); } return true; } /** * Processes a Node Response message. * * @param msg the Message * @param adv the source node SystemAdvertisement * @return true if the message is a response message * @throws IOException if an io error occurs */ boolean processNodeResponse(final Message msg, final SystemAdvertisement adv) throws IOException { final MessageElement msgElement = msg.getMessageElement(NAMESPACE, NODERESPONSE); if (msgElement == null || adv == null) { return false; } processRoute(msg); if (isMaster() && masterAssigned) { LOG.log(Level.FINER, MessageFormat.format("Received a Node Response from Name :{0} ID :{1}", adv.getName(), adv.getID())); final ClusterViewEvent cvEvent = new ClusterViewEvent(ADD_EVENT, adv); clusterViewManager.setMasterViewID(masterViewID.incrementAndGet()); clusterViewManager.notifyListeners(cvEvent); sendNewView(null, cvEvent, createMasterResponse(false, localNodeID), true); } return true; } void processRoute(final Message msg) { LOG.log(Level.FINER,"Inside processRoute..."); try { final MessageElement routeElement = msg.getMessageElement(NAMESPACE, ROUTEADV); if (routeElement != null && routeControl != null) { XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument( routeElement.getMimeType(), routeElement.getStream()); final RouteAdvertisement route = (RouteAdvertisement) AdvertisementFactory.newAdvertisement(asDoc); manager.cacheRoute(route); routeControl.addRoute(route); } } catch (IOException io) { io.printStackTrace(); LOG.log(Level.WARNING, io.getLocalizedMessage()); } } /** * Processes a MasterNode Collision. When two nodes assume a master role (by assertion through * a master node announcement), each node can indepedentaly and deterministically elect the master node. * This is done through electing the node atop of the NodeID sort order. If there are more than two * nodes in collision, this same process is repeated. * * @param msg the Message * @param adv the source node SystemAdvertisement * @return true if the message was indeed a collision message * @throws IOException if an io error occurs */ boolean processMasterNodeCollision(final Message msg, final SystemAdvertisement adv) throws IOException { final MessageElement msgElement = msg.getMessageElement(NAMESPACE, CCNTL); if (msgElement == null) { return false; } LOG.log(Level.FINER, MessageFormat.format("Received a MasterNode Collision from Name :{0} ID :{1}", adv.getName(), adv.getID())); final SystemAdvertisement madv = manager.getSystemAdvertisement(); LOG.log(Level.FINER, "Candidate Master :" + madv.getName()); if (madv.getID().toString().compareTo(adv.getID().toString()) >= 0) { LOG.log(Level.FINER, "Affirming Master Node role"); synchronized (MASTERLOCK) { //Ensure the view SeqID is incremented by 2 clusterViewManager.setMasterViewID(masterViewID.incrementAndGet()); announceMaster(manager.getSystemAdvertisement()); MASTERLOCK.notifyAll(); } } else { LOG.log(Level.FINER, "Resigning Master Node role"); clusterViewManager.setMaster(adv, true); } return true; } /** * Probes a node. Used when a node does not exist in local view * * @param entry node entry * @throws IOException if an io error occurs sending the message */ void probeNode(final HealthMessage.Entry entry) throws IOException { if (isMaster() && masterAssigned) { LOG.log(Level.FINER, "Probing ID = " + entry.id + ", name = " + entry.adv.getName()); send(entry.id, null, createNodeQuery()); } } /** * {@inheritDoc} */ public void pipeMsgEvent(final PipeMsgEvent event) { LOG.log(Level.FINEST, "Received a message inside pipeMsgEvent"); if (manager.isStopping()) { LOG.log(Level.FINE, "Since this Peer is Stopping, returning without processing incoming master node message. "); return; } if (isStarted()) { final Message msg; // grab the message from the event msg = event.getMessage(); if (msg == null) { LOG.log(Level.WARNING, "Received a null message"); return; } try { final SystemAdvertisement adv = processNodeAdvertisement(msg); if (adv != null && adv.getID().equals(localNodeID)) { LOG.log(Level.FINEST, "Discarding loopback message"); return; } // add the advertisement to the list if (adv != null) { if (isMaster() && masterAssigned) { clusterViewManager.add(adv); } else if (discoveryInProgress) { discoveryView.add(adv); } } if (processMasterNodeQuery(msg, adv)) { return; } if (processMasterNodeResponse(msg, adv)) { return; } if (processMasterNodeAnnouncement(msg, adv)) { return; } if (processMasterNodeCollision(msg, adv)) { return; } if (processChangeEvent(msg, adv)) { return; } if (processNodeQuery(msg, adv)) { return; } if (processNodeResponse(msg, adv)) { return; } } catch (IOException e) { e.printStackTrace(); LOG.log(Level.WARNING, e.getLocalizedMessage()); } LOG.log(Level.FINER, MessageFormat.format("ClusterViewManager contains {0} entries", clusterViewManager.getViewSize())); } else { LOG.log(Level.FINER, "Started : " + isStarted()); } } private void announceMaster(SystemAdvertisement adv) { final Message msg = createMasterResponse(true, adv.getID()); final ClusterViewEvent cvEvent = new ClusterViewEvent(ClusterViewEvents.MASTER_CHANGE_EVENT, adv); if(masterAssigned && isMaster()){ LOG.log(Level.FINER, MessageFormat.format("Announcing Master Node designation Local view contains" + " {0} entries", clusterViewManager.getViewSize())); sendNewView(null, cvEvent, msg, true); } } /** * MasterNode discovery thread. Starts the master node discovery protocol */ public void run() { startMasterNodeDiscovery(); } /** * Starts the master node discovery protocol */ void startMasterNodeDiscovery() { int count = 0; //assumes self as master node synchronized (this) { clusterViewManager.start(); } if (masterAssigned) { discoveryInProgress = false; synchronized (discoveryLock) { discoveryLock.notifyAll(); } return; } while (!stop && count < interval) { if (!discoverMaster()) { // TODO: Consider changing this approach to a background reaper // that would reconcole the group from time to time, consider // using an incremental timeout interval ex. 800, 1200, 2400, // 4800, 9600 ms for iteration periods, then revert to 800 count++; } else { break; } } // timed out if (!masterAssigned) { LOG.log(Level.FINER, "MN Discovery timeout, appointing master"); appointMasterNode(); } LOG.log(Level.FINEST,"startMasterNodeDiscovery making discoveryInProgress false"); discoveryInProgress = false; synchronized (discoveryLock) { discoveryLock.notifyAll(); } } /** * determines a master node candidate, if the result turns to be this node * then a master node announcement is made to assert such state */ void appointMasterNode() { if (masterAssigned) { return; } final SystemAdvertisement madv; LOG.log(Level.FINER, "MasterNode: discoveryInProgress="+discoveryInProgress); if (discoveryInProgress) { madv = discoveryView.getMasterCandidate(); } else { madv = clusterViewManager.getMasterCandidate(); } LOG.log(Level.FINER, "MasterNode: Master Candidate="+madv.getName()); // 2008.06.10, removed by carryel // new master setting should be done with new master's view. So we put off setMaster() // If master changed, MASTER_CHANGE_EVENT would be notified in setMatser() //avoid notifying listeners //clusterViewManager.setMaster(madv, false); //masterAssigned = true; if (madv.getID().equals(localNodeID)) { LOG.log(Level.FINER, "MasterNode: Setting myself as MasterNode "); clusterViewManager.setMasterViewID(masterViewID.incrementAndGet()); LOG.log(Level.FINER, "MasterNode: masterViewId ="+masterViewID ); // generate view change event if (discoveryInProgress) { List list = discoveryView.getView(); // 2008.06.10, edited by carryel // new master setting should be done with new master's view // If master changed, MASTER_CHANGE_EVENT would be notified in setMatser() //final ClusterViewEvent cvEvent = new ClusterViewEvent(ClusterViewEvents.MASTER_CHANGE_EVENT, madv); boolean masterChanged = clusterViewManager.setMaster( list, madv ); if ( !masterChanged ) { // If master was not changed(this is normal case), we notify MASTER_CHANGE_EVENT forcefully. // because if current member becomes a group leader, current member must receive own JoinNotification. final ClusterViewEvent cvEvent = new ClusterViewEvent(ClusterViewEvents.MASTER_CHANGE_EVENT, madv); clusterViewManager.notifyListeners(cvEvent); } masterAssigned = true; ///clusterViewManager.addToView(list, true, cvEvent); } else { clusterViewManager.setMaster(madv, false); masterAssigned = true; LOG.log(Level.FINER, "MasterNode: Notifying Local Listeners of Master Change"); final ClusterViewEvent cvEvent = new ClusterViewEvent(ClusterViewEvents.MASTER_CHANGE_EVENT, madv); clusterViewManager.notifyListeners(cvEvent); } } else { // 2008.06.10, commented by carryel // You must not call setMaster(). Though now current member doesn't set the master, // current member can receive the master change event through real group leader's announceMaster() later. //clusterViewManager.setMaster(madv, false); //masterAssigned = true; } discoveryView.clear(); discoveryView.add(sysAdv); synchronized (MASTERLOCK) { if (madv.getID().equals(localNodeID)) { // this thread's job is done LOG.log(Level.FINER, "Assuming Master Node designation ..."); // 2008.06.10, edited by carryel // In a short time, though one more member joined the group, clusterViewManager.getViewSize() could be equal to 1. // so we should call announceMaster() for safety. //broadcast we are the masternode if view size is more than one //if (clusterViewManager.getViewSize() > 1) { LOG.log(Level.FINER, "MasterNode: announcing MasterNode assumption "); announceMaster(manager.getSystemAdvertisement()); //} MASTERLOCK.notifyAll(); } } } /** * Send a message to a specific node. In the case where the id is null the * message multicast * * @param peerid the destination node, if null, the message is sent to the cluster * @param msg the message to send * @param name name used for debugging messages */ private void send(final ID peerid, final String name, final Message msg) { try { if (peerid != null) { // Unicast datagram // create a op pipe to the destination peer LOG.log(Level.FINER, "Unicasting Message to :" + name + "ID=" + peerid); OutputPipe output = null; if (!pipeCache.containsKey(peerid)) { RouteAdvertisement route = manager.getCachedRoute((PeerID) peerid); if (route != null) { output = new BlockingWireOutputPipe(manager.getNetPeerGroup(), pipeAdv, (PeerID) peerid, route); } if (output == null) { // Unicast datagram // create a op pipe to the destination peer output = pipeService.createOutputPipe(pipeAdv, Collections.singleton(peerid), 1); } pipeCache.put(peerid, output); } else { output = pipeCache.get(peerid); if (output.isClosed()) { output = pipeService.createOutputPipe(pipeAdv, Collections.singleton(peerid), 1); pipeCache.put(peerid, output); } } output.send(msg); } else { // multicast LOG.log(Level.FINER, "Broadcasting Message"); outputPipe.send(msg); } } catch (IOException io) { LOG.log(Level.FINEST, "Failed to send message", io); } } /** * Sends the discovered view to the group indicating a new membership snapshot has been * created. This will lead to all members replacing their localviews to * this new view. * * @param event The ClusterViewEvent object containing details of the event. * @param msg The message to send * @param toID receipient ID * @param includeView if true view will be included in the message */ void sendNewView(final ID toID, final ClusterViewEvent event, final Message msg, final boolean includeView) { if (includeView) { addAuthoritativeView(msg); } final ByteArrayMessageElement cvEvent = new ByteArrayMessageElement(VIEW_CHANGE_EVENT, MimeMediaType.AOS, JxtaUtil. createByteArrayFromObject(event), null); LOG.log(Level.FINER, MessageFormat.format("Created a view element of size {0}bytes", cvEvent.getByteLength())); msg.addMessageElement(NAMESPACE, cvEvent); LOG.log(Level.FINER, "Sending new authoritative cluster view to group, event :" + event.getEvent().toString()+" viewSeqId: "+clusterViewManager.getMasterViewID()); send(toID, null, msg); } /** * Adds an authoritative message element to a Message * * @param msg The message to add the view to */ void addAuthoritativeView(final Message msg) { final List view; view = clusterViewManager.getLocalView().getView(); LOG.log(Level.FINER, "MasterNode: Adding Authoritative View of size "+view.size()+ " to view message"); final ByteArrayMessageElement bame = new ByteArrayMessageElement(AMASTERVIEW, MimeMediaType.AOS, JxtaUtil.createByteArrayFromObject(view), null); msg.addMessageElement(NAMESPACE, bame); LOG.log(Level.FINER, MessageFormat.format("Created an Authoritative view element of size {0}bytes", bame.getByteLength())); addLongToMessage(msg, NAMESPACE, MASTERVIEWSEQ, masterViewID.longValue()); } /** * Stops this service */ synchronized void stop() { LOG.log(Level.FINER, "Stopping MasterNode"); outputPipe.close(); inputPipe.close(); pipeCache.clear(); discoveryView.clear(); thread = null; masterAssigned = false; started = false; stop = true; discoveryInProgress = false; synchronized (discoveryLock){ discoveryLock.notifyAll(); } } /** * Starts this service. Creates the communication channels, and the MasterNode discovery thread. */ synchronized void start() { LOG.log(Level.FINER, "Starting MasterNode"); this.clusterViewManager = manager.getClusterViewManager(); try { //better set the started flag before the pipe is open //in case messages arrive started = true; inputPipe = pipeService.createInputPipe(pipeAdv, this); } catch (IOException ioe) { LOG.log(Level.SEVERE, "Failed to create service input pipe" + ioe); } thread = new Thread(this, "MasterNode"); thread.setDaemon(true); thread.start(); } /** * Sends a ViewChange event to the cluster. * * @param event VievChange event */ void viewChanged(final ClusterViewEvent event) { if (isMaster() && masterAssigned) { //increment the view seqID clusterViewManager.setMasterViewID(masterViewID.incrementAndGet()); Message msg = createSelfNodeAdvertisement(); sendNewView(null, event, msg, true); } } /** * Adds a long to a message * * @param message The message to add to * @param nameSpace The namespace of the element to add. a null value assumes default namespace. * @param elemName Name of the Element. * @param data The feature to be added to the LongToMessage attribute */ private static void addLongToMessage(Message message, String nameSpace, String elemName, long data) { message.addMessageElement(nameSpace, new StringMessageElement(elemName, Long.toString(data), null)); } /** * Returns an long from a message * * @param message The message to retrieve from * @param nameSpace The namespace of the element to get. * @param elemName Name of the Element. * @return The long value, -1 if element does not exist in the message * @throws NumberFormatException If the String does not contain a parsable int. */ private static long getLongFromMessage(Message message, String nameSpace, String elemName) throws NumberFormatException { String seqStr = message.getMessageElement(nameSpace, elemName).toString(); if (seqStr != null) { return Long.parseLong(message.getMessageElement(nameSpace, elemName).toString()); } else { return -1; } } RouteControl getRouteControl() { if (routeControl == null) { routeControl = (RouteControl) endpointRouter.transportControl(EndpointRouter.GET_ROUTE_CONTROL, null); } return routeControl; } /** * This method allows the DAS to become a master by force. This * is especially important when the the DAS is going down and then * coming back up. This way only the DAS will ever be the master. */ void takeOverMasterRole() { synchronized (MASTERLOCK) { final SystemAdvertisement madv = clusterViewManager.get(localNodeID); LOG.log(Level.FINER, "MasterNode: Forcefully becoming the Master..." + madv.getName()); //avoid notifying listeners clusterViewManager.setMaster(madv, false); masterAssigned = true; clusterViewManager.setMasterViewID(masterViewID.incrementAndGet()); LOG.log(Level.FINER, "MasterNode: becomeMaster () : masterViewId ="+masterViewID ); // generate view change event LOG.log(Level.FINER, "MasterNode: becomeMaster () : Notifying Local Listeners of Master Change"); final ClusterViewEvent cvEvent = new ClusterViewEvent(ClusterViewEvents.MASTER_CHANGE_EVENT, madv); clusterViewManager.notifyListeners(cvEvent); discoveryView.clear(); discoveryView.add(sysAdv); //broadcast we are the masternode if view size is more than one if (clusterViewManager.getViewSize() > 1) { LOG.log(Level.FINER, "MasterNode: becomeMaster () : announcing MasterNode assumption "); announceMaster(manager.getSystemAdvertisement()); } MASTERLOCK.notifyAll(); manager.notifyNewMaster(); } } void setClusterStopping() { clusterStopping = true; } ClusterViewEvent sendReadyEventView(final SystemAdvertisement adv) { final ClusterViewEvent cvEvent = new ClusterViewEvent(ClusterViewEvents.JOINED_AND_READY_EVENT, adv); LOG.log(Level.FINEST, MessageFormat.format("Sending to Group, Joined and Ready Event View for peer :{0}", adv.getName())); clusterViewManager.setMasterViewID(masterViewID.incrementAndGet()); sendNewView(null, cvEvent, createSelfNodeAdvertisement(), true); return cvEvent; } boolean isDiscoveryInProgress() { return discoveryInProgress; } }