HareDo

About

I'll bet that most people don't realize that when they use RabbitMQ they are actually using the core software that runs big-freaking telecom systems (OTP – Open Telecom Platform). That in itself is reason enough to consider RabbitMQ.

But RabbitMQ uses both an elegant but also somewhat elaborate model called AMQP 0-9-1. Notice the 0-9-1. This is important. Versions later than this are crap. 0-9-1 is a work of pure genius and art which was later utterly mauled by committees. Fortunately enough for you, RabbitMQ is and most likely always will be AMQP 0-9-1. Hurray!

But the bottom line is this: You might be one of those pragmatic people that doesn't want to have to know everything about AMQP. You want just want that fancy telecom thing-a-majigger software to pump massive volumes of your messages from one place to another using Ruby and give you 5 9's doing it.

If that's true for you then you've come to the right place.

Haredo is an easy-to-use framework for creating peer-to-peer applications in Ruby via RabbitMQ. Built atop the Bunny AMQP client, it uses a small subset of RabbitMQ's capabilities to provide an intuitive client/server framework for implementing network services and simple peer to peer applications. It is specifically designed for those who are not familiar with the ins and outs of the AMQP protocol and employs RabbitMQ's default AMQP exchange to implement direct peer to peer communication.

This framework's design targets a common case within web applications and other miscellaneous contexts (logging, event handling, data retrieval, etc.), when you just need to send a request somewhere and get a reply back. In this case specifically:

In summary, this gem employs a small, practical and useful subset of the vast array of features available within RabbitMQ to make it easy to whip up network services quickly and easily without having to master all aspects of RabbitMQ or the AMQP protocol.

How It Works

The easiest way to explain how it works is by example. The following program illustrates creating a network service and a client to talk to it. The service simply takes a number from the message headers, adds one to it and sends the result back.

In this example, both client and service run simultaneously in the same script. However, services can run in separate scripts just as easily (more on this below).

#!/usr/bin/env ruby

require 'haredo/peer'

$mq_host     = 'localhost'
$mq_username = 'guest'
$mq_password = 'guest'
$queue       = 'HareDo'

class Service < HareDo::Peer

  def initialize(name)
    super
    connect(:user=>$mq_username, :password=>$mq_password, :host=>$mq_host)
  end

  def serve(msg)
    #dump_message msg
    data = msg.headers['i'].to_i + 1
    reply(msg, :data => data.to_s)
  end

end

service = Service.new($queue)
service.listen(:queue => $queue, :blocking => false)

client = HareDo::Peer.new()
client.connect(:user=>$mq_username, :password=>$mq_password, :host=>$mq_host)

1.upto(10) do |i|
  id = client.send($queue, :headers => { :i => i })
  msg = client.receive(id)
      
  puts msg.data.to_i == i + 1
end

# Very important -- you will leave queues open on RabbitMQ if you don't do this
service.disconnect()
client.disconnect()

The service takes a single argument – the name. This name is the name of the AMQP queue used to send messages to the service. A queue is like an email address, phone number or an IP address – it's just a unique identifier. In this case, its identifer you use to direct messages to the service. The Service::run() method then causes the server to listen on the queue.

The client sends 10 messages to the service, each containing monotonically increasing integer values. It waits for the response (using a blocking timeout) after each call and checks the results.

The client is under no obligation to wait for a response. This is just how we've coded the service in this example – to send something back. The interaction between the two forms an ad-hoc protocol. Hence the point of this project – quick and easy network protocols with minimal code (thanks to RabbitMQ and Bunny).

Clients

There are three basic use-cases the client addresses:

Event dispatch is simple – you don't send back a reply from the service, nor do you expect a response on the client-side. Take a simple logging service for example. Here is the server code:

#!/usr/bin/env ruby

require 'haredo/peer'

$mq_host     = 'localhost'
$mq_username = 'guest'
$mq_password = 'guest'

class Service < HareDo::Peer

  def initialize(name)
    super
    connect(:user=>$mq_username, :password=>$mq_password, :host=>$mq_host)
  end

  def listen()
    super :queue => @name
  end

  def serve(msg)
    # Log the message somewhere
  end

end

Service.new('logger').listen()
service.disconnect()

The client code is a follows:

#!/usr/bin/env ruby

require 'haredo/peer'

$mq_host     = 'localhost'
$mq_username = 'guest'
$mq_password = 'guest'

client = HareDo::Peer.new()
client.connect(:user=>$mq_username, :password=>$mq_password, :host=>$mq_host)

client.send('logger', :data => 'Backup script failed')
client.disconnect()

