class Mercury::Fake
Public Class Methods
domains()
click to toggle source
# File lib/mercury/fake.rb, line 37 def self.domains @domains ||= Hash.new { |h, k| h[k] = Domain.new } end
install(rspec_context, domain=:default)
click to toggle source
# File lib/mercury/fake.rb, line 20 def self.install(rspec_context, domain=:default) rspec_context.instance_exec do allow(Mercury).to receive(:open) do |**kws, &k| EM.next_tick { k.call(Mercury::Fake.new(domain, **kws)) } # EM.next_tick is required to emulate the real Mercury.open end end end
new(domain=:default, **kws)
click to toggle source
# File lib/mercury/fake.rb, line 28 def initialize(domain=:default, **kws) @domain = Fake.domains[domain] @parallelism = kws.fetch(:parallelism, 1) ignored_keys = kws.keys - [:parallelism] if ignored_keys.any? $stderr.puts "Warning: Mercury::Fake::new is ignoring keyword arguments: #{ignored_keys.join(', ')}" end end
Public Instance Methods
close(&k)
click to toggle source
# File lib/mercury/fake.rb, line 41 def close(&k) @closed = true ret(k) end
delete_source(source_name, &k)
click to toggle source
# File lib/mercury/fake.rb, line 77 def delete_source(source_name, &k) guard_public(k) queues.delete_if{|_k, v| v.source == source_name} ret(k) end
delete_work_queue(worker_group, &k)
click to toggle source
# File lib/mercury/fake.rb, line 83 def delete_work_queue(worker_group, &k) guard_public(k) queues.delete_if{|_k, v| v.worker == worker_group} ret(k) end
publish(source_name, msg, tag: '', headers: {}, &k)
click to toggle source
# File lib/mercury/fake.rb, line 46 def publish(source_name, msg, tag: '', headers: {}, &k) guard_public(k) queues.values.select{|q| q.binds?(source_name, tag)}.each{|q| q.enqueue(roundtrip(msg), tag, headers.stringify_keys)} ret(k) end
queue_exists?(worker, &k)
click to toggle source
# File lib/mercury/fake.rb, line 95 def queue_exists?(worker, &k) guard_public(k) ret(k, queues.values.map(&:worker).include?(worker)) end
republish(msg, &k)
click to toggle source
# File lib/mercury/fake.rb, line 52 def republish(msg, &k) guard_public(k) msg.ack queue = queues.values.detect{|q| q.worker == msg.work_queue_name} queue.enqueue(roundtrip(msg.content), msg.tag, Mercury.increment_republish_count(msg)) ret(k) end
source_exists?(source, &k)
click to toggle source
# File lib/mercury/fake.rb, line 89 def source_exists?(source, &k) guard_public(k) built_in_sources = %w(direct topic fanout headers match rabbitmq.log rabbitmq.trace).map{|x| "amq.#{x}"} ret(k, (queues.values.map(&:source) + built_in_sources).include?(source)) end
start_listener(source_name, handler, tag_filter: nil, &k)
click to toggle source
# File lib/mercury/fake.rb, line 60 def start_listener(source_name, handler, tag_filter: nil, &k) start_worker_or_listener(source_name, handler, tag_filter, &k) end
start_worker(worker_group, source_name, handler, tag_filter: nil, &k)
click to toggle source
# File lib/mercury/fake.rb, line 64 def start_worker(worker_group, source_name, handler, tag_filter: nil, &k) start_worker_or_listener(source_name, handler, tag_filter, worker_group, &k) end
Private Instance Methods
ensure_queue(source, tag_filter, worker)
click to toggle source
# File lib/mercury/fake.rb, line 115 def ensure_queue(source, tag_filter, worker) require_ack = worker != nil worker ||= SecureRandom.uuid queues.fetch(unique_queue_name(source, tag_filter, worker)) do |k| queues[k] = Queue.new(source, tag_filter, worker, require_ack) end end
guard_public(k, initializing: false)
click to toggle source
# File lib/mercury/fake.rb, line 127 def guard_public(k, initializing: false) Mercury.guard_public(@closed, k, initializing: initializing) end
queues()
click to toggle source
# File lib/mercury/fake.rb, line 102 def queues @domain.queues end
ret(k, value=nil)
click to toggle source
# File lib/mercury/fake.rb, line 106 def ret(k, value=nil) EM.next_tick{k.call(value)} if k end
roundtrip(msg)
click to toggle source
# File lib/mercury/fake.rb, line 110 def roundtrip(msg) ws = WireSerializer.new ws.read(ws.write(msg)) end
start_worker_or_listener(source_name, handler, tag_filter, worker_group=nil, &k)
click to toggle source
# File lib/mercury/fake.rb, line 68 def start_worker_or_listener(source_name, handler, tag_filter, worker_group=nil, &k) guard_public(k) tag_filter ||= '#' q = ensure_queue(source_name, tag_filter, worker_group) ret(k) # it's important we show the "start" operation finishing before delivery starts (in add_subscriber) q.add_subscriber(Subscriber.new(handler, @parallelism)) end
unique_queue_name(source, tag_filter, worker)
click to toggle source
# File lib/mercury/fake.rb, line 123 def unique_queue_name(source, tag_filter, worker) [source, tag_filter, worker].join('^') end