users@grizzly.java.net

Re: Filters and writing

From: Oleksiy Stashok <Oleksiy.Stashok_at_Sun.COM>
Date: Thu, 24 Sep 2009 18:30:03 +0200

Hi Steve,

sorry for the delay, I missed this mail somehow...

> A little background first; I thought I would try a simple example
> where
> clients connect to a server and can type messages (string). The
> server will
> then send those messages to any client connected. Like a chat
> server but
> really dumb. The MyMessage is nothing more than how many bytes are
> in the
> string and a bytebuffer of that size with the character data.
>
> All in all this code works but there are a couple of things that I
> don't
> think are right.
>
> 1) in dispatch I am converting the object to bytes and sending it
> via the
> client connections stream writer. It seems this should be in the
> MyParseFilter. I have placed the code there but cant see how to
> call it so
> MyParseFilter does the converting of object to bytebuffer. How do I
> write a
> object back to the client using the code in parse filter? Or is
> there a
> better way to do this?

I've just update GIOP sample, which probably has similar logic [1].
There we introduced Encoder and Decoder, which are used both for
client and server sides.

>
> 2) I don't think using the client connection stream writer approach
> is async
> even if I do push the conversion code back to the parse filter which
> in a
> real world solution you would want it to be. How do I do the above
> with
> async calls so if I am writing to several clients at one time it is
> not
> blocking until each one completes? Maybe some sudo-code or a short
> example?
> All the examples I can find just write byte buffers to the stream.
StreamWriter operations are asynchronous (if it is used in non
blocking mode, which is default). When you do
writer.flush(<CompletionHandler>) in async mode - we try to write the
data in the same thread, but if it's not possible - we add it to
queue, and it will be written, once connection will become available
for writing.

>
> Here is the server code:
> public class MyChatServer implements Runnable
> {
> public static final String DEFAULT_HOST = "localhost";
> public static final int DEFAULT_PORT = 4447;
> private List<Connection> connections = new ArrayList<Connection>();
> TCPNIOTransport transport;
>
> String host;
> int port;
>
> public MyChatServer(String host, int port)
> {
> this.host = host;
> this.port = port;
> }
>
> public void dispatch(Connection connection, MyMessage message)
> {
> try
> {
> for (Connection clientConnection : connections)
> // NOTE: this does not work becuase message needs to
> be a
> socket address
> //clientConnection.write(message,
> transport.getMemoryManager().allocate(255));
>
> // NOTE: this works but we are not using async or the
> MyParseFilter
> writeMCPMessage(clientConnection.getStreamWriter(),
> message);
>
> }
> catch (Exception ex) {System.out.println("Exception dispatching
> message to other connections: " + ex);}
> }
>
> public void run()
> {
>
> transport =
> TransportFactory.getInstance().createTCPTransport();
> transport.getFilterChain().add(new TransportFilter());
> transport.getFilterChain().add(new MyParseFilter());
> transport.getFilterChain().add(new LogFilter());
> transport.getFilterChain().add(new MyConnectionFilter(this));
> transport.getFilterChain().add(new MyDispatchFilter(this));
>
> try
> {
> transport.bind(host, port);
> transport.start();
> System.out.println("[" + host + "] listening on port [" +
> port +
> "]");
> System.out.println("Press <Return> to stop the server...");
> System.in.read();
> }
> catch (IOException ioex)
> {Logger.getLogger(MyChatServer.class.getName()).log(Level.SEVERE,
> null,
> ioex);}
> finally
> {
> System.out.println("Stopping chat server...");
> try {transport.stop();}
> catch (IOException ex)
> {Logger.getLogger(MyChatServer.class.getName()).log(Level.SEVERE,
> null,
> ex);}
> TransportFactory.getInstance().close();
> System.out.println("Server stopped...");
> }
> }
>
> private int writeMCPMessage(StreamWriter writer, MyMessage message)
> throws IOException, InterruptedException
> {
>
> // body length
> writer.writeInt(message.bodyLength);
>
> // Body
> writer.writeByteArray(message.body);
>
> // Flush the message
> writer.flush();
>
> return 18 + message.bodyLength;
> }
>
> public static void main (String[] args)
> {
> // get our host and port that we will bind to for the server
> String host;
> try
> {
> host = args[0];
> if (host == null || host.length() == 0)
> host = DEFAULT_HOST;
> }
> catch (Exception ex) {host = DEFAULT_HOST;}
> int port;
> try {port = Integer.parseInt(args[1]);}
> catch (Exception ex) {port = DEFAULT_PORT;}
>
> ExecutorService threadExecutor =
> Executors.newSingleThreadExecutor();
>
> // start thread for the client
> threadExecutor.execute( new MyChatServer(host, port) );
>
> threadExecutor.shutdown();
> }
>
> void addConnection(Connection connection) {
> if (!connections.contains(connection))
> connections.add(connection);
> }
>
> void removeConnection(Connection connection) {
> if (connections.contains(connection))
> connections.remove(connection);
> }
>
> }
>
> I can post other code bits if needed.

WBR,
Alexey.

[1] https://grizzly.dev.java.net/source/browse/grizzly/branches/2dot0/code/samples/framework-samples/src/main/java/com/sun/grizzly/samples/filterchain/
>
> Thanks,
>
> Steve
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>