Router Sample Program

This section describes a simple router, which can send messages from a queue residing in one JMS server to a queue with the same name in a different JMS server.

The second part of the example consists of a transactional router, which uses the XAResource object of the two sessions to group the consuming and producing of messages into a single unit of work.

There are four sample programs for this section:

Queue Sender

Queue Receiver

Message Router

Transactional Message Router

1 Queue Sender

The queue sender is a small convenience program that simply sends a text message to a queue in some JMS server. The full source code for the send application is shown below:
package router;
                                                                           
import javax.naming.InitialContext;
                                                                           
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
                                                                           
public class Send
{
    private final static String _usage =
        "usage: Send broker queue message\n\n" +
        "   example: Send localhost:53506 queue0 hello\n";
                                                                           
    public static void main(String[] args) throws Exception
    {
    |   if (args.length != 3)
    |   {
    |   |   System.out.println(_usage);
    |   |   System.exit(0);
    |   }
    |                                                                      
    |   String broker = "iiop://" + args[0];
    |                                                                      
    |   // get the initial context
    |   InitialContext ctx = new InitialContext();
    |                                                                      
    |   // lookup the queue objects
    |   Queue queue = (Queue) ctx.lookup(broker + "/queue/" + args[1]);
    |                                                                      
    |   // lookup the queue connection factories
    |   QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.
    |       lookup(broker + "/queue/connectionFactory");
    |                                                                      
    |   // create a queue connection
    |   QueueConnection queueConn = connFactory.createQueueConnection();
    |                                                                      
    |   // create a queue session
    |   QueueSession queueSession = queueConn.createQueueSession(false,
    |       Session.DUPS_OK_ACKNOWLEDGE);
    |                                                                      
    |   // create a queue sender
    |   QueueSender queueSender = queueSession.createSender(queue);
    |                                                                      
    |   // send the text message
    |   queueSender.send(queueSession.createTextMessage(args[2]));
    |                                                                      
    |   // close connection
    |   queueConn.close();
    }
}
This code is very similar to
other point-to-point examples, with the exception that we pass a fully qualified lookup string to the initial context. This ensures that we get a reference to a queue running in a specific JMS server on a specific port.

2 Queue Receiver

The queue receiver is also a small convenience program that receives a message from a queue, which resides in a particular JMS server. The full source code for the Receive class is shown below:
package router;
                                                                           
import javax.naming.InitialContext;
                                                                           
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Message;
import javax.jms.QueueSession;
import javax.jms.QueueReceiver;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
                                                                           
public class Receive
{
    private final static String _usage =
        "usage: Receive broker queue\n\n" +
        "   example: Receive localhost:53506 queue0\n";
                                                                           
    public static void main(String[] args) throws Exception
    {
    |   if (args.length != 2)
    |   {
    |   |   System.out.println(_usage);
    |   |   System.exit(0);
    |   }
    |                                                                      
    |   String broker = "iiop://" + args[0];
    |                                                                      
    |   // get the initial context
    |   InitialContext ctx = new InitialContext();
    |                                                                      
    |   // lookup the queue objects
    |   Queue queue = (Queue) ctx.lookup(broker + "/queue/" + args[1]);
    |                                                                      
    |   // lookup the queue connection factories
    |   QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.
    |       lookup(broker + "/queue/connectionFactory");
    |                                                                      
    |   // create a queue connection
    |   QueueConnection queueConn = connFactory.createQueueConnection();
    |                                                                      
    |   // start the connection
    |   queueConn.start();
    |                                                                      
    |   // create a queue session
    |   QueueSession queueSession = queueConn.createQueueSession(false,
    |       Session.AUTO_ACKNOWLEDGE);
    |                                                                      
    |   // create a queue receiver
    |   QueueReceiver queueReceiver = queueSession.createReceiver(queue);
    |                                                                      
    |   // try to receive a message
    |   Message msg = queueReceiver.receive(2000);
    |                                                                      
    |   // print a message
    |   System.out.println(msg != null ? msg.toString() : "no message");
    |                                                                      
    |   // close connection
    |   queueConn.close();
    }
}
Again, this code is similar to
other point-to-point examples. As mentioned above, we use a fully qualified lookup string in the initial context to get a queue in a specific JMS server instance.

3 Message Router

A message router consumes messages from one destination and puts them onto a different destination. This example routes messages from one queue to another. The queues can be running in different JMS servers:
package router;
                                                                           
import javax.naming.InitialContext;
                                                                           
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.JMSException;
import javax.jms.QueueReceiver;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
                                                                           
public class Router
{
    private final static String _usage =
        "usage: Router source target queue\n\n" +
        "   example: Router localhost:53506 localhost:53507 queue0\n";
                                                                           
