module Mercury::TestUtils
Public Instance Methods
amq_filter(xs)
click to toggle source
# File lib/mercury/test_utils.rb, line 85 def amq_filter(xs) xs.reject{|x| x.start_with?('amq.')} end
cps_benchmark(label, &block)
click to toggle source
# File lib/mercury/test_utils.rb, line 77 def cps_benchmark(label, &block) seql do let(:time) { lift { Time.now } } and_then { block.call } and_lift { puts "#{label} : #{(Time.now - time) * 1000} ms" } end end
delete_sources_and_queues_cps(source_names, queue_names)
click to toggle source
# File lib/mercury/test_utils.rb, line 47 def delete_sources_and_queues_cps(source_names, queue_names) # We must create a new mercury. The AMQP gem doesn't let you redeclare # a construct with the same instance you deleted it with. Mercury::Monadic.open.and_then do |m| Cps.inject(amq_filter(source_names)) { |s| m.delete_source(s) }. inject(amq_filter(queue_names)) { |q| m.delete_work_queue(q) }. and_then { m.close } end end
done()
click to toggle source
# File lib/mercury/test_utils.rb, line 20 def done EM.stop end
em(timeout_seconds: 3) { || ... }
click to toggle source
# File lib/mercury/test_utils.rb, line 9 def em(timeout_seconds: 3) EM.run do EM.add_timer(in_debug_mode? ? 999999 : timeout_seconds) { raise 'EM spec timed out' } yield end end
em_wait_until(pred, &k)
click to toggle source
# File lib/mercury/test_utils.rb, line 24 def em_wait_until(pred, &k) try_again = proc do if pred.call k.call else EM.add_timer(1.0 / 50, try_again) end end try_again.call end
in_debug_mode?()
click to toggle source
# File lib/mercury/test_utils.rb, line 16 def in_debug_mode? ENV['RUBYLIB'] =~ /ruby-debug-ide/ # http://stackoverflow.com/questions/22039807/determine-if-a-program-is-running-in-debug-mode end
read_all_messages(worker: , source:, tag:, seconds_to_wait: 0.1)
click to toggle source
# File lib/mercury/test_utils.rb, line 57 def read_all_messages(worker: , source:, tag:, seconds_to_wait: 0.1) msgs = [] last_received_time = Time.now msg_handler = ->(msg) do msgs << msg msg.ack last_received_time = Time.now end EM.run do Cps.seql do let(:m) { Mercury::Monadic.open } and_then { m.start_worker(worker, source, msg_handler, tag_filter: tag) } and_then { wait_until { (Time.now - last_received_time).to_f > seconds_to_wait } } and_then { m.close } and_lift { EM.stop } end.run end msgs end
wait_for(seconds)
click to toggle source
# File lib/mercury/test_utils.rb, line 41 def wait_for(seconds) cps do |&k| EM.add_timer(seconds, &k) end end
wait_until(&pred)
click to toggle source
# File lib/mercury/test_utils.rb, line 35 def wait_until(&pred) cps do |&k| em_wait_until(pred, &k) end end