class Skein::Client::Worker

Attributes

operations[R]

Properties ===========================================================

queue_name[R]

Public Class Methods

new(queue_name, exchange_name: nil, connection: nil, context: nil, concurrency: nil, durable: nil, auto_delete: false, routing_key: nil, ident: nil) click to toggle source

Instance Methods =====================================================

Calls superclass method Skein::Connected::new
# File lib/skein/client/worker.rb, line 21
def initialize(queue_name, exchange_name: nil, connection: nil, context: nil, concurrency: nil, durable: nil, auto_delete: false, routing_key: nil, ident: nil)
  super(connection: connection, context: context, ident: ident)

  @exchange_name = exchange_name
  @queue_name = queue_name.dup.freeze
  @routing_key = routing_key
  @durable = durable.nil? ? !!@queue_name.match(/\S/) : !!durable
  @operations = [ ]
  @auto_delete = auto_delete

  concurrency &&= concurrency.to_i
  concurrency ||= 1

  concurrency.times do |i|
    with_channel_in_thread(name: 'worker-%d' % i) do |channel, meta|
      self.establish_subscriber!(channel, meta)
    end
  end

  self.after_initialize rescue nil
end

Public Instance Methods

after_close() click to toggle source

Declared in derived classes. Will be called immediately after the worker has been closed down.

# File lib/skein/client/worker.rb, line 82
def after_close
end
after_exception(e) click to toggle source

Declared in derived classes. Will be called immediately after an exception has occurred when processing a request. Any excepions generated in this method call are suppressed and ignored to avoid being caught in a loop.

# File lib/skein/client/worker.rb, line 72
def after_exception(e)
end
after_execution(method_name) click to toggle source

Define in derived classes. Will be called immediately after executing the worker method.

# File lib/skein/client/worker.rb, line 60
def after_execution(method_name)
end
after_initialize() click to toggle source

Define in derived classes to implement any desired customization to be performed after initialization.

# File lib/skein/client/worker.rb, line 45
def after_initialize
end
after_request() click to toggle source

Define in derived classes. Will be called immediately after handling an RPC call even if an error has occured.

# File lib/skein/client/worker.rb, line 65
def after_request
end
async?() click to toggle source
# File lib/skein/client/worker.rb, line 122
def async?
  # Define this method as `true` in any subclass that requires async
  # callback-style delegation.
  false
end
before_close() click to toggle source

Declared in derived classes. Will be called immediately before the worker is closed down.

# File lib/skein/client/worker.rb, line 77
def before_close
end
before_execution(method_name) click to toggle source

Define in derived classes. Will be called immediately prior to executing the worker method.

# File lib/skein/client/worker.rb, line 55
def before_execution(method_name)
end
before_request() click to toggle source

Define in derived classes. Willl be called immediately after a request is received but before any processing occurs.

# File lib/skein/client/worker.rb, line 50
def before_request
end
close(delete_queue: false) click to toggle source
Calls superclass method Skein::Connected#close
# File lib/skein/client/worker.rb, line 85
def close(delete_queue: false)
  self.before_close

  @operations.each do |meta|
    subscriber = meta[:subscriber]

    if (subscriber.respond_to?(:gracefully_shut_down))
      subscriber.gracefully_shut_down
    end

    thread = meta[:thread]

    thread.respond_to?(:terminate!) ? thread.terminate! : thread.kill
    thread.join
  end

  if (delete_queue)
    # The connection may have been terminated, so reconnect and delete
    # the queue if necessary.
    channel = @connection.create_channel

    channel.queue(@queue_name, durable: @durable).delete

    channel.close
  end

  super()

  self.after_close
end
join() click to toggle source
# File lib/skein/client/worker.rb, line 116
def join
  @operations.each do |meta|
    meta[:thread].join
  end
end
reject!() click to toggle source

Signal that the current operation should be abandoned and not retried.

# File lib/skein/client/worker.rb, line 129
def reject!
  raise RejectMessage
end
retry!() click to toggle source

Signal that the current operation should be abandoned and retried later.

# File lib/skein/client/worker.rb, line 134
def retry!
  raise RetryMessage
end

Protected Instance Methods

establish_queue!(channel) click to toggle source
# File lib/skein/client/worker.rb, line 164
def establish_queue!(channel)
  queue = channel.queue(
    @queue_name,
    durable: @durable,
    auto_delete: @auto_delete
  )

  if (@exchange_name&.match(/\S/))
    exchange = channel.direct(@exchange_name, durable: true)

    queue.bind(exchange, routing_key: @routing_key || @queue_name)
  end

  queue
end
establish_subscriber!(channel, meta) click to toggle source
# File lib/skein/client/worker.rb, line 180
def establish_subscriber!(channel, meta)
  queue = self.establish_queue!(channel)

  meta[:subscriber] = Skein::Adapter.subscribe(queue) do |payload, delivery_tag, reply_to|
    if (ENV['SKEIN_DEBUG_JSON'] and reply_to)
      $stdout.puts('%s -> %s' % [ reply_to, payload ])
    end

    self.context.trap do
      self.before_request rescue nil

      handler.handle(payload, meta[:metrics], meta[:state]) do |reply_json|
        if (ENV['SKEIN_DEBUG_JSON'] and reply_to)
          $stdout.puts('%s <- %s' % [ reply_to, reply_json ])
        end

        # Secondary (inner) trap required since some handlers are async
        self.context.trap do
          # NOTE: begin...end necessary for rescue in Ruby versions below 2.4
          begin
            channel.acknowledge(delivery_tag, true)

            if (reply_to)
              channel.default_exchange.publish(
                reply_json,
                routing_key: reply_to,
                content_type: 'application/json'
              )
            end

          rescue RejectMessage
            # Reject the message
            channel.reject(delivery_tag, false)
          rescue RetryMessage
            # Reject and requeue the message
            channel.reject(delivery_tag, true)
          rescue => e
            self.after_exception(e) rescue nil
            raise e
          ensure
            self.after_request rescue nil
          end
        end
      end
    end
  end
end
handler() click to toggle source
# File lib/skein/client/worker.rb, line 258
def handler
  @handler ||= Skein::Handler.for(self)
end
in_thread() { || ... } click to toggle source
# File lib/skein/client/worker.rb, line 154
def in_thread
  @operations << {
    thread: Thread.new do
      Thread.abort_on_exception = true

      yield
    end
  }
end
metrics_tracker() click to toggle source
# File lib/skein/client/worker.rb, line 147
def metrics_tracker
  Hash.new(0).merge(
    time: 0.0,
    errors: Hash.new(0)
  )
end
state_tracker() click to toggle source
# File lib/skein/client/worker.rb, line 139
def state_tracker
  {
    method: nil,
    started: nil,
    finished: nil
  }
end
with_channel_in_thread(recover: true, name: nil) { |channel, meta| ... } click to toggle source
# File lib/skein/client/worker.rb, line 228
def with_channel_in_thread(recover: true, name: nil)
  meta = {
    metrics: metrics_tracker,
    state: state_tracker
  }

  @operations << meta

  meta[:thread] = Thread.new do
    Thread.abort_on_exception = true
    Thread.current.name = name

    begin
      channel = meta[:channel] = self.create_channel

      yield(channel, meta)

      channel.close rescue nil

      redo if (recover)

    ensure
      # NOTE: The `.close` call may fail for a variety of reasons, but the
      #       important thing here is an attempt is made, regardless of
      #       outcome.
      channel&.close rescue nil
    end
  end
end