class Splash::Transports::Rabbitmq::Client

publish / get Mode RabbitMQ Client

Public Class Methods

new() click to toggle source

Constructor initialize a Bunny Client

# File lib/splash/transports/rabbitmq.rb, line 52
def initialize
  @config = get_config.transports
  host = @config[:rabbitmq][:host]
  port = @config[:rabbitmq][:port]
  vhost = (@config[:rabbitmq][:vhost])? @config[:rabbitmq][:vhost] : '/'
  passwd = (@config[:rabbitmq][:passwd])? @config[:rabbitmq][:passwd] : 'guest'
  user = (@config[:rabbitmq][:user])? @config[:rabbitmq][:user] : 'guest'
  conf  = { :host => host, :vhost => vhost, :user => user, :password => passwd, :port => port.to_i}

  begin
    @connection = Bunny.new conf
    @connection.start
    @channel = @connection.create_channel
  rescue Bunny::Exception
    splash_exit  case: :service_dependence_missing, more: "RabbitMQ Transport not available."
  end
end

Public Instance Methods

ack(ack) click to toggle source

ack a specific message for manual ack with a delivery tag to a queue @param [String] ack @return [Boolean]

# File lib/splash/transports/rabbitmq.rb, line 88
def ack(ack)
  return @channel.acknowledge(ack, false)
end
close() click to toggle source

close the RabbitMQ connection

# File lib/splash/transports/rabbitmq.rb, line 126
def close
  @connection.close
end
execute(order) click to toggle source

send an execution order message (verb+payload) via RabbitMQ to an slash input queue @param [Hash] order @return [Void] unserialized Void object from YAML

# File lib/splash/transports/rabbitmq.rb, line 96
def execute(order)
  queue = order[:return_to]
  lock = Mutex.new
  res = nil
  condition = ConditionVariable.new
  get_default_subscriber(queue: queue).subscribe do |delivery_info, properties, payload|
    res = YAML::load(payload)
    lock.synchronize { condition.signal }
  end
  get_logger.send "Verb : #{order[:verb].to_s} to queue : #{order[:queue]}."
  get_default_client.publish queue: order[:queue], message: order.to_yaml
  lock.synchronize { condition.wait(lock) }
  return res
end
get(options ={}) click to toggle source

Get a message from a RabbitMQ queue @param [Hash] options @option options [String] :queue the name of the queue to query @option options [String] :manual_ack flag to inhibit ack @return [Hash] Payload + ack tag if :manual_ack

# File lib/splash/transports/rabbitmq.rb, line 116
def get(options ={})
  queue = @channel.queue(options[:queue])
  opt = {}; opt[:manual_ack] = (options[:manual_ack])? true : false
  delivery_info, properties, payload = queue.pop
  res = {:message => payload}
  res[:ack] = delivery_info.delivery_tag if options[:manual_ack]
  return res
end
publish(options ={}) click to toggle source

publish to a queue @param [Hash] options @option options [String] :queue the name of the queue to purge @option options [String] :message the message to send

# File lib/splash/transports/rabbitmq.rb, line 81
def publish(options ={})
  return @channel.default_exchange.publish(options[:message], :routing_key => options[:queue])
end
purge(options) click to toggle source

purge a queue @param [Hash] options @option options [String] :queue the name of the queue to purge

# File lib/splash/transports/rabbitmq.rb, line 73
def purge(options)
  @channel.queue(options[:queue]).purge
end