Transacted sessions are used when an applications needs to group certain messages together. As an example, an application may need to acknowledge the receipt of one message and send some response to another queue in one transaction. There are two sample programs for this section:1 TxSender
A transactional sender differs from a regular, non-transacted sender in two ways:Below is the full source for the TxSender class:
- When creating the session, the first parameter in the
createQueueSession
method is set to true to indicate a transacted session.- Sending or discarding messages are controlled using the session's
commit
androllback
methods, respectively. Message are not sent until the session is committed.As is always the case for message producers, the acknowledge mode on the session is not used. Here we set the delivery mode topackage txSession; import javax.naming.InitialContext; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.QueueSender; import javax.jms.DeliveryMode; import javax.jms.QueueSession; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; public class TxSender { public static void main(String[] args) throws Exception { | // get the initial context | InitialContext ctx = new InitialContext(); | | // lookup the queue object | Queue queue = (Queue) ctx.lookup("queue/queue0"); | | // lookup the queue connection factory | QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx. | lookup("queue/connectionFactory"); | | // create a queue connection | QueueConnection queueConn = connFactory.createQueueConnection(); | | // create a queue session | QueueSession queueSession = queueConn.createQueueSession(true, | Session.DUPS_OK_ACKNOWLEDGE); | | // create a queue sender | QueueSender queueSender = queueSession.createSender(queue); | queueSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT); | | TextMessage part1 = queueSession.createTextMessage("part one"); | TextMessage part2 = queueSession.createTextMessage("part two"); | TextMessage part3 = queueSession.createTextMessage("part three"); | TextMessage last = queueSession.createTextMessage("final"); | | queueSender.send(part1); | queueSender.send(part2); | queueSender.send(part3); | queueSender.send(last); | | // commit the send (actually put messages on queue) | queueSession.commit(); | | System.out.println("TxSender committed 4 messages"); | | // close the queue connection | queueConn.close(); } }NON_PERSISTENT
which means that the four messages are not persistent but they are still put onto the target queue in one unit of work.In the Novell exteNd Messaging Platform's JMS server, persistent messages in a transactional message producer are entered into the database in one database transaction. This guarantees the all-or-nothing semantics of a tranacted session. When commit succeeds, all messages are guaranteed to be delivered.
2 TxReceiver
A transactional receiver is similar to non-transacted receivers, with the following two exceptions:Transactional message acknowledgement works similar to CLIENT_ACKNOWLEDGE in that all messages received since the last call to
- As was the case for the sender, the first parameter in the
createQueueSession
method is set to true to indicate a transacted session. The second argument is ignored.- Acknowledgement of messages is controlled using the session's
commit
androllback
methods.commit
will be acknowledged each timecommit
is invoked. However, as opposed to CLIENT_ACKNOWLEDGE, comitting a transacted session affects all its producers and consumers.Rolling back a transacted session is similar to calling the session's
recover
method on a receiver that uses CLIENT_ACKNOWLEDGE. All messages that were received since the last call tocommit
will be re-delivered to the session's consumers. Also,rollback
will discard all messages sent by a session's producers as described above.Below is the source for the TxReceiver class:
The TxReceiver performs the familiar initial steps to start receiving messages. The loop keeps receiving messages until the "last" message arrives on the queue. At that point the session is committed to signal that the consumer has received the entire unit of work.package txSession; import javax.naming.InitialContext; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.QueueSession; import javax.jms.QueueReceiver; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; public class TxReceiver { public static void main(String[] args) throws Exception { | // get the initial context | InitialContext ctx = new InitialContext(); | | // lookup the queue object | Queue queue = (Queue) ctx.lookup("queue/queue0"); | | // lookup the queue connection factory | QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx. | lookup("queue/connectionFactory"); | | // create a queue connection | QueueConnection queueConn = connFactory.createQueueConnection(); | | // create a queue session | QueueSession queueSession = queueConn.createQueueSession(true, | Session.AUTO_ACKNOWLEDGE); | | // create a queue receiver | QueueReceiver queueReceiver = queueSession.createReceiver(queue); | | // start the connection | queueConn.start(); | | while (true) | { | | TextMessage msg = (TextMessage) queueReceiver.receive(); | | System.out.println(msg.getText()); | | if (msg.getText().equals("final")) | | { | | | queueSession.commit(); // acknowledge all | | | break; | | } | } | | // close the queue connection | queueConn.close(); } }
Copyright © 2000-2003, Novell, Inc. All rights reserved. |