class Fleck::Consumer

Attributes

actions_map[RW]
configs[RW]
consumers[RW]
initialize_block[RW]
logger[RW]

Public Class Methods

actions(*args) click to toggle source
# File lib/fleck/consumer.rb, line 20
def self.actions(*args)
  args.each do |item|
    case item
    when Hash
      item.each do |k,v|
        self.register_action(k.to_s, v.to_s)
      end
    else
      self.register_action(item.to_s, item.to_s)
    end
  end
end
autostart(subclass) click to toggle source
# File lib/fleck/consumer.rb, line 60
def self.autostart(subclass)
  # Use TracePoint to autostart the consumer when ready
  trace = TracePoint.new(:end) do |tp|
    if tp.self == subclass
      # disable tracing when we reach the end of the subclass
      trace.disable
      # create a new instance of the subclass, in order to start the consumer
      [subclass.configs[:concurrency].to_i, 1].max.times do |i|
        subclass.consumers << subclass.new(i)
      end
    end
  end
  trace.enable
end
configure(opts = {}) click to toggle source
# File lib/fleck/consumer.rb, line 15
def self.configure(opts = {})
  self.configs.merge!(opts)
  logger.debug "Consumer configurations updated."
end
inherited(subclass) click to toggle source
Calls superclass method
# File lib/fleck/consumer.rb, line 8
def self.inherited(subclass)
  super
  init_consumer(subclass)
  autostart(subclass)
  Fleck.register_consumer(subclass)
end
init_consumer(subclass) click to toggle source
# File lib/fleck/consumer.rb, line 48
def self.init_consumer(subclass)
  subclass.logger          = Fleck.logger.clone
  subclass.logger.progname = subclass.to_s

  subclass.logger.debug "Setting defaults for #{subclass.to_s.color(:yellow)} consumer"

  subclass.configs     = Fleck.config.default_options
  subclass.configs[:autostart] = true if subclass.configs[:autostart].nil?
  subclass.actions_map = {}
  subclass.consumers   = []
end
initialize(&block) click to toggle source
# File lib/fleck/consumer.rb, line 38
def self.initialize(&block)
  self.initialize_block = block
end
new(thread_id = nil) click to toggle source
# File lib/fleck/consumer.rb, line 75
def initialize(thread_id = nil)
  @__thread_id    = thread_id
  @__connection   = nil
  @__consumer_tag = nil
  @__request      = nil
  @__response     = nil
  @__lock         = Mutex.new
  @__lounger      = ConditionVariable.new

  @__host          = configs[:host]
  @__port          = configs[:port]
  @__user          = configs[:user]          || 'guest'
  @__pass          = configs[:password]      || configs[:pass]
  @__vhost         = configs[:vhost]         || "/"
  @__exchange_type = configs[:exchange_type] || :direct
  @__exchange_name = configs[:exchange_name] || ""
  @__queue_name    = configs[:queue]
  @__autostart     = configs[:autostart]
  @__prefetch      = (configs[:prefetch]      || 100).to_i
  @__mandatory     = !!configs[:mandatory]

  if self.class.initialize_block
    self.instance_eval(&self.class.initialize_block)
  end

  logger.info "Launching #{self.class.to_s.color(:yellow)} consumer ..."

  start if @__autostart

  at_exit do
    terminate
  end
end
register_action(action, method_name) click to toggle source
# File lib/fleck/consumer.rb, line 33
def self.register_action(action, method_name)
  raise ArgumentError.new("Cannot use `:#{method_name}` method as an action, because it is reserved for Fleck::Consumer internal stuff!") if Fleck::Consumer.instance_methods.include?(method_name.to_s.to_sym)
  self.actions_map[action.to_s] = method_name.to_s
end
start(block: false) click to toggle source
# File lib/fleck/consumer.rb, line 42
def self.start(block: false)
  self.consumers.each do |consumer|
    consumer.start(block: block)
  end
end

Public Instance Methods

actions() click to toggle source
# File lib/fleck/consumer.rb, line 146
def actions
  @actions ||= self.class.actions_map
end
channel() click to toggle source
# File lib/fleck/consumer.rb, line 154
def channel
  return @__channel
end
configs() click to toggle source
# File lib/fleck/consumer.rb, line 142
def configs
  @configs ||= self.class.configs
end
connection() click to toggle source
# File lib/fleck/consumer.rb, line 150
def connection
  return @__connection
end
deprecated!() click to toggle source
# File lib/fleck/consumer.rb, line 193
def deprecated!
  logger.warn("DEPRECATION: the method `#{caller_locations(1,1)[0].label}` is going to be deprecated. Please, consider using a newer version of this method.")
  @__response.deprecated! if @__response
end
exchange() click to toggle source
# File lib/fleck/consumer.rb, line 162
def exchange
  return @__exchange
end
logger() click to toggle source
# File lib/fleck/consumer.rb, line 134
def logger
  return @logger if @logger
  @logger = self.class.logger.clone
  @logger.progname = "#{self.class.name}" + (configs[:concurrency].to_i <= 1 ? "" : "[#{@__thread_id}]")

  @logger
