users@grizzly.java.net

Re: How to implement the client's message flow based on async read/write?

From: JianXing Yi <jianxing.yi_at_gmail.com>
Date: Thu, 12 Jun 2008 18:11:06 +0800

Just summarize the message flow/protocol in this client:
The client interacts the server in this way:
It works in this way:
client server
  |--------request---------->|
  |<------response---------|

The client here implements this through following steps:

   1. ConnectorHandler.connect(SocketAddress, CallbackHandler) connects to
   server and registers TcpConnectorCallbackHandler to handle the
   SelectionKey.OP_CONNECT interest.
   2. TcpConnectorCallbackHandler#onConnect(IOEvent) finishes the
   connecting.
   3. sendRequest(TCPConnectorHandler)<file:///C:/00-Work/Projects/CMCC/LGEnhancement/nio_research_workspace/Research/doc/com/nsn/lg/LoadGenerator.html#sendRequest%28com.sun.grizzly.TCPConnectorHandler%29>performs
an asynchronous write operation and registers
   MyAsyncWriteCallbackHandler to do things want at the write-completed
   opportunity.
   4. MyAsyncWriteCallbackHandler#onWriteCompleted(SelectionKey,
ByteBuffer)performs synchronous reading when writing completed.

Looking at this, a question is can I let the CLIENT send out the request by
means of a ProtocolFilter (maybe plus a ProtocolParser)? If yes, how?

Many thanks!

Regards,
-Yi Bing


2008/6/12 JianXing Yi <jianxing.yi_at_gmail.com>:

