Domain Unification Example

Version 1.1 of the Java Message Service (JMS) specification introduces a unification of the topic (Pub/Sub) and queue (point-to-point) domains at the interface level. The most important side-effect of the domain unification is that topic clients and queue clients can use the same session, e.g. messages consumed from a queue can be published to a topic (and vice versa) in the same local transaction. This example shows how to do exactly that:

package unified;
                                                                           
import javax.naming.InitialContext;
                                                                           
import javax.jms.Queue;
import javax.jms.Topic;
import javax.jms.Session;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.TextMessage;
import javax.jms.MessageProducer;
import javax.jms.MessageConsumer;
import javax.jms.ConnectionFactory;
                                                                           
public class Unified
{
    public static void main(String[] args) throws Exception
    {
    |   // get the initial context
    |   InitialContext ctx = new InitialContext();
    |                                                                      
    |   // lookup the queue and topic objects
    |   Destination queue = (Destination) ctx.lookup("queue/queue0");
    |   Destination topic = (Destination) ctx.lookup("topic/topic0");
    |                                                                      
    |   // lookup a connection factory
    |   ConnectionFactory factory = (ConnectionFactory) ctx.
    |       lookup("queue/connectionFactory");
    |                                                                      
    |   // create a connection
    |   Connection connection = factory.createConnection();
    |                                                                      
    |   // create a transacted session
    |   Session session = connection.createSession(true, -1);
    |                                                                      
    |   // create a (queue) message producer
    |   MessageProducer sender = session.createProducer(queue);
    |                                                                      
    |   // create a (topic) message producer
    |   MessageProducer publisher = session.createProducer(topic);
    |                                                                      
    |   // create a (queue) message consumer
    |   MessageConsumer receiver = session.createConsumer(queue);
    |                                                                      
    |   // create a (topic) message consumer
    |   MessageConsumer subscriber = session.createConsumer(topic);
    |                                                                      
    |   // start the connection
    |   connection.start();
    |                                                                      
    |   // send a few messages to the queue
    |   sender.send(session.createTextMessage("part one"));
    |   sender.send(session.createTextMessage("part two"));
    |   sender.send(session.createTextMessage("part three"));
    |   sender.send(session.createTextMessage("final"));
    |                                                                      
    |   // commit the session (actually put messages on queue)
    |   session.commit();
    |                                                                      
    |   // consume messages from queue and produce to topic in one transaction
    |   TextMessage m1 = (TextMessage) receiver.receive();
    |   TextMessage m2 = (TextMessage) receiver.receive();
    |   TextMessage m3 = (TextMessage) receiver.receive();
    |   TextMessage m4 = (TextMessage) receiver.receive();
    |                                                                      
    |   publisher.send(m1);
    |   publisher.send(m2);
    |   publisher.send(m3);
    |   publisher.send(m4);
    |                                                                      
    |   // commit receive and send
    |   session.commit();
    |                                                                      
    |   // check messages came to topic
    |   TextMessage tm1 = (TextMessage) subscriber.receive();
    |   TextMessage tm2 = (TextMessage) subscriber.receive();
    |   TextMessage tm3 = (TextMessage) subscriber.receive();
    |   TextMessage tm4 = (TextMessage) subscriber.receive();
    |                                                                      
    |   System.out.println(tm1.getText() + ", " + tm2.getText() + ", " +
    |       tm3.getText() + ", " + tm4.getText());
    |                                                                      
    |   // commit receive of topic messages
    |   session.commit();
    |                                                                      
    |   // close connection
    |   connection.close();
    }
}

As can be seen, the message producers and consumers are treated in a generic fashion without casting them to TopicPublisher's, QueueReceiver's, etc. Since the session is generic, it can be used to coordinate the work on both the receiver and publisher.



Copyright © 2003, 2004 Novell, Inc. All rights reserved. Copyright © 2001, 2002, 2003 SilverStream Software, LLC. All rights reserved.