class Rpush::Daemon::DispatcherLoop

Constants

STOP

Attributes

dispatch_count[R]
started_at[R]

Public Class Methods

new(queue, dispatcher) click to toggle source
# File lib/rpush/daemon/dispatcher_loop.rb, line 11
def initialize(queue, dispatcher)
  @queue = queue
  @dispatcher = dispatcher
  @dispatch_count = 0
end

Public Instance Methods

start() click to toggle source
# File lib/rpush/daemon/dispatcher_loop.rb, line 21
def start
  @started_at = Time.now

  @thread = Thread.new do
    loop do
      payload = @queue.pop
      if stop_payload?(payload)
        break if should_stop?(payload)

        # Intended for another dispatcher loop.
        @queue.push(payload)
        Thread.pass
        sleep 0.1
      else
        dispatch(payload)
      end
    end

    Rpush::Daemon.store.release_connection
  end
end
stop() click to toggle source
# File lib/rpush/daemon/dispatcher_loop.rb, line 43
def stop
  @queue.push([STOP, object_id]) if @thread
  @thread.join if @thread
  @dispatcher.cleanup
rescue StandardError => e
  log_error(e)
  reflect(:error, e)
ensure
  @thread = nil
end
thread_status() click to toggle source
# File lib/rpush/daemon/dispatcher_loop.rb, line 17
def thread_status
  @thread ? @thread.status : 'not started'
end

Private Instance Methods

dispatch(payload) click to toggle source
# File lib/rpush/daemon/dispatcher_loop.rb, line 64
def dispatch(payload)
  @dispatch_count += 1
  @dispatcher.dispatch(payload)
rescue StandardError => e
  log_error(e)
  reflect(:error, e)
end
should_stop?(payload) click to toggle source
# File lib/rpush/daemon/dispatcher_loop.rb, line 60
def should_stop?(payload)
  payload.last == object_id
end
stop_payload?(payload) click to toggle source
# File lib/rpush/daemon/dispatcher_loop.rb, line 56
def stop_payload?(payload)
  payload.is_a?(Array) && payload.first == STOP
end