persistence@glassfish.java.net

RE: broken connection during commit

From: Christianne Carvalho <ccarvalho_at_mhave.com>
Date: Tue, 27 Feb 2007 11:15:40 -0800

No, no firewall running. If I have only process 2, for example,
everything works fine and the app commits.
 
Christianne Carvalho

________________________________

From: Adam Leftik [mailto:adam.leftik_at_oracle.com]
Sent: Tuesday, February 27, 2007 3:33 PM
To: persistence_at_glassfish.dev.java.net; Christianne Carvalho
Subject: RE: broken connection during commit


Do you have a firewall between the db and the jvm running TLE?

        -----Original Message-----
        From: Christianne Carvalho [mailto:ccarvalho_at_mhave.com]
        Sent: Tuesday, February 27, 2007 10:13 AM
        To: persistence_at_glassfish.dev.java.net
        Subject: broken connection during commit
        
        

        Hi.
        
        I'm having a problem when commiting two threads at the same
time. This is my use case:
        
        - thread-1/process 1 starts running
        - process 1 spawns thread-2/process 2
        - process 1 spawns thread-3/process 3
        - process 2 and process 3 starts running
        - process 2 ends and tries to synchronize. Process 1 is
terminated. As process 1 is not terminated yet, thread 2 is put to sleep
        - process 1 ends and tries to synchronize. Process 2 is
terminated. As all synchronization point's children are terminated,
interrupts sleeping threads
        - process 1 commits
        - process 2 commits
        
        Obs.: process 2 and process 3 update the same table, different
records. It's also the same pu, but each called an
entitymanagerfactory.createEntityManager())
        
        Everything works fine until the last commit, when I get the