    public static void main(String[] args) throws Exception
    {
    |   if (args.length != 3)
    |   {
    |   |   System.out.println(_usage);
    |   |   System.exit(0);
    |   }
    |                                                                      
    |   String source = "iiop://" + args[0];
    |   String target = "iiop://" + args[1];
    |   String queue  = args[2];
    |                                                                      
    |   // get the initial context
    |   InitialContext ctx = new InitialContext();
    |                                                                      
    |   // lookup the queue objects
    |   Queue sourceQueue = (Queue) ctx.lookup(source + "/queue/" + queue);
    |   Queue targetQueue = (Queue) ctx.lookup(target + "/queue/" + queue);
    |                                                                      
    |   // lookup the queue connection factories
    |   QueueConnectionFactory sourceFactory = (QueueConnectionFactory) ctx.
    |       lookup(source + "/queue/connectionFactory");
    |   QueueConnectionFactory targetFactory = (QueueConnectionFactory) ctx.
    |       lookup(target + "/queue/connectionFactory");
    |                                                                      
    |   QueueReceiver queueReceiver = getReceiver(sourceFactory, sourceQueue);
    |   QueueSender   queueSender   = getSender(targetFactory, targetQueue);
    |                                                                      
    |   System.out.println("routing from " + args[0] + "/" + queue +
    |       " to " + args[1] + "/" + queue);
    |                                                                      
    |   while (true)
    |   {
    |   |   queueSender.send(queueReceiver.receive());
    |   |   System.out.print(".");
    |   }
    }
                                                                           
    private static QueueSender getSender(QueueConnectionFactory factory,
        Queue queue) throws JMSException
    {
    |   // create a queue connection
    |   QueueConnection queueConn = factory.createQueueConnection();
    |                                                                      
    |   // create a queue session
    |   QueueSession queueSession = queueConn.createQueueSession(false,
    |       Session.DUPS_OK_ACKNOWLEDGE);
    |                                                                      
    |   // create a queue sender
    |   return queueSession.createSender(queue);
    }
                                                                           
    private static QueueReceiver getReceiver(QueueConnectionFactory factory,
        Queue queue) throws JMSException
    {
    |   // create a queue connection
    |   QueueConnection queueConn = factory.createQueueConnection();
    |                                                                      
    |   // start the flow of messages
    |   queueConn.start();
    |                                                                      
    |   // create a queue session
    |   QueueSession queueSession = queueConn.createQueueSession(false,
    |       Session.AUTO_ACKNOWLEDGE);
    |                                                                      
    |   // create a queue receiver
    |   return queueSession.createReceiver(queue);
    }
}
The router uses the same fully qualified JNDI lookup strings to locate queues in particular servers as described above. Once the sender has been established for the source queue, and the receiver has been established for the target queue, we simply enter a loop that forwards the messages.

4 Transactional Message Router

The router above works fine as long as no errors occurs. But messages can be lost if either one of the JMS servers crashes, or if the router itself crashes when a message has not been fully forwarded.

Using a different acknowledge mode and implementing a router that writes some appropriate check point into a file or database can reduce the window for errors but not fully eliminate it.

Note that we can not use a transacted session here because the two queues could be in different servers. A session can only span message producers and consumers within single server.

In such situations, a distributed transaction is necessary to eliminate the possiblity of loosing or duplicating messages. A distributed transaction ensures that the two operations happen as a single unit of work.

As mentioned in the introduction to this example, the JMS server does not ship with a transaction manager than can coordinate resources from different systems.

The example below therefore "manually" drives the transaction by calling methods on the XAResource object. Below is the source code for the transactional version of the router:

package router;
                                                                           
import javax.naming.InitialContext;
                                                                           
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.JMSException;
import javax.jms.QueueReceiver;
import javax.jms.XAQueueSession;
import javax.jms.XAQueueConnection;
import javax.jms.XAQueueConnectionFactory;
                                                                           
import javax.transaction.xa.Xid;
import javax.transaction.xa.XAResource;
                                                                           
public class XARouter
{
    private final static String _usage =
        "usage: XARouter source target queue\n\n" +
        "   example: XARouter localhost:53506 localhost:53507 queue0\n";
                                                                           
    private Queue _sourceQueue, _targetQueue;
    private XAQueueConnectionFactory _sourceFactory, _targetFactory;
    private XAQueueSession _sourceSession, _targetSession;
    private QueueReceiver _queueReceiver;
    private QueueSender _queueSender;
                                                                           
    public static void main(String[] args) throws Exception
    {
    |   if (args.length != 3)
    |   {
    |   |   System.out.println(_usage);
    |   |   System.exit(0);
    |   }
    |                                                                      
    |   String source = "iiop://" + args[0];
    |   String target = "iiop://" + args[1];
    |   String queue  = args[2];
    |                                                                      
    |   XARouter router = new XARouter(source, target, queue);
    |                                                                      
    |   System.out.println("transactional routing from " + args[0] + "/" +
    |       queue + " to " + args[1] + "/" + queue);
    |                                                                      
    |   router.route();
    }
                                                                           
