The browser applications allow managers to inspect the messages on a queue or associated with a durable subscriber. Although the P2P model supports a QueueBrowser class, this only allows client applications to look at queue messages. It is not possible to consume (acknowledge) messages with a queue browser. The Pub/Sub model does not support a browser object.The Novell exteNd Messaging Platform's JMS supports a special acknowledge mode, which allows regular queue receivers and topic subscribers to acknowledge (consume) individual messages. This allows client applications to exploit a very fine grained acknowledgement model. This acknowledge mode can for instance be used to create browser applications that remove individual messages from a destination.
There are two examples in this section, one for queues and one for durable subscribers. Note that a "topic browser" for transient topic subscribers is less meaningful because transient subscribers only get messages received by the topic after the subscription was made. Both browsers use a common base class, which sets up a table with incoming messages:
As can be seen from thepackage browser; import java.util.Vector; import java.util.TreeMap; import java.util.Enumeration; import java.awt.Dimension; import java.awt.BorderLayout; import java.awt.event.MouseEvent; import java.awt.event.ActionEvent; import java.awt.event.MouseAdapter; import java.awt.event.ActionListener; import javax.swing.JFrame; import javax.swing.JTable; import javax.swing.JMenuItem; import javax.swing.JPopupMenu; import javax.swing.JScrollPane; import javax.swing.JOptionPane; import javax.swing.ListSelectionModel; import javax.swing.table.DefaultTableModel; import javax.jms.Queue; import javax.jms.Message; import javax.jms.MapMessage; import javax.jms.Destination; import javax.jms.TextMessage; import javax.jms.ObjectMessage; import javax.jms.BytesMessage; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.StreamMessage; import javax.jms.MessageListener; import com.sssw.jms.core.JMQMessage; /** This class ilustrates how to use the individual message acknowledge mode to browse a destination and allow managers to remove (acknowledge) individual messages from the destination. The messages on the destination are displayed in a Swing table. @see QueueManager @see TopicManager */ public abstract class Manager implements MessageListener { protected Destination _dest; protected DefaultTableModel _model; protected JFrame _frame; protected JTable _table; protected JScrollPane _scroll; protected TreeMap _msgs = new TreeMap(); /** Setup the main table to contain incoming messages. */ protected void gui() { | Vector cols = new Vector(6); | cols.addElement("Number"); | cols.addElement("Type"); | cols.addElement("Priority"); | cols.addElement("Expiry"); | cols.addElement("Timestamp"); | cols.addElement("Mode"); | | _model = new DefaultTableModel(new Vector(), cols) { | | public boolean isCellEditable(int row, int col) { | | | return false; | | } | }; | | _table = new JTable(_model); | int mode = ListSelectionModel.MULTIPLE_INTERVAL_SELECTION; | _table.setSelectionMode(mode); | | _table.addMouseListener(new MouseAdapter() { | | public void mousePressed(MouseEvent me) { | | | if (me.isPopupTrigger() && _table.getSelectedRowCount() > 0) { | | | | JPopupMenu pop = new JPopupMenu(); | | | | if (_table.getSelectedRowCount() == 1) { | | | | | JMenuItem props = new JMenuItem("Properties..."); | | | | | props.addActionListener(new ActionListener() { | | | | | | public void actionPerformed(ActionEvent ev) { | | | | | | | int n = _table.getSelectedRow(); | | | | | | | Vector data = _model.getDataVector(); | | | | | | | Vector row = (Vector) data.elementAt(n); | | | | | | | try { | | | | | | | | showProps((Long)row.elementAt(0)); | | | | | | | } catch (JMSException ex) { | | | | | | | | JOptionPane.showMessageDialog(_table, | | | | | | | | ex.getMessage(), "Error", | | | | | | | | JOptionPane.ERROR_MESSAGE); | | | | | | | } | | | | | | } | | | | | }); | | | | | pop.add(props); | | | | } | | | | JMenuItem ack = new JMenuItem("Acknowledge..."); | | | | ack.addActionListener(new ActionListener() { | | | | | public void actionPerformed(ActionEvent ev) { | | | | | | int[] n = _table.getSelectedRows(); | | | | | | Vector data = _model.getDataVector(); | | | | | | Long[] msgNo = new Long[n.length]; | | | | | | for (int i = 0; i < n.length; i++) { | | | | | | | Vector row = (Vector) data.elementAt(n[i]); | | | | | | | msgNo[i] = (Long) row.elementAt(0); | | | | | | } | | | | | | try { | | | | | | | ackMessages(msgNo); | | | | | | | for (int i = 0; i < n.length; i++) | | | | | | | _model.removeRow(n[n.length - 1 - i]); | | | | | | } catch (JMSException ex) { | | | | | | | JOptionPane.showMessageDialog(_table, | | | | | | | ex.getMessage(), "Error", | | | | | | | JOptionPane.ERROR_MESSAGE); | | | | | | } | | | | | } | | | | }); | | | | pop.add(ack); | | | | pop.pack(); | | | | pop.show(_table, me.getX(), me.getY()); | | | | pop.setVisible(true); | | | } | | } | }); | | _scroll = new JScrollPane(_table); | | _frame = new JFrame(_dest.toString()); | _frame.getContentPane().setLayout(new BorderLayout()); | _frame.getContentPane().add(_scroll); | _frame.setSize(new Dimension(400,300)); | _frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); | _frame.setVisible(true); } /** Show table with message properties. @param msgNo the message number @exception JMSException if properties could not be retrieved */ private void showProps(Long msgNo) throws JMSException { | Vector cols = new Vector(3); | cols.addElement("Name"); | cols.addElement("Type"); | cols.addElement("Value"); | DefaultTableModel model = new DefaultTableModel(new Vector(), cols) { | | public boolean isCellEditable(int row, int col) { | | | return false; | | } | }; | JTable table = new JTable(model); | Message msg = (Message) _msgs.get(msgNo); | Enumeration e = msg.getPropertyNames(); | while (e.hasMoreElements()) { | | Vector row = new Vector(3); | | String prop = (String) e.nextElement(); | | Object value = msg.getObjectProperty(prop); | | row.addElement(prop); | | row.addElement(objType(value)); | | row.addElement(value); | | model.addRow(row); | } | | JOptionPane.showMessageDialog(_table, table, | "Properties for message " + msgNo, | JOptionPane.INFORMATION_MESSAGE); } /** Acknowledge an array of messages. @param msgNo array of message numbers @exception JMSException if acknowledge failed */ private void ackMessages(Long[] msgNo) throws JMSException { | String msg = "Really acknowledge " + msgNo.length + " messages?"; | int ok = JOptionPane.showOptionDialog(_table, msg, "Confirm", | JOptionPane.OK_CANCEL_OPTION, JOptionPane.QUESTION_MESSAGE, | null, null, null); | if (ok == JOptionPane.OK_OPTION) | { | | for (int i = 0; i < msgNo.length; i++) { | | | Message message = (Message) _msgs.get(msgNo[i]); | | | message.acknowledge(); | | | _msgs.remove(msgNo[i]); | | } | } } /** Add incoming message to the table. @param message message that recently arrived on queue */ public void onMessage(Message message) { | try | { | | Vector row = new Vector(6); | | Long msgNo = new Long(((JMQMessage)message).getMessageNumber()); | | _msgs.put(msgNo, message); | | row.addElement(msgNo); | | row.addElement(typeStr(message)); | | row.addElement(new Integer(message.getJMSPriority())); | | row.addElement(new Long(message.getJMSExpiration())); | | row.addElement(new Long(message.getJMSTimestamp())); | | row.addElement(modeStr(message.getJMSDeliveryMode())); | | _model.addRow(row); | } | catch (JMSException ex) | { | | System.err.println("error processing: " + message); | } } /** Utility to get type string for message. @param message a JMS message @return string with message type */ private static String typeStr(Message message) { | if (message instanceof BytesMessage) return "bytes"; | else if (message instanceof MapMessage) return "map"; | else if (message instanceof ObjectMessage) return "object"; | else if (message instanceof StreamMessage) return "stream"; | else if (message instanceof TextMessage) return "text"; | else return "plain"; } /** Utility to get property type string for message. @param obj a property from a JMS message @return string with property type */ private static String objType(Object obj) { | if (obj instanceof Boolean) return "boolean"; | else if (obj instanceof Byte) return "byte"; | else if (obj instanceof Short) return "short"; | else if (obj instanceof Integer) return "int"; | else if (obj instanceof Long) return "long"; | else if (obj instanceof Float) return "float"; | else if (obj instanceof Double) return "double"; | else if (obj instanceof String) return "String"; | else if (obj instanceof byte[]) return "byte[]"; | else throw new RuntimeException("unknown type: " + obj.getClass()); } /** Utility to get string for message delivery mode. @param mode the JMS delivery mode @return "transient" or "persistent" */ private static String modeStr(int mode) { | switch (mode) | { | | case DeliveryMode.PERSISTENT: return "persistent"; | | case DeliveryMode.NON_PERSISTENT: return "transient"; | | default: throw new RuntimeException("illegal mode: " + mode); | } } }Manager
class, messages are acknowledged using theacknowledge
method of the Message interface, which is the as when using theCLIENT_ACKNOWLEDGE
mode. TheManager
simply uses aJTable
and stores incoming messages in a vector-based table model.The
QueueManager
class extends theManager
class and adds functionality to connect to a queue specified on the command line. When constructing the session, theINDIVIDUAL_ACKNOWLEDGE
mode is used, which is defined in the JMQSession interface.Thepackage browser; import java.rmi.RemoteException; import javax.naming.InitialContext; import javax.naming.NamingException; import javax.jms.Queue; import javax.jms.Session; import javax.jms.QueueSession; import javax.jms.JMSException; import javax.jms.QueueReceiver; import javax.jms.MessageListener; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import com.sssw.jms.api.JMQSession; import com.sssw.jms.api.JMQDestination; import com.sssw.jms.api.JMQQueueConnection; import com.sssw.jms.api.admin.JMQDestinationAdmin; public class QueueManager extends Manager { /** Driver for QueueManager. @param args Command line arguments with queue */ public static void main(String[] args) { | if (args.length != 1) { | | System.out.println("usage: QueueManager <queue>"); | | try { | | | listQueues(); | | } catch (Exception ex) { | | | System.err.println("unable to list queues: " + ex.getMessage()); | | } | | System.exit(0); | } | | QueueManager mgr = new QueueManager(); | | try { | | mgr.connect(args[0]); | } catch (Exception ex) { | | String msg = ex.getMessage(); | | if (msg == null || msg.length() == 0) msg = ex.toString(); | | System.err.println(msg); | | System.exit(0); | } | | mgr.gui(); } /** Print a list of available queues to console. @exception JMSException if JMS failed to created connection @exception NamingException if a JNDI error occurred @exception RemoteException if remote object unavailable */ public static void listQueues() throws JMSException, NamingException, RemoteException { | InitialContext ctx = new InitialContext(); | QueueConnectionFactory factory = (QueueConnectionFactory) | ctx.lookup("queue/connectionFactory"); | JMQQueueConnection conn = (JMQQueueConnection) | factory.createQueueConnection(); | JMQDestinationAdmin admin = conn.getDestinationAdmin(); | String[] queues = admin.getDestinations(JMQDestination.QUEUE); | if (queues.length > 0) | System.out.println("where queue is one of:"); | for (int i = 0; i < queues.length; i++) | System.out.println(" " + queues[i]); } /** Connect a receiver to queue. @param queue name of queue to connect to @exception JMSException if JMS failed to connect receiver @exception NamingException if a JNDI error occurred */ private void connect(String queue) throws JMSException, NamingException { | InitialContext ctx = new InitialContext(); | QueueConnectionFactory factory = (QueueConnectionFactory) | ctx.lookup("queue/connectionFactory"); | _dest = (Queue) ctx.lookup("queue/" + queue); | QueueConnection conn = factory.createQueueConnection(); | QueueSession session = conn.createQueueSession(false, | JMQSession.INDIVIDUAL_ACKNOWLEDGE); | QueueReceiver receiver = session.createReceiver((Queue)_dest); | receiver.setMessageListener(this); | conn.start(); } }TopicManager
class again extends the baseManager
class and adds functionality to connect a durable subscriber, which name and client identifier are specified on the command line along with the topic name. Note that if the subscription did not exist prior to running the topic manager, only new messages arriving on the topic will be displayed in the table.package browser; import java.rmi.RemoteException; import javax.naming.InitialContext; import javax.naming.NamingException; import javax.jms.Topic; import javax.jms.Session; import javax.jms.TopicSession; import javax.jms.JMSException; import javax.jms.TopicSubscriber; import javax.jms.MessageListener; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import com.sssw.jms.api.JMQSession; import com.sssw.jms.api.JMQDestination; import com.sssw.jms.api.JMQTopicConnection; import com.sssw.jms.api.admin.JMQSubscriberInfo; import com.sssw.jms.api.admin.JMQDestinationAdmin; public class TopicManager extends Manager { /** Driver for TopicManager. @param args Command line arguments with topic */ public static void main(String[] args) { | if (args.length != 2) { | | System.out.print("usage: TopicManager <topic>"); | | System.out.println(" <subscriber>[#clientId]"); | | try { | | | listTopics(); | | } catch (Exception ex) { | | | System.err.println("unable to list topics: " + ex.getMessage()); | | } | | System.exit(0); | } | | TopicManager mgr = new TopicManager(); | | try { | | mgr.connect(args[0], args[1]); | } catch (Exception ex) { | | String msg = ex.getMessage(); | | if (msg == null || msg.length() == 0) msg = ex.toString(); | | System.err.println(msg); | | System.exit(0); | } | | mgr.gui(); | mgr._frame.setTitle(mgr._dest.toString() + " - " + args[1]); } /** Print a list of available topics and their subscriptions to console. @exception JMSException if JMS failed to created connection @exception NamingException if a JNDI error occurred @exception RemoteException if remote object unavailable */ public static void listTopics() throws JMSException, NamingException, RemoteException { | InitialContext ctx = new InitialContext(); | TopicConnectionFactory factory = (TopicConnectionFactory) | ctx.lookup("topic/connectionFactory"); | JMQTopicConnection conn = (JMQTopicConnection) | factory.createTopicConnection(); | JMQDestinationAdmin admin = conn.getDestinationAdmin(); | String[] topics = admin.getDestinations(JMQDestination.TOPIC); | if (topics.length > 0) | System.out.println("where topic is one of:"); | for (int i = 0; i < topics.length; i++) { | | JMQSubscriberInfo[] info = admin.getDurableSubscribers(topics[i]); | | for (int j = 0; j < info.length; j++) | | System.out.println(" " + info[j].getName() + "#" + | | info[j].getClientId()); | | System.out.println(" " + topics[i]); | } } /** Connect a subscriber to topic. @param topic name of topic to connect to @exception JMSException if JMS failed to connect subscriber @exception NamingException if a JNDI error occurred */ private void connect(String topic, String name) throws JMSException, NamingException { | InitialContext ctx = new InitialContext(); | TopicConnectionFactory factory = (TopicConnectionFactory) | ctx.lookup("topic/connectionFactory"); | _dest = (Topic) ctx.lookup("topic/" + topic); | TopicConnection conn = factory.createTopicConnection(); | | // determine subscriber name and client id | String clientId = "browser"; | int index = name.indexOf("#"); | if (index != -1) { | | clientId = name.substring(index+1); | | name = name.substring(0, index); | } | conn.setClientID(clientId); | | TopicSession session = conn.createTopicSession(false, | JMQSession.INDIVIDUAL_ACKNOWLEDGE); | TopicSubscriber subscriber = | session.createDurableSubscriber((Topic)_dest, name); | subscriber.setMessageListener(this); | conn.start(); } }
Copyright © 2000-2003, Novell, Inc. All rights reserved. |