The following is a patch to take advantage of direct messengers since
routes are available in shoal.
Index: ClusterManager.java
===================================================================
RCS file:
/cvs/shoal/gms/src/java/com/sun/enterprise/jxtamgmt/ClusterManager.java,v
retrieving revision 1.31
diff -u -w -d -r1.31 ClusterManager.java
--- ClusterManager.java 7 Feb 2008 17:31:16 -0000 1.31
+++ ClusterManager.java 20 Feb 2008 01:40:51 -0000
@@ -56,6 +56,8 @@
import net.jxta.pipe.PipeMsgListener;
import net.jxta.pipe.PipeService;
import net.jxta.protocol.PipeAdvertisement;
+import net.jxta.protocol.RouteAdvertisement;
+import net.jxta.impl.pipe.BlockingWireOutputPipe;
import java.io.IOException;
import java.io.Serializable;
@@ -67,6 +69,7 @@
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -90,6 +93,7 @@
private static final String NODEADV = "NAD";
private transient Map<String, String> identityMap;
+ private transient Map<PeerID, RouteAdvertisement> routeCache = new
ConcurrentHashMap<PeerID, RouteAdvertisement>();
private PipeAdvertisement pipeAdv;
private PipeService pipeService;
private MessageElement sysAdvElement = null;
@@ -432,11 +436,18 @@
message.addMessageElement(NAMESPACE, bame);
if (peerid != null) {
- OutputPipe output;
+ OutputPipe output = null;
if (!pipeCache.containsKey(peerid)) {
+
+ RouteAdvertisement route = getCachedRoute((PeerID)
peerid);
+ if (route != null) {
+ output = new
BlockingWireOutputPipe(getNetPeerGroup(), pipeAdv, (PeerID) peerid, route);
+ }
+ if (output == null) {
// Unicast datagram
// create a op pipe to the destination peer
output = pipeService.createOutputPipe(pipeAdv,
Collections.singleton(peerid), 1);
+ }
pipeCache.put(peerid, output);
} else {
output = pipeCache.get(peerid);
@@ -632,5 +643,25 @@
public void reportJoinedAndReadyState() {
healthMonitor.reportJoinedAndReadyState();
}
+
+ /**
+ * Caches a route for an instance
+ *
+ * @param route the route advertisement
+ */
+ public void cacheRoute(RouteAdvertisement route) {
+ routeCache.put(route.getDestPeerID(), route);
+ }
+
+ /**
+ * returns the cached route if any, null otherwise
+ *
+ * @param peerid the instance id
+ * @return the cached route if any, null otherwise
+ */
+ public RouteAdvertisement getCachedRoute(PeerID peerid) {
+ return routeCache.get(peerid);
+ }
+
}
Index: HealthMessage.java
===================================================================
RCS file:
/cvs/shoal/gms/src/java/com/sun/enterprise/jxtamgmt/HealthMessage.java,v
retrieving revision 1.8
diff -u -w -d -r1.8 HealthMessage.java
--- HealthMessage.java 9 Nov 2007 23:33:19 -0000 1.8
+++ HealthMessage.java 20 Feb 2008 01:40:51 -0000
@@ -33,8 +33,6 @@
* only if the new code is made subject to such option by the copyright
* holder.
*/
-
-
package com.sun.enterprise.jxtamgmt;
import net.jxta.document.Attributable;
Index: HealthMonitor.java
===================================================================
RCS file:
/cvs/shoal/gms/src/java/com/sun/enterprise/jxtamgmt/HealthMonitor.java,v
retrieving revision 1.63
diff -u -w -d -r1.63 HealthMonitor.java
--- HealthMonitor.java 24 Jan 2008 19:51:34 -0000 1.63
+++ HealthMonitor.java 20 Feb 2008 01:40:51 -0000
@@ -44,6 +44,8 @@
import net.jxta.peer.PeerID;
import net.jxta.pipe.*;
import net.jxta.protocol.PipeAdvertisement;
+import net.jxta.protocol.RouteAdvertisement;
+import net.jxta.impl.pipe.BlockingWireOutputPipe;
import java.io.IOException;
import java.text.MessageFormat;
@@ -410,11 +412,17 @@
// Unicast datagram
// create a op pipe to the destination peer
LOG.log(Level.FINE, "Unicasting Message to :" +
peerid.toString());
- OutputPipe output;
+ OutputPipe output = null;
if (!pipeCache.containsKey(peerid)) {
+ RouteAdvertisement route =
manager.getCachedRoute((PeerID) peerid);
+ if (route != null) {
+ output = new
BlockingWireOutputPipe(manager.getNetPeerGroup(), pipeAdv, (PeerID)
peerid, route);
+ }
+ if (output == null) {
// Unicast datagram
// create a op pipe to the destination peer
output = pipeService.createOutputPipe(pipeAdv,
Collections.singleton(peerid), 1);
+ }
pipeCache.put(peerid, output);
} else {
output = pipeCache.get(peerid);
@@ -480,6 +488,7 @@
/**
* Stops this service
+ *
* @param isClusterShutdown true if the cluster is shutting down
*/
void stop(boolean isClusterShutdown) {
@@ -532,20 +541,17 @@
entry = cache.get((PeerID) peerID);
if (entry != null) {
state = entry.state;
- }
- else {
+ } else {
if(((PeerID)peerID).equals(localPeerID)){
if(!started){
state = states[STARTING];
- }
- else {
+ } else {
state = states[ALIVE];
}
} else {
if(manager.getClusterViewManager().containsKey(peerID)){
state = states[STARTING];//we assume that the peer
is in starting state hence its state is not yet known in this peer
- }
- else {
+ } else {
entry = cache.get((PeerID) peerID);
if (entry != null) {
state = entry.state;
@@ -691,6 +697,7 @@
}
}
}
+
private boolean canProcessInDoubt(final HealthMessage.Entry
entry) {
boolean canProcessIndoubt = false;
if (masterNode.getMasterNodeID().equals(entry.id)) {
@@ -818,7 +825,7 @@
//if System property for InetAddress.isReachable() is set, then
check for the following:
//if InetAddress.isReachable() is true, then check for
isConnected()
//if InetAddress.isReachable() is false, then simply return false
- return masterNode.getRouteControl().isConnected(pid);
+ return masterNode.getRouteControl().isConnected(pid,
manager.getCachedRoute(pid));
}
/*
private void shutdown() {
Index: MasterNode.java
===================================================================
RCS file:
/cvs/shoal/gms/src/java/com/sun/enterprise/jxtamgmt/MasterNode.java,v
retrieving revision 1.61
diff -u -w -d -r1.61 MasterNode.java
--- MasterNode.java 14 Jan 2008 19:23:10 -0000 1.61
+++ MasterNode.java 20 Feb 2008 01:40:51 -0000
@@ -37,19 +37,38 @@
import static com.sun.enterprise.jxtamgmt.ClusterViewEvents.ADD_EVENT;
import static com.sun.enterprise.jxtamgmt.JxtaUtil.getObjectFromByteArray;
-import net.jxta.document.*;
-import net.jxta.endpoint.*;
+import net.jxta.document.AdvertisementFactory;
+import net.jxta.document.MimeMediaType;
+import net.jxta.document.StructuredDocument;
+import net.jxta.document.StructuredDocumentFactory;
+import net.jxta.document.XMLDocument;
+import net.jxta.endpoint.ByteArrayMessageElement;
+import net.jxta.endpoint.Message;
+import net.jxta.endpoint.MessageElement;
+import net.jxta.endpoint.MessageTransport;
+import net.jxta.endpoint.StringMessageElement;
+import net.jxta.endpoint.TextDocumentMessageElement;
import net.jxta.id.ID;
import net.jxta.impl.endpoint.router.EndpointRouter;
import net.jxta.impl.endpoint.router.RouteControl;
+import net.jxta.impl.pipe.BlockingWireOutputPipe;
import net.jxta.peergroup.PeerGroup;
-import net.jxta.pipe.*;
+import net.jxta.pipe.InputPipe;
+import net.jxta.pipe.OutputPipe;
+import net.jxta.pipe.PipeMsgEvent;
+import net.jxta.pipe.PipeMsgListener;
+import net.jxta.pipe.PipeService;
import net.jxta.protocol.PipeAdvertisement;
import net.jxta.protocol.RouteAdvertisement;
+import net.jxta.peer.PeerID;
import java.io.IOException;
import java.text.MessageFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -685,6 +704,7 @@
routeElement.getMimeType(),
routeElement.getStream());
final RouteAdvertisement route = (RouteAdvertisement)
AdvertisementFactory.newAdvertisement(asDoc);
+ manager.cacheRoute(route);
routeControl.addRoute(route);
}
} catch (IOException io) {
@@ -923,11 +943,17 @@
// Unicast datagram
// create a op pipe to the destination peer
LOG.log(Level.FINER, "Unicasting Message to :" + name +
"ID=" + peerid);
- OutputPipe output;
+ OutputPipe output = null;
if (!pipeCache.containsKey(peerid)) {
+ RouteAdvertisement route =
manager.getCachedRoute((PeerID) peerid);
+ if (route != null) {
+ output = new
BlockingWireOutputPipe(manager.getNetPeerGroup(), pipeAdv, (PeerID)
peerid, route);
+ }
+ if (output == null) {
// Unicast datagram
// create a op pipe to the destination peer
output = pipeService.createOutputPipe(pipeAdv,
Collections.singleton(peerid), 1);
+ }
pipeCache.put(peerid, output);
} else {
output = pipeCache.get(peerid);
Sheetal Vartak wrote:
> Kshitiz is also seeing these exceptions with the latest shoal-gms.jar.
>
> Begin forwarded message:
>
>> *From: *Kshitiz Saxena <Kshitiz.Saxena_at_Sun.COM
>> <mailto:Kshitiz.Saxena_at_Sun.COM>>
>> *Date: *February 15, 2008 11:03:11 PM PST
>> *To: *Sheetal Vartak <Sheetal.Vartak_at_Sun.COM
>> <mailto:Sheetal.Vartak_at_Sun.COM>>
>> *Subject: **Re: lots of exceptions while accessing admin GUI*
>>
>> Hi Sheetal,
>>
>> Getting below exception with jar provided by you. Instances are not
>> even coming up.
>>
>> [#|2008-02-16T12:29:02.637+0530|WARNING|sun-comms-appserver1.0|javax.enterprise.system.core.transaction|_ThreadID=15;_ThreadName=JTS
>> Resync Thread;_RequestID=93429b5d-34a8-4985-8fa6-1bfca8516146;|Error
>> while updating the DSC.
>> com.sun.enterprise.ee.cms.core.GMSException: java.io.IOException:
>> Unable to create a messenger to
>> jxta://uuid-3E8CB04EC4414BB495C1BDB91E98193625AFB7356AC94920B695BBF20577946A03/PipeService/urn:jxta:uuid-3E8CB04EC4414BB495C1BDB91E9819366521C3C52E4443928082812BCDC1E25B04
>> at
>> com.sun.enterprise.ee.cms.impl.jxta.GroupCommunicationProviderImpl.sendMessage(GroupCommunicationProviderImpl.java:223)
>> at
>> com.sun.enterprise.ee.cms.impl.jxta.DistributedStateCacheImpl.sendMessage(DistributedStateCacheImpl.java:467)
>> at
>> com.sun.enterprise.ee.cms.impl.jxta.DistributedStateCacheImpl.addToRemoteCache(DistributedStateCacheImpl.java:213)
>> at
>> com.sun.enterprise.ee.cms.impl.jxta.DistributedStateCacheImpl.addToCache(DistributedStateCacheImpl.java:131)
>> at
>> com.sun.enterprise.ee.cms.impl.common.GroupManagementServiceImpl.updateMemberDetails(GroupManagementServiceImpl.java:265)
>> at
>> com.sun.enterprise.ee.server.autotxrecovery.EEAutoTransactionRecoveryServiceImpl.init(EEAutoTransactionRecoveryServiceImpl.java:168)
>> at
>> com.sun.enterprise.ee.server.autotxrecovery.EEAutoTransactionRecoveryServiceImpl.start(EEAutoTransactionRecoveryServiceImpl.java:99)
>> at
>> com.sun.jts.CosTransactions.RecoveryManager.proceedWithXARecovery(RecoveryManager.java:934)
>> at
>> com.sun.jts.CosTransactions.RecoveryManager.recover(RecoveryManager.java:468)
>> at
>> com.sun.jts.CosTransactions.ResyncThread.run(RecoveryManager.java:1745)
>> Caused by: java.io.IOException: Unable to create a messenger to
>> jxta://uuid-3E8CB04EC4414BB495C1BDB91E98193625AFB7356AC94920B695BBF20577946A03/PipeService/urn:jxta:uuid-3E8CB04EC4414BB495C1BDB91E9819366521C3C52E4443928082812BCDC1E25B04
>> at
>> net.jxta.impl.pipe.BlockingWireOutputPipe.checkMessenger(BlockingWireOutputPipe.java:221)
>> at
>> net.jxta.impl.pipe.BlockingWireOutputPipe.send(BlockingWireOutputPipe.java:245)
>> at
>> com.sun.enterprise.jxtamgmt.ClusterManager.send(ClusterManager.java:448)
>> at
>> com.sun.enterprise.ee.cms.impl.jxta.GroupCommunicationProviderImpl.sendMessage(GroupCommunicationProviderImpl.java:211)
>> ... 9 more
>> |#]
>>
>> [#|2008-02-16T12:29:36.780+0530|WARNING|sun-comms-appserver1.0|ShoalLogger|_ThreadID=15;_ThreadName=JTS
>> Resync Thread;_RequestID=93429b5d-34a8-4985-8fa6-1bfca8516146;|Force
>> Syncing of DistributedStateCache failed:java.io.IOException: Unable
>> to create a messenger to
>> jxta://uuid-3E8CB04EC4414BB495C1BDB91E9819362DE658F932AB436995B78E0CB3E080DA03/PipeService/urn:jxta:uuid-3E8CB04EC4414BB495C1BDB91E9819366521C3C52E4443928082812BCDC1E25B04|#]
>>
>> [#|2008-02-16T12:29:53.846+0530|WARNING|sun-comms-appserver1.0|ShoalLogger|_ThreadID=15;_ThreadName=JTS
>> Resync Thread;_RequestID=93429b5d-34a8-4985-8fa6-1bfca8516146;|Force
>> Syncing of DistributedStateCache failed:java.io.IOException: Unable
>> to create a messenger to
>> jxta://uuid-3E8CB04EC4414BB495C1BDB91E9819362DE658F932AB436995B78E0CB3E080DA03/PipeService/urn:jxta:uuid-3E8CB04EC4414BB495C1BDB91E9819366521C3C52E4443928082812BCDC1E25B04|#]
>>
>> Thanks,
>> Kshitiz
>> Sheetal Vartak wrote:
>