/* * Simple.java * * Created on January 17, 2007, 2:52 PM * * To change this template, choose Tools | Template Manager * and open the template in the editor. */ package wstx.sample.service; import java.util.logging.Logger; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.ejb.EJB; import javax.ejb.SessionContext; import javax.ejb.Stateless; import javax.ejb.TransactionAttribute; import javax.ejb.TransactionAttributeType; import javax.ejb.TransactionManagement; import javax.ejb.TransactionManagementType; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.XASession; import javax.jws.WebMethod; import javax.jws.WebService; import com.sun.xml.ws.api.tx.ATTransaction; import com.sun.xml.ws.api.tx.TXException; import com.sun.xml.ws.api.tx.Participant; import com.sun.xml.ws.api.tx.Protocol; import javax.annotation.Resource; import javax.jws.WebService; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import javax.transaction.HeuristicMixedException; import javax.transaction.HeuristicRollbackException; import javax.transaction.NotSupportedException; import javax.transaction.RollbackException; import javax.transaction.SystemException; import javax.transaction.Transaction; import javax.transaction.TransactionManager; import javax.transaction.UserTransaction; import com.sun.xml.ws.api.tx.TransactionManagerFactory; import javax.transaction.xa.XAResource; import sb.MessageVerifierLocal; import sb.PublisherLocal; import sb.PublisherRemote; import javax.ejb.TransactionManagement; import javax.ejb.TransactionManagementType; /** * * @author jf39279 */ //wsdlLocation = "META-INF/wsdl/wsit-wstx.sample.service.Simple.xml" @WebService(portName = "SimpleServiceBinding", targetNamespace = "http://tempuri.org/" ) @Stateless @TransactionManagement(TransactionManagementType.CONTAINER) @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED) public class Simple { static final Logger logger = Logger.getLogger("PublisherBean"); //@Resource //UserTransaction ut; @EJB(name = "PublisherLocal") PublisherLocal publisher; @EJB(name = "MessageVerifierLocal") MessageVerifierLocal verifier; @Resource private SessionContext sc; Connection connection = null; @Resource(mappedName = "jms/ConnectionFactory") private ConnectionFactory connectionFactory; @Resource(mappedName = "jms/Queue") private Queue queue; public enum AccessMode { NONE, DIRECT, VIA_EJB }; private AccessMode accessMode; final int TIMEOUT_READING_MESSAGE = 10000; /** Creates a new instance */ public Simple() { accessMode = AccessMode.NONE; } @WebMethod @TransactionAttribute(TransactionAttributeType.NEVER) public void setAccessMode(AccessMode accessMode) { this.accessMode = accessMode; if (connection == null) { System.out.println("assertion failure: connection should be non-null. preConstruct failed. Fixing..."); } getConnection(); clearJMSQueue(); // try { // ut.setTransactionTimeout(600); // } catch (SystemException ex) { // ex.printStackTrace(); // } } @WebMethod @TransactionAttribute(TransactionAttributeType.REQUIRED) public void publishRequired(Long id, String description) { final String METHODNAME = "publishRequired"; System.out.println(METHODNAME + " ENTER [id=" + id + " description="+ description + "]"); // getATTransaction().enlistParticipant(new BaseParticipant(description, getATTransaction())); publish(id, description); System.out.println(METHODNAME + " Exit [id=" + id + " description="+ description + "]"); } @WebMethod @TransactionAttribute(TransactionAttributeType.SUPPORTS) public void publishSupports(Long id, String description) { final String METHODNAME = "publishSupports"; System.out.println(METHODNAME + "[id=" + id + " description="+ description + "]"); // getATTransaction().enlistParticipant(new BaseParticipant(description, getATTransaction())); publish(id, description); } @WebMethod @TransactionAttribute(TransactionAttributeType.REQUIRED) public void publishRequiredAlwaysRollsback(Long id, String description) { final String METHODNAME = "publishRequiredAlwaysRollsback"; System.out.println(METHODNAME + "[id=" + id + " description="+ description + "]"); // getATTransaction().enlistParticipant(new BaseParticipant(description, getATTransaction())); publish(id, description); try { sc.setRollbackOnly(); } catch (IllegalStateException ex) { ex.printStackTrace(); } } @WebMethod @TransactionAttribute(TransactionAttributeType.REQUIRED) public boolean verify(Long id, String description) { boolean result = false; try { result = verifyMessage(TIMEOUT_READING_MESSAGE, id, description); } catch (JMSException je) { je.printStackTrace(); } return result; } static private TransactionManager wsatTm = null; private static ATTransaction getATTransaction() { try { return (ATTransaction)TransactionManagerFactory.getTransactionManager().getTransaction(); } catch (SystemException ex) { ex.printStackTrace(); } return null; } public void publish(Long id, String description) { switch (accessMode) { case VIA_EJB: publisher.publish(id, description); return; case DIRECT: // directly access XATransaction resources in web service below } Throwable rethrow = null; Session session = null; MessageProducer publisher = null; TextMessage message = null; String messageType = null; XAResource xares = null; try { // getConnection(); session = connection.createSession(true, 0); publisher = session.createProducer(queue); message = session.createTextMessage(); message.setText("Item " + id + ": " + description); logger.info( "PUBLISHER: Setting " + "message text to: " + message.getText()); publisher.send(message); } catch (Throwable t) { // JMSException could be thrown logger.severe( "PublisherBean.publish: " + "Exception: " + t.toString()); rethrow = t; } finally { if (session != null) { try { session.close(); // endConnection(); } catch (JMSException e) { } } } } private void getConnection() { if (connection == null) { makeConnection(); } } public Message getMessage(int timeout) throws JMSException { Message result = null; Session session = null; try { //getConnection(); session = connection.createSession(false, 0); MessageConsumer msgConsumer = session.createConsumer(queue); Message message = null; connection.start(); result = msgConsumer.receive(timeout); } finally { if (session != null) { try { session.close(); //endConnection(); } catch (JMSException e) { } } return result; } } public boolean verifyMessage(int timeout, Long id, String description) throws JMSException { switch (accessMode) { case NONE: return false; case VIA_EJB: return verifier.verifyMessage(TIMEOUT_READING_MESSAGE, id, description); case DIRECT: // directly access in web service below } Message msg = getMessage(timeout); if (msg == null) { System.out.println("timed out reading message from jms/Queue"); } else if (msg instanceof TextMessage) { TextMessage txtMsg = (TextMessage)msg; System.out.println("read message from jms/Queue with text=|" + txtMsg.getText()); final String expectedMsgText = "Item " + id + ": " + description; if (txtMsg.getText().equals(expectedMsgText)) { return true; } else { System.out.println("verify failed. Expected=" + expectedMsgText + " Received msg txt=" + txtMsg.getText()); } } else { logger.warning("unhandled message type in verifyMessage"); } return false; } /** * Creates the connection. */ @PostConstruct public void makeConnection() { logger.info("acquiring JMS connection for instance " + this.toString()); try { if (connectionFactory == null) { System.out.println("assertion failure: connectionFactory should not be null, resource injection for connectionFactory has not occurred yet."); } connection = connectionFactory.createConnection(); } catch (Throwable t) { // JMSException could be thrown logger.severe( "Simple.makeConnection:" + "Exception: " + t.toString()); } } /** * Closes the connection. */ @PreDestroy public void endConnection() throws RuntimeException { logger.info("release JMS connection for instance " + toString()); if (connection != null) { try { connection.close(); connection = null; } catch (Exception e) { e.printStackTrace(); } } } public static class BaseParticipant implements Participant { final private Protocol protocol; Participant.STATE prepareResult = null; final String participantName; final Transaction txn; public BaseParticipant(String participantName, Protocol p, Participant.STATE prepareResult, Transaction txn) { protocol = p; this.prepareResult = prepareResult; this.participantName = participantName; this.txn = txn; } public BaseParticipant(String participantName, Transaction txn) { this(participantName, Protocol.DURABLE, Participant.STATE.P_OK, txn); } public void abort() { report("abort"); } public void commit() { report("commit"); } public Protocol getProtocol() { return protocol; } public STATE prepare() throws TXException { report("prepare"); if (prepareResult == null) { throw new TXException("Participant " + participantName + " initiated rollback in prepare"); } return prepareResult; } private void report(String methodName){ System.out.println(protocol + "Participant." + methodName + "()" + "PName:" + participantName + " JTA TxnId:" + txn.toString()); } } private void clearJMSQueue() { // clear jms queue Session session = null; try { // getConnection(); session = connection.createSession(false, 0); MessageConsumer msgConsumer = session.createConsumer(queue); Message message = null; connection.start(); while (true) { Message result = msgConsumer.receive(1000); if (result == null) { break; } else { System.out.println("WARNING: clear unexpected unread message from queue:" + ((TextMessage)result).getText()); } } } catch (JMSException je) { je.printStackTrace(); } finally { if (session != null) { try { session.close(); //endConnection(); } catch (JMSException e) { } } } } }