class Mercury
Async IO often involves CPS (continuation-passing style) code, where the continuation (a.k.a. “callback”) is passed to a function to be invoked at a later time. CPS style can result in deep lexical nesting making code difficult to read. This monad hides the CPS plumbing, which allows code to be written in a flat style with no visible continuation passing. It basically wraps a CPS proc with methods for composing them. See codon.com/refactoring-ruby-with-monads
This class simulates Mercury
without using the AMQP gem. It can be useful for unit testing code that uses Mercury
. The domain concept allows different mercury instances to hit different virtual servers; this should rarely be needed. This class cannot simulate behavior of server disconnections, broken sockets, etc.
Constants
- ORIGINAL_TAG_HEADER
- REPUBLISH_COUNT_HEADER
Attributes
Public Class Methods
# File lib/mercury/mercury.rb, line 14 def self.open(logger: Logger.new(STDOUT), **kws, &k) new(logger: logger, **kws, &k) nil end
# File lib/mercury/mercury.rb, line 97 def self.publish_opts(tag, headers) { routing_key: tag, persistent: true, headers: headers } end
Private Class Methods
# File lib/mercury/mercury.rb, line 345 def self.guard_public(is_closed, k, initializing: false) if is_closed && !initializing raise 'This mercury instance is defunct. Either it was purposely closed or an error occurred.' end unless k raise 'A continuation block is required but none was provided.' end end
@param msg [Mercury::ReceivedMessage] @return [Hash] the headers with republish count incremented
# File lib/mercury/mercury.rb, line 337 def self.increment_republish_count(msg) msg.headers.merge(REPUBLISH_COUNT_HEADER => msg.republish_count + 1) end
# File lib/mercury/mercury.rb, line 30 def initialize(host: 'localhost', port: 5672, vhost: '/', username: 'guest', password: 'guest', parallelism: 1, on_error: nil, wait_for_publisher_confirms: true, logger:, &k) guard_public(k, initializing: true) @logger = logger @on_error = on_error AMQP.connect(host: host, port: port, vhost: vhost, username: username, password: password, on_tcp_connection_failure: server_down_error_handler) do |amqp| @amqp = amqp install_lost_connection_error_handler AMQP::Channel.new(amqp, prefetch: parallelism) do |channel| @channel = channel install_channel_error_handler if wait_for_publisher_confirms enable_publisher_confirms do k.call(self) end else k.call(self) end end end end
# File lib/mercury/mercury.rb, line 314 def self.source_opts { durable: true, auto_delete: false } end
Public Instance Methods
# File lib/mercury/mercury.rb, line 19 def close(&k) if @amqp @amqp.close do @amqp = nil k.call end else EM.next_tick(&k) end end
# File lib/mercury/mercury.rb, line 140 def delete_source(source_name, &k) guard_public(k) with_source(source_name) do |exchange| exchange.delete do k.call end end end
# File lib/mercury/mercury.rb, line 149 def delete_work_queue(worker_group, &k) guard_public(k) @channel.queue(worker_group, work_queue_opts) do |queue| queue.delete do k.call end end end
# File lib/mercury/mercury.rb, line 62 def publish(source_name, msg, tag: '', headers: {}, &k) guard_public(k) # The amqp gem caches exchange objects, so it's fine to # redeclare the exchange every time we publish. with_source(source_name) do |exchange| publish_internal(exchange, msg, tag, headers, &k) end end
# File lib/mercury/mercury.rb, line 167 def queue_exists?(queue_name, &k) guard_public(k) existence_check(k) do |ch, &ret| ch.queue(queue_name, passive: true) do ret.call(true) end end end
Places a copy of the message at the back of the queue, then acks the original message.
# File lib/mercury/mercury.rb, line 73 def republish(msg, &k) guard_public(k) raise 'Only messages from a work queue can be republished' unless msg.work_queue_name raise 'A message can only be republished by the mercury instance that received it' unless msg.mercury_instance == self raise "This message was already #{msg.action_taken}ed" if msg.action_taken headers = Mercury.increment_republish_count(msg).merge(ORIGINAL_TAG_HEADER => msg.tag) publish_internal(@channel.default_exchange, msg.content, msg.work_queue_name, headers) do msg.ack k.call end end
# File lib/mercury/mercury.rb, line 158 def source_exists?(source_name, &k) guard_public(k) existence_check(k) do |ch, &ret| with_source_no_cache(ch, source_name, passive: true) do ret.call(true) end end end
# File lib/mercury/mercury.rb, line 101 def start_listener(source_name, handler, tag_filter: nil, &k) guard_public(k) with_source(source_name) do |exchange| with_listener_queue(exchange, tag_filter) do |queue| queue.subscribe(ack: false) do |metadata, payload| handler.call(make_received_message(payload, metadata)) end k.call end end end
Production code should not call this. Only test/tool code should call this, and only if you're sure the worker queue already exists and is bound to the source
# File lib/mercury/mercury.rb, line 126 def start_queue_worker(worker_group, handler, &k) guard_public(k) @channel.queue(worker_group, work_queue_opts) do |queue| subscribe_worker(queue, handler) k.call end end
# File lib/mercury/mercury.rb, line 113 def start_worker(worker_group, source_name, handler, tag_filter: nil, &k) guard_public(k) with_source(source_name) do |exchange| with_work_queue(worker_group, exchange, tag_filter) do |queue| subscribe_worker(queue, handler) k.call end end end
Private Instance Methods
# File lib/mercury/mercury.rb, line 326 def bind_queue(exchange, queue_name, tag_filter, opts, &k) tag_filter ||= '#' @channel.queue(queue_name, opts) do |queue| queue.bind(exchange, routing_key: tag_filter) do k.call(queue) end end end
# File lib/mercury/mercury.rb, line 218 def dispatch_publisher_confirm(basic_ack) confirmed_tags = if basic_ack.multiple @confirm_handlers.keys.select { |tag| tag <= basic_ack.delivery_tag }.sort # sort just to be deterministic else [basic_ack.delivery_tag] end confirmed_tags.each do |tag| @confirm_handlers.delete(tag).call end end
In AMQP, queue consumers ack requests after handling them. Unacked messages are automatically returned to the queue, guaranteeing they are eventually handled. Services often ack one request while publishing related messages. Ideally, these operations would be transactional. If the ack succeeds but the publish does not, the line of processing is abandoned, resulting in processing getting “stuck”. The best we can do in AMQP is to use “publisher confirms” to confirm that the publish succeeded before acking the originating request. Since the ack can still fail in this scenario, the system should employ idempotent design, which makes request redelivery harmless.
see www.rabbitmq.com/confirms.html see rubyamqp.info/articles/durability/
# File lib/mercury/mercury.rb, line 190 def enable_publisher_confirms(&k) @confirm_handlers = {} @channel.confirm_select do @last_published_delivery_tag = 0 @channel.on_ack do |basic_ack| tag = basic_ack.delivery_tag if @confirm_handlers.keys.exclude?(tag) raise "Got an unexpected publish confirmation ACK for delivery-tag: #{tag}. Was expecting one of: #{@confirm_handlers.keys.inspect}" end dispatch_publisher_confirm(basic_ack) end @channel.on_nack do |basic_nack| raise "Delivery failed for message with delivery-tag: #{basic_nack.delivery_tag}" end k.call end end
# File lib/mercury/mercury.rb, line 234 def existence_check(k, &check) AMQP::Channel.new(@amqp) do |ch| ch.on_error do |_, info| if info.reply_code == 404 # our request failed because it does not exist k.call(false) else # failed for unknown reason handle_channel_error(ch, info) end end check.call(ch) do |result| ch.close do k.call(result) end end end end
# File lib/mercury/mercury.rb, line 212 def expect_publisher_confirm(k) expected_delivery_tag = (@last_published_delivery_tag += 1) @confirm_handlers[expected_delivery_tag] = k expected_delivery_tag end
# File lib/mercury/mercury.rb, line 341 def guard_public(k, initializing: false) Mercury.guard_public(@amqp.nil?, k, initializing: initializing) end
# File lib/mercury/mercury.rb, line 265 def handle_channel_error(_ch, info) make_error_handler("An error occurred: #{info.reply_code} - #{info.reply_text}").call end
# File lib/mercury/mercury.rb, line 261 def install_channel_error_handler @channel.on_error(&method(:handle_channel_error)) end
# File lib/mercury/mercury.rb, line 257 def install_lost_connection_error_handler @amqp.on_tcp_connection_loss(&make_error_handler('Lost connection to AMQP server. Exiting.')) end
# File lib/mercury/mercury.rb, line 269 def make_error_handler(msg) proc do # If an error is already being raised, don't interfere with it. # This is actually essential since some versions of EventMachine (notably 1.2.0.1) # fail to clean up properly if an error is raised during the `ensure` clean up # phase (in EventMachine::run), which zombifies subsequent reactors. (AMQP connection # failure handlers are invoked from EventMachine's `ensure`.) current_exception = $! unless current_exception @logger.error(msg) close do if @on_error.respond_to?(:call) @on_error.call(msg) else raise msg end end end end end
# File lib/mercury/mercury.rb, line 230 def make_received_message(payload, metadata, work_queue_name: nil) ReceivedMessage.new(read(payload), metadata, self, work_queue_name: work_queue_name) end
# File lib/mercury/mercury.rb, line 85 def publish_internal(exchange, msg, tag, headers, &k) payload = write(msg) pub_opts = Mercury.publish_opts(tag, headers) if publisher_confirms_enabled expect_publisher_confirm(k) exchange.publish(payload, **pub_opts) else exchange.publish(payload, **pub_opts, &k) end end
# File lib/mercury/mercury.rb, line 208 def publisher_confirms_enabled @confirm_handlers.is_a?(Hash) end
# File lib/mercury/mercury.rb, line 294 def read(bytes) WireSerializer.new.read(bytes) end
# File lib/mercury/mercury.rb, line 253 def server_down_error_handler make_error_handler('Failed to establish connection to AMQP server. Exiting.') end
# File lib/mercury/mercury.rb, line 134 def subscribe_worker(queue, handler) queue.subscribe(ack: true) do |metadata, payload| handler.call(make_received_message(payload, metadata, work_queue_name: queue.name)) end end
# File lib/mercury/mercury.rb, line 322 def with_listener_queue(source_exchange, tag_filter, &k) bind_queue(source_exchange, '', tag_filter, exclusive: true, auto_delete: true, durable: false, &k) end
# File lib/mercury/mercury.rb, line 298 def with_source(source_name, &k) with_source_no_cache(@channel, source_name, Mercury.source_opts) do |exchange| k.call(exchange) end end
# File lib/mercury/mercury.rb, line 304 def with_source_no_cache(channel, source_name, opts, &k) channel.topic(source_name, opts) do |*args| k.call(*args) end end
# File lib/mercury/mercury.rb, line 310 def with_work_queue(worker_group, source_exchange, tag_filter, &k) bind_queue(source_exchange, worker_group, tag_filter, work_queue_opts, &k) end
# File lib/mercury/mercury.rb, line 318 def work_queue_opts { durable: true, auto_delete: false } end
# File lib/mercury/mercury.rb, line 290 def write(msg) WireSerializer.new.write(msg) end