end
on_message(request, response) click to toggle source
# File lib/fleck/consumer.rb, line 116
def on_message(request, response)
  method_name = actions[request.action.to_s]
  if method_name
    self.send(method_name)
  else
    response.not_found
  end
end
pause() click to toggle source
# File lib/fleck/consumer.rb, line 174
def pause
  if subscription
    cancel_ok = subscription.cancel
    @__consumer_tag = cancel_ok.consumer_tag
  end
end
publisher() click to toggle source
# File lib/fleck/consumer.rb, line 166
def publisher
  return @__publisher
end
queue() click to toggle source
# File lib/fleck/consumer.rb, line 158
def queue
  return @__queue
end
request() click to toggle source
# File lib/fleck/consumer.rb, line 185
def request
  @__request
end
response() click to toggle source
# File lib/fleck/consumer.rb, line 189
def response
  @__response
end
resume() click to toggle source
# File lib/fleck/consumer.rb, line 181
def resume
  subscribe!
end
start(block: false) click to toggle source
# File lib/fleck/consumer.rb, line 109
def start(block: false)
  connect!
  create_channel!
  subscribe!
  @__lock.synchronize{ @__lounger.wait(@__lock) } if block
end
subscription() click to toggle source
# File lib/fleck/consumer.rb, line 170
def subscription
  return @__subscription
end
terminate() click to toggle source
# File lib/fleck/consumer.rb, line 125
def terminate
  @__lock.synchronize { @__lounger.signal }
  pause
  unless channel.nil? || channel.closed?
    channel.close
    logger.info "Consumer successfully terminated."
  end
end

Protected Instance Methods

connect!() click to toggle source
# File lib/fleck/consumer.rb, line 200
def connect!
  @__connection = Fleck.connection(host: @__host, port: @__port, user: @__user, pass: @__pass, vhost: @__vhost)
end
create_channel!() click to toggle source
# File lib/fleck/consumer.rb, line 204
def create_channel!
  if @__channel && !@__channel.closed?
    logger.info("Closing the opened channel...")
    @__channel.close
  end

  logger.debug "Creating a new channel for #{self.class.to_s.color(:yellow)} consumer"
  @__channel  = @__connection.create_channel
  @__channel.prefetch(@__prefetch) # consume messages in batches
  @__publisher = Bunny::Exchange.new(@__connection.create_channel, :direct, 'fleck')
  if @__exchange_type == :direct && @__exchange_name == ""
    @__queue = @__channel.queue(@__queue_name, auto_delete: false)
  else
    @__exchange  = Bunny::Exchange.new(@__channel, @__exchange_type, @__exchange_name)
    @__queue = @__channel.queue("", exclusive: true, auto_delete: true).bind(@__exchange, routing_key: @__queue_name)
  end
end
restart!() click to toggle source
# File lib/fleck/consumer.rb, line 279
def restart!
  create_channel!
  subscribe!
end
subscribe!() click to toggle source
# File lib/fleck/consumer.rb, line 222
def subscribe!
  logger.debug "Consuming from queue: #{@__queue_name.color(:green)}"

  options = { manual_ack: true }
  options[:consumer_tag] = @__consumer_tag if @__consumer_tag

  @__subscription = @__queue.subscribe(options) do |delivery_info, metadata, payload|
    started_at = Time.now.to_f
    @__response = Fleck::Consumer::Response.new(metadata.correlation_id)
    begin
      @__request  = Fleck::Consumer::Request.new(metadata, payload, delivery_info)
      if @__request.errors.empty?
        on_message(@__request, @__response)
      else
        @__response.status = @__request.status
        @__response.errors += @__request.errors
      end
    rescue => e
      logger.error e.inspect + "\n" + e.backtrace.join("\n")
      @__response.status = 500
      @__response.errors << 'Internal Server Error'
    end

    if @__response.rejected?
      @__channel.reject(delivery_info.delivery_tag, @__response.requeue?)
    else
      logger.debug "Sending response: #{@__response}"
      if @__channel.closed?
        logger.warn "Channel already closed! The response #{metadata.correlation_id} is going to be dropped."
      else
        @__publisher.publish(@__response.to_json, routing_key: metadata.reply_to, correlation_id: metadata.correlation_id, mandatory: @__mandatory)
        @__channel.ack(delivery_info.delivery_tag)
      end
    end

    exec_time  = ((Time.now.to_f - started_at) * 1000).round(2)
    ex_type    = @__exchange_type.to_s[0].upcase
    ex_name    = @__exchange_name.to_s == "" ? "".inspect : @__exchange_name
    status     = @__response.status
    status     = 406 if @__response.rejected?
    status     = 503 if @__channel.closed?

    message    = "#{@__request.ip} #{metadata[:app_id]} => "
    message   += "(#{@__exchange_name.to_s.inspect}|#{ex_type}|#{@__queue_name}) "
    message   += "##{@__request.id} \"#{@__request.action} /#{@__request.version || 'v1'}\" #{status} "
    message   += "(#{exec_time}ms) #{'DEPRECATED!' if @__response.deprecated?}"

    if status >= 500
      logger.error message
    elsif status >= 400 || @__response.deprecated?
      logger.warn message
    else
      logger.info message
    end
  end
end