class Mercury

Async IO often involves CPS (continuation-passing style) code, where the continuation (a.k.a. “callback”) is passed to a function to be invoked at a later time. CPS style can result in deep lexical nesting making code difficult to read. This monad hides the CPS plumbing, which allows code to be written in a flat style with no visible continuation passing. It basically wraps a CPS proc with methods for composing them. See codon.com/refactoring-ruby-with-monads

This class simulates Mercury without using the AMQP gem. It can be useful for unit testing code that uses Mercury. The domain concept allows different mercury instances to hit different virtual servers; this should rarely be needed. This class cannot simulate behavior of server disconnections, broken sockets, etc.

Constants

ORIGINAL_TAG_HEADER
REPUBLISH_COUNT_HEADER

Attributes

amqp[R]
channel[R]
logger[R]

Public Class Methods

open(logger: Logger.new(STDOUT), **kws, &k) click to toggle source
# File lib/mercury/mercury.rb, line 14
def self.open(logger: Logger.new(STDOUT), **kws, &k)
  new(logger: logger, **kws, &k)
  nil
end
publish_opts(tag, headers) click to toggle source
# File lib/mercury/mercury.rb, line 97
def self.publish_opts(tag, headers)
  { routing_key: tag, persistent: true, headers: headers }
end

Private Class Methods

guard_public(is_closed, k, initializing: false) click to toggle source
# File lib/mercury/mercury.rb, line 345
def self.guard_public(is_closed, k, initializing: false)
  if is_closed && !initializing
    raise 'This mercury instance is defunct. Either it was purposely closed or an error occurred.'
  end
  unless k
    raise 'A continuation block is required but none was provided.'
  end
end
increment_republish_count(msg) click to toggle source

@param msg [Mercury::ReceivedMessage] @return [Hash] the headers with republish count incremented

# File lib/mercury/mercury.rb, line 337
def self.increment_republish_count(msg)
  msg.headers.merge(REPUBLISH_COUNT_HEADER => msg.republish_count + 1)
end
new(host: 'localhost', port: 5672, vhost: '/', username: 'guest', password: 'guest', parallelism: 1, on_error: nil, wait_for_publisher_confirms: true, logger:, &k) click to toggle source
# File lib/mercury/mercury.rb, line 30
def initialize(host: 'localhost',
               port: 5672,
               vhost: '/',
               username: 'guest',
               password: 'guest',
               parallelism: 1,
               on_error: nil,
               wait_for_publisher_confirms: true,
               logger:,
               &k)
  guard_public(k, initializing: true)
  @logger = logger
  @on_error = on_error
  AMQP.connect(host: host, port: port, vhost: vhost, username: username, password: password,
               on_tcp_connection_failure: server_down_error_handler) do |amqp|
    @amqp = amqp
    install_lost_connection_error_handler
    AMQP::Channel.new(amqp, prefetch: parallelism) do |channel|
      @channel = channel
      install_channel_error_handler
      if wait_for_publisher_confirms
        enable_publisher_confirms do
          k.call(self)
        end
      else
        k.call(self)
      end
    end
  end
end
source_opts() click to toggle source
# File lib/mercury/mercury.rb, line 314
def self.source_opts
  { durable: true, auto_delete: false }
end

Public Instance Methods

close(&k) click to toggle source
# File lib/mercury/mercury.rb, line 19
def close(&k)
  if @amqp
    @amqp.close do
      @amqp = nil
      k.call
    end
  else
    EM.next_tick(&k)
  end
end
delete_source(source_name, &k) click to toggle source
# File lib/mercury/mercury.rb, line 140
def delete_source(source_name, &k)
  guard_public(k)
  with_source(source_name) do |exchange|
    exchange.delete do
      k.call
    end
  end
end
delete_work_queue(worker_group, &k) click to toggle source
# File lib/mercury/mercury.rb, line 149
def delete_work_queue(worker_group, &k)
  guard_public(k)
  @channel.queue(worker_group, work_queue_opts) do |queue|
    queue.delete do
      k.call
    end
  end
end
publish(source_name, msg, tag: '', headers: {}, &k) click to toggle source
# File lib/mercury/mercury.rb, line 62
def publish(source_name, msg, tag: '', headers: {}, &k)
  guard_public(k)
  # The amqp gem caches exchange objects, so it's fine to
  # redeclare the exchange every time we publish.
  with_source(source_name) do |exchange|
    publish_internal(exchange, msg, tag, headers, &k)
  end
end
queue_exists?(queue_name, &k) click to toggle source
# File lib/mercury/mercury.rb, line 167
def queue_exists?(queue_name, &k)
  guard_public(k)
  existence_check(k) do |ch, &ret|
    ch.queue(queue_name, passive: true) do
      ret.call(true)
    end
  end
end
republish(msg, &k) click to toggle source

Places a copy of the message at the back of the queue, then acks the original message.

