import bitronix.tm.BitronixTransactionManager; import bitronix.tm.TransactionManagerServices; import bitronix.tm.resource.jms.PoolingConnectionFactory; import javax.jms.*; /** * @author lorban */ public class Receive { public static void main(String[] args) throws Exception { PoolingConnectionFactory pcf = new PoolingConnectionFactory(); pcf.setClassName("org.apache.activemq.ActiveMQXAConnectionFactory"); pcf.setUniqueName("amq"); pcf.setMaxPoolSize(1); pcf.getDriverProperties().setProperty("brokerURL", "tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=5"); pcf.init(); TransactionManagerServices.getConfiguration().setJournal("null"); BitronixTransactionManager btm = TransactionManagerServices.getTransactionManager(); Connection c = pcf.createConnection(); c.start(); { btm.begin(); Session s = c.createSession(true, Session.SESSION_TRANSACTED); Queue q = s.createQueue("queue"); MessageProducer p = s.createProducer(q); p.send(s.createTextMessage("test")); s.close(); btm.commit(); } while (true) { btm.begin(); Session s = c.createSession(true, Session.SESSION_TRANSACTED); Queue q = s.createQueue("queue"); MessageConsumer consumer = s.createConsumer(q); Message msg = consumer.receive(1000); if (msg != null) { /** uncomment the suspend/resume to make the redelivery mechanism fail */ //Transaction tx = btm.suspend(); System.out.println("msg: " + msg.toString()); //btm.resume(tx); } else { System.out.println("no more message in queue"); s.close(); btm.rollback(); break; } s.close(); btm.rollback(); } c.close(); btm.shutdown(); pcf.close(); } }