users@grizzly.java.net

Re: How to implement the client's message flow based on async read/write?

From: Oleksiy Stashok <Oleksiy.Stashok_at_Sun.COM>
Date: Tue, 10 Jun 2008 10:49:08 +0200

Hi Yi,

Just checked your code on my machine... By default, I succeeded to
create max 5700 connections for *both* types of client.
Increasing max number of open files for MacOS [1], I succeeded to
increase the max number of client connections. So, IMHO, results look
consistent.

Hope this helps.

Also, I've updated your Raw NIO client initSocketChannel, doConnect
methods like this:

     private SocketChannel initSocketChannel() throws IOException {
         SocketChannel sc = SocketChannel.open();
         sc.configureBlocking(false);
         sc.socket().setReuseAddress(true);
         sc.connect(new InetSocketAddress(serverHost, serverPort));
         sc.register(selector, SelectionKey.OP_CONNECT);
         return sc;
     }

..............................
cut .......................................
     private void doConnect(SelectionKey key) {
         SocketChannel sc = (SocketChannel) key.channel();
         try {
             if (sc.finishConnect()) {
                 connectionCount.incrementAndGet();
             }

         } catch (IOException e) {
             // TODO Auto-generated catch block
             e.printStackTrace();
         }
     }

/////////////////////////////////////////////////////

Hope this helps.

WBR,
Alexey.

[1] sysctl -w kern.maxfiles=50000
      sysctl -w kern.maxfilesperproc=40000


