users@grizzly.java.net

Re: Client side - ConcurrentModificationException - Best Practices for Grizzly

From: Neil Avery <neil.avery_at_gmail.com>
Date: Wed, 28 May 2008 21:20:33 +0100

BTW - If I use a single Controller - I still call new
Thread(controller).start() as each new connection is established,
this leads to the following exception,

Exception in thread "Thread-8" java.lang.IllegalMonitorStateException
    at java.lang.Object.wait(Native Method)
    at
com.sun.grizzly.Controller.waitUntilSeletorHandlersStop(Controller.java:1176)
    at com.sun.grizzly.Controller.start(Controller.java:915)
    at com.sun.grizzly.Controller.run(Controller.java:796)
    at java.lang.Thread.run(Thread.java:619)

Any help greatly appreciated...
Regards.

2008/5/28 :

> I seem to get many ConcurrentModificationExceptions when running TCP client
> - my testcases work ok, but running in the system I get spurious exceptions
> as follows.
>
> BTW - Im not too clear on bestpractices so may be doing something stupid
> here!
>
> Client code is below.... please note there can be multiples of these
> created in the same process...
> - Should I be only creating a single Controller and TCPSelectHandler per
> client process?
> - When can I call new Thread(controller).start() - only once after the
> first connection is created?
>
>
> java.util.ConcurrentModificationException
> at java.util.HashMap$HashIterator.nextEntry(HashMap.java:793)
> at java.util.HashMap$KeyIterator.next(HashMap.java:828)
> at
> java.util.Collections$UnmodifiableCollection$1.next(Collections.java:1010)
> at
> com.sun.grizzly.DefaultSelectionKeyHandler.expire(DefaultSelectionKeyHandler.java:211)
> at
> com.sun.grizzly.TCPSelectorHandler.postSelect(TCPSelectorHandler.java:475)
> at com.sun.grizzly.Controller.doSelect(Controller.java:469)
> at
> com.sun.grizzly.SelectorHandlerRunner.run(SelectorHandlerRunner.java:82)
> at
> com.sun.grizzly.Controller.startSelectorHandlerRunner(Controller.java:1099)
> at com.sun.grizzly.Controller.start(Controller.java:912)
> at com.sun.grizzly.Controller.run(Controller.java:796)
> at java.lang.Thread.run(Thread.java:619)
>
>
> public class GrizzlySender implements Sender {
>
> static final Logger LOGGER = Logger.getLogger(GrizzlySender.class);
>
> Map<URI, MyClient> clients = new ConcurrentHashMap<URI, MyClient>();
>
> TCPSelectorHandler tcpSelector = new TCPSelectorHandler(true);
>
> ReentrantLock clientAccessLock = new ReentrantLock();
> HeapByteBufferCache bbCache = new HeapByteBufferCache();
> boolean blocking = false;
>
> private URI startPoint;
>
> public GrizzlySender(URI startPoint) {
> this.startPoint = startPoint;
> }
>
> public void send(URI endPoint, byte[] bytes) throws
> InterruptedException {
> try {
> clientAccessLock.lock();
> if (!clients.containsKey(endPoint)) {
> clients.put(endPoint, new MyClient(endPoint));
> }
> clientAccessLock.unlock();
>
> ByteBuffer byteBuffer = bbCache.get(HEADER_BYTE_SIZE.length +
> bytes.length);
> byteBuffer.put(HEADER_BYTES);
> byteBuffer.putInt(bytes.length);
> byteBuffer.put(bytes);
> byteBuffer.flip();
>
> MyClient client = clients.get(endPoint);
>
> if (!client.connectorHandler.isConnected()) {
> // throw new RuntimeException("Client is not connected to:"+
> endPoint);
> LOGGER.warn("**************** Client had dropped
> connection to:" + endPoint + " reconnecting");
> client.connect(endPoint);
> }
>
> if (LOGGER.isDebugEnabled()) LOGGER.debug(getAddress() + "
> Sending byte["+bytes.length+"] to:" + endPoint);
> long write = client.connectorHandler.write(byteBuffer,
> blocking);
> if (LOGGER.isDebugEnabled()) LOGGER.debug(getAddress() + "
> Bytes Written:" + write);
>
> bbCache.put(byteBuffer);
> } catch (Throwable e) {
> throw new RuntimeException("Error with endPoint:" + endPoint,
> e);
> } finally {
> if (clientAccessLock.isHeldByCurrentThread())
> clientAccessLock.unlock();
> }
> }
>
> public class MyClient implements CallbackHandler<Context> {
> private TCPConnectorHandler connectorHandler;
> Controller controller = new Controller();
> Throwable errorState = null;
>
> public MyClient(URI endPoint) throws InterruptedException,
> IOException {
> controller.addSelectorHandler(tcpSelector);
> connect(endPoint);
> if (errorState != null) throw new RuntimeException(errorState);
> }
>
> public void connect(URI endPoint) throws InterruptedException,
> IOException {
> final CountDownLatch countDownLatch = new CountDownLatch(1);
> controller.addStateListener(new
> ControllerStateListenerAdapter() {
>
> public void onException(Throwable e) {
> errorState = e;
> e.printStackTrace();
> countDownLatch.countDown();
> }
>
> public void onReady() {
> LOGGER.debug("ClientConnectorReady, port:" +
> tcpSelector.getPort());
> countDownLatch.countDown();
> }
> });
>
>
> new Thread(controller).start();
> countDownLatch.await();
>
> connectorHandler = (TCPConnectorHandler)
> controller.acquireConnectorHandler(Controller.Protocol.TCP);
> connectorHandler.connect(new
> InetSocketAddress(endPoint.getHost(), endPoint.getPort()), this);
> }
>
> public void onConnect(IOEvent<Context> e) {
> SelectionKey k = e.attachment().getSelectionKey();
> try {
> connectorHandler.finishConnect(k);
> } catch (Exception ex) {
> LOGGER.warn("Failed to connect:" + ex.getMessage());
> ex.printStackTrace();
> errorState = ex;
> }
> e.attachment().getSelectorHandler().register(k,
> SelectionKey.OP_READ);
> }
>
> public void onRead(IOEvent<Context> e) {
> System.out.println("Callbackhandler: OnRead...");
> }
> public void onWrite(IOEvent<Context> e) {
> System.out.println("Callbackhandler: OnWrite...");
> }
> }
>
>
>