users@grizzly.java.net

Re: How to implement the client's message flow based on async read/write?

From: JianXing Yi <jianxing.yi_at_gmail.com>
Date: Tue, 10 Jun 2008 18:03:21 +0800

Hi Alexey,

I revised the raw NIO client as you pointed out and tried the 2 clients
again. The result is the same as previously got :-(. The grizzly-based one
reached 5.8k+ connections while the raw one reached 15k+ connections (I
stopped it instead of waiting any more). In my env., the clients run on SUN
T3400 station, the simple echo server runs on my laptop and on a Linux
server (both connection limits are enlarged). I'll try them again... Anyway,
great thanks for your help!

Regards,
-Yi Bing


2008/6/10 Oleksiy Stashok <Oleksiy.Stashok_at_sun.com>:

>
> Hi Yi,
> Just checked your code on my machine... By default, I succeeded to create
> max 5700 connections for *both* types of client.
> Increasing max number of open files for MacOS [1], I succeeded to increase
> the max number of client connections. So, IMHO, results look consistent.
>
> Hope this helps.
>
> Also, I've updated your Raw NIO client initSocketChannel, doConnect
> methods like this:
>
> private SocketChannel initSocketChannel() throws IOException {
> SocketChannel sc = SocketChannel.open();
> sc.configureBlocking(false);
> sc.socket().setReuseAddress(true);
> sc.connect(new InetSocketAddress(serverHost, serverPort));
> sc.register(selector, SelectionKey.OP_CONNECT);
> return sc;
> }
>
> .............................. cut .......................................
> private void doConnect(SelectionKey key) {
> SocketChannel sc = (SocketChannel) key.channel();
> try {
> if (sc.finishConnect()) {
> connectionCount.incrementAndGet();
> }
>
> } catch (IOException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> }
>
> /////////////////////////////////////////////////////
>
> Hope this helps.
>
> WBR,
> Alexey.
>
> [1] sysctl -w kern.maxfiles=50000
> sysctl -w kern.maxfilesperproc=40000
>
>
> On Jun 10, 2008, at 4:23 , JianXing Yi wrote:
>
> ok. pls. get the code from the attached.
>
> thanks!
> -Yi Bing
>
> 2008/6/10 Jeanfrancois Arcand <Jeanfrancois.Arcand_at_sun.com>:
>
>> Hi Yi Bing,
>>
>> can you zip your code? That will be easier to compile ans test :-)
>>
>> Thanks!
>>
>> -- Jeanfrancois
>>
>> JianXing Yi wrote:
>>
>>>
>>>
>>> Hi Alexey,
>>>
>>>
>>>> 1. in the grizzly-based nio client, i try to establish a
>>>> relatively large number of connections to a server, e.g. 30,000. i
>>>> avoid 30s closing idle connections through the
>>>> SelectionKeyHandler. my problem is that it seems that about 5,000
>>>> connections is the limit. no more connections could be established
>>>> after that. btw, i've already adjusted the system tcp parameters,
>>>> such as the tcp connection number (greater than 30,000) and the
>>>> keep-alive time.
>>>>
>>> It should be some system limitations, in Grizzly we don't have any.
>>> If you will provide me the code, I can test how many connections I
>>> can create on my local machine, just to compare the results.
>>> Also it's interesting what kind of scenario you have, which requires
>>> 30K+ tcp connections? :))
>>>
>>>
>>> great thanks for your help!
>>>
>>>
>>> i want to generate http (may be extended) load to servers. some http
>>> servers can support persistent connections, so a relatively large
>>> number of connections to them should be established to get the
>>> performance index.
>>>
>>> i compared the result of the Grizzly-based client and a raw NIO API
>>> based one. netstat showed the number of concurrent connections for
>>> each of them, they are 5k and 25k in my environment. code of the
>>> Grizzly-based client, the raw NIO API based client and the
>>> Grizzly-based echo server is listed below.
>>>
>>>
>>> package test.grizzly;
>>>
>>> import java.io.IOException;
>>> import java.net.InetSocketAddress;
>>> import java.net.SocketAddress;
>>> import java.nio.ByteBuffer;
>>> import java.nio.channels.SelectionKey;
>>> import java.util.ArrayList;
>>> import java.util.Iterator;
>>> import java.util.Queue;
>>> import java.util.concurrent.Callable;
>>> import java.util.concurrent.ConcurrentLinkedQueue;
>>> import java.util.concurrent.CountDownLatch;
>>> import java.util.concurrent.ExecutorService;
>>> import java.util.concurrent.Executors;
>>> import java.util.concurrent.atomic.AtomicLong;
>>>
>>> import com.sun.grizzly.CallbackHandler;
>>> import com.sun.grizzly.Context;
>>> import com.sun.grizzly.Controller;
>>> import com.sun.grizzly.ControllerStateListenerAdapter;
>>> import com.sun.grizzly.DefaultConnectorHandlerPool;
>>> import com.sun.grizzly.DefaultPipeline;
>>> import com.sun.grizzly.DefaultSelectionKeyHandler;
>>> import com.sun.grizzly.IOEvent;
>>> import com.sun.grizzly.Pipeline;
>>> import com.sun.grizzly.TCPConnectorHandler;
>>> import com.sun.grizzly.TCPSelectorHandler;
>>> import com.sun.grizzly.Controller.Protocol;
>>> import com.sun.grizzly.async.AsyncWriteCallbackHandler;
>>> import com.sun.grizzly.async.AsyncWriteQueueRecord;
>>>
>>> public class GrizzlyBasedClient {
>>>
>>> public static final int CONNECTION_NUMBER = 30000;
>>> public static final int NUMBER_CONNECTION_PER_THREAD = 1000;
>>> private static final int SERVER_PORT = 11113;
>>> private static final String SERVER_HOST_OR_IP = "192.168.0.113
>>> <http://192.168.0.113>";
>>>
>>> private static final int CONNECTION_THREADPOOL_SIZE = 100;
>>> private static final int NUMBER_LOAD_ON_THREADPOOL = 10;
>>>
>>> /**
>>> * @param args
>>> * @throws InterruptedException
>>> * @throws IOException
>>> */
>>> public static void main(String[] args) throws
>>> InterruptedException,
>>> IOException {
>>> GrizzlyBasedClient lg = new GrizzlyBasedClient();
>>> lg.start();
>>> }
>>>
>>> private Controller controller;
>>> protected CountDownLatch controllerReadyLatch = new
>>> CountDownLatch(1);
>>> public ConcurrentLinkedQueue<TCPConnectorHandler> connectors =
>>> new ConcurrentLinkedQueue<TCPConnectorHandler>();
>>>
>>> // private TCPConnectorHandler connectorHandler;
>>>
>>> class MyConnectorCallbackHandler implements
>>> CallbackHandler<Context> {
>>> private TCPConnectorHandler tcpConnectorHandler;
>>>
>>> public MyConnectorCallbackHandler(
>>> TCPConnectorHandler tcpConnectorHandler) {
>>> this.tcpConnectorHandler = tcpConnectorHandler;
>>> }
>>>
>>> public void onConnect(IOEvent<Context> ioEvent) {
>>> try {
>>> tcpConnectorHandler.finishConnect(ioEvent.attachment()
>>> .getSelectionKey());
>>> allConnectedLatch.countDown();
>>> connectors.add(tcpConnectorHandler);
>>> // Reporter.getInstance().collect(
>>> // tcpConnectorHandler.getSelectorHandler().getPort(),
>>> // System.currentTimeMillis());
>>> connectionCount.incrementAndGet();
>>>
>>> //
>>>
>>> System.out.println(tcpConnectorHandler.getSelectorHandler().getPort()+",
>>> // "+System.currentTimeMillis());
>>> } catch (IOException e) {
>>> //
>>> System.out.println(Reporter.getInstance().verbose());
>>> e.printStackTrace();
>>> }
>>> }
>>>
>>> public void onRead(IOEvent<Context> arg0) {
>>> // TODO Auto-generated method stub
>>>
>>> }
>>>
>>> public void onWrite(IOEvent<Context> arg0) {
>>> // TODO Auto-generated method stub
>>>
>>> }
>>> }
>>>
>>> protected ByteBuffer readBuffer =
>>> ByteBuffer.allocateDirect(10240);
>>>
>>> AtomicLong transactionCount = new AtomicLong(0);
>>> AtomicLong connectionCount = new AtomicLong(0);
>>> public AtomicLong bytesRead = new AtomicLong(0);
>>> public AtomicLong minusOneCount = new AtomicLong(0);
>>> public AtomicLong zeroCount = new AtomicLong(0);
>>>
>>> class MyAsyncWriteCallbackHandler implements
>>> AsyncWriteCallbackHandler {
>>> private TCPConnectorHandler tcpConnectorHandler;
>>>
>>> public MyAsyncWriteCallbackHandler(
>>> TCPConnectorHandler tcpConnectorHandler) {
>>> this.tcpConnectorHandler = tcpConnectorHandler;
>>> }
>>>
>>> public void onIOException(IOException ioException,
>>> SelectionKey key,
>>> ByteBuffer buffer, Queue<AsyncWriteQueueRecord>
>>> remainingQueue) {
>>> // TODO Auto-generated method stub
>>>
>>> }
>>>
>>> public void onWriteCompleted(SelectionKey key, ByteBuffer
>>> buffer) {
>>> try {
>>>
>>> transactionCount.incrementAndGet();
>>>
>>> readBuffer.clear();
>>>
>>> long nCount =
>>> this.tcpConnectorHandler.read(readBuffer, true);
>>> if (nCount == -1) {
>>> minusOneCount.incrementAndGet();
>>> } else if (nCount == 0) {
>>> zeroCount.incrementAndGet();
>>> } else {
>>> bytesRead.addAndGet(nCount);
>>> }
>>> // readBuffer.flip();
>>> //
>>> System.out.println(Charset.defaultCharset().decode(readBuffer));
>>> // System.out.println("V");
>>> // tcpConnectorHandler.close();
>>> //
>>> controller.releaseConnectorHandler(tcpConnectorHandler);
>>> } catch (IOException e) {
>>> // TODO Auto-generated catch block
>>> e.printStackTrace();
>>> }
>>> }
>>> };
>>>
>>> private static int localPort = 12110;
>>> SocketAddress serverAddr = new
>>> InetSocketAddress(SERVER_HOST_OR_IP,
>>> SERVER_PORT);
>>> private CountDownLatch allConnectedLatch = new CountDownLatch(
>>> CONNECTION_NUMBER);
>>>
>>> public void start() throws InterruptedException, IOException {
>>> controller = createController();
>>> new Thread(controller).start();
>>> waitControllerReady();
>>>
>>> // Establish CONNECTION_NUMBER connections.
>>> long start1 = System.currentTimeMillis();
>>> for (int i = 0; i < CONNECTION_NUMBER; i++) {
>>> SocketAddress localAddr = new
>>> InetSocketAddress("localhost",
>>> localPort++);
>>> // connectorHandler.setLinger(10);
>>> TCPConnectorHandler connectorHandler =
>>> (TCPConnectorHandler) controller
>>> .acquireConnectorHandler(Protocol.TCP);
>>> // connectorHandler.setTcpNoDelay(true);
>>> connectorHandler.setReuseAddress(true);
>>>
>>> connectorHandler.connect(serverAddr, /* localAddr, */
>>> new MyConnectorCallbackHandler(connectorHandler));
>>> // Thread.sleep(20);
>>> }
>>> allConnectedLatch.await();
>>> long interval2 = System.currentTimeMillis() - start1;
>>> System.out.println("ALL CONNECTED." + " connections: "
>>> + connectionCount.get() + "," + connectors.size()
>>> + ", connection total time (s): " + interval2 / 1000);
>>>
>>> // distribute tasks
>>> Iterator<TCPConnectorHandler> itCon = connectors.iterator();
>>> ArrayList<Callable<Object>> callables2 = new
>>> ArrayList<Callable<Object>>();
>>> // int count = 0;
>>> // LinkedList<TCPConnectorHandler> tmpConnectorList = new
>>> // LinkedList<TCPConnectorHandler>();
>>> while (itCon.hasNext()) {
>>> final TCPConnectorHandler con = itCon.next();
>>> // tmpConnectorList.add(con);
>>> // count++;
>>> callables2.add(new Callable<Object>() {
>>> public Object call() throws Exception {
>>> // Perform 3 transactions on each connection.
>>> for (int i = 0; i < 1; i++) {
>>> executeOnConnection(con);
>>> }
>>> return null;
>>> }
>>> });
>>> }
>>>
>>> // Operate on each connection.
>>>
>>> // Callable<Object>[] callables3 = new
>>> // Callable[NUMBER_LOAD_ON_THREADPOOL];
>>>
>>> // Callable<Object>[] callables = new
>>> // Callable[NUMBER_LOAD_ON_THREADPOOL];
>>> // // Issue these connections concurrently.
>>> // for (int i = 0; i < callables.length; i++) {
>>> // callables[i] = new Callable<Object>() {
>>> // public Object call() throws Exception {
>>> // for (int i = 0; i < CONNECTION_NUMBER
>>> // / NUMBER_LOAD_ON_THREADPOOL; i++) {
>>> // // TCPConnectorHandler connectorHandler =
>>> // // (TCPConnectorHandler) controller
>>> // // .acquireConnectorHandler(Protocol.TCP);
>>> // executeOnConnection(connectorHandler);
>>> // }
>>> // return null;
>>> // }
>>> // };
>>> // }
>>> ExecutorService executor = Executors
>>> .newFixedThreadPool(CONNECTION_THREADPOOL_SIZE);
>>> // List<Callable<Object>> c = Arrays.asList(callables2);
>>>
>>> long start = System.currentTimeMillis();
>>>
>>> executor.invokeAll(callables2);
>>> executor.shutdown();
>>>
>>> long interval = System.currentTimeMillis() - start;
>>> System.out.println("transaction/s: " +
>>> transactionCount.get() * 1000
>>> / interval + ", bytes read: " + bytesRead.get()
>>> + "\n-1 times: " + minusOneCount.get() + ", 0 times: "
>>> + zeroCount);
>>>
>>> Thread.sleep(2000);
>>> itCon = connectors.iterator();
>>> while (itCon.hasNext()) {
>>> TCPConnectorHandler con = itCon.next();
>>> // con.getSelectorHandler().getPort()
>>> con.close();
>>> }
>>>
>>> controller.stop();
>>> }
>>>
>>> private void executeOnConnection(TCPConnectorHandler
>>> connectorHandler)
>>> throws IOException {
>>> try {
>>> sendRequest(connectorHandler);
>>> } finally {
>>> // connectorHandler.close();
>>> }
>>> }
>>>
>>> private void sendRequest(TCPConnectorHandler connectorHandler)
>>> throws IOException {
>>> String str = "GET /"
>>> + /* icons/apache_pb2.gif */"\r\nHTTP 1.1\r\nHOST:"
>>> + SERVER_HOST_OR_IP +
>>> "\r\nConnection:keep-alive\r\n\r\n";
>>>
>>> connectorHandler.writeToAsyncQueue(ByteBuffer.wrap(str.getBytes()),
>>> new MyAsyncWriteCallbackHandler(connectorHandler));
>>> }
>>>
>>> private void waitControllerReady() throws InterruptedException {
>>> controllerReadyLatch.await();
>>> }
>>>
>>> private Controller createController() {
>>> Controller tmpCtl = new Controller();
>>> DefaultConnectorHandlerPool connectorPool = new
>>> DefaultConnectorHandlerPool(
>>> tmpCtl);
>>> tmpCtl.setConnectorHandlerPool(connectorPool);
>>>
>>> DefaultSelectionKeyHandler keyHandler = new
>>> DefaultSelectionKeyHandler();
>>> keyHandler.setTimeout(30 * 1000 * 60);
>>> tmpCtl.setSelectionKeyHandler(keyHandler);
>>> tmpCtl.setSelectorHandler(new TCPSelectorHandler(true));
>>> tmpCtl.addStateListener(new ControllerStateListenerAdapter() {
>>> @Override
>>> public void onReady() {
>>> controllerReadyLatch.countDown();
>>> }
>>> });
>>>
>>> Pipeline pl = new DefaultPipeline();
>>> pl.setMaxThreads(50);
>>> tmpCtl.setPipeline(pl);
>>>
>>> // tmpCtl.getConnectorHandlerPool().
>>>
>>> return tmpCtl;
>>> }
>>>
>>> }
>>>
>>>
>>> package test.grizzly;
>>>
>>> import java.net.InetAddress;
>>> import java.net.UnknownHostException;
>>> import java.util.concurrent.CountDownLatch;
>>>
>>> import com.sun.grizzly.Controller;
>>> import com.sun.grizzly.ControllerStateListenerAdapter;
>>> import com.sun.grizzly.DefaultProtocolChain;
>>> import com.sun.grizzly.DefaultProtocolChainInstanceHandler;
>>> import com.sun.grizzly.ProtocolChain;
>>> import com.sun.grizzly.TCPSelectorHandler;
>>> import com.sun.grizzly.filter.EchoFilter;
>>> import com.sun.grizzly.filter.ReadFilter;
>>>
>>> public class EchoServer {
>>>
>>> private static final String SERVER_HOST_IP = "190.168.0.113 <
>>> http://190.168.0.113>";
>>>
>>> private static final int SERVER_PORT = 11113;
>>>
>>> /**
>>> * @param args
>>> * @throws InterruptedException
>>> * @throws UnknownHostException
>>> */
>>> public static void main(String[] args) throws UnknownHostException,
>>> InterruptedException {
>>> EchoServer server = new EchoServer();
>>> server.start();
>>> }
>>>
>>> private Controller controller;
>>> protected CountDownLatch controllerReadyLatch = new CountDownLatch(1);
>>>
>>> public void start() throws InterruptedException, UnknownHostException
>>> {
>>> controller = createController();
>>> new Thread(controller).start();
>>>
>>> controllerReadyLatch.await();
>>>
>>> }
>>>
>>> private Controller createController() throws UnknownHostException {
>>> Controller tmpCtl = new Controller();
>>> TCPSelectorHandler tcpSelectorHandler = new TCPSelectorHandler();
>>> tcpSelectorHandler.setInet(InetAddress.getByName(SERVER_HOST_IP));
>>> tcpSelectorHandler.setPort(SERVER_PORT);
>>> tmpCtl.addSelectorHandler(tcpSelectorHandler);
>>> tmpCtl.addStateListener(new ControllerStateListenerAdapter() {
>>> @Override
>>> public void onReady() {
>>> controllerReadyLatch.countDown();
>>> }
>>> });
>>> tmpCtl
>>> .setProtocolChainInstanceHandler(new
>>> DefaultProtocolChainInstanceHandler() {
>>> private ProtocolChain protocolChain;
>>>
>>> @Override
>>> public ProtocolChain poll() {
>>> if (protocolChain == null) {
>>> protocolChain = new DefaultProtocolChain();
>>> }
>>> return protocolChain;
>>> }
>>> });
>>> tmpCtl.getProtocolChainInstanceHandler().poll().addFilter(
>>> new ReadFilter());
>>> tmpCtl.getProtocolChainInstanceHandler().poll().addFilter(
>>> new EchoFilter());
>>>
>>> return tmpCtl;
>>> }
>>>
>>> }
>>>
>>>
>>> package test.grizzly;
>>>
>>> import java.io.IOException;
>>> import java.net.InetAddress;
>>> import java.net.InetSocketAddress;
>>> import java.net.UnknownHostException;
>>> import java.nio.channels.SelectionKey;
>>> import java.nio.channels.Selector;
>>> import java.nio.channels.SocketChannel;
>>> import java.nio.channels.spi.SelectorProvider;
>>> import java.util.Iterator;
>>>
>>> public class RawNioClient implements Runnable {
>>> private InetAddress serverHost;
>>> private int serverPort;
>>> private Selector selector;
>>> private int connectionCount = 0;
>>>
>>> // private SocketChannel socketChannel;
>>>
>>> public RawNioClient(InetAddress serverHost, int serverPort)
>>> throws IOException {
>>> this.serverHost = serverHost;
>>> this.serverPort = serverPort;
>>> selector = initSelector();
>>> // socketChannel = initSocketChannel();
>>> }
>>>
>>> private SocketChannel initSocketChannel() throws IOException {
>>> SocketChannel sc = SocketChannel.open();
>>> sc.connect(new InetSocketAddress(serverHost, serverPort));
>>> sc.configureBlocking(false);
>>> sc.socket().setReuseAddress(true);
>>> sc.register(selector, SelectionKey.OP_CONNECT);
>>> return sc;
>>> }
>>>
>>> private Selector initSelector() throws IOException {
>>> return SelectorProvider.provider().openSelector();
>>> }
>>>
>>> public static void main(String[] args) {
>>> try {
>>> new Thread(new RawNioClient(InetAddress.getByName("
>>> 192.168.0.113 <http://192.168.0.113>"),
>>>
>>> 11113)).start();
>>> } catch (UnknownHostException e) {
>>> // TODO Auto-generated catch block
>>> e.printStackTrace();
>>> } catch (IOException e) {
>>> // TODO Auto-generated catch block
>>> e.printStackTrace();
>>> }
>>> }
>>>
>>> public void run() {
>>> while (true) {
>>> try {
>>> initSocketChannel();
>>> } catch (IOException e1) {
>>> // TODO Auto-generated catch block
>>> e1.printStackTrace();
>>> }
>>> if (connectionCount >= 30000) {
>>> System.out.println("reach 30000 connections");
>>> try {
>>> Thread.sleep(5000);
>>> } catch (InterruptedException e) {
>>> // TODO Auto-generated catch block
>>> e.printStackTrace();
>>> }
>>> } else {
>>> try {
>>> selector.select();
>>>
>>> Iterator<SelectionKey> itKey = selector.selectedKeys()
>>> .iterator();
>>> while (itKey.hasNext()) {
>>> SelectionKey key = itKey.next();
>>> itKey.remove();
>>> if (!key.isValid()) {
>>> continue;
>>> }
>>> if (key.isConnectable()) {
>>> doConnect(key);
>>> }
>>> }
>>> } catch (IOException e) {
>>> // TODO Auto-generated catch block
>>> e.printStackTrace();
>>> }
>>> }
>>> }
>>> }
>>>
>>> private void doConnect(SelectionKey key) {
>>> SocketChannel sc = (SocketChannel) key.channel();
>>> try {
>>> boolean fc = sc.finishConnect();
>>> } catch (IOException e) {
>>> // TODO Auto-generated catch block
>>> e.printStackTrace();
>>> }
>>> connectionCount++;
>>> }
>>> }
>>>
>>>
>>>
>>>> 2. i want to iterate over the ConnectorHandlerPool to perform
>>>> transaction on each connection. but it seems that
>>>> ConnectorHandlerPool does not support the iteration explicitly. so
>>>> as a walk-around, i collect the ConnectorHandler after executing
>>>> ConnectorHandler.finishConnect manually.
>>>>
>>> May be it's the only way right now. As ConnectorHandlerPool
>>> initially wasn't introduced to serve existing live connections, but
>>> to cache/reuse ones, which were released (closed).
>>>
>>> thanks.
>>>
>>> - Yi Bing
>>>
>>>
>>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>
>>
> <connection_test.zip>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>
>
>