Distributed Queue Example

This example shows how to interact with a JMS queue, which is distributed across two brokers. Clustered queues are like regular queues in the sense that incoming messages are distributed in round-robin fashion to all connected queue receivers. Messages are not automatically forwarded between queues in the cluster.

As described in the replicated topic example, clients must use connection factories that are configured for multiple JMS server connections. These can be looked up using JNDI or by creating cluster-aware connection factories using the Novell exteNd Messaging Platform's API's and set cluster information using the JMQServerInfo data structure.

Sender applications requires very little configuration to take advantage of clustering. JMS will automatically and transparantly re-connect the sender to any available server in the cluster if one server fails. Since queues are bound to a specific broker, sender applications will want to specify the broker name as part of the queue name.

As was the case with topic subscribers, queue receivers must sending message acknowledgements to a particular JMS server. Since queue receiver must deal with queue failures in an application specific manner, JMS only supports connect time fault-tolerance for consumer applications. However, queue receivers can connect to a queue using any available broker.

When connecting to a queue on a particular broker, the following string is used:

   queueName@brokerName

The examples in this section are not different from previous point to point examples since the behavior is illustrated by running the examples. Below is the source code for a regular queue sender that sends messages to any of two servers in a cluster. The queue name is derived from the command line.

package distQueue;
                                                                           
import javax.naming.InitialContext;
                                                                           
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.QueueSender;
import javax.jms.DeliveryMode;
import javax.jms.QueueSession;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
                                                                           
public class Sender
{
    public static void main(String[] args) throws Exception
    {
    |   // get the initial context
    |   InitialContext ctx = new InitialContext();
    |                                                                      
    |   // lookup the queue object
    |   Queue queue = (Queue) ctx.lookup("queue://" + args[0]);
    |                                                                      
    |   // lookup the queue connection factory
    |   QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.
    |       lookup("queue/connectionFactory");
    |                                                                      
    |   // create a queue connection
    |   QueueConnection queueConn = connFactory.createQueueConnection();
    |                                                                      
    |   // create a queue session
    |   QueueSession queueSession = queueConn.createQueueSession(false,
    |       Session.DUPS_OK_ACKNOWLEDGE);
    |                                                                      
    |   // create a queue sender
    |   QueueSender queueSender = queueSession.createSender(queue);
    |   queueSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    |                                                                      
    |   // create a simple message to say "Hello"
    |   TextMessage message = queueSession.createTextMessage();
    |   message.setText(new java.util.Date().toString());
    |                                                                      
    |   // send the message
    |   queueSender.send(message);
    |                                                                      
    |   // print what we did
    |   System.out.println("sent: " + message.getText());
    |                                                                      
    |   // close the queue connection
    |   queueConn.close();
    }
}

As can be seen from the Sender class, the JNDI lookup string contains two host/port pairs to allow fail-over. Note that the client applications must also be started with cluster information in the ORBDefaultInitRef, as illustrated in the ANT build script.

The Receiver class also uses the same JNDI lookup string to ensure connection time fail-over. Again, the queue name from which to receive is taken as a command line option:

package distQueue;
                                                                           
import javax.naming.InitialContext;
                                                                           
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.QueueSession;
import javax.jms.QueueReceiver;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
                                                                           
public class Receiver
{
    public static void main(String[] args) throws Exception
    {
    |   // get the initial context
    |   InitialContext ctx = new InitialContext();
    |                                                                      
    |   // lookup the queue object
    |   Queue queue = (Queue) ctx.lookup("queue://" + args[0]);
    |                                                                      
    |   // lookup the queue connection factory
    |   QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.
    |       lookup("queue/connectionFactory");
    |                                                                      
    |   // create a queue connection
    |   QueueConnection queueConn = connFactory.createQueueConnection();
    |                                                                      
    |   // create a queue session
    |   QueueSession queueSession = queueConn.createQueueSession(false,
    |       Session.AUTO_ACKNOWLEDGE);
    |                                                                      
    |   // create a queue receiver
    |   QueueReceiver queueReceiver = queueSession.createReceiver(queue);
    |                                                                      
    |   // start the connection
    |   queueConn.start();
    |                                                                      
    |   // receive a message
    |   TextMessage message = (TextMessage) queueReceiver.receive();
    |                                                                      
    |   // print the message
    |   System.out.println("received: " + message.getText());
    |                                                                      
    |   // close the queue connection
    |   queueConn.close();
    }
}

Assume that two brokers are running on the local host in a cluster on ports 53506 and 53507, with names B1 and B2, respectively. The table below provides a summary of what happens when sending or receiving messages to different queues with varying availability of the two servers. The sender and receiver are assumed to be run with the flag ORBDefaultInitRef=iioploc://localhost:53506,localhost:53507.

Command B1 B2 Description
Sender queue0
up
up
Message is sent to queue0 on B1 (the broker where the sender is connected to). This is the usual behavior in a non-clustered environment.
Receiver queue0
up
up
Message is received from queue0 on B1 (the broker where the receiver connects to). This is again the usual behavior in a non-clustered environment.
Sender queue0
up
down
Message is sent to queue0 on B1. The fact that B2 is down does not affect the sender.
Receiver queue0
up
down
Message is received from queue0 on B1.
Sender queue0
down
up
Message is sent to queue0 on B2. Note that the message is not sent to B1 since the default (local) broker is always used when no broker name is specified as part of the queue name.
Receiver queue0
down
up
Message is received from queue0 on B2.
Sender queue0@B1
down
up
Message is sent to queue0 on B1 via B2. B2 will route the message to B1 when it re-starts. The message can not be received from queue0 on B2.
Receiver queue0@B1
up
up
Message is received from queue0 on B1 via B2. B2 effectively sets up a proxy receiver on behalf of the client application.

The queue semantics should be very intuitive because it is the same as in a single broker environment. A messages is sent to and received from a queue on a particular broker but since it is a cluster, the message can be routed via other (available) brokers in the cluster.



Copyright © 2003, 2004 Novell, Inc. All rights reserved. Copyright © 2001, 2002, 2003 SilverStream Software, LLC. All rights reserved.