Monday, August 27, 2012

Dynamic queue creation with WMQ

Today's task is dynamically creating persistant queues in WMQ(IBM's Websphere MQ) from within Java code. Out of all of the message brokers that I have researched, WMQ definetely was the hardest to figure out how to create queues from code.  Although, on the positive side, at least it is possible.  

In order to invoke the broker from Java, you must use WMQ's API which they call PCF (Programming Command Format).  PCF allows you to do a wide range of admin tasks such as viewing current queues and creating new queues. In order to perform a specific action, you must create a PCFMessage and add all of the needed parameters to it (You also need to make sure that you add them in the correct order!?). A list of all of the required and optional parameters can be found here. After setting all of the parameters, all you need to do is create a PCFMessageAgent and send the parameters. Done. It seems easy now, but most of my time was spent fishing through IBM documentation looking for the correct syntax and parameter names.

If you are interested in other functionality provided by PCF, there are samples included with the installation of the WMQ client found here.  After installation, by default the samples are location at: C:\Program Files (x86)\IBM\WebSphere MQ\tools\pcf\samples
   public void createQueue(String queueName) throws Exception {
      PCFMessageAgent agent = null;

      // Create the PCF message type for the create queue.
      // NOTE: The parameters must be added in a specific order or an exception (3015) will be thrown.
      // The required order is Name, type, then all optional parameters
      PCFMessage pcfCmd = new PCFMessage(MQConstants.MQCMD_CREATE_Q);

      // Queue name - Mandatory.
      pcfCmd.addParameter(MQConstants.MQCA_Q_NAME, queueName);

      // Queue Type - Optional.
      pcfCmd.addParameter(MQConstants.MQIA_Q_TYPE, MQConstants.MQQT_LOCAL);

      // Add description.
      pcfCmd.addParameter(MQConstants.MQCA_Q_DESC, "Queue created by PCF test");
      //set the max queue depth
      pcfCmd.addParameter(MQConstants.MQIA_MAX_Q_DEPTH, 999999);
      //Make queue persistant
      pcfCmd.addParameter(MQConstants.MQIA_DEF_PERSISTENCE, MQConstants.MQPER_PERSISTENT);

      //backout queue
      pcfCmd.addParameter(MQConstants.MQCA_BACKOUT_REQ_Q_NAME, "INVALID." + queueName);
      //harden backout count-determines if backout count is saved after restart
      try {
          agent = new PCFMessageAgent(BROKER_HOST, BROKER_PORT, DEFAULT_CHANNEL_NAME);
          //An exception will be thrown if the queue already exists
          System.out.println("Successfull created queue: " + queueName);
      catch (PCFException pcfe) {
       System.out.println("Failed to create queue");
       if (pcfe.reasonCode == MQConstants.MQRCCF_OBJECT_ALREADY_EXISTS) {
        System.out.println("The queue " + queueName + " already exists on the queue manager.");
       // Disconnect the agent.

Monday, July 16, 2012

Dynamic messaging with Qpid and Spring Integration

One of the key functions of a message broker that I would like to be able to do is to dynamically send messages to queues at runtime. This means that I don’t want to have to create queues ahead of time within the broker and then alter my application’s configuration to connect to the new queue.  What I want to do is have the application determine when it needs to create a new queue (either by customer interaction, database values, or some of arbitrary logic), and then have new messages sent to it.

With RabbitMQ, if you send a message to a queue that doesn’t exist, it creates the queue for you and then adds the message to it. Qpid, on the other hand, doesn’t work like this.  If you try to send a message to a non-existent queue, you’ll get an error.

While fishing through Qpid documentation I read that if a consumer is created for a queue that doesn’t exist, then the broker will create the queue.  This is better than nothing, but still not really what I want.

Then while testing a few different settings, I realized that from within Java, you can specify arguments (similar to the arguments you would use if you were to create a queue from the command line) when you specify the message destination.  This changes everything.  Now I can create durable queues on the fly, which is just what I wanted.

Qpid isn’t directly supported by Spring Integration in the same way that RabbitMQ is, but since it can be accessed using JMS, you can use the existing JMS classes within SI do to send and receive messages.This can be done by creating a service activator that has a DynamicJmsTemplate which will send the messages.  

The JmsTemplate will need a connection factory.  The Qpid examples use a JNDI connection factory, but I would rather have my connection defined in my config and not have to worry about JNDI.  For this, I used the AMQConnectionFactory provided in the qpid client library.  I’m not sure if this is the best class to use, but for now it will work.



At this point all you will need to do is to implement a sendMessage method within the handler class and use the JmsTemplate to send the message.  But wait, how do we guarantee that the queue will be created if it doesn’t exist when we send the message?  The trick here is specify the create arguments with the destination.

String destinationName = queueName + "; {create: always, node:{type:queue,durable:True}}";
this.jmsTemplate.convertAndSend(destinationName, objectToSend);

And there you have it. You can now create queues on the fly without having to worry about errors.  Just make sure you create a consumer for the queue you just created, otherwise your queues will fill up pretty quick.

Message Broker Questions/Concerns

Since the last post, I have narrowed my queue search down to two brokers; Qpid and RabbitMQ.  The following is a list of questions that focus on the management aspect of each broker.  What I have found is that management doesn’t seem to be a major focus of either broker, but both do have some support built in.  RabbitMQ suggests users develop plugins to meet their specific needs, but since the plugins need to be written in Erlang, it would require learning a new language (which I’m all for, it will just take a little time).  The management console that I refer to for Qpid is the Red Hat Enterprise MRG Management Console. Ideally I would have liked to be able to control everything from a single GUI, but that doesn’t seem to be the case with either broker.
  1. How are the brokers managed?
·        [Qpid] Web UI and command line
·        [RabbitMQ] Web UI, command line, and REST API
  1. Does the broker support replaying messages in queue ‘A’ through queue ‘B’ ?
·        [QPID] Messages can be moved through the web management console (although I received errors when I tried to do it).
·        [RabbitMQ] No
  1. Does the broker support purging of queues?
·        [Qpid] Messages can be purged through the web management console (although I received errors when I tried to do it).
·        [RabbitMQ] Messages can be purged through the web management UI
  1. What type of error monitoring does the broker support? Email notifications? System alerts?
·        [Qpid] Possibly
·        [RabbitMQ] Possibly
·        If yes, how are they setup?
·        [Qpid] From the command line it is possible to use qpid-printevents. This could be used within a script and send an email if a specific problem arises, such as the server goes down. Alternatively, we could create a script/application that monitors specific queues and sends email notifications if a specific queue reasons a predefined threshold.
·        [RabbitMQ] There are a few third party plugins that could be used to monitor the system. Alternatively, we could do something similar to the Qpid approach.
  1. What happens if the queues exceed a certain threshold? Exceeds given memory?
·        [Qpid] Application connection is closed by broker.
·        [RabbitMQ] The message broker stops accepting messages.
  1. How to start/stop/restart brokers?
·        [Qpid] server command line
·        [RabbitMQ] server command line
  1. Where are the logs located?
·        [Qpid] /var/log/messages/
·        [RabbitMQ] /var/log/rabbitmq/
  1. Where are configuration files located?
·        [Qpid] /etc/qpid/qpidc.conf
·        [RabbitMQ] /etc/rabbitmq/rabbitmq.config NOTE: the file is not created by default, but can be created manually if needed.
  1. How do we add additional queues? Delete queues?
·        [Qpid] Can be done through web management UI
·        [RabbitMQ] Can be done through web management UI
  1. Are the queued messages visible?
·        [Qpid] No
·        [RabbitMQ] The first X number of messages are visible from the web UI.
  1. Does the broker provide a way to view broker/system statistics?
·        [Qpid] Provides a plug-in that allows users to view the server statistics from the web UI. The web UI also displays some statistics about messages queued/dequeued, number of messages, size of messages, and the number of consumers.
·        [RabbitMQ] The web UI provides visibility to the same information Qpid provides plus users can view IP address of the open producer/consumer connections.
  1. What kind of clients (i.e. languages/runtimes) are supported/available?
·        [Qpid] Java, C++, .NET, Python, Ruby
·        [RabbitMQ] Java, C/C++, web, .NET, Python, Ruby, and many others. Complete list can be found here.
  1. What language is the broker written in?
·        [Qpid] Two implementations, Java and C++. The RHN version is C++
·        [RabbitMQ] Erlang
  1. How do you add additional users?
·        [Qpid] server command line
·        [RabbitMQ] web UI
  1. Can queues be paused?
·        [Qpid] No
·        [RabbitMQ] No
  1. Can test messages be added from the management console?
·        [Qpid] No
·        [RabbitMQ] Yes, through the web UI
  1. How are the queue maximums set?
·        [Qpid] there are four fields that need to be set: file-count(nbr files in queue's persistence journal), file-size (file size in pages), max-queue-size, max-queue-count. The broker appears to go by the smallest value set. ex: if the max-queue-count is set to 200k messages, but the file-count is set to its default (8) and that size can't hold 200k messages, the limit is what the file-count can hold (10k messages).
·        [RabbitMQ] Maximums cannot be set. RabbitMQ states that it shouldn't be needed because if the queue depth starts to grow exceptionally large, it writes the messages to disk so they will not be lost.
  1. Can users view the maximum number of messages for a queue?
·        [Qpid] not from web UI. The queue file sizes can be viewed from the command line
·        [RabbitMQ] N/A
  1. Can messages be routed between life-cycles?
·        [Qpid] Supported by using qpid-route
·        [RabbitMQ] Supported by using the Shovel plugin.
  1. Can queues be setup to route failed messages to an invalid (dead-letter) queue?
·        [Qpid] Yes, but only from the command line
·        [RabbitMQ] Yes, from the web UI and from the command line