> Yes, exactly :-). Thank you again!
>
> 2008/6/11 Oleksiy Stashok <Oleksiy.Stashok_at_sun.com>:
>
> Hello Yi,
>> does it mean it is working correctly for you right now? :)
>> In 1.8 we really fixed one annoying bug with client connection timeouts.
>>
>> Thanks.
>>
>> WBR,
>> Alexey.
>>
>>
>> On Jun 11, 2008, at 7:40 , JianXing Yi wrote:
>>
>> After I replace the Grizzly framework from 1.7.3.3 to 1.8.1-rc on the
>> client-side, there is no implicit connection 'killer' and the grizzly-based
>> client created about 20k+ connections now.
>>
>> 2008/6/11 JianXing Yi <jianxing.yi_at_gmail.com>:
>>
>>> Hi Alexey,
>>>
>>> --Just checked your code on my machine... By default, I succeeded to
>>> create max 5700 connections for *both* types of client.
>>>
>>> Did you statitstic the connection numbers on the server-side or
>>> client-side?
>>> For the grizzly-based client, the connection number can reach 15k+ on
>>> client-side, but can not exceed about 6k+ limit on the server-side. On
>>> client-side, within 15k+ connections, there are approximately 10k+ TIME_WAIT
>>> connections. Because TIME_WAIT connections only occur in actively closing
>>> peer, so I remove the closing operations of the grizzly-based client but it
>>> is still the same situation :-(. Is there any closing operation in
>>> ConnectorHandler or something else? Btw, I've set the connection timeout to
>>> a large number through TcpConnectorHandler.setConnectionTimeout() already.
>>>
>>> Regards,
>>> -Yi Bing
>>>
>>>
>>> 2008/6/10 JianXing Yi <jianxing.yi_at_gmail.com>:
>>>
>>> Hi Alexey,
>>>>
>>>> I revised the raw NIO client as you pointed out and tried the 2 clients
>>>> again. The result is the same as previously got :-(. The grizzly-based one
>>>> reached 5.8k+ connections while the raw one reached 15k+ connections (I
>>>> stopped it instead of waiting any more). In my env., the clients run on SUN
>>>> T3400 station, the simple echo server runs on my laptop and on a Linux
>>>> server (both connection limits are enlarged). I'll try them again... Anyway,
>>>> great thanks for your help!
>>>>
>>>> Regards,
>>>> -Yi Bing
>>>>
>>>>
>>>> 2008/6/10 Oleksiy Stashok <Oleksiy.Stashok_at_sun.com>:
>>>>
>>>>
>>>>> Hi Yi,
>>>>> Just checked your code on my machine... By default, I succeeded to
>>>>> create max 5700 connections for *both* types of client.
>>>>> Increasing max number of open files for MacOS [1], I succeeded to
>>>>> increase the max number of client connections. So, IMHO, results look
>>>>> consistent.
>>>>>
>>>>> Hope this helps.
>>>>>
>>>>> Also, I've updated your Raw NIO client initSocketChannel, doConnect
>>>>> methods like this:
>>>>>
>>>>> private SocketChannel initSocketChannel() throws IOException {
>>>>> SocketChannel sc = SocketChannel.open();
>>>>> sc.configureBlocking(false);
>>>>> sc.socket().setReuseAddress(true);
>>>>> sc.connect(new InetSocketAddress(serverHost, serverPort));
>>>>> sc.register(selector, SelectionKey.OP_CONNECT);
>>>>> return sc;
>>>>> }
>>>>>
>>>>> .............................. cut
>>>>> .......................................
>>>>> private void doConnect(SelectionKey key) {
>>>>> SocketChannel sc = (SocketChannel) key.channel();
>>>>> try {
>>>>> if (sc.finishConnect()) {
>>>>> connectionCount.incrementAndGet();
>>>>> }
>>>>>
>>>>> } catch (IOException e) {
>>>>> // TODO Auto-generated catch block
>>>>> e.printStackTrace();
>>>>> }
>>>>> }
>>>>>
>>>>> /////////////////////////////////////////////////////
>>>>>
>>>>> Hope this helps.
>>>>>
>>>>> WBR,
>>>>> Alexey.
>>>>>
>>>>> [1] sysctl -w kern.maxfiles=50000
>>>>> sysctl -w kern.maxfilesperproc=40000
>>>>>
>>>>>
>>>>> On Jun 10, 2008, at 4:23 , JianXing Yi wrote:
>>>>>
>>>>> ok. pls. get the code from the attached.
>>>>>
>>>>> thanks!
>>>>> -Yi Bing
>>>>>
>>>>> 2008/6/10 Jeanfrancois Arcand <Jeanfrancois.Arcand_at_sun.com>:
>>>>>
>>>>>> Hi Yi Bing,
>>>>>>
>>>>>> can you zip your code? That will be easier to compile ans test :-)
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> -- Jeanfrancois
>>>>>>
>>>>>> JianXing Yi wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Hi Alexey,
>>>>>>>
>>>>>>>
>>>>>>>> 1. in the grizzly-based nio client, i try to establish a
>>>>>>>> relatively large number of connections to a server, e.g. 30,000.
>>>>>>>> i
>>>>>>>> avoid 30s closing idle connections through the
>>>>>>>> SelectionKeyHandler. my problem is that it seems that about 5,000
>>>>>>>> connections is the limit. no more connections could be
>>>>>>>> established
>>>>>>>> after that. btw, i've already adjusted the system tcp parameters,
>>>>>>>> such as the tcp connection number (greater than 30,000) and the
>>>>>>>> keep-alive time.
>>>>>>>>
>>>>>>> It should be some system limitations, in Grizzly we don't have
>>>>>>> any.
>>>>>>> If you will provide me the code, I can test how many connections I
>>>>>>> can create on my local machine, just to compare the results.
>>>>>>> Also it's interesting what kind of scenario you have, which
>>>>>>> requires
>>>>>>> 30K+ tcp connections? :))
>>>>>>>
>>>>>>>
>>>>>>> great thanks for your help!
>>>>>>>
>>>>>>>
>>>>>>> i want to generate http (may be extended) load to servers. some
>>>>>>> http
>>>>>>> servers can support persistent connections, so a relatively large
>>>>>>> number of connections to them should be established to get the
>>>>>>> performance index.
>>>>>>>
>>>>>>> i compared the result of the Grizzly-based client and a raw NIO
>>>>>>> API
>>>>>>> based one. netstat showed the number of concurrent connections for
>>>>>>> each of them, they are 5k and 25k in my environment. code of the
>>>>>>> Grizzly-based client, the raw NIO API based client and the
>>>>>>> Grizzly-based echo server is listed below.
>>>>>>>
>>>>>>>
>>>>>>> package test.grizzly;
>>>>>>>
>>>>>>> import java.io.IOException;
>>>>>>> import java.net.InetSocketAddress;
>>>>>>> import java.net.SocketAddress;
>>>>>>> import java.nio.ByteBuffer;
>>>>>>> import java.nio.channels.SelectionKey;
>>>>>>> import java.util.ArrayList;
>>>>>>> import java.util.Iterator;
>>>>>>> import java.util.Queue;
>>>>>>> import java.util.concurrent.Callable;
>>>>>>> import java.util.concurrent.ConcurrentLinkedQueue;
>>>>>>> import java.util.concurrent.CountDownLatch;
>>>>>>> import java.util.concurrent.ExecutorService;
>>>>>>> import java.util.concurrent.Executors;
>>>>>>> import java.util.concurrent.atomic.AtomicLong;
>>>>>>>
>>>>>>> import com.sun.grizzly.CallbackHandler;
>>>>>>> import com.sun.grizzly.Context;
>>>>>>> import com.sun.grizzly.Controller;
>>>>>>> import com.sun.grizzly.ControllerStateListenerAdapter;
>>>>>>> import com.sun.grizzly.DefaultConnectorHandlerPool;
>>>>>>> import com.sun.grizzly.DefaultPipeline;
>>>>>>> import com.sun.grizzly.DefaultSelectionKeyHandler;
>>>>>>> import com.sun.grizzly.IOEvent;
>>>>>>> import com.sun.grizzly.Pipeline;
>>>>>>> import com.sun.grizzly.TCPConnectorHandler;
>>>>>>> import com.sun.grizzly.TCPSelectorHandler;
>>>>>>> import com.sun.grizzly.Controller.Protocol;
>>>>>>> import com.sun.grizzly.async.AsyncWriteCallbackHandler;
>>>>>>> import com.sun.grizzly.async.AsyncWriteQueueRecord;
>>>>>>>
>>>>>>> public class GrizzlyBasedClient {
>>>>>>>
>>>>>>> public static final int CONNECTION_NUMBER = 30000;
>>>>>>> public static final int NUMBER_CONNECTION_PER_THREAD = 1000;
>>>>>>> private static final int SERVER_PORT = 11113;
>>>>>>> private static final String SERVER_HOST_OR_IP = "
>>>>>>> 192.168.0.113
>>>>>>> <http://192.168.0.113>";
>>>>>>>
>>>>>>> private static final int CONNECTION_THREADPOOL_SIZE = 100;
>>>>>>> private static final int NUMBER_LOAD_ON_THREADPOOL = 10;
>>>>>>>
>>>>>>> /**
>>>>>>> * @param args
>>>>>>> * @throws InterruptedException
>>>>>>> * @throws IOException
>>>>>>> */
>>>>>>> public static void main(String[] args) throws
>>>>>>> InterruptedException,
>>>>>>> IOException {
>>>>>>> GrizzlyBasedClient lg = new GrizzlyBasedClient();
>>>>>>> lg.start();
>>>>>>> }
>>>>>>>
>>>>>>> private Controller controller;
>>>>>>> protected CountDownLatch controllerReadyLatch = new
>>>>>>> CountDownLatch(1);
>>>>>>> public ConcurrentLinkedQueue<TCPConnectorHandler> connectors =
>>>>>>> new ConcurrentLinkedQueue<TCPConnectorHandler>();
>>>>>>>
>>>>>>> // private TCPConnectorHandler connectorHandler;
>>>>>>>
>>>>>>> class MyConnectorCallbackHandler implements
>>>>>>> CallbackHandler<Context> {
>>>>>>> private TCPConnectorHandler tcpConnectorHandler;
>>>>>>>
>>>>>>> public MyConnectorCallbackHandler(
>>>>>>> TCPConnectorHandler tcpConnectorHandler) {
>>>>>>> this.tcpConnectorHandler = tcpConnectorHandler;
>>>>>>> }
>>>>>>>
>>>>>>> public void onConnect(IOEvent<Context> ioEvent) {
>>>>>>> try {
>>>>>>>
>>>>>>> tcpConnectorHandler.finishConnect(ioEvent.attachment()
>>>>>>> .getSelectionKey());
>>>>>>> allConnectedLatch.countDown();
>>>>>>> connectors.add(tcpConnectorHandler);
>>>>>>> // Reporter.getInstance().collect(
>>>>>>> //
>>>>>>> tcpConnectorHandler.getSelectorHandler().getPort(),
>>>>>>> // System.currentTimeMillis());
>>>>>>> connectionCount.incrementAndGet();
>>>>>>>
>>>>>>> //
>>>>>>>
>>>>>>> System.out.println(tcpConnectorHandler.getSelectorHandler().getPort()+",
>>>>>>> // "+System.currentTimeMillis());
>>>>>>> } catch (IOException e) {
>>>>>>> //
>>>>>>> System.out.println(Reporter.getInstance().verbose());
>>>>>>> e.printStackTrace();
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> public void onRead(IOEvent<Context> arg0) {
>>>>>>> // TODO Auto-generated method stub
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>> public void onWrite(IOEvent<Context> arg0) {
>>>>>>> // TODO Auto-generated method stub
>>>>>>>
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> protected ByteBuffer readBuffer =
>>>>>>> ByteBuffer.allocateDirect(10240);
>>>>>>>
>>>>>>> AtomicLong transactionCount = new AtomicLong(0);
>>>>>>> AtomicLong connectionCount = new AtomicLong(0);
>>>>>>> public AtomicLong bytesRead = new AtomicLong(0);
>>>>>>> public AtomicLong minusOneCount = new AtomicLong(0);
>>>>>>> public AtomicLong zeroCount = new AtomicLong(0);
>>>>>>>
>>>>>>> class MyAsyncWriteCallbackHandler implements
>>>>>>> AsyncWriteCallbackHandler {
>>>>>>> private TCPConnectorHandler tcpConnectorHandler;
>>>>>>>
>>>>>>> public MyAsyncWriteCallbackHandler(
>>>>>>> TCPConnectorHandler tcpConnectorHandler) {
>>>>>>> this.tcpConnectorHandler = tcpConnectorHandler;
>>>>>>> }
>>>>>>>
>>>>>>> public void onIOException(IOException ioException,
>>>>>>> SelectionKey key,
>>>>>>> ByteBuffer buffer, Queue<AsyncWriteQueueRecord>
>>>>>>> remainingQueue) {
>>>>>>> // TODO Auto-generated method stub
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>> public void onWriteCompleted(SelectionKey key, ByteBuffer
>>>>>>> buffer) {
>>>>>>> try {
>>>>>>>
>>>>>>> transactionCount.incrementAndGet();
>>>>>>>
>>>>>>> readBuffer.clear();
>>>>>>>
>>>>>>> long nCount =
>>>>>>> this.tcpConnectorHandler.read(readBuffer, true);
>>>>>>> if (nCount == -1) {
>>>>>>> minusOneCount.incrementAndGet();
>>>>>>> } else if (nCount == 0) {
>>>>>>> zeroCount.incrementAndGet();
>>>>>>> } else {
>>>>>>> bytesRead.addAndGet(nCount);
>>>>>>> }
>>>>>>> // readBuffer.flip();
>>>>>>> //
>>>>>>> System.out.println(Charset.defaultCharset().decode(readBuffer));
>>>>>>> // System.out.println("V");
>>>>>>> // tcpConnectorHandler.close();
>>>>>>> //
>>>>>>> controller.releaseConnectorHandler(tcpConnectorHandler);
>>>>>>> } catch (IOException e) {
>>>>>>> // TODO Auto-generated catch block
>>>>>>> e.printStackTrace();
>>>>>>> }
>>>>>>> }
>>>>>>> };
>>>>>>>
>>>>>>> private static int localPort = 12110;
>>>>>>> SocketAddress serverAddr = new
>>>>>>> InetSocketAddress(SERVER_HOST_OR_IP,
>>>>>>> SERVER_PORT);
>>>>>>> private CountDownLatch allConnectedLatch = new CountDownLatch(
>>>>>>> CONNECTION_NUMBER);
>>>>>>>
>>>>>>> public void start() throws InterruptedException, IOException {
>>>>>>> controller = createController();
>>>>>>> new Thread(controller).start();
>>>>>>> waitControllerReady();
>>>>>>>
>>>>>>> // Establish CONNECTION_NUMBER connections.
>>>>>>> long start1 = System.currentTimeMillis();
>>>>>>> for (int i = 0; i < CONNECTION_NUMBER; i++) {
>>>>>>> SocketAddress localAddr = new
>>>>>>> InetSocketAddress("localhost",
>>>>>>> localPort++);
>>>>>>> // connectorHandler.setLinger(10);
>>>>>>> TCPConnectorHandler connectorHandler =
>>>>>>> (TCPConnectorHandler) controller
>>>>>>> .acquireConnectorHandler(Protocol.TCP);
>>>>>>> // connectorHandler.setTcpNoDelay(true);
>>>>>>> connectorHandler.setReuseAddress(true);
>>>>>>>
>>>>>>> connectorHandler.connect(serverAddr, /* localAddr, */
>>>>>>> new MyConnectorCallbackHandler(connectorHandler));
>>>>>>> // Thread.sleep(20);
>>>>>>> }
>>>>>>> allConnectedLatch.await();
>>>>>>> long interval2 = System.currentTimeMillis() - start1;
>>>>>>> System.out.println("ALL CONNECTED." + " connections: "
>>>>>>> + connectionCount.get() + "," + connectors.size()
>>>>>>> + ", connection total time (s): " + interval2 /
>>>>>>> 1000);
>>>>>>>
>>>>>>> // distribute tasks
>>>>>>> Iterator<TCPConnectorHandler> itCon =
>>>>>>> connectors.iterator();
>>>>>>> ArrayList<Callable<Object>> callables2 = new
>>>>>>> ArrayList<Callable<Object>>();
>>>>>>> // int count = 0;
>>>>>>> // LinkedList<TCPConnectorHandler> tmpConnectorList = new
>>>>>>> // LinkedList<TCPConnectorHandler>();
>>>>>>> while (itCon.hasNext()) {
>>>>>>> final TCPConnectorHandler con = itCon.next();
>>>>>>> // tmpConnectorList.add(con);
>>>>>>> // count++;
>>>>>>> callables2.add(new Callable<Object>() {
>>>>>>> public Object call() throws Exception {
>>>>>>> // Perform 3 transactions on each connection.
>>>>>>> for (int i = 0; i < 1; i++) {
>>>>>>> executeOnConnection(con);
>>>>>>> }
>>>>>>> return null;
>>>>>>> }
>>>>>>> });
>>>>>>> }
>>>>>>>
>>>>>>> // Operate on each connection.
>>>>>>>
>>>>>>> // Callable<Object>[] callables3 = new
>>>>>>> // Callable[NUMBER_LOAD_ON_THREADPOOL];
>>>>>>>
>>>>>>> // Callable<Object>[] callables = new
>>>>>>> // Callable[NUMBER_LOAD_ON_THREADPOOL];
>>>>>>> // // Issue these connections concurrently.
>>>>>>> // for (int i = 0; i < callables.length; i++) {
>>>>>>> // callables[i] = new Callable<Object>() {
>>>>>>> // public Object call() throws Exception {
>>>>>>> // for (int i = 0; i < CONNECTION_NUMBER
>>>>>>> // / NUMBER_LOAD_ON_THREADPOOL; i++) {
>>>>>>> // // TCPConnectorHandler connectorHandler =
>>>>>>> // // (TCPConnectorHandler) controller
>>>>>>> // // .acquireConnectorHandler(Protocol.TCP);
>>>>>>> // executeOnConnection(connectorHandler);
>>>>>>> // }
>>>>>>> // return null;
>>>>>>> // }
>>>>>>> // };
>>>>>>> // }
>>>>>>> ExecutorService executor = Executors
>>>>>>> .newFixedThreadPool(CONNECTION_THREADPOOL_SIZE);
>>>>>>> // List<Callable<Object>> c = Arrays.asList(callables2);
>>>>>>>
>>>>>>> long start = System.currentTimeMillis();
>>>>>>>
>>>>>>> executor.invokeAll(callables2);
>>>>>>> executor.shutdown();
>>>>>>>
>>>>>>> long interval = System.currentTimeMillis() - start;
>>>>>>> System.out.println("transaction/s: " +
>>>>>>> transactionCount.get() * 1000
>>>>>>> / interval + ", bytes read: " + bytesRead.get()
>>>>>>> + "\n-1 times: " + minusOneCount.get() + ", 0
>>>>>>> times: "
>>>>>>> + zeroCount);
>>>>>>>
>>>>>>> Thread.sleep(2000);
>>>>>>> itCon = connectors.iterator();
>>>>>>> while (itCon.hasNext()) {
>>>>>>> TCPConnectorHandler con = itCon.next();
>>>>>>> // con.getSelectorHandler().getPort()
>>>>>>> con.close();
>>>>>>> }
>>>>>>>
>>>>>>> controller.stop();
>>>>>>> }
>>>>>>>
>>>>>>> private void executeOnConnection(TCPConnectorHandler
>>>>>>> connectorHandler)
>>>>>>> throws IOException {
>>>>>>> try {
>>>>>>> sendRequest(connectorHandler);
>>>>>>> } finally {
>>>>>>> // connectorHandler.close();
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> private void sendRequest(TCPConnectorHandler connectorHandler)
>>>>>>> throws IOException {
>>>>>>> String str = "GET /"
>>>>>>> + /* icons/apache_pb2.gif */"\r\nHTTP
>>>>>>> 1.1\r\nHOST:"
>>>>>>> + SERVER_HOST_OR_IP +
>>>>>>> "\r\nConnection:keep-alive\r\n\r\n";
>>>>>>>
>>>>>>> connectorHandler.writeToAsyncQueue(ByteBuffer.wrap(str.getBytes()),
>>>>>>> new
>>>>>>> MyAsyncWriteCallbackHandler(connectorHandler));
>>>>>>> }
>>>>>>>
>>>>>>> private void waitControllerReady() throws InterruptedException
>>>>>>> {
>>>>>>> controllerReadyLatch.await();
>>>>>>> }
>>>>>>>
>>>>>>> private Controller createController() {
>>>>>>> Controller tmpCtl = new Controller();
>>>>>>> DefaultConnectorHandlerPool connectorPool = new
>>>>>>> DefaultConnectorHandlerPool(
>>>>>>> tmpCtl);
>>>>>>> tmpCtl.setConnectorHandlerPool(connectorPool);
>>>>>>>
>>>>>>> DefaultSelectionKeyHandler keyHandler = new
>>>>>>> DefaultSelectionKeyHandler();
>>>>>>> keyHandler.setTimeout(30 * 1000 * 60);
>>>>>>> tmpCtl.setSelectionKeyHandler(keyHandler);
>>>>>>> tmpCtl.setSelectorHandler(new TCPSelectorHandler(true));
>>>>>>> tmpCtl.addStateListener(new
>>>>>>> ControllerStateListenerAdapter() {
>>>>>>> @Override
>>>>>>> public void onReady() {
>>>>>>> controllerReadyLatch.countDown();
>>>>>>> }
>>>>>>> });
>>>>>>>
>>>>>>> Pipeline pl = new DefaultPipeline();
>>>>>>> pl.setMaxThreads(50);
>>>>>>> tmpCtl.setPipeline(pl);
>>>>>>>
>>>>>>> // tmpCtl.getConnectorHandlerPool().
>>>>>>>
>>>>>>> return tmpCtl;
>>>>>>> }
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> package test.grizzly;
>>>>>>>
>>>>>>> import java.net.InetAddress;
>>>>>>> import java.net.UnknownHostException;
>>>>>>> import java.util.concurrent.CountDownLatch;
>>>>>>>
>>>>>>> import com.sun.grizzly.Controller;
>>>>>>> import com.sun.grizzly.ControllerStateListenerAdapter;
>>>>>>> import com.sun.grizzly.DefaultProtocolChain;
>>>>>>> import com.sun.grizzly.DefaultProtocolChainInstanceHandler;
>>>>>>> import com.sun.grizzly.ProtocolChain;
>>>>>>> import com.sun.grizzly.TCPSelectorHandler;
>>>>>>> import com.sun.grizzly.filter.EchoFilter;
>>>>>>> import com.sun.grizzly.filter.ReadFilter;
>>>>>>>
>>>>>>> public class EchoServer {
>>>>>>>
>>>>>>> private static final String SERVER_HOST_IP = "190.168.0.113 <
>>>>>>> http://190.168.0.113>";
>>>>>>>
>>>>>>> private static final int SERVER_PORT = 11113;
>>>>>>>
>>>>>>> /**
>>>>>>> * @param args
>>>>>>> * @throws InterruptedException
>>>>>>> * @throws UnknownHostException
>>>>>>> */
>>>>>>> public static void main(String[] args) throws
>>>>>>> UnknownHostException,
>>>>>>> InterruptedException {
>>>>>>> EchoServer server = new EchoServer();
>>>>>>> server.start();
>>>>>>> }
>>>>>>>
>>>>>>> private Controller controller;
>>>>>>> protected CountDownLatch controllerReadyLatch = new
>>>>>>> CountDownLatch(1);
>>>>>>>
>>>>>>> public void start() throws InterruptedException,
>>>>>>> UnknownHostException {
>>>>>>> controller = createController();
>>>>>>> new Thread(controller).start();
>>>>>>>
>>>>>>> controllerReadyLatch.await();
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>> private Controller createController() throws UnknownHostException
>>>>>>> {
>>>>>>> Controller tmpCtl = new Controller();
>>>>>>> TCPSelectorHandler tcpSelectorHandler = new
>>>>>>> TCPSelectorHandler();
>>>>>>>
>>>>>>> tcpSelectorHandler.setInet(InetAddress.getByName(SERVER_HOST_IP));
>>>>>>> tcpSelectorHandler.setPort(SERVER_PORT);
>>>>>>> tmpCtl.addSelectorHandler(tcpSelectorHandler);
>>>>>>> tmpCtl.addStateListener(new ControllerStateListenerAdapter() {
>>>>>>> @Override
>>>>>>> public void onReady() {
>>>>>>> controllerReadyLatch.countDown();
>>>>>>> }
>>>>>>> });
>>>>>>> tmpCtl
>>>>>>> .setProtocolChainInstanceHandler(new
>>>>>>> DefaultProtocolChainInstanceHandler() {
>>>>>>> private ProtocolChain protocolChain;
>>>>>>>
>>>>>>> @Override
>>>>>>> public ProtocolChain poll() {
>>>>>>> if (protocolChain == null) {
>>>>>>> protocolChain = new
>>>>>>> DefaultProtocolChain();
>>>>>>> }
>>>>>>> return protocolChain;
>>>>>>> }
>>>>>>> });
>>>>>>> tmpCtl.getProtocolChainInstanceHandler().poll().addFilter(
>>>>>>> new ReadFilter());
>>>>>>> tmpCtl.getProtocolChainInstanceHandler().poll().addFilter(
>>>>>>> new EchoFilter());
>>>>>>>
>>>>>>> return tmpCtl;
>>>>>>> }
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> package test.grizzly;
>>>>>>>
>>>>>>> import java.io.IOException;
>>>>>>> import java.net.InetAddress;
>>>>>>> import java.net.InetSocketAddress;
>>>>>>> import java.net.UnknownHostException;
>>>>>>> import java.nio.channels.SelectionKey;
>>>>>>> import java.nio.channels.Selector;
>>>>>>> import java.nio.channels.SocketChannel;
>>>>>>> import java.nio.channels.spi.SelectorProvider;
>>>>>>> import java.util.Iterator;
>>>>>>>
>>>>>>> public class RawNioClient implements Runnable {
>>>>>>> private InetAddress serverHost;
>>>>>>> private int serverPort;
>>>>>>> private Selector selector;
>>>>>>> private int connectionCount = 0;
>>>>>>>
>>>>>>> // private SocketChannel socketChannel;
>>>>>>>
>>>>>>> public RawNioClient(InetAddress serverHost, int serverPort)
>>>>>>> throws IOException {
>>>>>>> this.serverHost = serverHost;
>>>>>>> this.serverPort = serverPort;
>>>>>>> selector = initSelector();
>>>>>>> // socketChannel = initSocketChannel();
>>>>>>> }
>>>>>>>
>>>>>>> private SocketChannel initSocketChannel() throws IOException {
>>>>>>> SocketChannel sc = SocketChannel.open();
>>>>>>> sc.connect(new InetSocketAddress(serverHost, serverPort));
>>>>>>> sc.configureBlocking(false);
>>>>>>> sc.socket().setReuseAddress(true);
>>>>>>> sc.register(selector, SelectionKey.OP_CONNECT);
>>>>>>> return sc;
>>>>>>> }
>>>>>>>
>>>>>>> private Selector initSelector() throws IOException {
>>>>>>> return SelectorProvider.provider().openSelector();
>>>>>>> }
>>>>>>>
>>>>>>> public static void main(String[] args) {
>>>>>>> try {
>>>>>>> new Thread(new RawNioClient(InetAddress.getByName("
>>>>>>> 192.168.0.113 <http://192.168.0.113>"),
>>>>>>>
>>>>>>> 11113)).start();
>>>>>>> } catch (UnknownHostException e) {
>>>>>>> // TODO Auto-generated catch block
>>>>>>> e.printStackTrace();
>>>>>>> } catch (IOException e) {
>>>>>>> // TODO Auto-generated catch block
>>>>>>> e.printStackTrace();
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> public void run() {
>>>>>>> while (true) {
>>>>>>> try {
>>>>>>> initSocketChannel();
>>>>>>> } catch (IOException e1) {
>>>>>>> // TODO Auto-generated catch block
>>>>>>> e1.printStackTrace();
>>>>>>> }
>>>>>>> if (connectionCount >= 30000) {
>>>>>>> System.out.println("reach 30000 connections");
>>>>>>> try {
>>>>>>> Thread.sleep(5000);
>>>>>>> } catch (InterruptedException e) {
>>>>>>> // TODO Auto-generated catch block
>>>>>>> e.printStackTrace();
>>>>>>> }
>>>>>>> } else {
>>>>>>> try {
>>>>>>> selector.select();
>>>>>>>
>>>>>>> Iterator<SelectionKey> itKey =
>>>>>>> selector.selectedKeys()
>>>>>>> .iterator();
>>>>>>> while (itKey.hasNext()) {
>>>>>>> SelectionKey key = itKey.next();
>>>>>>> itKey.remove();
>>>>>>> if (!key.isValid()) {
>>>>>>> continue;
>>>>>>> }
>>>>>>> if (key.isConnectable()) {
>>>>>>> doConnect(key);
>>>>>>> }
>>>>>>> }
>>>>>>> } catch (IOException e) {
>>>>>>> // TODO Auto-generated catch block
>>>>>>> e.printStackTrace();
>>>>>>> }
>>>>>>> }
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> private void doConnect(SelectionKey key) {
>>>>>>> SocketChannel sc = (SocketChannel) key.channel();
>>>>>>> try {
>>>>>>> boolean fc = sc.finishConnect();
>>>>>>> } catch (IOException e) {
>>>>>>> // TODO Auto-generated catch block
>>>>>>> e.printStackTrace();
>>>>>>> }
>>>>>>> connectionCount++;
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>> 2. i want to iterate over the ConnectorHandlerPool to perform
>>>>>>>> transaction on each connection. but it seems that
>>>>>>>> ConnectorHandlerPool does not support the iteration explicitly.
>>>>>>>> so
>>>>>>>> as a walk-around, i collect the ConnectorHandler after executing
>>>>>>>> ConnectorHandler.finishConnect manually.
>>>>>>>>
>>>>>>> May be it's the only way right now. As ConnectorHandlerPool
>>>>>>> initially wasn't introduced to serve existing live connections,
>>>>>>> but
>>>>>>> to cache/reuse ones, which were released (closed).
>>>>>>>
>>>>>>> thanks.
>>>>>>>
>>>>>>> - Yi Bing
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>>>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>>>>
>>>>>>
>>>>> <connection_test.zip>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>>
>