You can run both in the same script by setting the service code to non-blocking (passing :blocking => false to the Service::run() method). Services can run in either blocking or non-blocking mode, enabling you to run them either in the same script as clients (non-blocking), or in separate scripts where they idle and wait for incoming messages (blocking). By default, services run in blocking mode, as it is assumed that they will be be run in their own independent scripts/processes. Here is the above example in a single script where the services runs in non-blocking mode:

#!/usr/bin/env ruby

require 'haredo/peer'

$mq_host     = 'localhost'
$mq_username = 'guest'
$mq_password = 'guest'

class Service < HareDo::Peer

  def initialize(name)
    super
    connect(:user=>$mq_username, :password=>$mq_password, :host=>$mq_host)
  end

  def listen()
    super :queue => @name, :blocking => false
  end

  def serve(msg)
    # Log the data somewhere
  end

end

Service.new('logger').listen()

client = HareDo::Peer.new()
client.connect(:user=>$mq_username, :password=>$mq_password, :host=>$mq_host)
client.send('logger', :data => 'Backup script failed')

service.disconnect()
client.disconnect()

Send/Receive

The send/receive pattern is illustrated in the initial example – the client sends out ten integers and wait for the response. Each time is calls send() it gets a correlation ID as the return value. It passed this ID into receive() which then looks for a return message with that correlation ID. This is how you ensure you get back the message you are looking for (as opposed to some other message that happened to arrive). While the responses come back in the correct order in this example, there is no guarantee that this will happen every time. This is why the correlation ID is important.

Remote Procedure Call

This is the purpose of the Client::call() method. It assigns a specific ID to the outgoing message and waits for a response back containing that message ID. So call() acts like a synchronous function call. It sends out a request and blocks waiting for a specific reply. Here is an example:

#!/usr/bin/env ruby

require 'haredo/peer'

$mq_host     = 'localhost'
$mq_username = 'guest'
$mq_password = 'guest'

class Service < HareDo::Peer

  def initialize(name)
    super
    connect(:user=>$mq_username, :password=>$mq_password, :host=>$mq_host)
  end

  def serve(msg)
    reply(msg, :data => 'Reply')
  end
end

service = Service.new('rpc')
service.listen(:queue => service.name, :blocking => false)

client = HareDo::Peer.new()
client.connect(:user=>$mq_username, :password=>$mq_password, :host=>$mq_host)
response = @client.call('rpc', :data => 'jujifruit')

client.disconnect()
service.disconnect()

The call() method will block waiting for the response up to client.timeout seconds (floating point value). The default is 1 second. You can change this to whatever value you like. If a timeout occurs, call() will return nil.

The call() method is just a convenience method that wraps send() and receive(). Interally, send() always assigns a unique message ID to each outgoing message. It passes this ID back as the return value. You can use this ID to fetch specific messages from receive().

Normally receive() just returns the oldest message to arrive. Internally is keeps a queue of messages as they are recieved. When you call it with no arguments, it just pops the oldest message off the queue (if there are any), otherwise it blocks waiting for a message to come in until the timeout interval expires. However, if you pass an ID to receive(), it will look specifically for a message with that ID. If none is found, again, it will block for the timeout interval waiting for it to come it. If none does, it returns nil.

If call() times-out, subsequent calls will fail to get the response, even if it comes in later as it will be discarded. This keeps dilatory messages from filling up the receive queue. Once you've given up on a message, it makes no sense to keep it around should it arrive sometime in the future.

The Service Side

Notice that the service uses the Service::reply() method rather than the send() method to return a response. This method takes the original request message, extracts the to address along with message ID and addresses a message back to the client. It also sets the correlation_id to the message ID. This correlates the reply and lets the client know that the message coming in is specifically a reply to a given message. This is needed this because what if another peer sent the client a message which happened to have the same message ID? The client would confuse that message as the reply from the service and that would mess things up fast. Therefore the corellation_id value lets the client know that the message is a reply to a previous request and can then use the message ID to correlate the two.

Services

Most of what needs to be said about services has be covered. However there is one important feature RabbitMQ accords you which we need to address.

Say you want to scale your service from just a single thread or process and distribute accross multiple machines which are perhaps even on different networks around the world. You can enable this by modifying a single field. The only thing you need to do differently is redefine one method – Service::createQueue(). The following is updates the above example and makes is available to run in this way:

#!/usr/bin/env ruby

require 'haredo/peer'

$mq_host     = 'localhost'
$mq_username = 'guest'
$mq_password = 'guest'

class Service < HareDo::Peer

  def initialize(name)
    super
    connect(:user=>$mq_username, :password=>$mq_password, :host=>$mq_host)
  end

  def createQueue()
    return @channel.queue(@queue_name, :auto_delete => true)
  end   

  def serve(msg)
    reply(msg, :data => 'Reply')
  end
end

Service.new('rpc').listen(queue => service.name)
service.disconnect()