following error in the server log:
        
        
[#|2007-02-23T10:15:32.906-0300|WARNING|sun-appserver9.1|javax.enterpris
e.resource.resourceadapter|_ThreadID=44;_ThreadName=p: thread-pool-1; w:
17;Connection could not be allocated because: Io exception: The Network
Adapter could not establish the
connection;_RequestID=1826fd21-66b0-4da3-91bf-57c2bdb6b07f;|RAR5117 :
Failed to obtain/create connection. Reason : Connection could not be
allocated because: Io exception: The Network Adapter could not establish
the connection|#]
        
        
[#|2007-02-23T10:15:32.906-0300|WARNING|sun-appserver9.1|javax.enterpris
e.resource.resourceadapter|_ThreadID=44;_ThreadName=p: thread-pool-1; w:
17;Error in allocating a connection. Cause: Connection could not be
allocated because: Io exception: The Network Adapter could not establish
the connection;_RequestID=1826fd21-66b0-4da3-91bf-57c2bdb6b07f;|RAR5114
: Error allocating connection : [Error in allocating a connection.
Cause: Connection could not be allocated because: Io exception: The
Network Adapter could not establish the connection]|#]
        
        
[#|2007-02-23T10:15:32.906-0300|WARNING|sun-appserver9.1|oracle.toplink.
essentials.session.file:/C:/java/glassfish/v2/domains/domain1/applicatio
ns/j2ee-apps/bizlogic-ear/consumer-pu.jar-consumer|_ThreadID=44;_ThreadN
ame=p: thread-pool-1; w:
17;_RequestID=1826fd21-66b0-4da3-91bf-57c2bdb6b07f;|
        Local Exception Stack:
        Exception [TOPLINK-4002] (Oracle TopLink Essentials - 9.1 (Build
b32)): oracle.toplink.essentials.exceptions.DatabaseException
        Internal Exception: java.sql.SQLException: Error in allocating a
connection. Cause: Connection could not be allocated because: Io
exception: The Network Adapter could not establish the connectionError
Code: 0
        Call:SQLCall(UPDATE ACCOUNT SET BALANCE = ?, RECORD_VERSION = ?
WHERE ((ACCOUNT_ID = ?) AND (RECORD_VERSION = ?)))
        
Query:UpdateObjectQuery(mhave.consumer.model.entities.Account[accountId=
1260])
                at
oracle.toplink.essentials.exceptions.DatabaseException.sqlException(Data
baseException.java:289)
                at
oracle.toplink.essentials.jndi.JNDIConnector.connect(JNDIConnector.java:
135)
                at
oracle.toplink.essentials.sessions.DatasourceLogin.connectToDatasource(D
atasourceLogin.java:170)
                ...
        
        I already checked my connection pool setup/config and that
shouldn't be problem.
        
        Does anyone have any idea why is this happening?
        
        Below is a sample of my implementation, in case it helps..
        
        I have two main classes:
        - Processor
        Receives a message and processes it.
        1. Processor class is called
        2. At some point of the processing, a new thread with a new call
to Processor is spawned
        3. When Processor finishes, if the msg parameter contains
synchronization information, it calls the Synchronizer class synchronize
method
        4. If Synchronizer releases the thread, commits process
        
        - Synchronizer
        Singleton that executes synchronization processes.
        1. Synchronizer has 'synchronized points' that groups processes
being executed by the Processor and that needs to be synchronized.
        2. When Processor calls Synchronizer to synchronize,
Synchronizer checks if all involved processes are already terminated.
        2.a If they are, it interrupts all threads to let the processing
continue.
        2.b If not, it sleeps the current thread and wait for a new
'synchronize' call.
        
        /**
        * PROCESSORIMPL
        */
        @Stateless
        @TransactionManagement(TransactionManagementType.BEAN)
        public class ProcessorImpl implements Processor{
        
            @Resource
            private UserTransaction utx = null;
        
            @Resource
            private SessionContext sessionContext = null;
        
            (...)
        
            public void run(ProcessorMsg msg) {
        
                // get current thread
                Thread currentThread = Thread.currentThread();
        
                // prepare new workflow context
                MyContext context = newContext(msg.getInfo());
        
                // start new JTA transaction
                if (context.isTransacted()) {
                    utx.begin();
                context.getEntityManagers().joinTransaction();
                }
        
                // general processing
                (...)
        
                // synchronize this process
                if (msg.getSynchronizationId() != null) {
                        Synchronizer.getInstance()
        
.synchronize(processorMsg.getParentSynchronizationId(),
                                processorMsg.getSynchronizationId(),
                                currentThread, context);
                }
        
                // commit transaction if no error happened
                if (context.isTransacted()) {
                    utx.commit();
                }
        
            }
        
           private MyContext newContext(
                HashMap<String, Object> info) {
        
                // create workflow context
                MyContext context = new MyContext();
        
                EntityManagerFactory emf = null ;
        
                // lookup emf instead of inject it for project specific
reasons
                emf = (EntityManagerFactory) sessionContext.lookup("pu/"
+ info.get("EntityManagerFactory.name"));
        
                // register entity managers in context
                context.putEntityManager(emName,
emf.createEntityManager());
        
                return context;
            }
        
        }
        
        
        /**
        * SYNCHRONIZER
        */
        public class Synchronizer {
           
            private static Synchronizer synchronizer;
        
            HashMap syncPoints = new HashMap();
        
            public static Synchronizer getInstance() {
                if (synchronizer == null) {
                    synchronizer = new SynchronizerBean();
                }
                return synchronizer;
            }
        
           /**
             * Indicates that the workflow has finished and releases it
for the sync
             * point
            */
            public WorkflowContext synchronize(String parentSyncId,
String childSyncId,
                Thread thread, WorkflowContext childWorkflow) {
        
                logger.finest("Called synchronize for parent sync point:
" +
                    parentSyncId + " and child sync id: " +
childSyncId);
        
                SynchronizationPoint point = syncMap.get(parentSyncId);
        
                // 1. Updates context status to finished (indicates that
processing has terminated)
                point.terminate(childSyncId, logger);
        
                // 2. Verifies if all are terminated
                boolean isTerminated = point.isTerminated();
        
                // 3. If all are terminated, notifies all indicating
that process has ended and removes synchronization point
                if (isTerminated) {
                    point.release(logger);
                }
        
                // 4. If not all are terminated, wait until notification
                else {
                    boolean timeoutOcurred = true;
        
                    try {
                        logger.finest("Sleeping thread " +
thread.getId() +
                            " for parent sync point: " + parentSyncId +
                            " and child sync id: " + childSyncId);
                        thread.sleep(DEFAULT_TIMEOUT);
                    } catch (InterruptedException e) {
                        timeoutOcurred = false;
                        logger.finer("Child sync id: " + childSyncId +
                            "for parent sync point: " + parentSyncId + "
released ");
                    }
        
                    // timeout ocurred
                    if (timeoutOcurred) {
                        logger.finer(
                            "Timeout occurred for parent sync point: " +
parentSyncId +
                                " and child sync id: " + childSyncId);
        
childWorkflow.setResult(WorkflowContext.Result.ERROR);
                        childWorkflow.setErrorCode(new
ErrorCode("THREAD_TIMEOUT", ""));
                        return childWorkflow;
        // point.release(WorkflowContext.Result.ERROR,
logger);
        // point.clean(thread, logger);
                    }
                }
        
                childWorkflow.setResult(WorkflowContext.Result.SUCCESS);
        
                return childWorkflow;
            }
        
            // other class methods
           (...)
        
            private class SynchronizationPoint() {
        
                HashMap childProcesses = new HashMap() ;
        
                // terminate the child process for the informed id by
changing its status
                public boolean terminated(String id) {
                (...)
                }
        
                // check if the current sync point is terminated by
checking the status for child processes
                public boolean isTerminated() {
                (...)
                }
        
                // Awakes all threads
                public void release() {
                    Set<Map.Entry<String, Thread>> threads =
                        childProcesses.entrySet();
                    for (Map.Entry<String, Thread> entry : threads) {
                        logger.finest("Releasing thread " +
entry.getValue().getId() +
                            " for sync point " + this.id);
                        // updates context with informed status
                        entry.getValue().interrupt();
                    }
                }
        
                // other inner class methods
                (...)
        
            }
        }
        
        

        Christianne Carvalho
        mHave Software, Ltda.