class HareDo::Peer

This is an abstract base class used for both Client and Service.

Attributes

channel[RW]
exchange[RW]
mid[R]
name[R]
plugins[R]
queue[R]
sleep_interval[RW]
timeout[RW]
trace[RW]

Public Class Methods

new(name=nil) click to toggle source
# File src/lib/haredo/peer.rb, line 169
def initialize(name=nil)
  # Message identifier
  @mid = 0

  # Client attributes
  @timeout        = 1.0
  @sleep_interval = 0.001
  @receive_queue  = {}
  @trace          = false

  @listen_queues = []

  # The number of messages to prefecth from Rabbit
  @prefetch = 10

  @plugins = Plugins::Manager.new(self)

  @name = name
end

Public Instance Methods

call(to, args) click to toggle source

Sends a message and waits for response

@param to The to address @param :data Message data @param :headers Message headers @param :headers Message from address @param :properties Message properties

@return Returns the response message if successful, nil otherwise. Will block until @timeout seconds elapse. Sleeps in busywait. Sleep interval is given by @sleep_interval.

# File src/lib/haredo/peer.rb, line 341
def call(to, args)

  data       = args[:data] || ''
  from       = args[:from]
  headers    = args[:headers] || {}
  properties = args[:headers] || {}

  # Get message id of sent message
  id = send(to, args)

  # Put blank entry in queue to indicate we are expecting a response with that
  # message id.
  @receive_queue[id] = nil

  return receive(id)
end
connect(args) click to toggle source

Connect to RabbitMQ

@param user The RabbitMQ username (default guest) @param password The RabbitMQ password (default guest) @param host The RabbitMQ host (default localhost) @param port The RabbitMQ port (default 5672) @param vhost The RabbitMQ vhost (default /) @param ssl Use SSL (default false) @return Returns true if connection was successful, false othewise.

# File src/lib/haredo/peer.rb, line 199
def connect(args)

  queue_args = { 
    #'x-expires'     => 30000
    #'x-message-ttl' => 1000
  }

  user         = args[:user]       || 'guest'
  password     = args[:password]   || 'guest'
  host         = args[:host]       || 'localhost'
  port         = args[:port]       || '5672'
  vhost        = args[:vhost]      || ''
  queue_props  = args[:queue]      || queue_args
  exchange     = args[:exchange]   || ''
  exclusive    = args[:exclusive]  || true
  ssl          = args[:ssl]        || {}
  tls_cert     = ssl['tls_cert']   || nil
  tls_key      = ssl['tls_key']    || nil
  tls_ca_certs = nil

  if ssl['tls_cert']
    tls_cert = ssl['tls_cert']
  end

  if ssl['tls_key']
    tls_key = ssl['tls_key']
  end

  if ssl['tls_ca']
    tls_ca_certs = [ssl['tls_ca']]
  end
      
  use_ssl = false
  if ssl.size > 0 and ssl['enable'] == true
    use_ssl = true
    port = ssl['port'] || '5671'
  end

  if vhost == '/'
    vhost = ''
  end

  if use_ssl == true
    @cnx = Bunny.new( "amqps://#{user}:#{password}@#{host}:#{port}#{vhost}",
                      :log_file => '/dev/null',
                      :tls_cert => tls_cert,
                      :tls_key => tls_key,
                      :tls_ca_certificates => tls_ca_certs)
  else
    @cnx = Bunny.new("amqp://#{user}:#{password}@#{host}:#{port}#{vhost}")
  end
  
  @cnx.start()

  @channel = @cnx.create_channel()

  @channel.prefetch(@prefetch)

  @queue = @channel.queue( '', 
                           :auto_delete => true, 
                           :exclusive   => exclusive,
                           :arguments   => queue_props )

  @exchange = @channel.default_exchange()

  return true
end
createQueue(args={}) click to toggle source

Defined the queue this service will listen on. Assumes a single-instance service therefore declares queue as exclusive.

# File src/lib/haredo/peer.rb, line 434
def createQueue(args={})

  queue_args = { 
    #'x-expires'     => 30000
    #'x-message-ttl' => 1000
  }

  queue_name   = args[:queue]
  queue_props  = args[:properties]  || queue_args
  auto_delete  = args[:auto_delete] || true
  exclusive    = true
  
  if args.has_key?(:exclusive)
    exclusive = args[:exclusive] 
  end

  return @channel.queue( queue_name, 
                         :auto_delete => true, 
                         :exclusive   => exclusive )
