module Rabbitek::Consumer::ClassMethods

Attributes

opts[RW]
rabbit_options_hash[RW]

Public Instance Methods

perform_async(payload, opts: {}, channel: nil) click to toggle source
# File lib/rabbitek/server/consumer.rb, line 67
def perform_async(payload, opts: {}, channel: nil)
  publisher = Publisher.new(
    rabbit_options_hash[:bind_exchange],
    exchange_type: rabbit_options_hash[:bind_exchange_type],
    channel: channel
  )
  publish_with_publisher(publisher, payload, opts)
ensure
  publisher&.close unless channel
end
perform_at(at_time, payload, opts: {}, channel: nil) click to toggle source
# File lib/rabbitek/server/consumer.rb, line 92
def perform_at(at_time, payload, opts: {}, channel: nil)
  perform_in(at_time - Time.current, payload, opts: opts, channel: channel)
end
perform_in(time, payload, opts: {}, channel: nil) click to toggle source
# File lib/rabbitek/server/consumer.rb, line 78
def perform_in(time, payload, opts: {}, channel: nil)
  publisher = Publisher.new(
    Utils::RabbitObjectNames.retry_or_delayed_bind_exchange(rabbit_options_hash[:bind_exchange]),
    exchange_type: :direct,
    channel: channel
  )
  publish_with_publisher(publisher, payload, {
    expiration: time.to_i * 1000, # in milliseconds
    headers: { 'x-dead-letter-routing-key': to_s }
  }.merge(opts))
ensure
  publisher&.close unless channel
end
publish_with_publisher(publisher, payload, opts) click to toggle source
# File lib/rabbitek/server/consumer.rb, line 96
def publish_with_publisher(publisher, payload, opts)
  publisher.publish(payload, { routing_key: to_s }.merge(opts))
end
rabbit_options(opts) click to toggle source
# File lib/rabbitek/server/consumer.rb, line 62
def rabbit_options(opts)
  self.rabbit_options_hash = default_rabbit_options(opts).with_indifferent_access.merge(opts)
  self.opts = opts
end

Private Instance Methods

default_rabbit_options(opts) click to toggle source
# File lib/rabbitek/server/consumer.rb, line 102
def default_rabbit_options(opts)
  YAML.load_file(opts[:config_file]).with_indifferent_access[:parameters]
end