The only thing we did is remove the exclusive attribute which is by default in the HareDO::Service base class. By making the queue non-exclusive, mulitple service instances can now connect to it and process requests. RabbitMQ will automatically distribute messages equally across all the running services, dividing up the load. You can now scale your service to as many machines and processes as you like.

Peers

In case you haven't noticed by now, there is no distinction in code between a service and client. Everything is implemented in the same class: HareDo::Peer. So the only difference between a client and service is how you use the Peer class. To create custom services, you just derive from Peer and implement a serve() method. Or, you can get away with not even doing that – you could just write a plugin.

Plugins

The library includes a simple plugin architecture which is included in the Peer class. This allows message processing to be delegated to external modules written outside the library.

For example, I could write a simple echo plugin like this:

require 'haredo/plugin'

class Plugin < HareDo::Plugin

  UUID = 'echo'

  def initialize(service, config, env)
    super UUID, service, config, env
  end

  def process(msg)
    @peer.reply(msg, :data=>msg.data)
  end

end # class Plugin

Plugins are loaded in anonymous modules at runtime, each having its own private namespace. The Peer class then looks inside the anonymous module and extracts the Plugin class defined within, instantiates it, and adds it to the set of loaded modules.

Plugins are identified by their UUID. To send a message to a plugin, you include the UUID of the plugin in the message headers.

Each plugin is passed a reference to the peer that uses them on construction. This is stored in the @peer member. This is the message gateway that your plugin uses to send/reply with.

Now I can implement my service and call it as follows:

#!/usr/bin/env ruby

$queue = 'myservice'

service = HareDo::Peer.new($queue)
service.plugins.load('echo')
service.listen(queue => service.name, :blocking => false)

client = HareDo::Peer.new()
client.connect(:user=>$mq_username, :password=>$mq_password, :host=>$mq_host)
response = @client.call($queue, headers=>{:uuid=>'echo'}, :data=>'jujifruit')

client.disconnect()
service.disconnect()

In this case, I didn't even need to implement a custom service. I just loaded my plugin. By default the base class implementation of HareDo::Peer::serve() passes all messages into the plugin system. If no plugin is found to service the message, it is simply dropped.

By default the Peer::plugins::load() method iterates over each entry in Ruby PATH and looks within the haredo/plugins directly of each entry. This value is given by the @module_path_prefix member of Peer, which you can modify to suit your needs. Therefore, if you stored your plugins in /opt/share/haredo/plugins and you wanted to load from there, you could do it a couple different ways:

Add /opt/share to your Ruby path:

$:.unshift '/opt/share'
service.plugins.load('echo')

Add /opt/share to your Ruby path and modify the service's @module_path_prefix:

$:.unshift '/opt/share/haredo/plugins'

# Remove the module prefix.
service.module_path_prefix = ''

service.plugins.load('echo')

In this case, every module in the Ruby path that has the name echo.rb will be a candidate. The first file to match will be loaded as the module. You can see that in this case, namespace collisions are a very real danger. That's why the @module_path_prefix member exists – there is a very low probability that there will be another module with a given name located in the Ruby path which has a relative directory and name of haredo/plugins/echo.rb.

Configuration

You can pass configuration data into a plugin when you load it using the Peer::plugins.loadConfig() method. This takes a Hash whose keys are the names of the plugins to load and the values are Hashes passed into each plugin as the configuration. The contents of the configuration are complete up to you as the plugin author.

For example:

#!/usr/bin/env ruby

$queue = 'myservice'

service = HareDo::Peer.new($queue)

# Load modules from configuration
config = {
   # module name  => module configuration
   'example' => { 'x' => '1' },
   'echo'    => { 'y' => '2' }
}

service.plugins.loadConfig(config)
service.listen(:queue => $queue, :blocking => false)

client = HareDo::Peer.new()
client.connect(:user=>$mq_username, :password=>$mq_password, :host=>$mq_host)
response = @client.call($queue, headers=>{:uuid=>'example'}, :data => 'jujifruit')

service.disconnect()
client.disconnect()

By using this method to load plugins, you can store all your plugin configuration in a YAML file and load it at runtime to load/configure all of your plugins for your service(s).

There is nothing stopping you from using plugins within a client context as well. If you get a message in from a peer, you can simply check for a UUID in the message headers and pass it to @plugins.process() if you so choose.

The plugin architecture is a powerful way to implement and add an endless variety of services with minimal code.

A complete example of using plugins is illustrated in the basic.rb unit test with the source distribution.

Daemons

Say you have a service written and now you want an easy way to start/stop/manage it. The haredo command-line utility is included just for that. You can add your service to a simple configuration file, run haredo start and your service is online. The default configuration file is in /etc/haredo/haredo.conf and looks something like this:

