I seem to get many ConcurrentModificationExceptions when running TCP client
- my testcases work ok, but running in the system I get spurious exceptions
as follows.
BTW - Im not too clear on bestpractices so may be doing something stupid
here!
Client code is below.... please note there can be multiples of these created
in the same process...
- Should I be only creating a single Controller and TCPSelectHandler per
client process?
- When can I call new Thread(controller).start() - only once after the
first connection is created?
java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextEntry(HashMap.java:793)
at java.util.HashMap$KeyIterator.next(HashMap.java:828)
at
java.util.Collections$UnmodifiableCollection$1.next(Collections.java:1010)
at
com.sun.grizzly.DefaultSelectionKeyHandler.expire(DefaultSelectionKeyHandler.java:211)
at
com.sun.grizzly.TCPSelectorHandler.postSelect(TCPSelectorHandler.java:475)
at com.sun.grizzly.Controller.doSelect(Controller.java:469)
at
com.sun.grizzly.SelectorHandlerRunner.run(SelectorHandlerRunner.java:82)
at
com.sun.grizzly.Controller.startSelectorHandlerRunner(Controller.java:1099)
at com.sun.grizzly.Controller.start(Controller.java:912)
at com.sun.grizzly.Controller.run(Controller.java:796)
at java.lang.Thread.run(Thread.java:619)
public class GrizzlySender implements Sender {
static final Logger LOGGER = Logger.getLogger(GrizzlySender.class);
Map<URI, MyClient> clients = new ConcurrentHashMap<URI, MyClient>();
TCPSelectorHandler tcpSelector = new TCPSelectorHandler(true);
ReentrantLock clientAccessLock = new ReentrantLock();
HeapByteBufferCache bbCache = new HeapByteBufferCache();
boolean blocking = false;
private URI startPoint;
public GrizzlySender(URI startPoint) {
this.startPoint = startPoint;
}
public void send(URI endPoint, byte[] bytes) throws InterruptedException
{
try {
clientAccessLock.lock();
if (!clients.containsKey(endPoint)) {
clients.put(endPoint, new MyClient(endPoint));
}
clientAccessLock.unlock();
ByteBuffer byteBuffer = bbCache.get(HEADER_BYTE_SIZE.length +
bytes.length);
byteBuffer.put(HEADER_BYTES);
byteBuffer.putInt(bytes.length);
byteBuffer.put(bytes);
byteBuffer.flip();
MyClient client = clients.get(endPoint);
if (!client.connectorHandler.isConnected()) {
// throw new RuntimeException("Client is not connected to:"+
endPoint);
LOGGER.warn("**************** Client had dropped connection
to:" + endPoint + " reconnecting");
client.connect(endPoint);
}
if (LOGGER.isDebugEnabled()) LOGGER.debug(getAddress() + "
Sending byte["+bytes.length+"] to:" + endPoint);
long write = client.connectorHandler.write(byteBuffer,
blocking);
if (LOGGER.isDebugEnabled()) LOGGER.debug(getAddress() + " Bytes
Written:" + write);
bbCache.put(byteBuffer);
} catch (Throwable e) {
throw new RuntimeException("Error with endPoint:" + endPoint,
e);
} finally {
if (clientAccessLock.isHeldByCurrentThread())
clientAccessLock.unlock();
}
}
public class MyClient implements CallbackHandler<Context> {
private TCPConnectorHandler connectorHandler;
Controller controller = new Controller();
Throwable errorState = null;
public MyClient(URI endPoint) throws InterruptedException,
IOException {
controller.addSelectorHandler(tcpSelector);
connect(endPoint);
if (errorState != null) throw new RuntimeException(errorState);
}
public void connect(URI endPoint) throws InterruptedException,
IOException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
controller.addStateListener(new ControllerStateListenerAdapter()
{
public void onException(Throwable e) {
errorState = e;
e.printStackTrace();
countDownLatch.countDown();
}
public void onReady() {
LOGGER.debug("ClientConnectorReady, port:" +
tcpSelector.getPort());
countDownLatch.countDown();
}
});
new Thread(controller).start();
countDownLatch.await();
connectorHandler = (TCPConnectorHandler)
controller.acquireConnectorHandler(Controller.Protocol.TCP);
connectorHandler.connect(new
InetSocketAddress(endPoint.getHost(), endPoint.getPort()), this);
}
public void onConnect(IOEvent<Context> e) {
SelectionKey k = e.attachment().getSelectionKey();
try {
connectorHandler.finishConnect(k);
} catch (Exception ex) {
LOGGER.warn("Failed to connect:" + ex.getMessage());
ex.printStackTrace();
errorState = ex;
}
e.attachment().getSelectorHandler().register(k,
SelectionKey.OP_READ);
}
public void onRead(IOEvent<Context> e) {
System.out.println("Callbackhandler: OnRead...");
}
public void onWrite(IOEvent<Context> e) {
System.out.println("Callbackhandler: OnWrite...");
}
}