# File lib/mercury/mercury.rb, line 73
def republish(msg, &k)
  guard_public(k)
  raise 'Only messages from a work queue can be republished' unless msg.work_queue_name
  raise 'A message can only be republished by the mercury instance that received it' unless msg.mercury_instance == self
  raise "This message was already #{msg.action_taken}ed" if msg.action_taken
  headers = Mercury.increment_republish_count(msg).merge(ORIGINAL_TAG_HEADER => msg.tag)
  publish_internal(@channel.default_exchange, msg.content, msg.work_queue_name, headers) do
    msg.ack
    k.call
  end
end
source_exists?(source_name, &k) click to toggle source
# File lib/mercury/mercury.rb, line 158
def source_exists?(source_name, &k)
  guard_public(k)
  existence_check(k) do |ch, &ret|
    with_source_no_cache(ch, source_name, passive: true) do
      ret.call(true)
    end
  end
end
start_listener(source_name, handler, tag_filter: nil, &k) click to toggle source
# File lib/mercury/mercury.rb, line 101
def start_listener(source_name, handler, tag_filter: nil, &k)
  guard_public(k)
  with_source(source_name) do |exchange|
    with_listener_queue(exchange, tag_filter) do |queue|
      queue.subscribe(ack: false) do |metadata, payload|
        handler.call(make_received_message(payload, metadata))
      end
      k.call
    end
  end
end
start_queue_worker(worker_group, handler, &k) click to toggle source

Production code should not call this. Only test/tool code should call this, and only if you're sure the worker queue already exists and is bound to the source

# File lib/mercury/mercury.rb, line 126
def start_queue_worker(worker_group, handler, &k)
  guard_public(k)
  @channel.queue(worker_group, work_queue_opts) do |queue|
    subscribe_worker(queue, handler)
    k.call
  end
end
start_worker(worker_group, source_name, handler, tag_filter: nil, &k) click to toggle source
# File lib/mercury/mercury.rb, line 113
def start_worker(worker_group, source_name, handler, tag_filter: nil, &k)
  guard_public(k)
  with_source(source_name) do |exchange|
    with_work_queue(worker_group, exchange, tag_filter) do |queue|
      subscribe_worker(queue, handler)
      k.call
    end
  end
end

Private Instance Methods

bind_queue(exchange, queue_name, tag_filter, opts, &k) click to toggle source
# File lib/mercury/mercury.rb, line 326
def bind_queue(exchange, queue_name, tag_filter, opts, &k)
  tag_filter ||= '#'
  @channel.queue(queue_name, opts) do |queue|
    queue.bind(exchange, routing_key: tag_filter) do
      k.call(queue)
    end
  end
end
dispatch_publisher_confirm(basic_ack) click to toggle source
# File lib/mercury/mercury.rb, line 218
def dispatch_publisher_confirm(basic_ack)
  confirmed_tags =
      if basic_ack.multiple
        @confirm_handlers.keys.select { |tag| tag <= basic_ack.delivery_tag }.sort # sort just to be deterministic
      else
        [basic_ack.delivery_tag]
      end
  confirmed_tags.each do |tag|
    @confirm_handlers.delete(tag).call
  end
end
enable_publisher_confirms(&k) click to toggle source

In AMQP, queue consumers ack requests after handling them. Unacked messages are automatically returned to the queue, guaranteeing they are eventually handled. Services often ack one request while publishing related messages. Ideally, these operations would be transactional. If the ack succeeds but the publish does not, the line of processing is abandoned, resulting in processing getting “stuck”. The best we can do in AMQP is to use “publisher confirms” to confirm that the publish succeeded before acking the originating request. Since the ack can still fail in this scenario, the system should employ idempotent design, which makes request redelivery harmless.

see www.rabbitmq.com/confirms.html see rubyamqp.info/articles/durability/

# File lib/mercury/mercury.rb, line 190
def enable_publisher_confirms(&k)
  @confirm_handlers = {}
  @channel.confirm_select do
    @last_published_delivery_tag = 0
    @channel.on_ack do |basic_ack|
      tag = basic_ack.delivery_tag
      if @confirm_handlers.keys.exclude?(tag)
        raise "Got an unexpected publish confirmation ACK for delivery-tag: #{tag}. Was expecting one of: #{@confirm_handlers.keys.inspect}"
      end
      dispatch_publisher_confirm(basic_ack)
    end
    @channel.on_nack do |basic_nack|
      raise "Delivery failed for message with delivery-tag: #{basic_nack.delivery_tag}"
    end
    k.call
  end
end
existence_check(k, &check) click to toggle source
# File lib/mercury/mercury.rb, line 234
def existence_check(k, &check)
  AMQP::Channel.new(@amqp) do |ch|
    ch.on_error do |_, info|
      if info.reply_code == 404
        # our request failed because it does not exist
        k.call(false)
      else
        # failed for unknown reason
        handle_channel_error(ch, info)
      end
    end
    check.call(ch) do |result|
      ch.close do
        k.call(result)
      end
    end
  end
