To use persistent messages in pub/sub messaging, you need to use durable subscribers. Durable subscriber are registered to the JMS server with a persistent name, such that subscriptions can be remembered by the server even if a crash occurs.Durable subscribers can be disconnected at times. During this time, the server will still collect messages on behalf of the subscriber. When the subscriber re-connects to the JMS server, it will receive all messages sent to the topic since last time it was connected (subject to message expiry).
Since durable subscribers continue to consume resources while disconnected, it is important for application to manage durable subscribers properly. If an application "forgets" about a durable subscriber, this will slow down other subscribers and ultimately exhaust server resources.
There are four sample programs for this applications:
1 Create Initial Subscription
For durable subscribers there is a notion of "initial subscription" which is the time where a durable subscriber connects to the server for the first time. Although a durable subscriber always connects the same way, the first time marks the point from which the server remembers the subscription.After the initial subscription, JMS will store all messages sent to a topic for future delivery to the durable subscriber. While transient messages will still not survive a server crash, this facility provided guranteed delivery of persistent messages to durable subscribers.
The sample program below shows how to create an initial subscription:
Compared to a regular, non-durable subscriber durable subscribers must perform two initial steps:package durable; import javax.naming.InitialContext; import javax.jms.Topic; import javax.jms.Session; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; public class Subscribe { public static void main(String[] args) throws Exception { | // get the initial context | InitialContext ctx = new InitialContext(); | | // lookup the topic object | Topic topic = (Topic) ctx.lookup("topic/topic0"); | | // lookup the topic connection factory | TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx. | lookup("topic/connectionFactory"); | | // create a topic connection | TopicConnection topicConn = connFactory.createTopicConnection(); | topicConn.setClientID("durable"); | | // create a topic session | TopicSession topicSession = topicConn.createTopicSession(false, | Session.AUTO_ACKNOWLEDGE); | | // create a topic subscriber | TopicSubscriber topicSubscriber; | topicSubscriber = topicSession.createDurableSubscriber(topic, "mySub"); | | System.out.println("created durable subscriber mySub"); | | // close the topic connection | topicConn.close(); } }
- Set the ClientID on the connection. The ClientID is part of the durable subscriber's name. You must set the ClientID on the connection before creating any sessions.
- Use the
createDurableSubscriber
factory method of the topic session. This method requires a second parameter with the name of the durable subscriber.2 Publish Messages
Publishing messages to durable subscribers is done in exactly the same way as to non-durable subscribers. Please consult the other pub/sub example for a description of the publisher application.The above publisher simply takes an optional command line argument to indicate the number of messages to be sent. This number of messages is publisher to the topic with a header property "id" set to the number of the message. By default 100 messages are published.package durable; import javax.naming.InitialContext; import javax.jms.Topic; import javax.jms.Session; import javax.jms.Message; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; public class Publish { public static void main(String[] args) throws Exception { | // get the initial context | InitialContext ctx = new InitialContext(); | | // lookup the topic object | Topic topic = (Topic) ctx.lookup("topic/topic0"); | | // lookup the topic connection factory | TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx. | lookup("topic/connectionFactory"); | | // create a topic connection | TopicConnection topicConn = connFactory.createTopicConnection(); | | // create a topic session | TopicSession topicSession = topicConn.createTopicSession(false, | Session.AUTO_ACKNOWLEDGE); | | // create a topic publisher | TopicPublisher topicPublisher = topicSession.createPublisher(topic); | | // create a simple message | Message message = topicSession.createMessage(); | | // get the number of messages to publish | int numMsgs = (args.length > 0) ? new Integer(args[0]).intValue() : 100; | | // publish the messages | for (int i=0; i < numMsgs; i++) { | | message.setIntProperty("id", i); | | topicPublisher.publish(message); | } | | System.out.println("published " + numMsgs + " on " + topic); | | // close the topic connection | topicConn.close(); } }3 Consume Messages
The message consumer is similar to the non-durable subscriber. However, as described above, a durable subscriber must set the connection ClientID and use thecreateDurableSubscriber
method to register the subscriber. If the subscription already exists, the subscriber is simply re-connected.Below is the source code for consuming messages with a durable subscriber:
By looking at the "id" header property of incoming messages, the durable subscriber prints out the range of messages received. Since the subscriber uses AUTO_ACKNOWLEDGE, messages are automatically acknowledged by the session just before thepackage durable; import javax.naming.InitialContext; import javax.jms.Topic; import javax.jms.Session; import javax.jms.Message; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; public class Consume { public static void main(String[] args) throws Exception { | // get the initial context | InitialContext ctx = new InitialContext(); | | // lookup the topic object | Topic topic = (Topic) ctx.lookup("topic/topic0"); | | // lookup the topic connection factory | TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx. | lookup("topic/connectionFactory"); | | // create a topic connection | TopicConnection topicConn = connFactory.createTopicConnection(); | topicConn.setClientID("durable"); | | // create a topic session | TopicSession topicSession = topicConn.createTopicSession(false, | Session.AUTO_ACKNOWLEDGE); | | // create a topic subscriber | TopicSubscriber topicSubscriber; | topicSubscriber = topicSession.createDurableSubscriber(topic, "mySub"); | | // start the connection | topicConn.start(); | | // get the number of messages to receive | int numMsgs = (args.length > 0) ? new Integer(args[0]).intValue() : 100; | | int firstID = 0, lastID = 0; | | // receive the message | for (int i=0; i < numMsgs; i++) { | | Message message = topicSubscriber.receive(); | | if (i == 0) firstID = message.getIntProperty("id"); | | if (i == numMsgs - 1) lastID = message.getIntProperty("id"); | } | | System.out.println("consumed messages " + firstID + "-" + lastID); | | // close the topic connection | topicConn.close(); } }receive
method returns.It is not possible for two durable subscribers with the same name to be active at the same time. The
createDurableSubscriber
method will throw a JMSException if another durable subscriber is already active with the same connection ClientID and subscriber name.4 Unsubscribe Durable Subscriber
A durable subscriber is un-subscribed using the topic session'sunsubscribe
method. As mentioned above, it is important that applications always clean-up and un-subscribe their durable subscribers to avoid exhausting system resources.Below is the source code for un-subscribing a durable subscriber:
When un-subscribing a durable subscriber, the ClientID must be the same as the one used to initially creating the subscriber. Note that any session can be used to un-subscribe a durable subscriber, as long as its connection has the correct ClientID.package durable; import javax.naming.InitialContext; import javax.jms.Topic; import javax.jms.Session; import javax.jms.TopicSession; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; public class Unsubscribe { public static void main(String[] args) throws Exception { | // get the initial context | InitialContext ctx = new InitialContext(); | | // lookup the topic object | Topic topic = (Topic) ctx.lookup("topic/topic0"); | | // lookup the topic connection factory | TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx. | lookup("topic/connectionFactory"); | | // create a topic connection | TopicConnection topicConn = connFactory.createTopicConnection(); | topicConn.setClientID("durable"); | | // create a topic session | TopicSession topicSession = topicConn.createTopicSession(false, | Session.AUTO_ACKNOWLEDGE); | | // unsubscribe the durable topic subscriber | topicSession.unsubscribe("mySub"); | | System.out.println("unsubscribed mySub"); | | // close the topic connection | topicConn.close(); } }
Copyright © 2000-2003, Novell, Inc. All rights reserved. |