persistence@glassfish.java.net

Re: broken connection during commit

From: Kenneth Saks <Kenneth.Saks_at_Sun.COM>
Date: Wed, 28 Feb 2007 11:09:54 -0500

Christianne Carvalho wrote:

> Hi.
>
Hi Christianne,

Explicit thread-management is not allowed within EJBs. It's difficult
to say whether
that has any bearing on the error you're seeing but it's possible. The
EJB API
was designed to hide any threading details from the developer. Use of
them at
the application level can interfere with the container's ability to
properly manage
the runtime environment.

Regarding the exception, can you post the full stack trace?

>
> I'm having a problem when commiting two threads at the same time. This
> is my use case:
>
> - thread-1/process 1 starts running
> - process 1 spawns thread-2/process 2
> - process 1 spawns thread-3/process 3
> - process 2 and process 3 starts running
> - process 2 ends and tries to synchronize. Process 1 is terminated. As
> process 1 is not terminated yet, thread 2 is put to sleep
> - process 1 ends and tries to synchronize. Process 2 is terminated. As
> all synchronization point's children are terminated, interrupts
> sleeping threads
> - process 1 commits
> - process 2 commits
>
> Obs.: process 2 and process 3 update the same table, different
> records. It's also the same pu, but each called an
> entitymanagerfactory.createEntityManager())
>
> Everything works fine until the last commit, when I get the following
> error in the server log:
>
> [#|2007-02-23T10:15:32.906-0300|WARNING|sun-appserver9.1|javax.enterprise.resource.resourceadapter|_ThreadID=44;_ThreadName=p:
> thread-pool-1; w: 17;Connection could not be allocated because: Io
> exception: The Network Adapter could not establish the
> connection;_RequestID=1826fd21-66b0-4da3-91bf-57c2bdb6b07f;|RAR5117 :
> Failed to obtain/create connection. Reason : Connection could not be
> allocated because: Io exception: The Network Adapter could not
> establish the connection|#]
>
> [#|2007-02-23T10:15:32.906-0300|WARNING|sun-appserver9.1|javax.enterprise.resource.resourceadapter|_ThreadID=44;_ThreadName=p:
> thread-pool-1; w: 17;Error in allocating a connection. Cause:
> Connection could not be allocated because: Io exception: The Network
> Adapter could not establish the
> connection;_RequestID=1826fd21-66b0-4da3-91bf-57c2bdb6b07f;|RAR5114 :
> Error allocating connection : [Error in allocating a connection.
> Cause: Connection could not be allocated because: Io exception: The
> Network Adapter could not establish the connection]|#]
>
> [#|2007-02-23T10:15:32.906-0300|WARNING|sun-appserver9.1|oracle.toplink.essentials.session.file:/C:/java/glassfish/v2/domains/domain1/applications/j2ee-apps/bizlogic-ear/consumer-pu.jar-consumer|_ThreadID=44;_ThreadName=p:
> thread-pool-1; w: 17;_RequestID=1826fd21-66b0-4da3-91bf-57c2bdb6b07f;|
> Local Exception Stack:
> Exception [TOPLINK-4002] (Oracle TopLink Essentials - 9.1 (Build
> b32)): oracle.toplink.essentials.exceptions.DatabaseException
> Internal Exception: java.sql.SQLException: Error in allocating a
> connection. Cause: Connection could not be allocated because: Io
> exception: The Network Adapter could not establish the connectionError
> Code: 0
> Call:SQLCall(UPDATE ACCOUNT SET BALANCE = ?, RECORD_VERSION = ? WHERE
> ((ACCOUNT_ID = ?) AND (RECORD_VERSION = ?)))
> Query:UpdateObjectQuery(mhave.consumer.model.entities.Account[accountId=1260])
> at
> oracle.toplink.essentials.exceptions.DatabaseException.sqlException(DatabaseException.java:289)
> at
> oracle.toplink.essentials.jndi.JNDIConnector.connect(JNDIConnector.java:135)
> at
> oracle.toplink.essentials.sessions.DatasourceLogin.connectToDatasource(DatasourceLogin.java:170)
> ...
>
> I already checked my connection pool setup/config and that shouldn't
> be problem.
>
> Does anyone have any idea why is this happening?
>
> Below is a sample of my implementation, in case it helps..
>
> I have two main classes:
> - Processor
> Receives a message and processes it.
> 1. Processor class is called
> 2. At some point of the processing, a new thread with a new call to
> Processor is spawned
> 3. When Processor finishes, if the msg parameter contains
> synchronization information, it calls the Synchronizer class
> synchronize method
> 4. If Synchronizer releases the thread, commits process
>
> - Synchronizer
> Singleton that executes synchronization processes.
> 1. Synchronizer has 'synchronized points' that groups processes being
> executed by the Processor and that needs to be synchronized.
> 2. When Processor calls Synchronizer to synchronize, Synchronizer
> checks if all involved processes are already terminated.
> 2.a If they are, it interrupts all threads to let the processing continue.
> 2.b If not, it sleeps the current thread and wait for a new
> 'synchronize' call.
>
> /**
> * PROCESSORIMPL
> */
> @Stateless
> @TransactionManagement(TransactionManagementType.BEAN)
> public class ProcessorImpl implements Processor{
>
> @Resource
> private UserTransaction utx = null;
>
> @Resource
> private SessionContext sessionContext = null;
>
> (...)
>
> public void run(ProcessorMsg msg) {
>
> // get current thread
> Thread currentThread = Thread.currentThread();
>
> // prepare new workflow context
> MyContext context = newContext(msg.getInfo());
>
> // start new JTA transaction
> if (context.isTransacted()) {
> utx.begin();
> context.getEntityManagers().joinTransaction();
> }
>
> // general processing
> (...)
>
> // synchronize this process
> if (msg.getSynchronizationId() != null) {
> Synchronizer.getInstance()
>
> .synchronize(processorMsg.getParentSynchronizationId(),
> processorMsg.getSynchronizationId(),
> currentThread, context);
> }
>
> // commit transaction if no error happened
> if (context.isTransacted()) {
> utx.commit();
> }
>
> }
>
> private MyContext newContext(
> HashMap<String, Object> info) {
>
> // create workflow context
> MyContext context = new MyContext();
>
> EntityManagerFactory emf = null ;
>
> // lookup emf instead of inject it for project specific reasons
> emf = (EntityManagerFactory) sessionContext.lookup("pu/" +
> info.get("EntityManagerFactory.name"));
>
> // register entity managers in context
> context.putEntityManager(emName, emf.createEntityManager());
>
> return context;
> }
>
> }
>
>
> /**
> * SYNCHRONIZER
> */
> public class Synchronizer {
>
> private static Synchronizer synchronizer;
>
> HashMap syncPoints = new HashMap();
>
> public static Synchronizer getInstance() {
> if (synchronizer == null) {
> synchronizer = new SynchronizerBean();
> }
> return synchronizer;
> }
>
> /**
> * Indicates that the workflow has finished and releases it for
> the sync
> * point
> */
> public WorkflowContext synchronize(String parentSyncId, String
> childSyncId,
> Thread thread, WorkflowContext childWorkflow) {
>
> logger.finest("Called synchronize for parent sync point: " +
> parentSyncId + " and child sync id: " + childSyncId);
>
> SynchronizationPoint point = syncMap.get(parentSyncId);
>
> // 1. Updates context status to finished (indicates that
> processing has terminated)
> point.terminate(childSyncId, logger);
>
> // 2. Verifies if all are terminated
> boolean isTerminated = point.isTerminated();
>
> // 3. If all are terminated, notifies all indicating that
> process has ended and removes synchronization point
> if (isTerminated) {
> point.release(logger);
> }
>
> // 4. If not all are terminated, wait until notification
> else {
> boolean timeoutOcurred = true;
>
> try {
> logger.finest("Sleeping thread " + thread.getId() +
> " for parent sync point: " + parentSyncId +
> " and child sync id: " + childSyncId);
> thread.sleep(DEFAULT_TIMEOUT);
> } catch (InterruptedException e) {
> timeoutOcurred = false;
> logger.finer("Child sync id: " + childSyncId +
> "for parent sync point: " + parentSyncId + "
> released ");
> }
>
> // timeout ocurred
> if (timeoutOcurred) {
> logger.finer(
> "Timeout occurred for parent sync point: " +
> parentSyncId +
> " and child sync id: " + childSyncId);
> childWorkflow.setResult(WorkflowContext.Result.ERROR);
> childWorkflow.setErrorCode(new
> ErrorCode("THREAD_TIMEOUT", ""));
> return childWorkflow;
> // point.release(WorkflowContext.Result.ERROR, logger);
> // point.clean(thread, logger);
> }
> }
>
> childWorkflow.setResult(WorkflowContext.Result.SUCCESS);
>
> return childWorkflow;
> }
>
> // other class methods
> (...)
>
> private class SynchronizationPoint() {
>
> HashMap childProcesses = new HashMap() ;
>
> // terminate the child process for the informed id by changing
> its status
> public boolean terminated(String id) {
> (...)
> }
>
> // check if the current sync point is terminated by checking
> the status for child processes
> public boolean isTerminated() {
> (...)
> }
>
> // Awakes all threads
> public void release() {
> Set<Map.Entry<String, Thread>> threads =
> childProcesses.entrySet();
> for (Map.Entry<String, Thread> entry : threads) {
> logger.finest("Releasing thread " +
> entry.getValue().getId() +
> " for sync point " + this.id);
> // updates context with informed status
> entry.getValue().interrupt();
> }
> }
>
> // other inner class methods
> (...)
>
> }
> }
>
> /*Christianne Carvalho
> **mHave Software, Ltda. */