users@grizzly.java.net

Client side - ConcurrentModificationException - Best Practices for Grizzly

From: Neil Avery <neil.avery_at_gmail.com>
Date: Wed, 28 May 2008 21:07:42 +0100

I seem to get many ConcurrentModificationExceptions when running TCP client
- my testcases work ok, but running in the system I get spurious exceptions
as follows.

BTW - Im not too clear on bestpractices so may be doing something stupid
here!

Client code is below.... please note there can be multiples of these created
in the same process...
 - Should I be only creating a single Controller and TCPSelectHandler per
client process?
 - When can I call new Thread(controller).start() - only once after the
first connection is created?


java.util.ConcurrentModificationException
    at java.util.HashMap$HashIterator.nextEntry(HashMap.java:793)
    at java.util.HashMap$KeyIterator.next(HashMap.java:828)
    at
java.util.Collections$UnmodifiableCollection$1.next(Collections.java:1010)
    at
com.sun.grizzly.DefaultSelectionKeyHandler.expire(DefaultSelectionKeyHandler.java:211)
    at
com.sun.grizzly.TCPSelectorHandler.postSelect(TCPSelectorHandler.java:475)
    at com.sun.grizzly.Controller.doSelect(Controller.java:469)
    at
com.sun.grizzly.SelectorHandlerRunner.run(SelectorHandlerRunner.java:82)
    at
com.sun.grizzly.Controller.startSelectorHandlerRunner(Controller.java:1099)
    at com.sun.grizzly.Controller.start(Controller.java:912)
    at com.sun.grizzly.Controller.run(Controller.java:796)
    at java.lang.Thread.run(Thread.java:619)


public class GrizzlySender implements Sender {

    static final Logger LOGGER = Logger.getLogger(GrizzlySender.class);

    Map<URI, MyClient> clients = new ConcurrentHashMap<URI, MyClient>();

    TCPSelectorHandler tcpSelector = new TCPSelectorHandler(true);

    ReentrantLock clientAccessLock = new ReentrantLock();
    HeapByteBufferCache bbCache = new HeapByteBufferCache();
    boolean blocking = false;

    private URI startPoint;

    public GrizzlySender(URI startPoint) {
        this.startPoint = startPoint;
    }

    public void send(URI endPoint, byte[] bytes) throws InterruptedException
{
        try {
            clientAccessLock.lock();
            if (!clients.containsKey(endPoint)) {
                clients.put(endPoint, new MyClient(endPoint));
            }
            clientAccessLock.unlock();

            ByteBuffer byteBuffer = bbCache.get(HEADER_BYTE_SIZE.length +
bytes.length);
            byteBuffer.put(HEADER_BYTES);
            byteBuffer.putInt(bytes.length);
            byteBuffer.put(bytes);
            byteBuffer.flip();

            MyClient client = clients.get(endPoint);

            if (!client.connectorHandler.isConnected()) {
// throw new RuntimeException("Client is not connected to:"+
endPoint);
                LOGGER.warn("**************** Client had dropped connection
to:" + endPoint + " reconnecting");
                client.connect(endPoint);
            }

            if (LOGGER.isDebugEnabled()) LOGGER.debug(getAddress() + "
Sending byte["+bytes.length+"] to:" + endPoint);
            long write = client.connectorHandler.write(byteBuffer,
blocking);
            if (LOGGER.isDebugEnabled()) LOGGER.debug(getAddress() + " Bytes
Written:" + write);

            bbCache.put(byteBuffer);
        } catch (Throwable e) {
            throw new RuntimeException("Error with endPoint:" + endPoint,
e);
        } finally {
            if (clientAccessLock.isHeldByCurrentThread())
clientAccessLock.unlock();
        }
    }

    public class MyClient implements CallbackHandler<Context> {
        private TCPConnectorHandler connectorHandler;
        Controller controller = new Controller();
        Throwable errorState = null;

        public MyClient(URI endPoint) throws InterruptedException,
IOException {
            controller.addSelectorHandler(tcpSelector);
            connect(endPoint);
            if (errorState != null) throw new RuntimeException(errorState);
        }

        public void connect(URI endPoint) throws InterruptedException,
IOException {
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            controller.addStateListener(new ControllerStateListenerAdapter()
{

                public void onException(Throwable e) {
                    errorState = e;
                    e.printStackTrace();
                    countDownLatch.countDown();
                }

                public void onReady() {
                    LOGGER.debug("ClientConnectorReady, port:" +
tcpSelector.getPort());
                    countDownLatch.countDown();
                }
            });


            new Thread(controller).start();
            countDownLatch.await();

            connectorHandler = (TCPConnectorHandler)
controller.acquireConnectorHandler(Controller.Protocol.TCP);
            connectorHandler.connect(new
InetSocketAddress(endPoint.getHost(), endPoint.getPort()), this);
        }

        public void onConnect(IOEvent<Context> e) {
            SelectionKey k = e.attachment().getSelectionKey();
            try {
                connectorHandler.finishConnect(k);
            } catch (Exception ex) {
                LOGGER.warn("Failed to connect:" + ex.getMessage());
                ex.printStackTrace();
                errorState = ex;
            }
            e.attachment().getSelectorHandler().register(k,
 SelectionKey.OP_READ);
        }

        public void onRead(IOEvent<Context> e) {
            System.out.println("Callbackhandler: OnRead...");
        }
        public void onWrite(IOEvent<Context> e) {
            System.out.println("Callbackhandler: OnWrite...");
        }
    }