end
expect_publisher_confirm(k) click to toggle source
# File lib/mercury/mercury.rb, line 212
def expect_publisher_confirm(k)
  expected_delivery_tag = (@last_published_delivery_tag += 1)
  @confirm_handlers[expected_delivery_tag] = k
  expected_delivery_tag
end
guard_public(k, initializing: false) click to toggle source
# File lib/mercury/mercury.rb, line 341
def guard_public(k, initializing: false)
  Mercury.guard_public(@amqp.nil?, k, initializing: initializing)
end
handle_channel_error(_ch, info) click to toggle source
# File lib/mercury/mercury.rb, line 265
def handle_channel_error(_ch, info)
  make_error_handler("An error occurred: #{info.reply_code} - #{info.reply_text}").call
end
install_channel_error_handler() click to toggle source
# File lib/mercury/mercury.rb, line 261
def install_channel_error_handler
  @channel.on_error(&method(:handle_channel_error))
end
install_lost_connection_error_handler() click to toggle source
# File lib/mercury/mercury.rb, line 257
def install_lost_connection_error_handler
  @amqp.on_tcp_connection_loss(&make_error_handler('Lost connection to AMQP server. Exiting.'))
end
make_error_handler(msg) click to toggle source
# File lib/mercury/mercury.rb, line 269
def make_error_handler(msg)
  proc do
    # If an error is already being raised, don't interfere with it.
    # This is actually essential since some versions of EventMachine (notably 1.2.0.1)
    # fail to clean up properly if an error is raised during the `ensure` clean up
    # phase (in EventMachine::run), which zombifies subsequent reactors. (AMQP connection
    # failure handlers are invoked from EventMachine's `ensure`.)
    current_exception = $!
    unless current_exception
      @logger.error(msg)
      close do
        if @on_error.respond_to?(:call)
          @on_error.call(msg)
        else
          raise msg
        end
      end
    end
  end
end
make_received_message(payload, metadata, work_queue_name: nil) click to toggle source
# File lib/mercury/mercury.rb, line 230
def make_received_message(payload, metadata, work_queue_name: nil)
  ReceivedMessage.new(read(payload), metadata, self, work_queue_name: work_queue_name)
end
publish_internal(exchange, msg, tag, headers, &k) click to toggle source
# File lib/mercury/mercury.rb, line 85
def publish_internal(exchange, msg, tag, headers, &k)
  payload = write(msg)
  pub_opts = Mercury.publish_opts(tag, headers)
  if publisher_confirms_enabled
    expect_publisher_confirm(k)
    exchange.publish(payload, **pub_opts)
  else
    exchange.publish(payload, **pub_opts, &k)
  end
end
publisher_confirms_enabled() click to toggle source
# File lib/mercury/mercury.rb, line 208
def publisher_confirms_enabled
  @confirm_handlers.is_a?(Hash)
end
read(bytes) click to toggle source
# File lib/mercury/mercury.rb, line 294
def read(bytes)
  WireSerializer.new.read(bytes)
end
server_down_error_handler() click to toggle source
# File lib/mercury/mercury.rb, line 253
def server_down_error_handler
  make_error_handler('Failed to establish connection to AMQP server. Exiting.')
end
subscribe_worker(queue, handler) click to toggle source
# File lib/mercury/mercury.rb, line 134
        def subscribe_worker(queue, handler)
  queue.subscribe(ack: true) do |metadata, payload|
    handler.call(make_received_message(payload, metadata, work_queue_name: queue.name))
  end
end
with_listener_queue(source_exchange, tag_filter, &k) click to toggle source
# File lib/mercury/mercury.rb, line 322
def with_listener_queue(source_exchange, tag_filter, &k)
  bind_queue(source_exchange, '', tag_filter, exclusive: true, auto_delete: true, durable: false, &k)
end
with_source(source_name, &k) click to toggle source
# File lib/mercury/mercury.rb, line 298
def with_source(source_name, &k)
  with_source_no_cache(@channel, source_name, Mercury.source_opts) do |exchange|
    k.call(exchange)
  end
end
with_source_no_cache(channel, source_name, opts, &k) click to toggle source
# File lib/mercury/mercury.rb, line 304
def with_source_no_cache(channel, source_name, opts, &k)
  channel.topic(source_name, opts) do |*args|
    k.call(*args)
  end
end
with_work_queue(worker_group, source_exchange, tag_filter, &k) click to toggle source
# File lib/mercury/mercury.rb, line 310
def with_work_queue(worker_group, source_exchange, tag_filter, &k)
  bind_queue(source_exchange, worker_group, tag_filter, work_queue_opts, &k)
end
work_queue_opts() click to toggle source
# File lib/mercury/mercury.rb, line 318
def work_queue_opts
  { durable: true, auto_delete: false }
end
write(msg) click to toggle source
# File lib/mercury/mercury.rb, line 290
def write(msg)
  WireSerializer.new.write(msg)
end