class Vx::Consumer::Subscriber
Attributes
queue_name[RW]
vx_consumer_name[RW]
Public Class Methods
new(*args)
click to toggle source
Calls superclass method
# File lib/vx/consumer/subscriber.rb, line 12 def initialize(*args) super(*args) @lock = Mutex.new end
Public Instance Methods
call(*args)
click to toggle source
# File lib/vx/consumer/subscriber.rb, line 49 def call(*args) in_progress do @on_delivery.call(*args) if @on_delivery sleep 0 end end
cancel()
click to toggle source
Calls superclass method
# File lib/vx/consumer/subscriber.rb, line 56 def cancel instrument('cancel_consumer', consumer: vx_consumer_name, channel: channel.id) unless closed? super channel.close unless closed? end end
closed?()
click to toggle source
# File lib/vx/consumer/subscriber.rb, line 64 def closed? channel.closed? end
graceful_shutdown()
click to toggle source
# File lib/vx/consumer/subscriber.rb, line 17 def graceful_shutdown instrument('try_graceful_shutdown_consumer', consumer: vx_consumer_name) in_progress do cancel instrument('graceful_shutdown_consumer', consumer: vx_consumer_name) end end
in_progress() { || ... }
click to toggle source
# File lib/vx/consumer/subscriber.rb, line 39 def in_progress @lock.synchronize do yield end end
join()
click to toggle source
# File lib/vx/consumer/subscriber.rb, line 68 def join channel.work_pool.join end
running?()
click to toggle source
# File lib/vx/consumer/subscriber.rb, line 45 def running? @lock.locked? end
try_graceful_shutdown()
click to toggle source
# File lib/vx/consumer/subscriber.rb, line 25 def try_graceful_shutdown if @lock.try_lock begin instrument('graceful_shutdown_consumer', consumer: vx_consumer_name) cancel ensure @lock.unlock end true else false end end
wait_shutdown()
click to toggle source
# File lib/vx/consumer/subscriber.rb, line 72 def wait_shutdown Thread.new do Thread.current.abort_on_exception = true Consumer.wait_shutdown cancel end end