module Mercury::TestUtils

Public Instance Methods

amq_filter(xs) click to toggle source
# File lib/mercury/test_utils.rb, line 85
def amq_filter(xs)
  xs.reject{|x| x.start_with?('amq.')}
end
cps_benchmark(label, &block) click to toggle source
# File lib/mercury/test_utils.rb, line 77
def cps_benchmark(label, &block)
  seql do
    let(:time) { lift { Time.now } }
    and_then { block.call }
    and_lift { puts "#{label} : #{(Time.now - time) * 1000} ms" }
  end
end
delete_sources_and_queues_cps(source_names, queue_names) click to toggle source
# File lib/mercury/test_utils.rb, line 47
def delete_sources_and_queues_cps(source_names, queue_names)
  # We must create a new mercury. The AMQP gem doesn't let you redeclare
  # a construct with the same instance you deleted it with.
  Mercury::Monadic.open.and_then do |m|
    Cps.inject(amq_filter(source_names)) { |s| m.delete_source(s) }.
      inject(amq_filter(queue_names)) { |q| m.delete_work_queue(q) }.
      and_then { m.close }
  end
end
done() click to toggle source
# File lib/mercury/test_utils.rb, line 20
def done
  EM.stop
end
em(timeout_seconds: 3) { || ... } click to toggle source
# File lib/mercury/test_utils.rb, line 9
def em(timeout_seconds: 3)
  EM.run do
    EM.add_timer(in_debug_mode? ? 999999 : timeout_seconds) { raise 'EM spec timed out' }
    yield
  end
end
em_wait_until(pred, &k) click to toggle source
# File lib/mercury/test_utils.rb, line 24
def em_wait_until(pred, &k)
  try_again = proc do
    if pred.call
      k.call
    else
      EM.add_timer(1.0 / 50, try_again)
    end
  end
  try_again.call
end
in_debug_mode?() click to toggle source
# File lib/mercury/test_utils.rb, line 16
def in_debug_mode?
  ENV['RUBYLIB'] =~ /ruby-debug-ide/ # http://stackoverflow.com/questions/22039807/determine-if-a-program-is-running-in-debug-mode
end
read_all_messages(worker: , source:, tag:, seconds_to_wait: 0.1) click to toggle source
# File lib/mercury/test_utils.rb, line 57
def read_all_messages(worker: , source:, tag:, seconds_to_wait: 0.1)
  msgs = []
  last_received_time = Time.now
  msg_handler = ->(msg) do
    msgs << msg
    msg.ack
    last_received_time = Time.now
  end
  EM.run do
    Cps.seql do
      let(:m) { Mercury::Monadic.open }
      and_then { m.start_worker(worker, source, msg_handler, tag_filter: tag) }
      and_then { wait_until { (Time.now - last_received_time).to_f > seconds_to_wait } }
      and_then { m.close }
      and_lift { EM.stop }
    end.run
  end
  msgs
end
wait_for(seconds) click to toggle source
# File lib/mercury/test_utils.rb, line 41
def wait_for(seconds)
  cps do |&k|
    EM.add_timer(seconds, &k)
  end
end
wait_until(&pred) click to toggle source
# File lib/mercury/test_utils.rb, line 35
def wait_until(&pred)
  cps do |&k|
    em_wait_until(pred, &k)
  end
end