module Rabbitek::Consumer::ClassMethods
Attributes
opts[RW]
rabbit_options_hash[RW]
Public Instance Methods
perform_async(payload, opts: {}, channel: nil)
click to toggle source
# File lib/rabbitek/server/consumer.rb, line 67 def perform_async(payload, opts: {}, channel: nil) publisher = Publisher.new( rabbit_options_hash[:bind_exchange], exchange_type: rabbit_options_hash[:bind_exchange_type], channel: channel ) publish_with_publisher(publisher, payload, opts) ensure publisher&.close unless channel end
perform_at(at_time, payload, opts: {}, channel: nil)
click to toggle source
# File lib/rabbitek/server/consumer.rb, line 92 def perform_at(at_time, payload, opts: {}, channel: nil) perform_in(at_time - Time.current, payload, opts: opts, channel: channel) end
perform_in(time, payload, opts: {}, channel: nil)
click to toggle source
# File lib/rabbitek/server/consumer.rb, line 78 def perform_in(time, payload, opts: {}, channel: nil) publisher = Publisher.new( Utils::RabbitObjectNames.retry_or_delayed_bind_exchange(rabbit_options_hash[:bind_exchange]), exchange_type: :direct, channel: channel ) publish_with_publisher(publisher, payload, { expiration: time.to_i * 1000, # in milliseconds headers: { 'x-dead-letter-routing-key': to_s } }.merge(opts)) ensure publisher&.close unless channel end
publish_with_publisher(publisher, payload, opts)
click to toggle source
# File lib/rabbitek/server/consumer.rb, line 96 def publish_with_publisher(publisher, payload, opts) publisher.publish(payload, { routing_key: to_s }.merge(opts)) end
rabbit_options(opts)
click to toggle source
# File lib/rabbitek/server/consumer.rb, line 62 def rabbit_options(opts) self.rabbit_options_hash = default_rabbit_options(opts).with_indifferent_access.merge(opts) self.opts = opts end
Private Instance Methods
default_rabbit_options(opts)
click to toggle source
# File lib/rabbitek/server/consumer.rb, line 102 def default_rabbit_options(opts) YAML.load_file(opts[:config_file]).with_indifferent_access[:parameters] end