Transacted sessions are used when an applications needs to group certain messages together. As an example, an application may need to acknowledge the receipt of one message and send some response to another queue in one transaction. There are two sample programs for this section:
A transactional sender differs from a regular, non-transacted sender in two ways:
When creating the session, the first parameter in the
createQueueSession
method is set to true to indicate
a transacted session.
Sending or discarding messages are controlled using the session's
commit
and rollback
methods, respectively.
Message are not sent until the session is committed.
Below is the full source for the TxSender
class:
package txSession; import javax.naming.InitialContext; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.QueueSender; import javax.jms.DeliveryMode; import javax.jms.QueueSession; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; public class TxSender { public static void main(String[] args) throws Exception { | // get the initial context | InitialContext ctx = new InitialContext(); | | // lookup the queue object | Queue queue = (Queue) ctx.lookup("queue/queue0"); | | // lookup the queue connection factory | QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx. | lookup("queue/connectionFactory"); | | // create a queue connection | QueueConnection queueConn = connFactory.createQueueConnection(); | | // create a queue session | QueueSession queueSession = queueConn.createQueueSession(true, | Session.DUPS_OK_ACKNOWLEDGE); | | // create a queue sender | QueueSender queueSender = queueSession.createSender(queue); | queueSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT); | | TextMessage part1 = queueSession.createTextMessage("part one"); | TextMessage part2 = queueSession.createTextMessage("part two"); | TextMessage part3 = queueSession.createTextMessage("part three"); | TextMessage last = queueSession.createTextMessage("final"); | | queueSender.send(part1); | queueSender.send(part2); | queueSender.send(part3); | queueSender.send(last); | | // commit the send (actually put messages on queue) | queueSession.commit(); | | System.out.println("TxSender committed 4 messages"); | | // close the queue connection | queueConn.close(); } }
As is always the case for message producers, the acknowledge mode on the
session is not used. Here we set the delivery mode to NON_PERSISTENT
which
means that the four messages are not persistent but they are still put onto
the target queue in one unit of work.
In the Novell exteNd Messaging Platform's JMS server, persistent messages in a transactional message producer are entered into the database in one database transaction. This guarantees the all-or-nothing semantics of a tranacted session. When commit succeeds, all messages are guaranteed to be delivered.
A transactional receiver is similar to non-transacted receivers, with the following two exceptions:
As was the case for the sender, the first parameter in the
createQueueSession
method is set to true to indicate
a transacted session. The second argument is ignored.
Acknowledgement of messages is controlled using the session's
commit
and rollback
methods.
Transactional message acknowledgement works similar to CLIENT_ACKNOWLEDGE
in that all messages received since the last call to commit
will be acknowledged each time commit
is invoked. However,
as opposed to CLIENT_ACKNOWLEDGE
, comitting a transacted session affects all
its producers and consumers.
Rolling back a transacted session is similar to calling the session's
recover
method on a receiver that uses CLIENT_ACKNOWLEDGE
.
All messages that were received since the last call to commit
will be re-delivered to the session's consumers. Also, rollback
will discard all messages sent by a session's producers as described above.
Below is the source for the TxReceiver
class:
package txSession; import javax.naming.InitialContext; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.QueueSession; import javax.jms.QueueReceiver; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; public class TxReceiver { public static void main(String[] args) throws Exception { | // get the initial context | InitialContext ctx = new InitialContext(); | | // lookup the queue object | Queue queue = (Queue) ctx.lookup("queue/queue0"); | | // lookup the queue connection factory | QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx. | lookup("queue/connectionFactory"); | | // create a queue connection | QueueConnection queueConn = connFactory.createQueueConnection(); | | // create a queue session | QueueSession queueSession = queueConn.createQueueSession(true, | Session.AUTO_ACKNOWLEDGE); | | // create a queue receiver | QueueReceiver queueReceiver = queueSession.createReceiver(queue); | | // start the connection | queueConn.start(); | | while (true) | { | | TextMessage msg = (TextMessage) queueReceiver.receive(); | | System.out.println(msg.getText()); | | if (msg.getText().equals("final")) | | { | | | queueSession.commit(); // acknowledge all | | | break; | | } | } | | // close the queue connection | queueConn.close(); } }
The TxReceiver
performs the familiar initial steps to start receiving
messages. The loop keeps receiving messages until the "last" message arrives
on the queue. At that point the session is committed to signal that the
consumer has received the entire unit of work.
Copyright © 2003, 2004 Novell, Inc. All rights reserved. Copyright © 2001, 2002, 2003 SilverStream Software, LLC. All rights reserved.