After I replace the Grizzly framework from to 1.8.1-rc on the
client-side, there is no implicit connection 'killer' and the grizzly-based
client created about 20k+ connections now.
2008/6/11 JianXing Yi <>:
> Hi Alexey,
> --Just checked your code on my machine... By default, I succeeded to create
> max 5700 connections for *both* types of client.
> Did you statitstic the connection numbers on the server-side or
> client-side?
> For the grizzly-based client, the connection number can reach 15k+ on
> client-side, but can not exceed about 6k+ limit on the server-side. On
> client-side, within 15k+ connections, there are approximately 10k+ TIME_WAIT
> connections. Because TIME_WAIT connections only occur in actively closing
> peer, so I remove the closing operations of the grizzly-based client but it
> is still the same situation :-(. Is there any closing operation in
> ConnectorHandler or something else? Btw, I've set the connection timeout to
> a large number through TcpConnectorHandler.setConnectionTimeout() already.
> Regards,
> -Yi Bing
> 2008/6/10 JianXing Yi <>:
> Hi Alexey,
>> I revised the raw NIO client as you pointed out and tried the 2 clients
>> again. The result is the same as previously got :-(. The grizzly-based one
>> reached 5.8k+ connections while the raw one reached 15k+ connections (I
>> stopped it instead of waiting any more). In my env., the clients run on SUN
>> T3400 station, the simple echo server runs on my laptop and on a Linux
>> server (both connection limits are enlarged). I'll try them again... Anyway,
>> great thanks for your help!
>> Regards,
>> -Yi Bing
>> 2008/6/10 Oleksiy Stashok <>:
>>> Hi Yi,
>>> Just checked your code on my machine... By default, I succeeded to create
>>> max 5700 connections for *both* types of client.
>>> Increasing max number of open files for MacOS [1], I succeeded to
>>> increase the max number of client connections. So, IMHO, results look
>>> consistent.
>>> Hope this helps.
>>> Also, I've updated your Raw NIO client initSocketChannel, doConnect
>>> methods like this:
>>> private SocketChannel initSocketChannel() throws IOException {
>>> SocketChannel sc =;
>>> sc.configureBlocking(false);
>>> sc.socket().setReuseAddress(true);
>>> sc.connect(new InetSocketAddress(serverHost, serverPort));
>>> sc.register(selector, SelectionKey.OP_CONNECT);
>>> return sc;
>>> }
>>> .............................. cut
>>> .......................................
>>> private void doConnect(SelectionKey key) {
>>> SocketChannel sc = (SocketChannel);
>>> try {
>>> if (sc.finishConnect()) {
>>> connectionCount.incrementAndGet();
>>> }
>>> } catch (IOException e) {
>>> // TODO Auto-generated catch block
>>> e.printStackTrace();
>>> }
>>> }
>>> /////////////////////////////////////////////////////
>>> Hope this helps.
>>> WBR,
>>> Alexey.
>>> [1] sysctl -w kern.maxfiles=50000
>>> sysctl -w kern.maxfilesperproc=40000
>>> On Jun 10, 2008, at 4:23 , JianXing Yi wrote:
>>> ok. pls. get the code from the attached.
>>> thanks!
>>> -Yi Bing
>>> 2008/6/10 Jeanfrancois Arcand <>:
>>>> Hi Yi Bing,
>>>> can you zip your code? That will be easier to compile ans test :-)
>>>> Thanks!
>>>> -- Jeanfrancois
>>>> JianXing Yi wrote:
>>>>> Hi Alexey,
>>>>>> 1. in the grizzly-based nio client, i try to establish a
>>>>>> relatively large number of connections to a server, e.g. 30,000. i
>>>>>> avoid 30s closing idle connections through the
>>>>>> SelectionKeyHandler. my problem is that it seems that about 5,000
>>>>>> connections is the limit. no more connections could be established
>>>>>> after that. btw, i've already adjusted the system tcp parameters,
>>>>>> such as the tcp connection number (greater than 30,000) and the
>>>>>> keep-alive time.
>>>>> It should be some system limitations, in Grizzly we don't have any.
>>>>> If you will provide me the code, I can test how many connections I
>>>>> can create on my local machine, just to compare the results.
>>>>> Also it's interesting what kind of scenario you have, which requires
>>>>> 30K+ tcp connections? :))
>>>>> great thanks for your help!
>>>>> i want to generate http (may be extended) load to servers. some http
>>>>> servers can support persistent connections, so a relatively large
>>>>> number of connections to them should be established to get the
>>>>> performance index.
>>>>> i compared the result of the Grizzly-based client and a raw NIO API
>>>>> based one. netstat showed the number of concurrent connections for
>>>>> each of them, they are 5k and 25k in my environment. code of the
>>>>> Grizzly-based client, the raw NIO API based client and the
>>>>> Grizzly-based echo server is listed below.
>>>>> package test.grizzly;
>>>>> import;
>>>>> import;
>>>>> import;
>>>>> import java.nio.ByteBuffer;
>>>>> import java.nio.channels.SelectionKey;
>>>>> import java.util.ArrayList;
>>>>> import java.util.Iterator;
>>>>> import java.util.Queue;
>>>>> import java.util.concurrent.Callable;
>>>>> import java.util.concurrent.ConcurrentLinkedQueue;
>>>>> import java.util.concurrent.CountDownLatch;
>>>>> import java.util.concurrent.ExecutorService;
>>>>> import java.util.concurrent.Executors;
>>>>> import java.util.concurrent.atomic.AtomicLong;
>>>>> import com.sun.grizzly.CallbackHandler;
>>>>> import com.sun.grizzly.Context;
>>>>> import com.sun.grizzly.Controller;
>>>>> import com.sun.grizzly.ControllerStateListenerAdapter;
>>>>> import com.sun.grizzly.DefaultConnectorHandlerPool;
>>>>> import com.sun.grizzly.DefaultPipeline;
>>>>> import com.sun.grizzly.DefaultSelectionKeyHandler;
>>>>> import com.sun.grizzly.IOEvent;
>>>>> import com.sun.grizzly.Pipeline;
>>>>> import com.sun.grizzly.TCPConnectorHandler;
>>>>> import com.sun.grizzly.TCPSelectorHandler;
>>>>> import com.sun.grizzly.Controller.Protocol;
>>>>> import com.sun.grizzly.async.AsyncWriteCallbackHandler;
>>>>> import com.sun.grizzly.async.AsyncWriteQueueRecord;
>>>>> public class GrizzlyBasedClient {
>>>>> public static final int CONNECTION_NUMBER = 30000;
>>>>> public static final int NUMBER_CONNECTION_PER_THREAD = 1000;
>>>>> private static final int SERVER_PORT = 11113;
>>>>> private static final String SERVER_HOST_OR_IP = "
>>>>> <>";
>>>>> private static final int CONNECTION_THREADPOOL_SIZE = 100;
>>>>> private static final int NUMBER_LOAD_ON_THREADPOOL = 10;
>>>>> /**
>>>>> * @param args
>>>>> * @throws InterruptedException
>>>>> * @throws IOException
>>>>> */
>>>>> public static void main(String[] args) throws
>>>>> InterruptedException,
>>>>> IOException {
>>>>> GrizzlyBasedClient lg = new GrizzlyBasedClient();
>>>>> lg.start();
>>>>> }
>>>>> private Controller controller;
>>>>> protected CountDownLatch controllerReadyLatch = new
>>>>> CountDownLatch(1);
>>>>> public ConcurrentLinkedQueue<TCPConnectorHandler> connectors =
>>>>> new ConcurrentLinkedQueue<TCPConnectorHandler>();
>>>>> // private TCPConnectorHandler connectorHandler;
>>>>> class MyConnectorCallbackHandler implements
>>>>> CallbackHandler<Context> {
>>>>> private TCPConnectorHandler tcpConnectorHandler;
>>>>> public MyConnectorCallbackHandler(
>>>>> TCPConnectorHandler tcpConnectorHandler) {
>>>>> this.tcpConnectorHandler = tcpConnectorHandler;
>>>>> }
>>>>> public void onConnect(IOEvent<Context> ioEvent) {
>>>>> try {
>>>>> tcpConnectorHandler.finishConnect(ioEvent.attachment()
>>>>> .getSelectionKey());
>>>>> allConnectedLatch.countDown();
>>>>> connectors.add(tcpConnectorHandler);
>>>>> // Reporter.getInstance().collect(
>>>>> //
>>>>> tcpConnectorHandler.getSelectorHandler().getPort(),
>>>>> // System.currentTimeMillis());
>>>>> connectionCount.incrementAndGet();
>>>>> //
>>>>> System.out.println(tcpConnectorHandler.getSelectorHandler().getPort()+",
>>>>> // "+System.currentTimeMillis());
>>>>> } catch (IOException e) {
>>>>> //
>>>>> System.out.println(Reporter.getInstance().verbose());
>>>>> e.printStackTrace();
>>>>> }
>>>>> }
>>>>> public void onRead(IOEvent<Context> arg0) {
>>>>> // TODO Auto-generated method stub
>>>>> }
>>>>> public void onWrite(IOEvent<Context> arg0) {
>>>>> // TODO Auto-generated method stub
>>>>> }
>>>>> }
>>>>> protected ByteBuffer readBuffer =
>>>>> ByteBuffer.allocateDirect(10240);
>>>>> AtomicLong transactionCount = new AtomicLong(0);
>>>>> AtomicLong connectionCount = new AtomicLong(0);
>>>>> public AtomicLong bytesRead = new AtomicLong(0);
>>>>> public AtomicLong minusOneCount = new AtomicLong(0);
>>>>> public AtomicLong zeroCount = new AtomicLong(0);
>>>>> class MyAsyncWriteCallbackHandler implements
>>>>> AsyncWriteCallbackHandler {
>>>>> private TCPConnectorHandler tcpConnectorHandler;
>>>>> public MyAsyncWriteCallbackHandler(
>>>>> TCPConnectorHandler tcpConnectorHandler) {
>>>>> this.tcpConnectorHandler = tcpConnectorHandler;
>>>>> }
>>>>> public void onIOException(IOException ioException,
>>>>> SelectionKey key,
>>>>> ByteBuffer buffer, Queue<AsyncWriteQueueRecord>
>>>>> remainingQueue) {
>>>>> // TODO Auto-generated method stub
>>>>> }
>>>>> public void onWriteCompleted(SelectionKey key, ByteBuffer
>>>>> buffer) {
>>>>> try {
>>>>> transactionCount.incrementAndGet();
>>>>> readBuffer.clear();
>>>>> long nCount =
>>>>>, true);
>>>>> if (nCount == -1) {
>>>>> minusOneCount.incrementAndGet();
>>>>> } else if (nCount == 0) {
>>>>> zeroCount.incrementAndGet();
>>>>> } else {
>>>>> bytesRead.addAndGet(nCount);
>>>>> }
>>>>> // readBuffer.flip();
>>>>> //
>>>>> System.out.println(Charset.defaultCharset().decode(readBuffer));
>>>>> // System.out.println("V");
>>>>> // tcpConnectorHandler.close();
>>>>> //
>>>>> controller.releaseConnectorHandler(tcpConnectorHandler);
>>>>> } catch (IOException e) {
>>>>> // TODO Auto-generated catch block
>>>>> e.printStackTrace();
>>>>> }
>>>>> }
>>>>> };
>>>>> private static int localPort = 12110;
>>>>> SocketAddress serverAddr = new
>>>>> InetSocketAddress(SERVER_HOST_OR_IP,
>>>>> private CountDownLatch allConnectedLatch = new CountDownLatch(
>>>>> public void start() throws InterruptedException, IOException {
>>>>> controller = createController();
>>>>> new Thread(controller).start();
>>>>> waitControllerReady();
>>>>> // Establish CONNECTION_NUMBER connections.
>>>>> long start1 = System.currentTimeMillis();
>>>>> for (int i = 0; i < CONNECTION_NUMBER; i++) {
>>>>> SocketAddress localAddr = new
>>>>> InetSocketAddress("localhost",
>>>>> localPort++);
>>>>> // connectorHandler.setLinger(10);
>>>>> TCPConnectorHandler connectorHandler =
>>>>> (TCPConnectorHandler) controller
>>>>> .acquireConnectorHandler(Protocol.TCP);
>>>>> // connectorHandler.setTcpNoDelay(true);
>>>>> connectorHandler.setReuseAddress(true);
>>>>> connectorHandler.connect(serverAddr, /* localAddr, */
>>>>> new MyConnectorCallbackHandler(connectorHandler));
>>>>> // Thread.sleep(20);
>>>>> }
>>>>> allConnectedLatch.await();
>>>>> long interval2 = System.currentTimeMillis() - start1;
>>>>> System.out.println("ALL CONNECTED." + " connections: "
>>>>> + connectionCount.get() + "," + connectors.size()
>>>>> + ", connection total time (s): " + interval2 /
>>>>> 1000);
>>>>> // distribute tasks
>>>>> Iterator<TCPConnectorHandler> itCon = connectors.iterator();
>>>>> ArrayList<Callable<Object>> callables2 = new
>>>>> ArrayList<Callable<Object>>();
>>>>> // int count = 0;
>>>>> // LinkedList<TCPConnectorHandler> tmpConnectorList = new
>>>>> // LinkedList<TCPConnectorHandler>();
>>>>> while (itCon.hasNext()) {
>>>>> final TCPConnectorHandler con =;
>>>>> // tmpConnectorList.add(con);
>>>>> // count++;
>>>>> callables2.add(new Callable<Object>() {
>>>>> public Object call() throws Exception {
>>>>> // Perform 3 transactions on each connection.
>>>>> for (int i = 0; i < 1; i++) {
>>>>> executeOnConnection(con);
>>>>> }
>>>>> return null;
>>>>> }
>>>>> });
>>>>> }
>>>>> // Operate on each connection.
>>>>> // Callable<Object>[] callables3 = new
>>>>> // Callable<Object>[] callables = new
>>>>> // // Issue these connections concurrently.
>>>>> // for (int i = 0; i < callables.length; i++) {
>>>>> // callables[i] = new Callable<Object>() {
>>>>> // public Object call() throws Exception {
>>>>> // for (int i = 0; i < CONNECTION_NUMBER
>>>>> // / NUMBER_LOAD_ON_THREADPOOL; i++) {
>>>>> // // TCPConnectorHandler connectorHandler =
>>>>> // // (TCPConnectorHandler) controller
>>>>> // // .acquireConnectorHandler(Protocol.TCP);
>>>>> // executeOnConnection(connectorHandler);
>>>>> // }
>>>>> // return null;
>>>>> // }
>>>>> // };
>>>>> // }
>>>>> ExecutorService executor = Executors
>>>>> .newFixedThreadPool(CONNECTION_THREADPOOL_SIZE);
>>>>> // List<Callable<Object>> c = Arrays.asList(callables2);
>>>>> long start = System.currentTimeMillis();
>>>>> executor.invokeAll(callables2);
>>>>> executor.shutdown();
>>>>> long interval = System.currentTimeMillis() - start;
>>>>> System.out.println("transaction/s: " +
>>>>> transactionCount.get() * 1000
>>>>> / interval + ", bytes read: " + bytesRead.get()
>>>>> + "\n-1 times: " + minusOneCount.get() + ", 0 times:
>>>>> "
>>>>> + zeroCount);
>>>>> Thread.sleep(2000);
>>>>> itCon = connectors.iterator();
>>>>> while (itCon.hasNext()) {
>>>>> TCPConnectorHandler con =;
>>>>> // con.getSelectorHandler().getPort()
>>>>> con.close();
>>>>> }
>>>>> controller.stop();
>>>>> }
>>>>> private void executeOnConnection(TCPConnectorHandler
>>>>> connectorHandler)
>>>>> throws IOException {
>>>>> try {
>>>>> sendRequest(connectorHandler);
>>>>> } finally {
>>>>> // connectorHandler.close();
>>>>> }
>>>>> }
>>>>> private void sendRequest(TCPConnectorHandler connectorHandler)
>>>>> throws IOException {
>>>>> String str = "GET /"
>>>>> + /* icons/apache_pb2.gif */"\r\nHTTP 1.1\r\nHOST:"
>>>>> "\r\nConnection:keep-alive\r\n\r\n";
>>>>> connectorHandler.writeToAsyncQueue(ByteBuffer.wrap(str.getBytes()),
>>>>> new MyAsyncWriteCallbackHandler(connectorHandler));
>>>>> }
>>>>> private void waitControllerReady() throws InterruptedException {
>>>>> controllerReadyLatch.await();
>>>>> }
>>>>> private Controller createController() {
>>>>> Controller tmpCtl = new Controller();
>>>>> DefaultConnectorHandlerPool connectorPool = new
>>>>> DefaultConnectorHandlerPool(
>>>>> tmpCtl);
>>>>> tmpCtl.setConnectorHandlerPool(connectorPool);
>>>>> DefaultSelectionKeyHandler keyHandler = new
>>>>> DefaultSelectionKeyHandler();
>>>>> keyHandler.setTimeout(30 * 1000 * 60);
>>>>> tmpCtl.setSelectionKeyHandler(keyHandler);
>>>>> tmpCtl.setSelectorHandler(new TCPSelectorHandler(true));
>>>>> tmpCtl.addStateListener(new ControllerStateListenerAdapter()
>>>>> {
>>>>> @Override
>>>>> public void onReady() {
>>>>> controllerReadyLatch.countDown();
>>>>> }
>>>>> });
>>>>> Pipeline pl = new DefaultPipeline();
>>>>> pl.setMaxThreads(50);
>>>>> tmpCtl.setPipeline(pl);
>>>>> // tmpCtl.getConnectorHandlerPool().
>>>>> return tmpCtl;
>>>>> }
>>>>> }
>>>>> package test.grizzly;
>>>>> import;
>>>>> import;
>>>>> import java.util.concurrent.CountDownLatch;
>>>>> import com.sun.grizzly.Controller;
>>>>> import com.sun.grizzly.ControllerStateListenerAdapter;
>>>>> import com.sun.grizzly.DefaultProtocolChain;
>>>>> import com.sun.grizzly.DefaultProtocolChainInstanceHandler;
>>>>> import com.sun.grizzly.ProtocolChain;
>>>>> import com.sun.grizzly.TCPSelectorHandler;
>>>>> import com.sun.grizzly.filter.EchoFilter;
>>>>> import com.sun.grizzly.filter.ReadFilter;
>>>>> public class EchoServer {
>>>>> private static final String SERVER_HOST_IP = " <
>>>>> private static final int SERVER_PORT = 11113;
>>>>> /**
>>>>> * @param args
>>>>> * @throws InterruptedException
>>>>> * @throws UnknownHostException
>>>>> */
>>>>> public static void main(String[] args) throws UnknownHostException,
>>>>> InterruptedException {
>>>>> EchoServer server = new EchoServer();
>>>>> server.start();
>>>>> }
>>>>> private Controller controller;
>>>>> protected CountDownLatch controllerReadyLatch = new
>>>>> CountDownLatch(1);
>>>>> public void start() throws InterruptedException,
>>>>> UnknownHostException {
>>>>> controller = createController();
>>>>> new Thread(controller).start();
>>>>> controllerReadyLatch.await();
>>>>> }
>>>>> private Controller createController() throws UnknownHostException {
>>>>> Controller tmpCtl = new Controller();
>>>>> TCPSelectorHandler tcpSelectorHandler = new
>>>>> TCPSelectorHandler();
>>>>> tcpSelectorHandler.setInet(InetAddress.getByName(SERVER_HOST_IP));
>>>>> tcpSelectorHandler.setPort(SERVER_PORT);
>>>>> tmpCtl.addSelectorHandler(tcpSelectorHandler);
>>>>> tmpCtl.addStateListener(new ControllerStateListenerAdapter() {
>>>>> @Override
>>>>> public void onReady() {
>>>>> controllerReadyLatch.countDown();
>>>>> }
>>>>> });
>>>>> tmpCtl
>>>>> .setProtocolChainInstanceHandler(new
>>>>> DefaultProtocolChainInstanceHandler() {
>>>>> private ProtocolChain protocolChain;
>>>>> @Override
>>>>> public ProtocolChain poll() {
>>>>> if (protocolChain == null) {
>>>>> protocolChain = new DefaultProtocolChain();
>>>>> }
>>>>> return protocolChain;
>>>>> }
>>>>> });
>>>>> tmpCtl.getProtocolChainInstanceHandler().poll().addFilter(
>>>>> new ReadFilter());
>>>>> tmpCtl.getProtocolChainInstanceHandler().poll().addFilter(
>>>>> new EchoFilter());
>>>>> return tmpCtl;
>>>>> }
>>>>> }
>>>>> package test.grizzly;
>>>>> import;
>>>>> import;
>>>>> import;
>>>>> import;
>>>>> import java.nio.channels.SelectionKey;
>>>>> import java.nio.channels.Selector;
>>>>> import java.nio.channels.SocketChannel;
>>>>> import java.nio.channels.spi.SelectorProvider;
>>>>> import java.util.Iterator;
>>>>> public class RawNioClient implements Runnable {
>>>>> private InetAddress serverHost;
>>>>> private int serverPort;
>>>>> private Selector selector;
>>>>> private int connectionCount = 0;
>>>>> // private SocketChannel socketChannel;
>>>>> public RawNioClient(InetAddress serverHost, int serverPort)
>>>>> throws IOException {
>>>>> this.serverHost = serverHost;
>>>>> this.serverPort = serverPort;
>>>>> selector = initSelector();
>>>>> // socketChannel = initSocketChannel();
>>>>> }
>>>>> private SocketChannel initSocketChannel() throws IOException {
>>>>> SocketChannel sc =;
>>>>> sc.connect(new InetSocketAddress(serverHost, serverPort));
>>>>> sc.configureBlocking(false);
>>>>> sc.socket().setReuseAddress(true);
>>>>> sc.register(selector, SelectionKey.OP_CONNECT);
>>>>> return sc;
>>>>> }
>>>>> private Selector initSelector() throws IOException {
>>>>> return SelectorProvider.provider().openSelector();
>>>>> }
>>>>> public static void main(String[] args) {
>>>>> try {
>>>>> new Thread(new RawNioClient(InetAddress.getByName("
>>>>> <>"),
>>>>> 11113)).start();
>>>>> } catch (UnknownHostException e) {
>>>>> // TODO Auto-generated catch block
>>>>> e.printStackTrace();
>>>>> } catch (IOException e) {
>>>>> // TODO Auto-generated catch block
>>>>> e.printStackTrace();
>>>>> }
>>>>> }
>>>>> public void run() {
>>>>> while (true) {
>>>>> try {
>>>>> initSocketChannel();
>>>>> } catch (IOException e1) {
>>>>> // TODO Auto-generated catch block
>>>>> e1.printStackTrace();
>>>>> }
>>>>> if (connectionCount >= 30000) {
>>>>> System.out.println("reach 30000 connections");
>>>>> try {
>>>>> Thread.sleep(5000);
>>>>> } catch (InterruptedException e) {
>>>>> // TODO Auto-generated catch block
>>>>> e.printStackTrace();
>>>>> }
>>>>> } else {
>>>>> try {
>>>>> Iterator<SelectionKey> itKey =
>>>>> selector.selectedKeys()
>>>>> .iterator();
>>>>> while (itKey.hasNext()) {
>>>>> SelectionKey key =;
>>>>> itKey.remove();
>>>>> if (!key.isValid()) {
>>>>> continue;
>>>>> }
>>>>> if (key.isConnectable()) {
>>>>> doConnect(key);
>>>>> }
>>>>> }
>>>>> } catch (IOException e) {
>>>>> // TODO Auto-generated catch block
>>>>> e.printStackTrace();
>>>>> }
>>>>> }
>>>>> }
>>>>> }
>>>>> private void doConnect(SelectionKey key) {
>>>>> SocketChannel sc = (SocketChannel);
>>>>> try {
>>>>> boolean fc = sc.finishConnect();
>>>>> } catch (IOException e) {
>>>>> // TODO Auto-generated catch block
>>>>> e.printStackTrace();
>>>>> }
>>>>> connectionCount++;
>>>>> }
>>>>> }
>>>>>> 2. i want to iterate over the ConnectorHandlerPool to perform
>>>>>> transaction on each connection. but it seems that
>>>>>> ConnectorHandlerPool does not support the iteration explicitly. so
>>>>>> as a walk-around, i collect the ConnectorHandler after executing
>>>>>> ConnectorHandler.finishConnect manually.
>>>>> May be it's the only way right now. As ConnectorHandlerPool
>>>>> initially wasn't introduced to serve existing live connections, but
>>>>> to cache/reuse ones, which were released (closed).
>>>>> thanks.
>>>>> - Yi Bing
