1 Domain Unification Example

Version 1.1 of the Java Message Service (JMS) specification introduces a unification of the topic (Pub/Sub) and queue (point-to-point) domains at the interface level. The most important side-effect of the domain unification is that topic clients and queue clients can use the same session, e.g. messages consumed from a queue can be published to a topic (and vice versa) in the same local transaction. This example shows how to do exactly that:
package unified;
                                                                           
import javax.naming.InitialContext;
                                                                           
import javax.jms.Queue;
import javax.jms.Topic;
import javax.jms.Session;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.TextMessage;
import javax.jms.MessageProducer;
import javax.jms.MessageConsumer;
import javax.jms.ConnectionFactory;
                                                                           
public class Unified
{
    public static void main(String[] args) throws Exception
    {
    |   // get the initial context
    |   InitialContext ctx = new InitialContext();
    |                                                                      
    |   // lookup the queue and topic objects
    |   Destination queue = (Destination) ctx.lookup("queue/queue0");
    |   Destination topic = (Destination) ctx.lookup("topic/topic0");
    |                                                                      
    |   // lookup a connection factory
    |   ConnectionFactory factory = (ConnectionFactory) ctx.
    |       lookup("queue/connectionFactory");
    |                                                                      
    |   // create a connection
    |   Connection connection = factory.createConnection();
    |                                                                      
    |   // create a transacted session
    |   Session session = connection.createSession(true, -1);
    |                                                                      
    |   // create a (queue) message producer
    |   MessageProducer sender = session.createProducer(queue);
    |                                                                      
    |   // create a (topic) message producer
    |   MessageProducer publisher = session.createProducer(topic);
    |                                                                      
    |   // create a (queue) message consumer
    |   MessageConsumer receiver = session.createConsumer(queue);
    |                                                                      
    |   // create a (topic) message consumer
    |   MessageConsumer subscriber = session.createConsumer(topic);
    |                                                                      
    |   // start the connection
    |   connection.start();
    |                                                                      
    |   // send a few messages to the queue
    |   sender.send(session.createTextMessage("part one"));
    |   sender.send(session.createTextMessage("part two"));
    |   sender.send(session.createTextMessage("part three"));
    |   sender.send(session.createTextMessage("final"));
    |                                                                      
    |   // commit the session (actually put messages on queue)
    |   session.commit();
    |                                                                      
    |   // consume messages from queue and produce to topic in one transaction
    |   TextMessage m1 = (TextMessage) receiver.receive();
    |   TextMessage m2 = (TextMessage) receiver.receive();
    |   TextMessage m3 = (TextMessage) receiver.receive();
    |   TextMessage m4 = (TextMessage) receiver.receive();
    |                                                                      
    |   publisher.send(m1);
    |   publisher.send(m2);
    |   publisher.send(m3);
    |   publisher.send(m4);
    |                                                                      
    |   // commit receive and send
    |   session.commit();
    |                                                                      
    |   // check messages came to topic
    |   TextMessage tm1 = (TextMessage) subscriber.receive();
    |   TextMessage tm2 = (TextMessage) subscriber.receive();
    |   TextMessage tm3 = (TextMessage) subscriber.receive();
    |   TextMessage tm4 = (TextMessage) subscriber.receive();
    |                                                                      
    |   System.out.println(tm1.getText() + ", " + tm2.getText() + ", " +
    |       tm3.getText() + ", " + tm4.getText());
    |                                                                      
    |   // commit receive of topic messages
    |   session.commit();
    |                                                                      
    |   // close connection
    |   connection.close();
    }
}
As can be seen, the message producers and consumers are treated in a generic fashion without casting them to TopicPublisher's, QueueReceiver's, etc. Since the session is generic, it can be used to coordinate the work on both the receiver and publisher.

Back to top


Copyright © 2000-2003, Novell, Inc. All rights reserved.