On Jun 10, 2008, at 4:23 , JianXing Yi wrote:
> ok. pls. get the code from the attached.
>
> thanks!
> -Yi Bing
>
> 2008/6/10 Jeanfrancois Arcand <Jeanfrancois.Arcand_at_sun.com>:
> Hi Yi Bing,
>
> can you zip your code? That will be easier to compile ans test :-)
>
> Thanks!
>
> -- Jeanfrancois
>
> JianXing Yi wrote:
>
>
> Hi Alexey,
>
>
> 1. in the grizzly-based nio client, i try to establish a
> relatively large number of connections to a server, e.g. 30,000. i
> avoid 30s closing idle connections through the
> SelectionKeyHandler. my problem is that it seems that about 5,000
> connections is the limit. no more connections could be established
> after that. btw, i've already adjusted the system tcp parameters,
> such as the tcp connection number (greater than 30,000) and the
> keep-alive time.
> It should be some system limitations, in Grizzly we don't have any.
> If you will provide me the code, I can test how many connections I
> can create on my local machine, just to compare the results.
> Also it's interesting what kind of scenario you have, which
> requires
> 30K+ tcp connections? :))
>
>
> great thanks for your help!
>
>
> i want to generate http (may be extended) load to servers. some
> http
> servers can support persistent connections, so a relatively large
> number of connections to them should be established to get the
> performance index.
>
> i compared the result of the Grizzly-based client and a raw NIO API
> based one. netstat showed the number of concurrent connections for
> each of them, they are 5k and 25k in my environment. code of the
> Grizzly-based client, the raw NIO API based client and the
> Grizzly-based echo server is listed below.
>
>
> package test.grizzly;
>
> import java.io.IOException;
> import java.net.InetSocketAddress;
> import java.net.SocketAddress;
> import java.nio.ByteBuffer;
> import java.nio.channels.SelectionKey;
> import java.util.ArrayList;
> import java.util.Iterator;
> import java.util.Queue;
> import java.util.concurrent.Callable;
> import java.util.concurrent.ConcurrentLinkedQueue;
> import java.util.concurrent.CountDownLatch;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors;
> import java.util.concurrent.atomic.AtomicLong;
>
> import com.sun.grizzly.CallbackHandler;
> import com.sun.grizzly.Context;
> import com.sun.grizzly.Controller;
> import com.sun.grizzly.ControllerStateListenerAdapter;
> import com.sun.grizzly.DefaultConnectorHandlerPool;
> import com.sun.grizzly.DefaultPipeline;
> import com.sun.grizzly.DefaultSelectionKeyHandler;
> import com.sun.grizzly.IOEvent;
> import com.sun.grizzly.Pipeline;
> import com.sun.grizzly.TCPConnectorHandler;
> import com.sun.grizzly.TCPSelectorHandler;
> import com.sun.grizzly.Controller.Protocol;
> import com.sun.grizzly.async.AsyncWriteCallbackHandler;
> import com.sun.grizzly.async.AsyncWriteQueueRecord;
>
> public class GrizzlyBasedClient {
>
> public static final int CONNECTION_NUMBER = 30000;
> public static final int NUMBER_CONNECTION_PER_THREAD = 1000;
> private static final int SERVER_PORT = 11113;
> private static final String SERVER_HOST_OR_IP = "192.168.0.113
> <http://192.168.0.113>";
>
> private static final int CONNECTION_THREADPOOL_SIZE = 100;
> private static final int NUMBER_LOAD_ON_THREADPOOL = 10;
>
> /**
> * @param args
> * @throws InterruptedException
> * @throws IOException
> */
> public static void main(String[] args) throws
> InterruptedException,
> IOException {
> GrizzlyBasedClient lg = new GrizzlyBasedClient();
> lg.start();
> }
>
> private Controller controller;
> protected CountDownLatch controllerReadyLatch = new
> CountDownLatch(1);
> public ConcurrentLinkedQueue<TCPConnectorHandler> connectors =
> new ConcurrentLinkedQueue<TCPConnectorHandler>();
>
> // private TCPConnectorHandler connectorHandler;
>
> class MyConnectorCallbackHandler implements
> CallbackHandler<Context> {
> private TCPConnectorHandler tcpConnectorHandler;
>
> public MyConnectorCallbackHandler(
> TCPConnectorHandler tcpConnectorHandler) {
> this.tcpConnectorHandler = tcpConnectorHandler;
> }
>
> public void onConnect(IOEvent<Context> ioEvent) {
> try {
>
> tcpConnectorHandler.finishConnect(ioEvent.attachment()
> .getSelectionKey());
> allConnectedLatch.countDown();
> connectors.add(tcpConnectorHandler);
> // Reporter.getInstance().collect(
> //
> tcpConnectorHandler.getSelectorHandler().getPort(),
> // System.currentTimeMillis());
> connectionCount.incrementAndGet();
>
> //
>
> System.out.println(tcpConnectorHandler.getSelectorHandler().getPort()
> +",
> // "+System.currentTimeMillis());
> } catch (IOException e) {
> //
> System.out.println(Reporter.getInstance().verbose());
> e.printStackTrace();
> }
> }
>
> public void onRead(IOEvent<Context> arg0) {
> // TODO Auto-generated method stub
>
> }
>
> public void onWrite(IOEvent<Context> arg0) {
> // TODO Auto-generated method stub
>
> }
> }
>
> protected ByteBuffer readBuffer =
> ByteBuffer.allocateDirect(10240);
>
> AtomicLong transactionCount = new AtomicLong(0);
> AtomicLong connectionCount = new AtomicLong(0);
> public AtomicLong bytesRead = new AtomicLong(0);
> public AtomicLong minusOneCount = new AtomicLong(0);
> public AtomicLong zeroCount = new AtomicLong(0);
>
> class MyAsyncWriteCallbackHandler implements
> AsyncWriteCallbackHandler {
> private TCPConnectorHandler tcpConnectorHandler;
>
> public MyAsyncWriteCallbackHandler(
> TCPConnectorHandler tcpConnectorHandler) {
> this.tcpConnectorHandler = tcpConnectorHandler;
> }
>
> public void onIOException(IOException ioException,
> SelectionKey key,
> ByteBuffer buffer, Queue<AsyncWriteQueueRecord>
> remainingQueue) {
> // TODO Auto-generated method stub
>
> }
>
> public void onWriteCompleted(SelectionKey key, ByteBuffer
> buffer) {
> try {
>
> transactionCount.incrementAndGet();
>
> readBuffer.clear();
>
> long nCount =
> this.tcpConnectorHandler.read(readBuffer, true);
> if (nCount == -1) {
> minusOneCount.incrementAndGet();
> } else if (nCount == 0) {
> zeroCount.incrementAndGet();
> } else {
> bytesRead.addAndGet(nCount);
> }
> // readBuffer.flip();
> //
> System.out.println(Charset.defaultCharset().decode(readBuffer));
> // System.out.println("V");
> // tcpConnectorHandler.close();
> //
> controller.releaseConnectorHandler(tcpConnectorHandler);
> } catch (IOException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> }
> };
>
> private static int localPort = 12110;
> SocketAddress serverAddr = new
> InetSocketAddress(SERVER_HOST_OR_IP,
> SERVER_PORT);
> private CountDownLatch allConnectedLatch = new CountDownLatch(
> CONNECTION_NUMBER);
>
> public void start() throws InterruptedException, IOException {
> controller = createController();
> new Thread(controller).start();
> waitControllerReady();
>
> // Establish CONNECTION_NUMBER connections.
> long start1 = System.currentTimeMillis();
> for (int i = 0; i < CONNECTION_NUMBER; i++) {
> SocketAddress localAddr = new
> InetSocketAddress("localhost",
> localPort++);
> // connectorHandler.setLinger(10);
> TCPConnectorHandler connectorHandler =
> (TCPConnectorHandler) controller
> .acquireConnectorHandler(Protocol.TCP);
> // connectorHandler.setTcpNoDelay(true);
> connectorHandler.setReuseAddress(true);
>
> connectorHandler.connect(serverAddr, /* localAddr, */
> new MyConnectorCallbackHandler(connectorHandler));
> // Thread.sleep(20);
> }
> allConnectedLatch.await();
> long interval2 = System.currentTimeMillis() - start1;
> System.out.println("ALL CONNECTED." + " connections: "
> + connectionCount.get() + "," + connectors.size()
> + ", connection total time (s): " + interval2 /
> 1000);
>
> // distribute tasks
> Iterator<TCPConnectorHandler> itCon =
> connectors.iterator();
> ArrayList<Callable<Object>> callables2 = new
> ArrayList<Callable<Object>>();
> // int count = 0;
> // LinkedList<TCPConnectorHandler> tmpConnectorList = new
> // LinkedList<TCPConnectorHandler>();
> while (itCon.hasNext()) {
> final TCPConnectorHandler con = itCon.next();
> // tmpConnectorList.add(con);
> // count++;
> callables2.add(new Callable<Object>() {
> public Object call() throws Exception {
> // Perform 3 transactions on each connection.
> for (int i = 0; i < 1; i++) {
> executeOnConnection(con);
> }
> return null;
> }
> });
> }
>
> // Operate on each connection.
>
> // Callable<Object>[] callables3 = new
> // Callable[NUMBER_LOAD_ON_THREADPOOL];
>
> // Callable<Object>[] callables = new
> // Callable[NUMBER_LOAD_ON_THREADPOOL];
> // // Issue these connections concurrently.
> // for (int i = 0; i < callables.length; i++) {
> // callables[i] = new Callable<Object>() {
> // public Object call() throws Exception {
> // for (int i = 0; i < CONNECTION_NUMBER
> // / NUMBER_LOAD_ON_THREADPOOL; i++) {
> // // TCPConnectorHandler connectorHandler =
> // // (TCPConnectorHandler) controller
> // // .acquireConnectorHandler(Protocol.TCP);
> // executeOnConnection(connectorHandler);
> // }
> // return null;
> // }
> // };
> // }
> ExecutorService executor = Executors
> .newFixedThreadPool(CONNECTION_THREADPOOL_SIZE);
> // List<Callable<Object>> c = Arrays.asList(callables2);
>
> long start = System.currentTimeMillis();
>
> executor.invokeAll(callables2);
> executor.shutdown();
>
> long interval = System.currentTimeMillis() - start;
> System.out.println("transaction/s: " +
> transactionCount.get() * 1000
> / interval + ", bytes read: " + bytesRead.get()
> + "\n-1 times: " + minusOneCount.get() + ", 0
> times: "
> + zeroCount);
>
> Thread.sleep(2000);
> itCon = connectors.iterator();
> while (itCon.hasNext()) {
> TCPConnectorHandler con = itCon.next();
> // con.getSelectorHandler().getPort()
> con.close();
> }
>
> controller.stop();
> }
>
> private void executeOnConnection(TCPConnectorHandler
> connectorHandler)
> throws IOException {
> try {
> sendRequest(connectorHandler);
> } finally {
> // connectorHandler.close();
> }
> }
>
> private void sendRequest(TCPConnectorHandler connectorHandler)
> throws IOException {
> String str = "GET /"
> + /* icons/apache_pb2.gif */"\r\nHTTP 1.1\r\nHOST:"
> + SERVER_HOST_OR_IP +
> "\r\nConnection:keep-alive\r\n\r\n";
>
> connectorHandler.writeToAsyncQueue(ByteBuffer.wrap(str.getBytes()),
> new MyAsyncWriteCallbackHandler(connectorHandler));
> }
>
> private void waitControllerReady() throws
> InterruptedException {
> controllerReadyLatch.await();
> }
>
> private Controller createController() {
> Controller tmpCtl = new Controller();
> DefaultConnectorHandlerPool connectorPool = new
> DefaultConnectorHandlerPool(
> tmpCtl);
> tmpCtl.setConnectorHandlerPool(connectorPool);
>
> DefaultSelectionKeyHandler keyHandler = new
> DefaultSelectionKeyHandler();
> keyHandler.setTimeout(30 * 1000 * 60);
> tmpCtl.setSelectionKeyHandler(keyHandler);
> tmpCtl.setSelectorHandler(new TCPSelectorHandler(true));
> tmpCtl.addStateListener(new
> ControllerStateListenerAdapter() {
> @Override
> public void onReady() {
> controllerReadyLatch.countDown();
> }
> });
>
> Pipeline pl = new DefaultPipeline();
> pl.setMaxThreads(50);
> tmpCtl.setPipeline(pl);
>
> // tmpCtl.getConnectorHandlerPool().
>
> return tmpCtl;
> }
>
> }
>
>
> package test.grizzly;
>
> import java.net.InetAddress;
> import java.net.UnknownHostException;
> import java.util.concurrent.CountDownLatch;
>
> import com.sun.grizzly.Controller;
> import com.sun.grizzly.ControllerStateListenerAdapter;
> import com.sun.grizzly.DefaultProtocolChain;
> import com.sun.grizzly.DefaultProtocolChainInstanceHandler;
> import com.sun.grizzly.ProtocolChain;
> import com.sun.grizzly.TCPSelectorHandler;
> import com.sun.grizzly.filter.EchoFilter;
> import com.sun.grizzly.filter.ReadFilter;
>
> public class EchoServer {
>
> private static final String SERVER_HOST_IP = "190.168.0.113 <http://190.168.0.113
> >";
>
> private static final int SERVER_PORT = 11113;
>
> /**
> * @param args
> * @throws InterruptedException
> * @throws UnknownHostException
> */
> public static void main(String[] args) throws UnknownHostException,
> InterruptedException {
> EchoServer server = new EchoServer();
> server.start();
> }
>
> private Controller controller;
> protected CountDownLatch controllerReadyLatch = new
> CountDownLatch(1);
>
> public void start() throws InterruptedException,
> UnknownHostException {
> controller = createController();
> new Thread(controller).start();
>
> controllerReadyLatch.await();
>
> }
>
> private Controller createController() throws UnknownHostException {
> Controller tmpCtl = new Controller();
> TCPSelectorHandler tcpSelectorHandler = new
> TCPSelectorHandler();
>
> tcpSelectorHandler.setInet(InetAddress.getByName(SERVER_HOST_IP));
> tcpSelectorHandler.setPort(SERVER_PORT);
> tmpCtl.addSelectorHandler(tcpSelectorHandler);
> tmpCtl.addStateListener(new ControllerStateListenerAdapter() {
> @Override
> public void onReady() {
> controllerReadyLatch.countDown();
> }
> });
> tmpCtl
> .setProtocolChainInstanceHandler(new
> DefaultProtocolChainInstanceHandler() {
> private ProtocolChain protocolChain;
>
> @Override
> public ProtocolChain poll() {
> if (protocolChain == null) {
> protocolChain = new DefaultProtocolChain();
> }
> return protocolChain;
> }
> });
> tmpCtl.getProtocolChainInstanceHandler().poll().addFilter(
> new ReadFilter());
> tmpCtl.getProtocolChainInstanceHandler().poll().addFilter(
> new EchoFilter());
>
> return tmpCtl;
> }
>
> }
>
>
> package test.grizzly;
>
> import java.io.IOException;
> import java.net.InetAddress;
> import java.net.InetSocketAddress;
> import java.net.UnknownHostException;
> import java.nio.channels.SelectionKey;
> import java.nio.channels.Selector;
> import java.nio.channels.SocketChannel;
> import java.nio.channels.spi.SelectorProvider;
> import java.util.Iterator;
>
> public class RawNioClient implements Runnable {
> private InetAddress serverHost;
> private int serverPort;
> private Selector selector;
> private int connectionCount = 0;
>
> // private SocketChannel socketChannel;
>
> public RawNioClient(InetAddress serverHost, int serverPort)
> throws IOException {
> this.serverHost = serverHost;
> this.serverPort = serverPort;
> selector = initSelector();
> // socketChannel = initSocketChannel();
> }
>
> private SocketChannel initSocketChannel() throws IOException {
> SocketChannel sc = SocketChannel.open();
> sc.connect(new InetSocketAddress(serverHost, serverPort));
> sc.configureBlocking(false);
> sc.socket().setReuseAddress(true);
> sc.register(selector, SelectionKey.OP_CONNECT);
> return sc;
> }
>
> private Selector initSelector() throws IOException {
> return SelectorProvider.provider().openSelector();
> }
>
> public static void main(String[] args) {
> try {
> new Thread(new
> RawNioClient(InetAddress.getByName("192.168.0.113 <http://192.168.0.113
> >"),
>
> 11113)).start();
> } catch (UnknownHostException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> } catch (IOException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> }
>
> public void run() {
> while (true) {
> try {
> initSocketChannel();
> } catch (IOException e1) {
> // TODO Auto-generated catch block
> e1.printStackTrace();
> }
> if (connectionCount >= 30000) {
> System.out.println("reach 30000 connections");
> try {
> Thread.sleep(5000);
> } catch (InterruptedException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> } else {
> try {
> selector.select();
>
> Iterator<SelectionKey> itKey =
> selector.selectedKeys()
> .iterator();
> while (itKey.hasNext()) {
> SelectionKey key = itKey.next();
> itKey.remove();
> if (!key.isValid()) {
> continue;
> }
> if (key.isConnectable()) {
> doConnect(key);
> }
> }
> } catch (IOException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> }
> }
> }
>
> private void doConnect(SelectionKey key) {
> SocketChannel sc = (SocketChannel) key.channel();
> try {
> boolean fc = sc.finishConnect();
> } catch (IOException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> connectionCount++;
> }
> }
>
>
>
> 2. i want to iterate over the ConnectorHandlerPool to perform
> transaction on each connection. but it seems that
> ConnectorHandlerPool does not support the iteration explicitly. so
> as a walk-around, i collect the ConnectorHandler after executing
> ConnectorHandler.finishConnect manually.
> May be it's the only way right now. As ConnectorHandlerPool
> initially wasn't introduced to serve existing live connections, but
> to cache/reuse ones, which were released (closed).
>
> thanks.
>
> - Yi Bing
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>
>
> <
> connection_test
> .zip
> >---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: users-help_at_grizzly.dev.java.net