dev@shoal.java.net

Re: [Shoal-Dev] The problem of sending messages

From: Bongjae Chang <carryel_at_korea.com>
Date: Thu, 21 Aug 2008 15:38:46 +0900

Hi Joe.

Joe wrote:
I did get confirmation from a jxta developer on thread-safety of Messenger.

Thanks for confirming that!

So I would like to ask a jxta developer my doubt about BlockingMessenger.("first" case)

Should I send my question to JXTA community? Is the jxta developer a member of Shoal community?

Please give advice to me.

Thanks.

--
Bongjae Chang


  ----- Original Message -----
  From: Joseph Fialli
  To: dev_at_shoal.dev.java.net
  Sent: Thursday, August 21, 2008 7:08 AM
  Subject: Re: [Shoal-Dev] The problem of sending messages


  Bongjae,

  Thanks for catching this.
  I have filed this issue at following url, referencing this forum post.
  https://shoal.dev.java.net/issues/show_bug.cgi?id=75



  I did get confirmation from a jxta developer on thread-safety of Messenger.

> All subclasses of Messenger are Thread-safe and can be shared between threads.

  There will be no need to add synchronization in the shoal layer.
  Thus, the fix will be to add logic to retry sending a message when false is returned.
  I proposed an initial attempt of the fix in the bug report for shoal issue 75.

  -Joe

  Bongjae Chang wrote:
    Hi.
    I have questions of sending messages.
    Nowadays, I have tested sending logic. So I found some issues about this.
    I think following issues concerns JXTA as well as SHOAL.

    I know that SHOAL uses synchronous P2P communication over TCP about DSCMessage and GMSMessage.
    So SHOAL uses BlockingWireOutputPipe belonged to JXTA. The following is part of send() method in ClusterManager.java

    ----------------------------------------------------
    public void send(final ID peerid, final Serializable msg) throws IOException, MemberNotInViewException {
        ...
        if( peerid != null ) {
            //check if the peerid is part of the cluster view
            if( getClusterViewManager().containsKey(peerid)) {
                ...
                RouteAdvertisement route = getCachedRoute((PeerID) peerid));
                if( route != null ) {
                    output = new BlockingWireOutputPipe(..., route);
                    ...
                }
                if( output == null ) {
                     if( route == null ) {
                        route = getCachedRoute((PeerID) peerid);
                    }
                    ...
                    output = new BlockingWireOutputPipe(..., route);
                    ...
                }
                ...
                output.send(message); // ---------------------------------------(1)
            } else {
                throw new MemberNotInViewException();
            }
        } else {
            // multicast
        }
    }
    ----------------------------------------------------

    (1) When I tested sending messages in multi-thread, sometimes I found the loss of some messages because output.send(message) returned false. And you can also reproduce this with com/sun/enterprise/shoal/multithreadmessagesendertest/MultiThreadMessageSender after you edit ClusterManager.send()'s code for debugging simply. The following code is edited for debugging.

    In ClusterManager.java
    ----------------------------------------------------
    public void send(final ID peerid, final Serializable msg) throws IOException, MemberNotInViewException {
        ...
        if( peerid != null ) {
            //check if the peerid is part of the cluster view
            if( getClusterViewManager().containsKey(peerid)) {
                ...
                RouteAdvertisement route = getCachedRoute((PeerID) peerid));
                if( route != null ) {
                    output = new BlockingWireOutputPipe(..., route);
                    ...
                }
                if( output == null ) {
                     if( route == null ) {
                        route = getCachedRoute((PeerID) peerid);
                    }
                    ...
                    output = new BlockingWireOutputPipe(..., route);
                    ...
                }
                ...
                boolean result = output.send(message); // ---------------------------------------(1)
                if( !result )
                    System.out.println("#### failed to send this message because of overflow ####" );
            } else {
                throw new MemberNotInViewException();
            }
        } else {
            // multicast
        }
    }
    ----------------------------------------------------

    Here is send()'s java doc of OutputPipe.java in JXTA

    ----------------------------------------------------
        /**
         * Send a message through the pipe
         *
         * <p/><b>WARNING:</b> The message object used when sending a pipe message
         * should not be reused or modified after the {_at_link #send(Message)} call is
         * made. Concurrent modification of messages will produce unexpected result.
         *
         * @param msg is the PipeMessage to be sent.
         * @return boolean <code>true</code> if the message has been sent otherwise
         * <code>false</code>. <code>false</code>. is commonly returned for
         * non-error related congestion, meaning that you should be able to send
         * the message after waiting some amount of time.
         * @exception IOException output pipe error
         *
         */
         public boolean send(Message msg) throws IOException;
    ----------------------------------------------------

    As above comments, if OutputPipe.send() returns false, I think that we should try to resend or notify this error like the following code.

    [sending sample]
    ----------------------------------------------------
    boolean result;
    do {
        result = output.send(message);
        // sleep and check max retry count
    } while(!result)
    ----------------------------------------------------


    And I reviewed some JXTA codes about sending part in order to know why OutputPipe.send() returned false.
    In my humble opinion, there are two reasons.

    BlockingWireOutputPipe can use two messengers as you know.
    a) TcpMessenger(BlockingMessenger)
    b) ThreadedMessengerChannel(AsyncChannelMessenger)

    First, I can find that BlockingMessenger is not thread safe for sending messages. BlockingMessenger don't use sending queue, so overflow is meaningless.
    In other words, if OutputPipe.send() return false, it doesn't mean overflow.
    cf) AsynChannelMessenger uses sending queue. So if queue is full, OuputPipe.send() can return false because of overflow.

    The following code is internal sending logic in JXTA

    BlockingMessenger.java
    ----------------------------------------------------
    public final boolean sendMessageN(Message msg, String service, String serviceParam) {
        boolean queue = false;
        ...
        synchronized(stateMachine) {
            closed = inputClosed;
            if((!closed)&&(currentMessage == null)) {
                ...
                // set currentMessage
                ...
                queue = true;
            }
        }
        if(queue) {
            // send
            ...
            synchronized(stateMachine) {
               ...
               // clear currentMessage as null
               ...
            }
            return true;
        }
        ...
        // set overflow event
        ...
        return false;
    }
    ----------------------------------------------------

    Above code, when boolean queue is only true, we can send messages and sending result is true. If boolean queue is false, OutputPipe.send() return false finally.
    As you see, when currentMessage is not equal to null, boolean queue is false. If we send messages in multi-thread, currentMessage can be not equal to null.
    I think that this case doesn't mean real overflow, but this case means sending logic is not thread-safe.
    I don't know whether JXTA's BlockingMessenger supports multi-thread or not. If it supports multi-thread, I think this is bug in JXTA. If it doesn't support multi-thread, I think JavaDoc should explain this case(send()'s return value is false) clearly.
    Also, if BlockingMessenger doesn't support multi-thread, I think SHOAL should send messages with send lock.
    e.g)
    synchronized(sendLock) {
        output.send(message)
    }
    or
    synchronized(output) {
        output.send(message)
    }


    Second, when SHOAL uses BlockingWireOutputPipe, BlockingWireOutputPipe sometimes can use ThreadedMessengerChannel(AsyncChannelMessenger).
    When I reviewed JXTA code, AsyncChannelMessenger can be used if RouteAdvertisement is equal to null or BlockingMessenger is terminated.
    If BlockingWireOutputPipe uses AsyncChannelMessenger, sending is not blocking. Is it expected in SHOAL? and does SHOAL assume that BlockingWireOutputPipe can be created without RouteAdvertisement?

    If BlockingWireOutputPipe uses AsyncChannelMessenger and SHOAL expects this case, real overflow can be occurred if sending queue is full. So OutputPipe.send() can return false.
    So I said above, I think SHOAL try to resend messages for overflow. see above [sending sample].

    Thanks.

    --
    Bongjae Chang



  --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscribe_at_shoal.dev.java.net For additional commands, e-mail: dev-help_at_shoal.dev.java.net