module Evrone::Common::AMQP::Consumer::Subscribe
Public Instance Methods
subscribe()
click to toggle source
# File lib/evrone/common/amqp/consumer/subscribe.rb, line 6 def subscribe session.open session.with_channel do x = declare_exchange q = declare_queue run_callbacks(:subscribe, exchange: x, queue: q, name: consumer_id) do debug "subscribing to #{q.name}:#{x.name} using #{bind_options.inspect}" q.bind(x, bind_options) debug "successfuly subscribed to #{q.name}:#{x.name}" subscription_loop q end debug "shutdown" end end
Private Instance Methods
deserialize_message(properties, payload)
click to toggle source
# File lib/evrone/common/amqp/consumer/subscribe.rb, line 58 def deserialize_message(properties, payload) Common::AMQP::Formatter.unpack properties[:content_type], model, payload end
run_instance(delivery_info, properties, payload)
click to toggle source
# File lib/evrone/common/amqp/consumer/subscribe.rb, line 47 def run_instance(delivery_info, properties, payload) payload = deserialize_message properties, payload run_callbacks :recieve, payload: payload, name: consumer_id do new.tap do |inst| inst.properties = properties inst.delivery_info = delivery_info end.perform payload end end
subscription_loop(q)
click to toggle source
# File lib/evrone/common/amqp/consumer/subscribe.rb, line 27 def subscription_loop(q) loop do break if shutdown? delivery_info, properties, payload = q.pop(ack: ack) if payload result = nil debug "recieve ##{delivery_info.delivery_tag.to_i} #{payload.inspect}" result = run_instance delivery_info, properties, payload debug "done ##{delivery_info.delivery_tag.to_i}" break if result == :shutdown else sleep config.pool_timeout end end end