class Legion::Extensions::Actors::Subscription

Public Class Methods

new() click to toggle source
# File lib/legion/extensions/actors/subscription.rb, line 8
def initialize
  require class_path
  @queue = queue.new
  async.subscribe
end

Public Instance Methods

action(payload) click to toggle source
# File lib/legion/extensions/actors/subscription.rb, line 19
def action(payload)
  Legion::Logging.warn "Payload from default action was #{payload}"
end
cancel() click to toggle source
# File lib/legion/extensions/actors/subscription.rb, line 14
def cancel
  Legion::Logging.debug "Closing subscription to #{@queue.name}"
  @queue.close
end
subscribe(manual_ack = true) click to toggle source
# File lib/legion/extensions/actors/subscription.rb, line 23
def subscribe(manual_ack = true)
  require 'legion/extensions/tasker/runners/task_updater'
  @queue.subscribe(manual_ack: manual_ack) do |delivery_info, _metadata, payload|
    begin
      message = Legion::JSON.load(payload)
      Legion::Runner::Runner.new(runner_class, runner_method, message)
      @queue.acknowledge(delivery_info.delivery_tag) if manual_ack
    rescue StandardError => ex
      Legion::Logging.error(ex.message)
      Legion::Logging.error(ex.backtrace)
      @queue.reject(delivery_info.delivery_tag) if manual_ack
    end
  end
end