class RFlow::Components::AMQP::Subscriber
Component that subscribes to an AMQP
topic and issues +RFlow::Message+s of type {RFlow::Message::Data::AMQP::Message} and RFlow::Message::Data::Raw
when it receives messages on the AMQP
topic.
By default will use an exclusive, non-durable, auto-deleting, randomly named queue for subscribing to topic messages with an empty-string pattern.
Accepts config parameters:
-
server
- server address -
port
- server port -
username
- server username -
password
- server password -
vhost
- vhost -
reconnect_interval
- how long to wait when disconnected before reconnecting -
queue_name
-AMQP
topic name to monitor -
queue_passive
-queue_passive
parameter toAMQP
-
queue_durable
-queue_durable
parameter toAMQP
-
queue_exclusive
-queue_exclusive
parameter toAMQP
-
queue_auto_delete
-queue_auto_delete
parameter toAMQP
-
binding_pattern
-AMQP
routing key
Constants
- DEFAULT_CONFIG
Default config.
Attributes
@!visibility private
@!visibility private
Public Instance Methods
RFlow-called method at startup. @return [void]
# File lib/rflow/components/amqp/subscriber.rb, line 102 def configure!(config) @config = DEFAULT_CONFIG.merge config @config['port'] = @config['port'].to_i @config['reconnect_interval'] = @config['reconnect_interval'].to_i ['durable', 'passive', 'exclusive', 'auto_delete'].each do |opt| @config["queue_#{opt}"] = to_boolean(@config["queue_#{opt}"]) end # Convert the queue parameters into AMQP-friendly sym-keyed # Hash that can be passed directly to underlying AMQP gem # methods @queue_config = @config.each_with_object({}) do |(key, value), result| md = /queue_(.*)/.match(key.to_s) result[md[1].to_sym] = value unless md.nil? end end
RFlow-called method at startup. @return [void]
# File lib/rflow/components/amqp/subscriber.rb, line 122 def run! ::AMQP.connect(:host => @config['server'], :port => @config['port'], :vhost => @config['vhost'], :username => @config['username'], :password => @config['password']) do |conn| @amqp_connection = conn RFlow.logger.info 'Connected to AMQP server...' conn.on_disconnection do RFlow.logger.error 'AMQP disconnected. Reconnecting...' log_port.send_message(RFlow::Message.new('RFlow::Message::Data::Log').tap do |m| m.data.timestamp = Integer(Time.now.to_f * 1000) # ms since epoch m.data.level = 'INFO' m.data.text = "AMQP disconnected. Reconnecting in #{@config['reconnect_interval']} seconds." end) EventMachine::Timer.new(@config['reconnect_interval']) { run! } end ::AMQP::Channel.new(@amqp_connection) do |channel| @amqp_channel = channel channel.auto_recovery = true @amqp_exchange = @amqp_channel.topic ::AMQP::Queue.new(@amqp_channel, @config['queue_name'], @queue_config) do |queue| @amqp_queue = queue @amqp_queue.bind(@amqp_exchange, :routing_key => @config['binding_pattern']).subscribe(:ack => true) do |header, payload| RFlow.logger.debug { "#{name}: AMQP message received" } processing_event = RFlow::Message::ProcessingEvent.new(uuid, Time.now.utc).tap do |e| e.completed_at = Time.now.utc end amqp_port.send_message(RFlow::Message.new('RFlow::Message::Data::AMQP::Message').tap do |m| header.to_hash.each {|k,v| m.data.header[k.to_s] = v } m.data.payload = payload m.provenance << processing_event end) # TODO: Optimize out if not connected raw_port.send_message(RFlow::Message.new('RFlow::Message::Data::Raw').tap do |m| m.data.raw = payload m.provenance << processing_event end) header.ack end end end end end
@!visibility private
# File lib/rflow/components/amqp/subscriber.rb, line 172 def to_boolean(string) case string when /^true$/i, '1', true; true when /^false/i, '0', false; false else raise ArgumentError, "'#{string}' cannot be coerced to a boolean value" end end