class Legion::Extensions::Actors::Subscription
Public Class Methods
new()
click to toggle source
# File lib/legion/extensions/actors/subscription.rb, line 8 def initialize require class_path @queue = queue.new async.subscribe end
Public Instance Methods
action(payload)
click to toggle source
# File lib/legion/extensions/actors/subscription.rb, line 19 def action(payload) Legion::Logging.warn "Payload from default action was #{payload}" end
cancel()
click to toggle source
# File lib/legion/extensions/actors/subscription.rb, line 14 def cancel Legion::Logging.debug "Closing subscription to #{@queue.name}" @queue.close end
subscribe(manual_ack = true)
click to toggle source
# File lib/legion/extensions/actors/subscription.rb, line 23 def subscribe(manual_ack = true) require 'legion/extensions/tasker/runners/task_updater' @queue.subscribe(manual_ack: manual_ack) do |delivery_info, _metadata, payload| begin message = Legion::JSON.load(payload) Legion::Runner::Runner.new(runner_class, runner_method, message) @queue.acknowledge(delivery_info.delivery_tag) if manual_ack rescue StandardError => ex Legion::Logging.error(ex.message) Legion::Logging.error(ex.backtrace) @queue.reject(delivery_info.delivery_tag) if manual_ack end end end