class Mercury::Fake

Public Class Methods

domains() click to toggle source
# File lib/mercury/fake.rb, line 37
def self.domains
  @domains ||= Hash.new { |h, k| h[k] = Domain.new }
end
install(rspec_context, domain=:default) click to toggle source
# File lib/mercury/fake.rb, line 20
def self.install(rspec_context, domain=:default)
  rspec_context.instance_exec do
    allow(Mercury).to receive(:open) do |**kws, &k|
      EM.next_tick { k.call(Mercury::Fake.new(domain, **kws)) } # EM.next_tick is required to emulate the real Mercury.open
    end
  end
end
new(domain=:default, **kws) click to toggle source
# File lib/mercury/fake.rb, line 28
def initialize(domain=:default, **kws)
  @domain = Fake.domains[domain]
  @parallelism = kws.fetch(:parallelism, 1)
  ignored_keys = kws.keys - [:parallelism]
  if ignored_keys.any?
    $stderr.puts "Warning: Mercury::Fake::new is ignoring keyword arguments: #{ignored_keys.join(', ')}"
  end
end

Public Instance Methods

close(&k) click to toggle source
# File lib/mercury/fake.rb, line 41
def close(&k)
  @closed = true
  ret(k)
end
delete_source(source_name, &k) click to toggle source
# File lib/mercury/fake.rb, line 77
def delete_source(source_name, &k)
  guard_public(k)
  queues.delete_if{|_k, v| v.source == source_name}
  ret(k)
end
delete_work_queue(worker_group, &k) click to toggle source
# File lib/mercury/fake.rb, line 83
def delete_work_queue(worker_group, &k)
  guard_public(k)
  queues.delete_if{|_k, v| v.worker == worker_group}
  ret(k)
end
publish(source_name, msg, tag: '', headers: {}, &k) click to toggle source
# File lib/mercury/fake.rb, line 46
def publish(source_name, msg, tag: '', headers: {}, &k)
  guard_public(k)
  queues.values.select{|q| q.binds?(source_name, tag)}.each{|q| q.enqueue(roundtrip(msg), tag, headers.stringify_keys)}
  ret(k)
end
queue_exists?(worker, &k) click to toggle source
# File lib/mercury/fake.rb, line 95
def queue_exists?(worker, &k)
  guard_public(k)
  ret(k, queues.values.map(&:worker).include?(worker))
end
republish(msg, &k) click to toggle source
# File lib/mercury/fake.rb, line 52
def republish(msg, &k)
  guard_public(k)
  msg.ack
  queue = queues.values.detect{|q| q.worker == msg.work_queue_name}
  queue.enqueue(roundtrip(msg.content), msg.tag, Mercury.increment_republish_count(msg))
  ret(k)
end
source_exists?(source, &k) click to toggle source
# File lib/mercury/fake.rb, line 89
def source_exists?(source, &k)
  guard_public(k)
  built_in_sources = %w(direct topic fanout headers match rabbitmq.log rabbitmq.trace).map{|x| "amq.#{x}"}
  ret(k, (queues.values.map(&:source) + built_in_sources).include?(source))
end
start_listener(source_name, handler, tag_filter: nil, &k) click to toggle source
# File lib/mercury/fake.rb, line 60
def start_listener(source_name, handler, tag_filter: nil, &k)
  start_worker_or_listener(source_name, handler, tag_filter, &k)
end
start_worker(worker_group, source_name, handler, tag_filter: nil, &k) click to toggle source
# File lib/mercury/fake.rb, line 64
def start_worker(worker_group, source_name, handler, tag_filter: nil, &k)
  start_worker_or_listener(source_name, handler, tag_filter, worker_group, &k)
end

Private Instance Methods

ensure_queue(source, tag_filter, worker) click to toggle source
# File lib/mercury/fake.rb, line 115
def ensure_queue(source, tag_filter, worker)
  require_ack = worker != nil
  worker ||= SecureRandom.uuid
  queues.fetch(unique_queue_name(source, tag_filter, worker)) do |k|
    queues[k] = Queue.new(source, tag_filter, worker, require_ack)
  end
end
guard_public(k, initializing: false) click to toggle source
# File lib/mercury/fake.rb, line 127
def guard_public(k, initializing: false)
  Mercury.guard_public(@closed, k, initializing: initializing)
end
queues() click to toggle source
# File lib/mercury/fake.rb, line 102
def queues
  @domain.queues
end
ret(k, value=nil) click to toggle source
# File lib/mercury/fake.rb, line 106
def ret(k, value=nil)
  EM.next_tick{k.call(value)} if k
end
roundtrip(msg) click to toggle source
# File lib/mercury/fake.rb, line 110
def roundtrip(msg)
  ws = WireSerializer.new
  ws.read(ws.write(msg))
end
start_worker_or_listener(source_name, handler, tag_filter, worker_group=nil, &k) click to toggle source
# File lib/mercury/fake.rb, line 68
def start_worker_or_listener(source_name, handler, tag_filter, worker_group=nil, &k)
  guard_public(k)
  tag_filter ||= '#'
  q = ensure_queue(source_name, tag_filter, worker_group)
  ret(k) # it's important we show the "start" operation finishing before delivery starts (in add_subscriber)
  q.add_subscriber(Subscriber.new(handler, @parallelism))
end
unique_queue_name(source, tag_filter, worker) click to toggle source
# File lib/mercury/fake.rb, line 123
def unique_queue_name(source, tag_filter, worker)
  [source, tag_filter, worker].join('^')
end