    public XARouter(String source, String target, String queue)
        throws Exception
    {
    |   // get the initial context
    |   InitialContext ctx = new InitialContext();
    |                                                                      
    |   // lookup the queue objects
    |   _sourceQueue = (Queue) ctx.lookup(source + "/queue/" + queue);
    |   _targetQueue = (Queue) ctx.lookup(target + "/queue/" + queue);
    |                                                                      
    |   // lookup the queue connection factories
    |   _sourceFactory = (XAQueueConnectionFactory)
    |       ctx.lookup(source + "/queue/xaConnectionFactory");
    |   _targetFactory = (XAQueueConnectionFactory)
    |       ctx.lookup(target + "/queue/xaConnectionFactory");
    |                                                                      
    |   createReceiver();
    |   createSender();
    }
                                                                           
    private void route() throws Exception
    {
    |   XAResource xares1 = _sourceSession.getXAResource();
    |   XAResource xares2 = _targetSession.getXAResource();
    |                                                                      
    |   int index = 0;
    |                                                                      
    |   while (true)
    |   {
    |   |   Xid xid = new XidImpl(index++);
    |   |                                                                  
    |   |   xares1.start(xid, XAResource.TMNOFLAGS);
    |   |   xares2.start(xid, XAResource.TMJOIN);
    |   |                                                                  
    |   |   _queueSender.send(_queueReceiver.receive());
    |   |                                                                  
    |   |   xares1.end(xid, XAResource.TMNOFLAGS);
    |   |   xares2.end(xid, XAResource.TMNOFLAGS);
    |   |                                                                  
    |   |   xares1.prepare(xid);
    |   |   xares2.prepare(xid);
    |   |                                                                  
    |   |   xares1.commit(xid, false);
    |   |   xares2.commit(xid, false);
    |   |                                                                  
    |   |   System.out.print(".");
    |   }
    }
                                                                           
    private void createSender() throws JMSException
    {
    |   // create an XA queue connection
    |   XAQueueConnection queueConn =
    |       _sourceFactory.createXAQueueConnection();
    |                                                                      
    |   // create an XA queue session
    |   _sourceSession = queueConn.createXAQueueSession();
    |                                                                      
    |   // get the session from XA session
    |   QueueSession queueSession = _sourceSession.getQueueSession();
    |                                                                      
    |   // create a queue sender
    |   _queueSender = queueSession.createSender(_targetQueue);
    }
                                                                           
    private void createReceiver() throws JMSException
    {
    |   // create an XA queue connection
    |   XAQueueConnection queueConn =
    |       _targetFactory.createXAQueueConnection();
    |                                                                      
    |   // start the flow of messages
    |   queueConn.start();
    |                                                                      
    |   // create an XA queue session
    |   _targetSession = queueConn.createXAQueueSession();
    |                                                                      
    |   // get the session from XA session
    |   QueueSession queueSession = _targetSession.getQueueSession();
    |                                                                      
    |   // create a queue receiver
    |   _queueReceiver = queueSession.createReceiver(_sourceQueue);
    }
                                                                           
    static class XidImpl implements Xid
    {
    |   private byte _branch[] = new byte[64];
    |   private byte _global[] = new byte[64];
    |                                                                      
    |   public XidImpl(int id)
    |   {
    |   |   _branch[60] = (byte) ((id >>> 24) & 0xFF);
    |   |   _branch[61] = (byte) ((id >>> 16) & 0xFF);
    |   |   _branch[62] = (byte) ((id >>>  8) & 0xFF);
    |   |   _branch[63] = (byte) ( id         & 0xFF);
    |   }
    |                                                                      
    |   public byte[] getGlobalTransactionId() { return _global; }
    |                                                                      
    |   public byte[] getBranchQualifier() { return _branch; }
    |                                                                      
    |   public int getFormatId() { return 0; }
    }
}
The router perform approximately the same setup as the Router above by creating a receiver for the source queue and a sender for the target queue. Note that we use XA connection factories, connections and sessions.

The route method is where the two-phase commit protocol is manually driven for the resource objects associated with the two XASession's. Once the commit method succeeds on both resources, the message has successfully been routed.

The example above also provides a dummy implementation of the Xid interface, which encapsulates an identified for a distributed transaction. The Xid is typically supplied by the transaction manager.

While transactional, this example does not support recovery. If the program crashes after successfully invoking the prepare method but before completing both commit methods, the transaction has to be recovered. This is the job of a transaction manager.

Back to top


Copyright © 2000-2003, Novell, Inc. All rights reserved.