users@grizzly.java.net

Multi-threaded client example using callbacks

From: Johan Maasing <johan_at_zoom.nu>
Date: Tue, 10 Jun 2014 11:56:21 +0200

I'm learning Grizzly and with help from the mailing list I finally got a
little multi-threaded client running. Hope this example seems like "best
practice" and can be informative to others.

public class CallbackClient {
    private final static int NUMBER_OF_CLIENT_THREADS = 10;
    private final CountDownLatch finishLatch = new
CountDownLatch(NUMBER_OF_CLIENT_THREADS);
    private TCPNIOTransport transport;
    private final Attribute<CallbackListener> listenerAttribute =

Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("listenerAttribute");

    public static void main(String... args) throws InterruptedException,
IOException {
        CallbackClient app = new CallbackClient();
        app.startup();
        app.run();
    }

    private void run() throws InterruptedException {
        for (int n = 0; n < NUMBER_OF_CLIENT_THREADS; n++) {
            final String listenerID = "Listener: " + n ;
            final String message = "Hello from client: " + n + "\n";
            Thread clientThread = new Thread(() -> {
                final GrizzlyFuture<Connection> connectionGrizzlyFuture =
transport.connect("127.0.0.1", 5431);
                try {
                    final Connection connection =
connectionGrizzlyFuture.get();
                    listenerAttribute.set(connection, (CallbackResult
result) -> {
                        System.out.println(listenerID + " callback result:
" + result.message);
                        finishLatch.countDown();
                    });
                    connection.write(message);
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
            );
            clientThread.start();
        }

        finishLatch.await();
    }

    private void startup() throws IOException {

        FilterChainBuilder clientFilterChainBuilder =
FilterChainBuilder.stateless();
        clientFilterChainBuilder.add(new TransportFilter());
        clientFilterChainBuilder.add(new
StringFilter(Charset.defaultCharset(), "\n"));
        clientFilterChainBuilder.add(new CallbackFilter(listenerAttribute));
        transport = TCPNIOTransportBuilder.newInstance().build();
        transport.setProcessor(clientFilterChainBuilder.build());
        transport.start();
    }

    public static class CallbackFilter extends BaseFilter {
        private final Attribute<CallbackListener> listenerAttribute;

        public CallbackFilter(Attribute<CallbackListener>
listenerAttribute) {
            this.listenerAttribute = listenerAttribute;
        }

        @Override
        public NextAction handleRead(FilterChainContext ctx) throws
IOException {
            final String message = ctx.getMessage();
            final CallbackListener callbackListener =
listenerAttribute.get(ctx.getConnection());
            callbackListener.callback(new CallbackResult(message));
            return ctx.getStopAction();
        }
    }
}


Here is an "old-school" blocking IO server that can be used to test the
client.
public class EchoServer {
    boolean keepRunning = true;

    public static void main(String[] args) throws Exception {
        EchoServer app = new EchoServer();
        app.run();
    }

    private void run() throws IOException {
        ServerSocket ss = new ServerSocket(5431);
        while (keepRunning) {
            final Socket socket = ss.accept();
            System.out.println("Server accepted a connection");
            Thread clientThread = new Thread(() -> {
                try {
                    BufferedReader in = new BufferedReader(new
InputStreamReader(socket.getInputStream(), "UTF-8"));
                    final String clientMessage = in.readLine();
                    final String outMessage = "You said: " + clientMessage
+ "\n";
                    System.out.print(outMessage);
                    OutputStream outs = socket.getOutputStream() ;
                    outs.write(outMessage.getBytes("UTF-8"));
                    outs.flush();
                    outs.close();
                    in.close();
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                    keepRunning = false;
                }
            });
            clientThread.start();
        }
    }

}

Cheers,
Johan