class RFlow::Components::AMQP::Subscriber

Component that subscribes to an AMQP topic and issues +RFlow::Message+s of type {RFlow::Message::Data::AMQP::Message} and RFlow::Message::Data::Raw when it receives messages on the AMQP topic.

By default will use an exclusive, non-durable, auto-deleting, randomly named queue for subscribing to topic messages with an empty-string pattern.

Accepts config parameters:

Constants

DEFAULT_CONFIG

Default config.

Attributes

config[RW]

@!visibility private

queue_config[RW]

@!visibility private

Public Instance Methods

configure!(config) click to toggle source

RFlow-called method at startup. @return [void]

# File lib/rflow/components/amqp/subscriber.rb, line 102
def configure!(config)
  @config = DEFAULT_CONFIG.merge config
  @config['port'] = @config['port'].to_i
  @config['reconnect_interval'] = @config['reconnect_interval'].to_i

  ['durable', 'passive', 'exclusive', 'auto_delete'].each do |opt|
    @config["queue_#{opt}"] = to_boolean(@config["queue_#{opt}"])
  end

  # Convert the queue parameters into AMQP-friendly sym-keyed
  # Hash that can be passed directly to underlying AMQP gem
  # methods
  @queue_config = @config.each_with_object({}) do |(key, value), result|
    md = /queue_(.*)/.match(key.to_s)
    result[md[1].to_sym] = value unless md.nil?
  end
end
run!() click to toggle source

RFlow-called method at startup. @return [void]

# File lib/rflow/components/amqp/subscriber.rb, line 122
def run!
  ::AMQP.connect(:host => @config['server'], :port => @config['port'], :vhost => @config['vhost'],
                 :username => @config['username'], :password => @config['password']) do |conn|
    @amqp_connection = conn
    RFlow.logger.info 'Connected to AMQP server...'

    conn.on_disconnection do
      RFlow.logger.error 'AMQP disconnected. Reconnecting...'
      log_port.send_message(RFlow::Message.new('RFlow::Message::Data::Log').tap do |m|
        m.data.timestamp = Integer(Time.now.to_f * 1000) # ms since epoch
        m.data.level = 'INFO'
        m.data.text = "AMQP disconnected. Reconnecting in #{@config['reconnect_interval']} seconds."
      end)

      EventMachine::Timer.new(@config['reconnect_interval']) { run! }
    end

    ::AMQP::Channel.new(@amqp_connection) do |channel|
      @amqp_channel = channel
      channel.auto_recovery = true
      @amqp_exchange = @amqp_channel.topic

      ::AMQP::Queue.new(@amqp_channel, @config['queue_name'], @queue_config) do |queue|
        @amqp_queue = queue
        @amqp_queue.bind(@amqp_exchange, :routing_key => @config['binding_pattern']).subscribe(:ack => true) do |header, payload|
          RFlow.logger.debug { "#{name}: AMQP message received" }
          processing_event = RFlow::Message::ProcessingEvent.new(uuid, Time.now.utc).tap do |e|
            e.completed_at = Time.now.utc
          end

          amqp_port.send_message(RFlow::Message.new('RFlow::Message::Data::AMQP::Message').tap do |m|
            header.to_hash.each {|k,v| m.data.header[k.to_s] = v }
            m.data.payload = payload
            m.provenance << processing_event
          end)

          # TODO: Optimize out if not connected
          raw_port.send_message(RFlow::Message.new('RFlow::Message::Data::Raw').tap do |m|
            m.data.raw = payload
            m.provenance << processing_event
          end)

          header.ack
        end
      end
    end
  end
end
to_boolean(string) click to toggle source

@!visibility private

# File lib/rflow/components/amqp/subscriber.rb, line 172
def to_boolean(string)
  case string
  when /^true$/i, '1', true; true
  when /^false/i, '0', false; false
  else raise ArgumentError, "'#{string}' cannot be coerced to a boolean value"
  end
end