end
disconnect() click to toggle source

Disconnect from RabbitMQ

# File src/lib/haredo/peer.rb, line 268
def disconnect()

  if @queue != nil
    @queue.delete() 
    @queue = nil
  end

  @listen_queues.each do |listen_queue|
    listen_queue.delete() 
    listen_queue = nil
  end
  
  @listen_queues = {}

  if @cnx != nil
    @cnx.close()
    @cnx = nil
  end

  @plugins.shutdown()
end
listen(args={}) click to toggle source

Causes the service to listen for incoming messages.

@param :blocking If this is set to true, will go into indefinite blocking loop processing incoming messages.

@returns Returns nil if non-blocking. Never returns if blocking.

# File src/lib/haredo/peer.rb, line 462
def listen(args={})

  queue = @queue

  if args.has_key?(:queue)
    listen_queue = createQueue(args)
    @listen_queues << listen_queue

    if args.has_key?(:exchange)
      listen_queue.bind(args[:exchange])
    end

    queue = listen_queue
  end

  block = args[:blocking] || false

  if $syslog.nil?
    Syslog.open( "haredo #{@name}", Syslog::LOG_PID,
                 Syslog::LOG_DAEMON | Syslog::LOG_LOCAL7 )
  
    $syslog = true
  end

  Syslog.notice('listen()')
  
  queue.subscribe(:block => block, :manual_ack => true) do |info, props, data|
    @channel.acknowledge(info.delivery_tag, false)
    if serve(RabbitMQ::Message.new(info, props, data)) == false
      exit 0
    end
  end
end
receive(mid=nil) click to toggle source
# File src/lib/haredo/peer.rb, line 358
def receive(mid=nil)

  if mid != nil
    if @receive_queue.has_key?(mid)
      msg = @receive_queue[mid]

      if msg != nil
        @receive_queue.delete(mid)        
        return msg
      end
    end
  end

  now = Time::now.to_f
  
  while true
    delivery_info, properties, payload = @queue.pop()
    
    if delivery_info != nil
      msg = RabbitMQ::Message.new(delivery_info, properties, payload)
      
      if @trace == true
        dump_message(msg, 'receive')
      end
      
      if mid != nil 
        if properties[:correlation_id].to_i == mid.to_i
          # Reply flag must be set
          #if msg.headers['reply'] == 1
          return msg
          #end
        end
      end
      
      # Only add to receive queue if we are expecting it
      if @receive_queue.has_key?(mid)
        @receive_queue[msg.headers['id']] = msg
      end        
    end

    if (Time::now.to_f - now) > @timeout
      # Delete entry from receive queue
      @receive_queue.delete mid

      return nil
    end
    
    sleep @sleep_interval
  end
end
reply(msg, args) click to toggle source

You should use this method to reply back to a peer. It sets the reply header which tells the remote that this message is a response (as opposed to a message originating from another source which just happens to have the same message_id).

# File src/lib/haredo/peer.rb, line 413
def reply(msg, args)

  data    = args[:data]    || ''
  headers = args[:headers] || {}

  id = msg.properties.message_id.to_i
  to = msg.properties.reply_to

  # Set the reply flag to indicate that this is a response to a message
  # sent. The message_id should already be set in the headers.
  headers[:reply] = 1
  headers[:id]    = id.to_i
  
  properties = {}
  properties[:correlation_id] = msg.properties.message_id.to_i

  send(to, :headers => headers, :properties => properties, :data => data)
end
send(to, args) click to toggle source

Sends a message. Sends a message. Adds the @mid as message_id in message properties. @param to The to address @param :data Message data @param :headers Message headers @param :headers Message from address @param :properties Message properties

@return Returns the message ID of sent message

# File src/lib/haredo/peer.rb, line 300
def send(to, args)

  data       = args[:data]       || ''
  from       = args[:from]
  headers    = args[:headers]    || {}
  properties = args[:properties] || {}

  properties[:routing_key] = to
  properties[:headers]     = headers
  properties[:message_id]  = @mid

  rc = @mid
  @mid += 1
  
  if not from.nil?
    properties[:reply_to] = from 
  else
    properties[:reply_to] = @queue.name if @queue
  end

  @exchange.publish(data, properties) 

  if @trace == true
    dump_message(RabbitMQ::Message.new(headers, properties, data), 'send()')
  end

  return rc
end
serve(msg) click to toggle source
# File src/lib/haredo/peer.rb, line 496
def serve(msg)
  @plugins.process(msg)
end