class Lagomorph::Subscriber

Public Class Methods

new(worker_class) click to toggle source
# File lib/lagomorph/subscriber.rb, line 6
def initialize(worker_class)
  @worker_class = worker_class
end

Public Instance Methods

subscribe(queue, channel, opts={}) click to toggle source
# File lib/lagomorph/subscriber.rb, line 10
def subscribe(queue, channel, opts={})
  subscription_opts = opts.merge(durable:    true,
                                 manual_ack: true,
                                 block:      false)
  queue.subscribe(subscription_opts) do |metadata, payload|
    response = process_request(payload)
    channel.ack(metadata.delivery_tag)
    publish_response(channel, metadata, response)
  end
end

Private Instance Methods

build_error(error) click to toggle source
# File lib/lagomorph/subscriber.rb, line 46
def build_error(error)
  JsonParser.new.build_error(error)
end
build_response(result) click to toggle source
# File lib/lagomorph/subscriber.rb, line 42
def build_response(result)
  JsonParser.new.build_response(result)
end
parse_request(payload) click to toggle source
# File lib/lagomorph/subscriber.rb, line 38
def parse_request(payload)
  JsonParser.new.parse_request(payload)
end
process_request(request) click to toggle source
# File lib/lagomorph/subscriber.rb, line 24
def process_request(request)
  method, params = parse_request(request)
  result = @worker_class.new(method, *params).work
  build_response(result)
rescue => e
  build_error(e.message)
end
publish_response(channel, metadata, payload) click to toggle source
# File lib/lagomorph/subscriber.rb, line 32
def publish_response(channel, metadata, payload)
  channel.default_exchange.publish(payload,
                                   routing_key:    metadata.reply_to,
                                   correlation_id: metadata.correlation_id)
end