The Curious Dev

Various programming sidetracks and shiny-object detours

Messaging With RabbitMQ

Over the last few weeks, since I finished reading RabbitMQ in Action, I have played about with RabbitMQ a little … and I like it!

The whole integration / middleware / messaging space is wide and varied, but for this post I’m just referring to messaging with RabbitMQ.

What is RabbitMQ?

RabbitMQ is a mature open source project (with commercial support) and has been around for quite a few years. Rabbit implements the AMQP format and there is also support via plugins for the STOMP and MQTT with some tinkering done to utilise the (perhaps) more pervasive JMS. The vast platform and language compatibility that is available points to wide adoption.

Messaging is often at the core of an integration / middleware solution but can also be very useful for decoupling various modules of a system.

This post is just a brief intro to one particular form of messaging with RabbitMQ, using a “fanout” exchange whereby messages are routed to an exchange/broker which will then broadcast the message to all connections to that exchange (in our case, just one) with the corresponding routing key.

A similar and perhaps more powerful function would be Publisher/Subscriber “PubSub” using a Topic that potentially has multiple Subscribers that “listen” on a topic or many topics for specific message types (as configured in the Subscriber’s connection to the exchange in the routing key). A scenario that might be useful to have multiple subscribers to a stream of data would be HR data in a company where various information for all new employees is published to a topic and various applications consume that data in numerous ways. For example, an Incident Reporting System might keep track of all employees’ contact details or a Payroll System would get updated bank account information. In a future post I might go deeper into this.

There doesn’t have to just be one publisher either, you might have several systems providing information into an exchange, weather station sensors perhaps, that is then subscribed to by many other systems.

RabbitMQ can be configured in many ways to suit your project needs, be they performance considerations, reliability, guaranteed delivery etc. I recommend getting RabbitMQ in Action to further explore these options.

Installing RabbitMQ

  • get the Erlang runtime from here and install
  • then, get the RabbitMQ server from here and install
  • if you’re using Groovy, you can use this @Grape/@Grab to sort out the RabbitMQ client dependencies:
@Grapes(
    @Grab(group='com.rabbitmq', module='amqp-client', version='3.0.1')
)

If you’re using Java, you will likely need to manually download the client library from here, unless you’re using Maven or equivalent.

Creating a Producer

I’ve written a pretty simple little groovy class that generates a payload of raw text from one of my earlier scripts, as described in my post here: Simple Data Collectors.

Essentially, it just calls the temperature script and publishes that provided temperature to “temperatureExchange” which is configured to be a “fanout” exchange. I’ve thrown in a random sleep (1-30secs) to mix it up a bit.

//WeatherProducer.groovy
@Grapes(
    @Grab(group='com.rabbitmq', module='amqp-client', version='3.0.1')
)
import com.rabbitmq.client.*

class WeatherProducer {
    ConnectionFactory factory = null
    Connection conn = null
    String exchangeName = "WeatherExchange"
    String routingKey = "weather"
    String queueName = "WeatherQueue"
    Expando bomSite = null
        
    def start() {
        Channel channel = getNewChannel()
        channel.exchangeDeclare(exchangeName, "fanout", true)
        channel.queueDeclare(queueName, true, false, false, null)
        def extractTemperatures = new ExtractTemperatures()

        while (true) {
            int sleepTime = Math.random()*30000
            
            String myData = extractTemperatures.processBOM(bomSite)
            
            channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, myData.getBytes())

            println "Sleeping for ${sleepTime}ms"
            sleep(sleepTime)
        }

        channel.close()
        conn.close()
    }
    
    Channel getNewChannel() {
        if (factory == null || conn == null) {
            factory = new ConnectionFactory()
            factory.setUsername("guest")
            factory.setPassword("guest")
            factory.setVirtualHost("/")
            factory.setHost("localhost")
            factory.setPort(5672)
            conn = factory.newConnection()
        }
        
        return conn.createChannel()
    }
}

Weather Producer

Creating a Subscriber

//WeatherConsumer.groovy
@Grapes(
    @Grab(group='com.rabbitmq', module='amqp-client', version='3.0.1')
)
import com.rabbitmq.client.*

class WeatherConsumer {
    ConnectionFactory factory = null
    Connection conn = null
    String exchangeName = "WeatherExchange"
    String routingKey = "weather"
    String queueName = "WeatherQueue"
    
    def execute() {
        Channel channel = getNewChannel()
        channel.queueBind(queueName, exchangeName, "#");
        println " Queue: ${queueName} "

        boolean noAck = false;
        def consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, noAck, consumer);
        boolean running = true

        while(running) {
            QueueingConsumer.Delivery delivery;
            try {
                delivery = consumer.nextDelivery();
                println new String(delivery.body) + " - " + System.currentTimeMillis()
            } catch (InterruptedException ie) {
                //we don't really care .. do we?
                println ie.getMessage()
                running = false
            }
            
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
    
    Channel getNewChannel() {
        if (factory == null || conn == null) {
            factory = new ConnectionFactory()
            factory.setUsername("guest")
            factory.setPassword("guest")
            factory.setVirtualHost("/")
            factory.setHost("localhost")
            factory.setPort(5672)
            conn = factory.newConnection()
        }
        
        return conn.createChannel()
    }
}

Weather Consumer

Running It

I’ve written a very simple script file RunWeatherMessager.groovy that takes one argument and simply starts up a Producer or a Consumer with a ‘pub’ or ‘sub’. Like this:

c:\>groovy RunWeatherMessager.groovy pub

Just open two consoles and run a Producer and a Consumer.

//RunWeatherMessager.groovy
if (args[0] == "pub") {
    println "Starting Weather producer"
    def pub = new WeatherProducer()
    pub.start()
}
else if (args[0] == "sub") {
    println "Starting Weather consumer"
    def sub = new WeatherConsumer()
    sub.execute()
}
else {
    println "===ERROR==="
    println "Only valid options are 'pub' or 'sub'."
}

Code

The above code examples are here

Comments

Included file 'facebook_like.html' not found in _includes directory