class Legion::Extensions::Actors::Subscription

Public Class Methods

new(**_options) click to toggle source
Calls superclass method
# File lib/legion/extensions/actors/subscription.rb, line 12
def initialize(**_options)
  super()
  @queue = queue.new
  @queue.channel.prefetch(prefetch) if defined? prefetch
rescue StandardError => e
  log.fatal e.message
  log.fatal e.backtrace
end

Public Instance Methods

block() click to toggle source
# File lib/legion/extensions/actors/subscription.rb, line 48
def block
  false
end
cancel() click to toggle source
# File lib/legion/extensions/actors/subscription.rb, line 39
def cancel
  return true unless @queue.channel.active

  log.debug "Closing subscription to #{@queue.name}"
  @consumer.cancel
  @queue.channel.close
  true
end
consumers() click to toggle source
# File lib/legion/extensions/actors/subscription.rb, line 52
def consumers
  1
end
create_queue() click to toggle source
# File lib/legion/extensions/actors/subscription.rb, line 21
def create_queue
  queues.const_set(actor_const, Class.new(Legion::Transport::Queue))
  exchange_object = default_exchange.new
  queue_object = Kernel.const_get(queue_string).new

  queue_object.bind(exchange_object, routing_key: actor_name)
  queue_object.bind(exchange_object, routing_key: "#{lex_name}.#{actor_name}.#")
end
delay_start() click to toggle source
# File lib/legion/extensions/actors/subscription.rb, line 60
def delay_start
  0
end
find_function(message = {}) click to toggle source
# File lib/legion/extensions/actors/subscription.rb, line 96
def find_function(message = {})
  return runner_function if actor_class.instance_methods(false).include?(:runner_function)
  return function if actor_class.instance_methods(false).include?(:function)
  return action if actor_class.instance_methods(false).include?(:action)
  return message[:function] if message.key? :function

  function
end
include_metadata_in_message?() click to toggle source
# File lib/legion/extensions/actors/subscription.rb, line 64
def include_metadata_in_message?
  true
end
manual_ack() click to toggle source
# File lib/legion/extensions/actors/subscription.rb, line 56
def manual_ack
  true
end
process_message(message, metadata, delivery_info) click to toggle source
# File lib/legion/extensions/actors/subscription.rb, line 68
def process_message(message, metadata, delivery_info)
  payload = if metadata.content_encoding && metadata.content_encoding == 'encrypted/cs'
              Legion::Crypt.decrypt(message, metadata.headers['iv'])
            elsif metadata.content_encoding && metadata.content_encoding == 'encrypted/pk'
              Legion::Crypt.decrypt_from_keypair(metadata.headers[:public_key], message)
            else
              message
            end

  message = if metadata.content_type == 'application/json'
              Legion::JSON.load(payload)
            else
              { value: payload }
            end
  if include_metadata_in_message?
    message = message.merge(metadata.headers.transform_keys(&:to_sym)) unless metadata.headers.nil?
    message[:routing_key] = if Legion::Transport::TYPE == 'march_hare'
                              metadata.routing_key
                            else
                              delivery_info[:routing_key]
                            end
  end

  message[:timestamp] = (message[:timestamp_in_ms] / 1000).round if message.key?(:timestamp_in_ms) && !message.key?(:timestamp)
  message[:datetime] = Time.at(message[:timestamp].to_i).to_datetime.to_s if message.key?(:timestamp)
  message
end
queue() click to toggle source
# File lib/legion/extensions/actors/subscription.rb, line 30
def queue
  create_queue unless queues.const_defined?(actor_const)
  Kernel.const_get queue_string
end
queue_string() click to toggle source
# File lib/legion/extensions/actors/subscription.rb, line 35
def queue_string
  @queue_string ||= "#{queues}::#{actor_const}"
end
subscribe() click to toggle source
# File lib/legion/extensions/actors/subscription.rb, line 105
def subscribe
  sleep(delay_start)
  consumer_tag = "#{Legion::Settings[:client][:name]}_#{lex_name}_#{runner_name}_#{Thread.current.object_id}"
  on_cancellation = block { cancel }

  @consumer = @queue.subscribe(manual_ack: manual_ack, block: false, consumer_tag: consumer_tag, on_cancellation: on_cancellation) do |*rmq_message|
    payload = rmq_message.pop
    metadata = rmq_message.last
    delivery_info = rmq_message.first

    message = process_message(payload, metadata, delivery_info)
    if use_runner?
      Legion::Runner.run(**message,
                         runner_class:  runner_class,
                         function:      find_function(message),
                         check_subtask: check_subtask?,
                         generate_task: generate_task?)
    else
      runner_class.send(find_function(message), **message)
    end
    @queue.acknowledge(delivery_info.delivery_tag) if manual_ack

    cancel if Legion::Settings[:client][:shutting_down]
  rescue StandardError => e
    Legion::Logging.error e.message
    Legion::Logging.error e.backtrace
    Legion::Logging.error message
    Legion::Logging.error function
    @queue.reject(delivery_info.delivery_tag) if manual_ack
  end
end