class Lagomorph::Subscriber
Public Class Methods
new(worker_class)
click to toggle source
# File lib/lagomorph/subscriber.rb, line 6 def initialize(worker_class) @worker_class = worker_class end
Public Instance Methods
subscribe(queue, channel, opts={})
click to toggle source
# File lib/lagomorph/subscriber.rb, line 10 def subscribe(queue, channel, opts={}) subscription_opts = opts.merge(durable: true, manual_ack: true, block: false) queue.subscribe(subscription_opts) do |metadata, payload| response = process_request(payload) channel.ack(metadata.delivery_tag) publish_response(channel, metadata, response) end end
Private Instance Methods
build_error(error)
click to toggle source
# File lib/lagomorph/subscriber.rb, line 46 def build_error(error) JsonParser.new.build_error(error) end
build_response(result)
click to toggle source
# File lib/lagomorph/subscriber.rb, line 42 def build_response(result) JsonParser.new.build_response(result) end
parse_request(payload)
click to toggle source
# File lib/lagomorph/subscriber.rb, line 38 def parse_request(payload) JsonParser.new.parse_request(payload) end
process_request(request)
click to toggle source
# File lib/lagomorph/subscriber.rb, line 24 def process_request(request) method, params = parse_request(request) result = @worker_class.new(method, *params).work build_response(result) rescue => e build_error(e.message) end
publish_response(channel, metadata, payload)
click to toggle source
# File lib/lagomorph/subscriber.rb, line 32 def publish_response(channel, metadata, payload) channel.default_exchange.publish(payload, routing_key: metadata.reply_to, correlation_id: metadata.correlation_id) end