---
system:
  broker:
    host: localhost
    user: guest
    password: guest
    port: '5672'
    ssl:
      enable: false
      port: '5671'
      tls_cert: '/etc/haredo/ssl/cert.pem'
      tls_key: '/etc/haredo/ssl/key.pem'
      tls_ca: '/etc/haredo/ssl/cacert.pem'
daemon:
  queue: 
    name: 'jujifruit'
    properties:
      exclusive: false
      auto_delete: true
  key: 'b8d96b66-8cae-4e16-b5ec-d607483b061e'
  modules:
    - vaquero/service/config
  services:
    haredo:
      path_prefix: haredo/plugins
      plugins:
        echo:
          juji: fruit
    myplugins:
      path_prefix: mylib/plugins
      plugins:
        log:
          db:
            host: localhost
            db: vaquero
            user: root

Notice the SSL section. You have the option of connecting with or without SSL. For SSL, you must include the SSL section and specifically set enable to true. You must also provide the full path to the client cert, key and CA cert you generated for the RabbitMQ broker. See www.rabbitmq.com/ssl.html and rubybunny.info/articles/tls.html for more information on using SSL.

The Listen Queue

The daemon creates a queue using the queue entries. The name is given by the name attribute and the other optional queue properties are given by the properties attributes. The will be applied to the daemon's listen queue.

Services

Servces are sets of plugins that are loaded from different places. Usually these are delineated by the need to change the path_prefix value to load a given set of modules. At load time, the daemon iterates over each entry in services, modified the path_prefix according to the given value and then attempts to load all the plugins listed. It prints the results for each plugin to standard error. If a plugin fails to load, it will also log this in syslog.

Modules

Modules are additional modules you want to daemon to load. This is done before services are processed. This is for when you want to modify the daemon's code in some way (adding or overriding methods), or also including some custom initialization. For example, say I have a module vaquero/service/config that adds database connection functionality to HareDo::Service::Config mixin as follows:

require 'jw/dbi'

module HareDo
module Service

module Config 

  # Connect to the Database
  #
  # @return Return a client instance if connection was successful, nil otherwise
  def connectDb(params)

    vars = { 
      host: params['host'],
      db: params['db'], 
      username: params['user'],
      password: params['password'],
      driver: 'QPSQL',
    }

    if params.has_key? 'port'
      vars[:port] = params['port']
    end

    db = JW::DBI::Database.new()

    if db.open(vars) == false
      raise "Failed to connect: #{db.error()}"
    end

    return db
  end

end # class Context

end # module HareDo
end # module Vaquero

This uses my custom database abstraction extensions which I want to add to HareDo::Daemon. To incorporate it, I simply add it to the modules section on the config file. The daemon will load this on startup, the module will add the custom method, and now my plugins defined in the services section of the configuration file will have it available to them on service load. For example:

require 'haredo/plugin'

class Plugin < HareDo::Plugin

  UUID = 'log'

  attr_reader :db, :logrecord

  def initialize(peer, config)
    super UUID, peer, config 

    @db = @peer.connectDB(config['db'])
  end

  def finalize()
    super

    @db.close()
  end

  .
  .
  .
end

The peer member is a reference to the Daemon instance which has been modified to include the connectDB() method.

This gives you an easy way to customize the daemon without having to derive a new class and instantiate it (and thus having to create a new binary and/or modify the command-line utility).

Command Line

The command line utility is simple. It includes start, stop and status commands. It also has a help (-h) switch, along with each command. So you can check the status using the haredo status command which detects if the daemon is running. If it is, it sends it a message and get back basic information in JSON, including loaded plugins:

bash $ haredo status
{"time":"2013-10-01 12:12:02 -0500","pid":"25290","plugins":["echo","dbi","log","status"]}

There is also a subcommand called config. This is used to test and set values in the configuration file for broker and (otionally) a database. It exists mainly as a utility for installers. You can test a broker connection as follows:

bash $ PRBPASSWORD=guest haredo config broker -t -u guest -s localhost
{:user=>"guest", :password=>nil, :port=>nil, :vhost=>nil, :host=>"localhost", :ssl=>false}
succeeded

The RabbitMQ user password is passed in via environmental variables.

Most likely you will never need to use this as it was designed specifically for use in the Debian installer.

Installation

You can install directly from the command line via Ruby gems as follows:

gem install haredo

Building from Source

This project uses CMake. To build the gem:

cmake .
make gem

The resultant gem will be generated in the pkg directory.

To build on Debian/Ubuntu:

fakeroot debian/rules clean
dpkg-buildpackage -b -uc -us

The resulting binary package will be created in the parent directory.

License

Copyright information is located in the COPYING file. The software license is located in the LICENSE file.