dev@shoal.java.net

The problem of sending messages

From: Bongjae Chang <carryel_at_korea.com>
Date: Wed, 20 Aug 2008 14:19:23 +0900

Hi.
I have questions of sending messages.
Nowadays, I have tested sending logic. So I found some issues about this.
I think following issues concerns JXTA as well as SHOAL.

I know that SHOAL uses synchronous P2P communication over TCP about DSCMessage and GMSMessage.
So SHOAL uses BlockingWireOutputPipe belonged to JXTA. The following is part of send() method in ClusterManager.java

----------------------------------------------------
public void send(final ID peerid, final Serializable msg) throws IOException, MemberNotInViewException {
    ...
    if( peerid != null ) {
        //check if the peerid is part of the cluster view
        if( getClusterViewManager().containsKey(peerid)) {
            ...
            RouteAdvertisement route = getCachedRoute((PeerID) peerid));
            if( route != null ) {
                output = new BlockingWireOutputPipe(..., route);
                ...
            }
            if( output == null ) {
                 if( route == null ) {
                    route = getCachedRoute((PeerID) peerid);
                }
                ...
                output = new BlockingWireOutputPipe(..., route);
                ...
            }
            ...
            output.send(message); // ---------------------------------------(1)
        } else {
            throw new MemberNotInViewException();
        }
    } else {
        // multicast
    }
}
----------------------------------------------------

(1) When I tested sending messages in multi-thread, sometimes I found the loss of some messages because output.send(message) returned false. And you can also reproduce this with com/sun/enterprise/shoal/multithreadmessagesendertest/MultiThreadMessageSender after you edit ClusterManager.send()'s code for debugging simply. The following code is edited for debugging.

In ClusterManager.java
----------------------------------------------------
public void send(final ID peerid, final Serializable msg) throws IOException, MemberNotInViewException {
    ...
    if( peerid != null ) {
        //check if the peerid is part of the cluster view
        if( getClusterViewManager().containsKey(peerid)) {
            ...
            RouteAdvertisement route = getCachedRoute((PeerID) peerid));
            if( route != null ) {
                output = new BlockingWireOutputPipe(..., route);
                ...
            }
            if( output == null ) {
                 if( route == null ) {
                    route = getCachedRoute((PeerID) peerid);
                }
                ...
                output = new BlockingWireOutputPipe(..., route);
                ...
            }
            ...
            boolean result = output.send(message); // ---------------------------------------(1)
            if( !result )
                System.out.println("#### failed to send this message because of overflow ####" );
        } else {
            throw new MemberNotInViewException();
        }
    } else {
        // multicast
    }
}
----------------------------------------------------

Here is send()'s java doc of OutputPipe.java in JXTA

----------------------------------------------------
    /**
     * Send a message through the pipe
     *
     * <p/><b>WARNING:</b> The message object used when sending a pipe message
     * should not be reused or modified after the {_at_link #send(Message)} call is
     * made. Concurrent modification of messages will produce unexpected result.
     *
     * @param msg is the PipeMessage to be sent.
     * @return boolean <code>true</code> if the message has been sent otherwise
     * <code>false</code>. <code>false</code>. is commonly returned for
     * non-error related congestion, meaning that you should be able to send
     * the message after waiting some amount of time.
     * @exception IOException output pipe error
     *
     */
     public boolean send(Message msg) throws IOException;
----------------------------------------------------

As above comments, if OutputPipe.send() returns false, I think that we should try to resend or notify this error like the following code.

[sending sample]
----------------------------------------------------
boolean result;
do {
    result = output.send(message);
    // sleep and check max retry count
} while(!result)
----------------------------------------------------


And I reviewed some JXTA codes about sending part in order to know why OutputPipe.send() returned false.
In my humble opinion, there are two reasons.

BlockingWireOutputPipe can use two messengers as you know.
a) TcpMessenger(BlockingMessenger)
b) ThreadedMessengerChannel(AsyncChannelMessenger)

First, I can find that BlockingMessenger is not thread safe for sending messages. BlockingMessenger don't use sending queue, so overflow is meaningless.
In other words, if OutputPipe.send() return false, it doesn't mean overflow.
cf) AsynChannelMessenger uses sending queue. So if queue is full, OuputPipe.send() can return false because of overflow.

The following code is internal sending logic in JXTA

BlockingMessenger.java
----------------------------------------------------
public final boolean sendMessageN(Message msg, String service, String serviceParam) {
    boolean queue = false;
    ...
    synchronized(stateMachine) {
        closed = inputClosed;
        if((!closed)&&(currentMessage == null)) {
            ...
            // set currentMessage
            ...
            queue = true;
        }
    }
    if(queue) {
        // send
        ...
        synchronized(stateMachine) {
           ...
           // clear currentMessage as null
           ...
        }
        return true;
    }
    ...
    // set overflow event
    ...
    return false;
}
----------------------------------------------------

Above code, when boolean queue is only true, we can send messages and sending result is true. If boolean queue is false, OutputPipe.send() return false finally.
As you see, when currentMessage is not equal to null, boolean queue is false. If we send messages in multi-thread, currentMessage can be not equal to null.
I think that this case doesn't mean real overflow, but this case means sending logic is not thread-safe.
I don't know whether JXTA's BlockingMessenger supports multi-thread or not. If it supports multi-thread, I think this is bug in JXTA. If it doesn't support multi-thread, I think JavaDoc should explain this case(send()'s return value is false) clearly.
Also, if BlockingMessenger doesn't support multi-thread, I think SHOAL should send messages with send lock.
e.g)
synchronized(sendLock) {
    output.send(message)
}
or
synchronized(output) {
    output.send(message)
}


Second, when SHOAL uses BlockingWireOutputPipe, BlockingWireOutputPipe sometimes can use ThreadedMessengerChannel(AsyncChannelMessenger).
When I reviewed JXTA code, AsyncChannelMessenger can be used if RouteAdvertisement is equal to null or BlockingMessenger is terminated.
If BlockingWireOutputPipe uses AsyncChannelMessenger, sending is not blocking. Is it expected in SHOAL? and does SHOAL assume that BlockingWireOutputPipe can be created without RouteAdvertisement?

If BlockingWireOutputPipe uses AsyncChannelMessenger and SHOAL expects this case, real overflow can be occurred if sending queue is full. So OutputPipe.send() can return false.
So I said above, I think SHOAL try to resend messages for overflow. see above [sending sample].

Thanks.

--
Bongjae Chang