users@grizzly.java.net

Re: Client side - ConcurrentModificationException - Best Practices for Grizzly

From: Oleksiy Stashok <Oleksiy.Stashok_at_Sun.COM>
Date: Thu, 29 May 2008 11:13:22 +0200

Hello Neil,

>
> Client code is below.... please note there can be multiples of these
> created in the same process...
> - Should I be only creating a single Controller and
> TCPSelectHandler per client process?
Yes :)

>
> - When can I call new Thread(controller).start() - only once after
> the first connection is created?
You need to start controller *before* doing first connect.

Please take a look at TCPConnectorHandlerTest is grizzly-framework
module.

If you will have other questions - let us know :)

Thanks.

WBR,
Alexey.

>
>
>
> java.util.ConcurrentModificationException
> at java.util.HashMap$HashIterator.nextEntry(HashMap.java:793)
> at java.util.HashMap$KeyIterator.next(HashMap.java:828)
> at java.util.Collections$UnmodifiableCollection
> $1.next(Collections.java:1010)
> at
> com
> .sun
> .grizzly
> .DefaultSelectionKeyHandler.expire(DefaultSelectionKeyHandler.java:
> 211)
> at
> com
> .sun.grizzly.TCPSelectorHandler.postSelect(TCPSelectorHandler.java:
> 475)
> at com.sun.grizzly.Controller.doSelect(Controller.java:469)
> at
> com.sun.grizzly.SelectorHandlerRunner.run(SelectorHandlerRunner.java:
> 82)
> at
> com
> .sun.grizzly.Controller.startSelectorHandlerRunner(Controller.java:
> 1099)
> at com.sun.grizzly.Controller.start(Controller.java:912)
> at com.sun.grizzly.Controller.run(Controller.java:796)
> at java.lang.Thread.run(Thread.java:619)
>
>
> public class GrizzlySender implements Sender {
>
> static final Logger LOGGER =
> Logger.getLogger(GrizzlySender.class);
>
> Map<URI, MyClient> clients = new ConcurrentHashMap<URI,
> MyClient>();
>
> TCPSelectorHandler tcpSelector = new TCPSelectorHandler(true);
>
> ReentrantLock clientAccessLock = new ReentrantLock();
> HeapByteBufferCache bbCache = new HeapByteBufferCache();
> boolean blocking = false;
>
> private URI startPoint;
>
> public GrizzlySender(URI startPoint) {
> this.startPoint = startPoint;
> }
>
> public void send(URI endPoint, byte[] bytes) throws
> InterruptedException {
> try {
> clientAccessLock.lock();
> if (!clients.containsKey(endPoint)) {
> clients.put(endPoint, new MyClient(endPoint));
> }
> clientAccessLock.unlock();
>
> ByteBuffer byteBuffer =
> bbCache.get(HEADER_BYTE_SIZE.length + bytes.length);
> byteBuffer.put(HEADER_BYTES);
> byteBuffer.putInt(bytes.length);
> byteBuffer.put(bytes);
> byteBuffer.flip();
>
> MyClient client = clients.get(endPoint);
>
> if (!client.connectorHandler.isConnected()) {
> // throw new RuntimeException("Client is not
> connected to:"+ endPoint);
> LOGGER.warn("**************** Client had dropped
> connection to:" + endPoint + " reconnecting");
> client.connect(endPoint);
> }
>
> if (LOGGER.isDebugEnabled()) LOGGER.debug(getAddress() +
> " Sending byte["+bytes.length+"] to:" + endPoint);
> long write = client.connectorHandler.write(byteBuffer,
> blocking);
> if (LOGGER.isDebugEnabled()) LOGGER.debug(getAddress() +
> " Bytes Written:" + write);
>
> bbCache.put(byteBuffer);
> } catch (Throwable e) {
> throw new RuntimeException("Error with endPoint:" +
> endPoint, e);
> } finally {
> if (clientAccessLock.isHeldByCurrentThread())
> clientAccessLock.unlock();
> }
> }
>
> public class MyClient implements CallbackHandler<Context> {
> private TCPConnectorHandler connectorHandler;
> Controller controller = new Controller();
> Throwable errorState = null;
>
> public MyClient(URI endPoint) throws InterruptedException,
> IOException {
> controller.addSelectorHandler(tcpSelector);
> connect(endPoint);
> if (errorState != null) throw new
> RuntimeException(errorState);
> }
>
> public void connect(URI endPoint) throws
> InterruptedException, IOException {
> final CountDownLatch countDownLatch = new
> CountDownLatch(1);
> controller.addStateListener(new
> ControllerStateListenerAdapter() {
>
> public void onException(Throwable e) {
> errorState = e;
> e.printStackTrace();
> countDownLatch.countDown();
> }
>
> public void onReady() {
> LOGGER.debug("ClientConnectorReady, port:" +
> tcpSelector.getPort());
> countDownLatch.countDown();
> }
> });
>
>
> new Thread(controller).start();
> countDownLatch.await();
>
> connectorHandler = (TCPConnectorHandler)
> controller.acquireConnectorHandler(Controller.Protocol.TCP);
> connectorHandler.connect(new
> InetSocketAddress(endPoint.getHost(), endPoint.getPort()), this);
> }
>
> public void onConnect(IOEvent<Context> e) {
> SelectionKey k = e.attachment().getSelectionKey();
> try {
> connectorHandler.finishConnect(k);
> } catch (Exception ex) {
> LOGGER.warn("Failed to connect:" + ex.getMessage());
> ex.printStackTrace();
> errorState = ex;
> }
> e.attachment().getSelectorHandler().register(k,
> SelectionKey.OP_READ);
> }
>
> public void onRead(IOEvent<Context> e) {
> System.out.println("Callbackhandler: OnRead...");
> }
> public void onWrite(IOEvent<Context> e) {
> System.out.println("Callbackhandler: OnWrite...");
> }
> }
>
>