1 Topic Replication Example

The examples in this directory show how to take advantage of the JMS cluster capability. It shows a publisher and a subscriber that uses a topic, which is replicated on two clustered servers. The publisher can send messages to any available server. In this example the subscriber will fail-over to any available server if it looses its connection.

To benefit from fault-tolerance, clients must use connection factories that are configured for multiple JMS server connections. This is typically done using a JNDI lookup string which contains multiple host/port pairs. Since topics are available on any server in a cluster, no special lookup or configuration is required for topics.

Sender applications requires very little configuration to take advantage of clustering. JMS will automatically and transparantly re-connect the producer to any available server in the cluster if one server fails. Note that if a connection has any sessions with a consumer, fail-over is not transparent. It only works for producer-only clients.

Since consumer applications are sending acknowledgements to a particular JMS server, fault-tolerance is limited to connect time. This means that client applications must take action and decide what to do when a server failure is detected. This can be done either when the receive method fails, or when the connection's exception listener is notified about a lost connection.

Below is the source code for a topic publisher that sends messages to any of two servers in a cluster. If one server fails, the JMS client runtime will automatically re-connect the sender to a different server and continue sending messages. No intervention is required by the publisher.

package cluster;
                                                                           
import java.util.Date;
                                                                           
import javax.jms.Topic;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.JMSException;
import javax.jms.TopicSession;
import javax.jms.TopicPublisher;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
                                                                           
import javax.naming.InitialContext;
                                                                           
/**
   This program shows how to write a fault-tolerant topic publisher
   that connects to any server in a jBroker MQ cluster. If a connection
   fails, jBroker MQ automatically fails the publisher over to another
   available server.
 */
                                                                           
public class Publisher
{
    TopicSession _session;
    TopicPublisher _publisher;
                                                                           
    String lookup = "topic://localhost:53506,localhost:53507/";
                                                                           
    /**
       Main program - create and connect a topic publisher.
       @param args command line parameters
       @exception Exception if connecting to server fails
     */
    public static void main(String[] args) throws Exception
    {
    |   Publisher pub = new Publisher();
    |   pub.connect();
    |   while (true) {
    |   |   Thread.sleep(1000);
    |   |   pub.publish();
    |   }
    }
                                                                           
    /**
       Connect a topic publisher.
       @exception Exception if connecting to server fails
     */
    public void connect() throws Exception
    {
    |   InitialContext ctx = new InitialContext();
    |   Topic topic = (Topic) ctx.lookup("topic://topic0");
    |   TopicConnectionFactory fac = (TopicConnectionFactory) ctx.
    |       lookup(lookup + "connectionFactory");
    |   TopicConnection conn = fac.createTopicConnection();
    |   _session = conn.createTopicSession(false,
    |       Session.AUTO_ACKNOWLEDGE);
    |   _publisher = _session.createPublisher(topic);
    }   
                                                                           
    /**
       Publish a TextMessage to topic
       @exception JMSException if publish failed
     */
    public void publish() throws JMSException
    {
    |   String text = new Date().toString();
    |   Message msg = _session.createTextMessage(text);
    |   _publisher.publish(msg);
    |   System.out.println(msg);
    }
}
As can be seen from the Publisher class, the JNDI lookup string contains two host/port pairs to allow fail-over. Note that the client applications must also be started with cluster information in the ORBDefaultInitRef, as illustrated in the ANT build script.

The Subscriber class also uses the same JNDI lookup string to ensure connection time fail-over. Not all consumer applications can fail-over because this opens a window where messages can be lost of duplicated. Consumer applications that are less sensitive to message duplicates or message can manually fail-over as shown below:

package cluster;
                                                                           
import javax.jms.Topic;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.JMSException;
import javax.jms.TopicSession;
import javax.jms.TopicConnection;
import javax.jms.TopicSubscriber;
import javax.jms.MessageListener;
import javax.jms.ExceptionListener;
import javax.jms.TopicConnectionFactory;
                                                                           
import javax.naming.InitialContext;
                                                                           
import com.sssw.jms.api.JMQConnectionLostException;
                                                                           
/**
   This program shows how to write a fault-tolerant topic subscriber
   that connects to any server in a jBroker MQ cluster. If a connection
   fails, the subscriber re-issues the connect command to connect to
   any available server.
 */
                                                                           
public class Subscriber implements MessageListener, ExceptionListener
{
    String lookup = "topic://localhost:53506,localhost:53507/";
                                                                           
    /**
       Main program - create and connect a topic subscriber.
       @param args command line parameters
       @exception Exception if connecting to server fails
     */
    public static void main(String[] args) throws Exception
    {
    |   Subscriber sub = new Subscriber();
    |   sub.connect();
    |   System.out.println("subscriber ready to process messages ...");
    |   while (true) {
    |   |   synchronized (sub) { sub.wait(); }
    |   }
    }
                                                                           
    /**
       Connect a topic subscriber.
       @exception Exception if connecting to server fails
     */
    public void connect() throws Exception
    {
    |   InitialContext ctx = new InitialContext();
    |   Topic topic = (Topic) ctx.lookup("topic://topic0");
    |   TopicConnectionFactory fac = (TopicConnectionFactory) ctx.
    |       lookup(lookup + "connectionFactory");
    |   TopicConnection conn = fac.createTopicConnection();
    |   conn.start();
    |   TopicSession session = conn.createTopicSession(false,
    |       Session.AUTO_ACKNOWLEDGE);
    |   conn.setExceptionListener(this);
    |   TopicSubscriber subscriber = session.createSubscriber(topic);
    |   subscriber.setMessageListener(this);
    }   
                                                                           
    /**
       Receive message asynchronously.
       @param message incoming JMS message
     */
    public void onMessage(Message message)
    {
    |   System.out.println(message);
    }
                                                                           
    /**
       Intercept lost connection exception.
       @param ex a JMSException
     */
    public void onException(JMSException ex)
    {
    |   if (ex instanceof JMQConnectionLostException)
    |   {
    |   |   try {
    |   |   |   connect();
    |   |   |   System.out.println("connected to other server ...");
    |   |   } catch (Exception exc) {
    |   |   |   System.out.println("failed to connect: " + exc);
    |   |   |   System.exit(1);
    |   |   }
    |   }
    |   else
    |   {
    |   |   ex.printStackTrace(); // other error handling ...
    |   }
    }
}
Note that if a subscriber simply re-connects to another available server when a server crash is detected, it may loose messages. This happens because the subscriber is effectively disconnected for a little while, and since topics do not retain messages, any messages arriving at the topic during this interval will be dropped.

Durable subscribers should in general not fail-over since messages buffered on behalf of the subscriber is local to the broker where the subscription is active. In this respect, a durable subscriber behaves similar to a clustered queue. If a durable subscription is active on two brokers, it will be treated as two separate durable subscribers.

The Subscriber uses the ExceptionListener's onException callback to get notifications about lost connections by checking for the JMQConnectionLostException exception. When this exception occurs, the subscriber simply re-connects using the same JNDI lookup string.

Back to top


Copyright © 2000-2003, Novell, Inc. All rights reserved.