HareDo
¶ ↑
About¶ ↑
I'll bet that most people don't realize that when they use RabbitMQ
they are actually using the core software that runs big-freaking telecom systems (OTP – Open Telecom Platform). That in itself is reason enough to consider RabbitMQ
.
But RabbitMQ
uses both an elegant but also somewhat elaborate model called AMQP 0-9-1. Notice the 0-9-1. This is important. Versions later than this are crap. 0-9-1 is a work of pure genius and art which was later utterly mauled by committees. Fortunately enough for you, RabbitMQ
is and most likely always will be AMQP 0-9-1. Hurray!
But the bottom line is this: You might be one of those pragmatic people that doesn't want to have to know everything about AMQP. You want just want that fancy telecom thing-a-majigger software to pump massive volumes of your messages from one place to another using Ruby and give you 5 9's doing it.
If that's true for you then you've come to the right place.
Haredo is an easy-to-use framework for creating peer-to-peer applications in Ruby via RabbitMQ. Built atop the Bunny AMQP client, it uses a small subset of RabbitMQ's capabilities to provide an intuitive client/server framework for implementing network services and simple peer to peer applications. It is specifically designed for those who are not familiar with the ins and outs of the AMQP protocol and employs RabbitMQ's default AMQP exchange to implement direct peer to peer communication.
This framework's design targets a common case within web applications and other miscellaneous contexts (logging, event handling, data retrieval, etc.), when you just need to send a request somewhere and get a reply back. In this case specifically:
-
Clients send messages asynchronously (to one or more peers) and can receive synchronously (blocking with a configurable timeout).
-
Services run indefinitely either within the same process (in separate threads), or separate processes.
In summary, this gem employs a small, practical and useful subset of the vast array of features available within RabbitMQ
to make it easy to whip up network services quickly and easily without having to master all aspects of RabbitMQ
or the AMQP protocol.
How It Works¶ ↑
The easiest way to explain how it works is by example. The following program illustrates creating a network service and a client to talk to it. The service simply takes a number from the message headers, adds one to it and sends the result back.
In this example, both client and service run simultaneously in the same script. However, services can run in separate scripts just as easily (more on this below).
#!/usr/bin/env ruby require 'haredo/peer' $mq_host = 'localhost' $mq_username = 'guest' $mq_password = 'guest' $queue = 'HareDo' class Service < HareDo::Peer def initialize(name) super connect(:user=>$mq_username, :password=>$mq_password, :host=>$mq_host) end def serve(msg) #dump_message msg data = msg.headers['i'].to_i + 1 reply(msg, :data => data.to_s) end end service = Service.new($queue) service.listen(:queue => $queue, :blocking => false) client = HareDo::Peer.new() client.connect(:user=>$mq_username, :password=>$mq_password, :host=>$mq_host) 1.upto(10) do |i| id = client.send($queue, :headers => { :i => i }) msg = client.receive(id) puts msg.data.to_i == i + 1 end # Very important -- you will leave queues open on RabbitMQ if you don't do this service.disconnect() client.disconnect()
The service takes a single argument – the name. This name is the name of the AMQP queue used to send messages to the service. A queue is like an email address, phone number or an IP address – it's just a unique identifier. In this case, its identifer you use to direct messages to the service. The Service::run()
method then causes the server to listen on the queue.
The client sends 10 messages to the service, each containing monotonically increasing integer values. It waits for the response (using a blocking timeout) after each call and checks the results.
The client is under no obligation to wait for a response. This is just how we've coded the service in this example – to send something back. The interaction between the two forms an ad-hoc protocol. Hence the point of this project – quick and easy network protocols with minimal code (thanks to RabbitMQ
and Bunny).
Clients¶ ↑
There are three basic use-cases the client addresses:
-
Event Dispatch: Client sends one or messages expecting no response. This is the case when you want to log events – you don't really care about getting a response back. You just want to send out the event and be done with it.
-
Send/Receive: Client sends one or more messages expecting response and processes those responses in no particular order. That is, there is no need to correlate a response with its request, it can be processed in its own right. This could be like loading a set of images or documents from a remote service. When the document arrives, you don't really care about the request used to obtain it. All you care about is the document itself. In this case, a client could send out 10 messages asynchronously and then wait in a loop for each to come back.
-
Remote Procedure Call (RPC): Client sends one or more messages expecting response and needs to processes those responses in a specific order. That is, it needs to correlate the response to its associated request. This is like calling a function. You need to know the specific like a return value for the specific function you called.
Event dispatch is simple – you don't send back a reply from the service, nor do you expect a response on the client-side. Take a simple logging service for example. Here is the server code:
#!/usr/bin/env ruby require 'haredo/peer' $mq_host = 'localhost' $mq_username = 'guest' $mq_password = 'guest' class Service < HareDo::Peer def initialize(name) super connect(:user=>$mq_username, :password=>$mq_password, :host=>$mq_host) end def listen() super :queue => @name end def serve(msg) # Log the message somewhere end end Service.new('logger').listen() service.disconnect()
The client code is a follows:
#!/usr/bin/env ruby require 'haredo/peer' $mq_host = 'localhost' $mq_username = 'guest' $mq_password = 'guest' client = HareDo::Peer.new() client.connect(:user=>$mq_username, :password=>$mq_password, :host=>$mq_host) client.send('logger', :data => 'Backup script failed') client.disconnect()
You can run both in the same script by setting the service code to non-blocking (passing :blocking => false
to the Service::run()
method). Services can run in either blocking or non-blocking mode, enabling you to run them either in the same script as clients (non-blocking), or in separate scripts where they idle and wait for incoming messages (blocking). By default, services run in blocking mode, as it is assumed that they will be be run in their own independent scripts/processes. Here is the above example in a single script where the services runs in non-blocking mode:
#!/usr/bin/env ruby require 'haredo/peer' $mq_host = 'localhost' $mq_username = 'guest' $mq_password = 'guest' class Service < HareDo::Peer def initialize(name) super connect(:user=>$mq_username, :password=>$mq_password, :host=>$mq_host) end def listen() super :queue => @name, :blocking => false end def serve(msg) # Log the data somewhere end end Service.new('logger').listen() client = HareDo::Peer.new() client.connect(:user=>$mq_username, :password=>$mq_password, :host=>$mq_host) client.send('logger', :data => 'Backup script failed') service.disconnect() client.disconnect()
Send/Receive¶ ↑
The send/receive pattern is illustrated in the initial example – the client sends out ten integers and wait for the response. Each time is calls send() it gets a correlation ID as the return value. It passed this ID into receive() which then looks for a return message with that correlation ID. This is how you ensure you get back the message you are looking for (as opposed to some other message that happened to arrive). While the responses come back in the correct order in this example, there is no guarantee that this will happen every time. This is why the correlation ID is important.
Remote Procedure Call¶ ↑
This is the purpose of the Client::call()
method. It assigns a specific ID to the outgoing message and waits for a response back containing that message ID. So call()
acts like a synchronous function call. It sends out a request and blocks waiting for a specific reply. Here is an example:
#!/usr/bin/env ruby require 'haredo/peer' $mq_host = 'localhost' $mq_username = 'guest' $mq_password = 'guest' class Service < HareDo::Peer def initialize(name) super connect(:user=>$mq_username, :password=>$mq_password, :host=>$mq_host) end def serve(msg) reply(msg, :data => 'Reply') end end service = Service.new('rpc') service.listen(:queue => service.name, :blocking => false) client = HareDo::Peer.new() client.connect(:user=>$mq_username, :password=>$mq_password, :host=>$mq_host) response = @client.call('rpc', :data => 'jujifruit') client.disconnect() service.disconnect()
The call()
method will block waiting for the response up to client.timeout
seconds (floating point value). The default is 1 second. You can change this to whatever value you like. If a timeout occurs, call()
will return nil
.
The call()
method is just a convenience method that wraps send()
and receive()
. Interally, send()
always assigns a unique message ID to each outgoing message. It passes this ID back as the return value. You can use this ID to fetch specific messages from receive()
.
Normally receive()
just returns the oldest message to arrive. Internally is keeps a queue of messages as they are recieved. When you call it with no arguments, it just pops the oldest message off the queue (if there are any), otherwise it blocks waiting for a message to come in until the timeout interval expires. However, if you pass an ID to receive()
, it will look specifically for a message with that ID. If none is found, again, it will block for the timeout interval waiting for it to come it. If none does, it returns nil
.
If call()
times-out, subsequent calls will fail to get the response, even if it comes in later as it will be discarded. This keeps dilatory messages from filling up the receive queue. Once you've given up on a message, it makes no sense to keep it around should it arrive sometime in the future.
The Service Side¶ ↑
Notice that the service uses the Service::reply()
method rather than the send()
method to return a response. This method takes the original request message, extracts the to
address along with message ID and addresses a message back to the client. It also sets the correlation_id
to the message ID. This correlates the reply and lets the client know that the message coming in is specifically a reply to a given message. This is needed this because what if another peer sent the client a message which happened to have the same message ID? The client would confuse that message as the reply from the service and that would mess things up fast. Therefore the corellation_id
value lets the client know that the message is a reply to a previous request and can then use the message ID to correlate the two.
Services¶ ↑
Most of what needs to be said about services has be covered. However there is one important feature RabbitMQ
accords you which we need to address.
Say you want to scale your service from just a single thread or process and distribute accross multiple machines which are perhaps even on different networks around the world. You can enable this by modifying a single field. The only thing you need to do differently is redefine one method – Service::createQueue()
. The following is updates the above example and makes is available to run in this way:
#!/usr/bin/env ruby require 'haredo/peer' $mq_host = 'localhost' $mq_username = 'guest' $mq_password = 'guest' class Service < HareDo::Peer def initialize(name) super connect(:user=>$mq_username, :password=>$mq_password, :host=>$mq_host) end def createQueue() return @channel.queue(@queue_name, :auto_delete => true) end def serve(msg) reply(msg, :data => 'Reply') end end Service.new('rpc').listen(queue => service.name) service.disconnect()
The only thing we did is remove the exclusive
attribute which is by default in the HareDO::Service
base class. By making the queue non-exclusive, mulitple service instances can now connect to it and process requests. RabbitMQ
will automatically distribute messages equally across all the running services, dividing up the load. You can now scale your service to as many machines and processes as you like.
Peers¶ ↑
In case you haven't noticed by now, there is no distinction in code between a service and client. Everything is implemented in the same class: HareDo::Peer
. So the only difference between a client and service is how you use the Peer
class. To create custom services, you just derive from Peer
and implement a serve()
method. Or, you can get away with not even doing that – you could just write a plugin.
Plugins¶ ↑
The library includes a simple plugin architecture which is included in the Peer
class. This allows message processing to be delegated to external modules written outside the library.
For example, I could write a simple echo plugin like this:
require 'haredo/plugin' class Plugin < HareDo::Plugin UUID = 'echo' def initialize(service, config, env) super UUID, service, config, env end def process(msg) @peer.reply(msg, :data=>msg.data) end end # class Plugin
Plugins are loaded in anonymous modules at runtime, each having its own private namespace. The Peer
class then looks inside the anonymous module and extracts the Plugin
class defined within, instantiates it, and adds it to the set of loaded modules.
Plugins are identified by their UUID. To send a message to a plugin, you include the UUID of the plugin in the message headers.
Each plugin is passed a reference to the peer that uses them on construction. This is stored in the @peer
member. This is the message gateway that your plugin uses to send/reply with.
Now I can implement my service and call it as follows:
#!/usr/bin/env ruby $queue = 'myservice' service = HareDo::Peer.new($queue) service.plugins.load('echo') service.listen(queue => service.name, :blocking => false) client = HareDo::Peer.new() client.connect(:user=>$mq_username, :password=>$mq_password, :host=>$mq_host) response = @client.call($queue, headers=>{:uuid=>'echo'}, :data=>'jujifruit') client.disconnect() service.disconnect()
In this case, I didn't even need to implement a custom service. I just loaded my plugin. By default the base class implementation of HareDo::Peer::serve()
passes all messages into the plugin system. If no plugin is found to service the message, it is simply dropped.
By default the Peer::plugins::load()
method iterates over each entry in Ruby PATH
and looks within the haredo/plugins
directly of each entry. This value is given by the @module_path_prefix
member of Peer
, which you can modify to suit your needs. Therefore, if you stored your plugins in /opt/share/haredo/plugins
and you wanted to load from there, you could do it a couple different ways:
Add /opt/share
to your Ruby path:
$:.unshift '/opt/share' service.plugins.load('echo')
Add /opt/share
to your Ruby path and modify the service's @module_path_prefix
:
$:.unshift '/opt/share/haredo/plugins' # Remove the module prefix. service.module_path_prefix = '' service.plugins.load('echo')
In this case, every module in the Ruby path that has the name echo.rb
will be a candidate. The first file to match will be loaded as the module. You can see that in this case, namespace collisions are a very real danger. That's why the @module_path_prefix
member exists – there is a very low probability that there will be another module with a given name located in the Ruby path which has a relative directory and name of haredo/plugins/echo.rb
.
Configuration¶ ↑
You can pass configuration data into a plugin when you load it using the Peer::plugins.loadConfig()
method. This takes a Hash whose keys are the names of the plugins to load and the values are Hashes passed into each plugin as the configuration. The contents of the configuration are complete up to you as the plugin author.
For example:
#!/usr/bin/env ruby $queue = 'myservice' service = HareDo::Peer.new($queue) # Load modules from configuration config = { # module name => module configuration 'example' => { 'x' => '1' }, 'echo' => { 'y' => '2' } } service.plugins.loadConfig(config) service.listen(:queue => $queue, :blocking => false) client = HareDo::Peer.new() client.connect(:user=>$mq_username, :password=>$mq_password, :host=>$mq_host) response = @client.call($queue, headers=>{:uuid=>'example'}, :data => 'jujifruit') service.disconnect() client.disconnect()
By using this method to load plugins, you can store all your plugin configuration in a YAML file and load it at runtime to load/configure all of your plugins for your service(s).
There is nothing stopping you from using plugins within a client context as well. If you get a message in from a peer, you can simply check for a UUID in the message headers and pass it to @plugins.process()
if you so choose.
The plugin architecture is a powerful way to implement and add an endless variety of services with minimal code.
A complete example of using plugins is illustrated in the basic.rb
unit test with the source distribution.
Daemons¶ ↑
Say you have a service written and now you want an easy way to start/stop/manage it. The haredo
command-line utility is included just for that. You can add your service to a simple configuration file, run haredo start
and your service is online. The default configuration file is in /etc/haredo/haredo.conf
and looks something like this:
--- system: broker: host: localhost user: guest password: guest port: '5672' ssl: enable: false port: '5671' tls_cert: '/etc/haredo/ssl/cert.pem' tls_key: '/etc/haredo/ssl/key.pem' tls_ca: '/etc/haredo/ssl/cacert.pem' daemon: queue: name: 'jujifruit' properties: exclusive: false auto_delete: true key: 'b8d96b66-8cae-4e16-b5ec-d607483b061e' modules: - vaquero/service/config services: haredo: path_prefix: haredo/plugins plugins: echo: juji: fruit myplugins: path_prefix: mylib/plugins plugins: log: db: host: localhost db: vaquero user: root
Notice the SSL section. You have the option of connecting with or without SSL. For SSL, you must include the SSL section and specifically set enable
to true. You must also provide the full path to the client cert, key and CA cert you generated for the RabbitMQ
broker. See www.rabbitmq.com/ssl.html and rubybunny.info/articles/tls.html for more information on using SSL.
The Listen Queue¶ ↑
The daemon creates a queue using the queue
entries. The name is given by the name
attribute and the other optional queue properties are given by the properties
attributes. The will be applied to the daemon's listen queue.
Services¶ ↑
Servces are sets of plugins that are loaded from different places. Usually these are delineated by the need to change the path_prefix
value to load a given set of modules. At load time, the daemon iterates over each entry in services, modified the path_prefix
according to the given value and then attempts to load all the plugins listed. It prints the results for each plugin to standard error. If a plugin fails to load, it will also log this in syslog.
Modules¶ ↑
Modules are additional modules you want to daemon to load. This is done before services are processed. This is for when you want to modify the daemon's code in some way (adding or overriding methods), or also including some custom initialization. For example, say I have a module vaquero/service/config
that adds database connection functionality to HareDo::Service::Config
mixin as follows:
require 'jw/dbi' module HareDo module Service module Config # Connect to the Database # # @return Return a client instance if connection was successful, nil otherwise def connectDb(params) vars = { host: params['host'], db: params['db'], username: params['user'], password: params['password'], driver: 'QPSQL', } if params.has_key? 'port' vars[:port] = params['port'] end db = JW::DBI::Database.new() if db.open(vars) == false raise "Failed to connect: #{db.error()}" end return db end end # class Context end # module HareDo end # module Vaquero
This uses my custom database abstraction extensions which I want to add to HareDo::Daemon
. To incorporate it, I simply add it to the modules section on the config file. The daemon will load this on startup, the module will add the custom method, and now my plugins defined in the services
section of the configuration file will have it available to them on service load. For example:
require 'haredo/plugin' class Plugin < HareDo::Plugin UUID = 'log' attr_reader :db, :logrecord def initialize(peer, config) super UUID, peer, config @db = @peer.connectDB(config['db']) end def finalize() super @db.close() end . . . end
The peer
member is a reference to the Daemon
instance which has been modified to include the connectDB()
method.
This gives you an easy way to customize the daemon without having to derive a new class and instantiate it (and thus having to create a new binary and/or modify the command-line utility).
Command Line¶ ↑
The command line utility is simple. It includes start
, stop
and status
commands. It also has a help (-h
) switch, along with each command. So you can check the status using the haredo status
command which detects if the daemon is running. If it is, it sends it a message and get back basic information in JSON, including loaded plugins:
bash $ haredo status {"time":"2013-10-01 12:12:02 -0500","pid":"25290","plugins":["echo","dbi","log","status"]}
There is also a subcommand called config
. This is used to test and set values in the configuration file for broker and (otionally) a database. It exists mainly as a utility for installers. You can test a broker connection as follows:
bash $ PRBPASSWORD=guest haredo config broker -t -u guest -s localhost {:user=>"guest", :password=>nil, :port=>nil, :vhost=>nil, :host=>"localhost", :ssl=>false} succeeded
The RabbitMQ
user password is passed in via environmental variables.
Most likely you will never need to use this as it was designed specifically for use in the Debian installer.
Installation¶ ↑
You can install directly from the command line via Ruby gems as follows:
gem install haredo
Building from Source¶ ↑
This project uses CMake. To build the gem:
cmake . make gem
The resultant gem will be generated in the pkg
directory.
To build on Debian/Ubuntu:
fakeroot debian/rules clean dpkg-buildpackage -b -uc -us
The resulting binary package will be created in the parent directory.
License¶ ↑
Copyright information is located in the COPYING file. The software license is located in the LICENSE file.