users@grizzly.java.net

Filters and writing

From: Steve Stacha <sstacha_at_spe.org>
Date: Wed, 16 Sep 2009 15:56:35 -0500

A little background first; I thought I would try a simple example where
clients connect to a server and can type messages (string). The server will
then send those messages to any client connected. Like a chat server but
really dumb. The MyMessage is nothing more than how many bytes are in the
string and a bytebuffer of that size with the character data.

All in all this code works but there are a couple of things that I don't
think are right.
  
1) in dispatch I am converting the object to bytes and sending it via the
client connections stream writer. It seems this should be in the
MyParseFilter. I have placed the code there but cant see how to call it so
MyParseFilter does the converting of object to bytebuffer. How do I write a
object back to the client using the code in parse filter? Or is there a
better way to do this?

2) I don't think using the client connection stream writer approach is async
even if I do push the conversion code back to the parse filter which in a
real world solution you would want it to be. How do I do the above with
async calls so if I am writing to several clients at one time it is not
blocking until each one completes? Maybe some sudo-code or a short example?
All the examples I can find just write byte buffers to the stream.

Here is the server code:
public class MyChatServer implements Runnable
{
    public static final String DEFAULT_HOST = "localhost";
    public static final int DEFAULT_PORT = 4447;
    private List<Connection> connections = new ArrayList<Connection>();
    TCPNIOTransport transport;

    String host;
    int port;

    public MyChatServer(String host, int port)
    {
        this.host = host;
        this.port = port;
    }

    public void dispatch(Connection connection, MyMessage message)
    {
        try
        {
            for (Connection clientConnection : connections)
                // NOTE: this does not work becuase message needs to be a
socket address
                //clientConnection.write(message,
transport.getMemoryManager().allocate(255));

                // NOTE: this works but we are not using async or the
MyParseFilter
                writeMCPMessage(clientConnection.getStreamWriter(),
message);

        }
        catch (Exception ex) {System.out.println("Exception dispatching
message to other connections: " + ex);}
    }

    public void run()
    {

        transport = TransportFactory.getInstance().createTCPTransport();
        transport.getFilterChain().add(new TransportFilter());
        transport.getFilterChain().add(new MyParseFilter());
        transport.getFilterChain().add(new LogFilter());
        transport.getFilterChain().add(new MyConnectionFilter(this));
        transport.getFilterChain().add(new MyDispatchFilter(this));

        try
        {
            transport.bind(host, port);
            transport.start();
            System.out.println("[" + host + "] listening on port [" + port +
"]");
            System.out.println("Press <Return> to stop the server...");
            System.in.read();
        }
        catch (IOException ioex)
{Logger.getLogger(MyChatServer.class.getName()).log(Level.SEVERE, null,
ioex);}
        finally
        {
            System.out.println("Stopping chat server...");
            try {transport.stop();}
            catch (IOException ex)
{Logger.getLogger(MyChatServer.class.getName()).log(Level.SEVERE, null,
ex);}
            TransportFactory.getInstance().close();
            System.out.println("Server stopped...");
        }
    }

    private int writeMCPMessage(StreamWriter writer, MyMessage message)
throws IOException, InterruptedException
    {

        // body length
        writer.writeInt(message.bodyLength);

        // Body
        writer.writeByteArray(message.body);

        // Flush the message
        writer.flush();

        return 18 + message.bodyLength;
    }

    public static void main (String[] args)
    {
        // get our host and port that we will bind to for the server
        String host;
        try
        {
            host = args[0];
            if (host == null || host.length() == 0)
                host = DEFAULT_HOST;
        }
        catch (Exception ex) {host = DEFAULT_HOST;}
        int port;
        try {port = Integer.parseInt(args[1]);}
        catch (Exception ex) {port = DEFAULT_PORT;}

        ExecutorService threadExecutor =
Executors.newSingleThreadExecutor();

        // start thread for the client
        threadExecutor.execute( new MyChatServer(host, port) );

        threadExecutor.shutdown();
    }

    void addConnection(Connection connection) {
        if (!connections.contains(connection))
            connections.add(connection);
    }

    void removeConnection(Connection connection) {
        if (connections.contains(connection))
            connections.remove(connection);
    }

}

I can post other code bits if needed.

Thanks,

Steve