class Krakow::Consumer

Consume messages from a server

Consume messages from a server

Attributes

connections[R]
discovery[R]
distribution[R]
queue[R]

Public Class Methods

new(args={}) click to toggle source

@!endgroup

Calls superclass method
# File lib/krakow/consumer.rb, line 41
def initialize(args={})
  super
  arguments[:connection_options] = {:features => {}, :config => {}}.merge(
    arguments[:connection_options] || {}
  )
  @connections = {}
  @queue = Queue.new(
    current_actor,
    :removal_callback => :remove_message
  )
  @distribution = Distribution::Default.new(
    :max_in_flight => max_in_flight,
    :backoff_interval => backoff_interval,
    :consumer => current_actor
  )
  if(nsqlookupd)
    debug "Connections will be established via lookup #{nsqlookupd.inspect}"
    @discovery = Discovery.new(:nsqlookupd => nsqlookupd)
    discover
  elsif(host && port)
    direct_connect
  else
    abort Error::ConfigurationError.new('No connection information provided!')
  end
end

Public Instance Methods

build_connection(host, port, queue) click to toggle source

Build a new [Krakow::Connection]

@param host [String] remote host @param port [String, Integer] remote port @param queue [Queue] queue for messages @return [Krakow::Connection, nil] new connection or nil

# File lib/krakow/consumer.rb, line 131
def build_connection(host, port, queue)
  begin
    connection = Connection.new(
      :host => host,
      :port => port,
      :queue => queue,
      :topic => topic,
      :channel => channel,
      :notifier => notifier,
      :features => connection_options[:features],
      :features_args => connection_options[:config],
      :callbacks => {
        :handle => {
          :actor => current_actor,
          :method => :process_message
        }
      }
    )
    queue.register_connection(connection)
    connection
  rescue => e
    error "Failed to build connection (host: #{host} port: #{port} queue: #{queue}) - #{e.class}: #{e}"
    debug "#{e.class}: #{e}\n#{e.backtrace.join("\n")}"
    nil
  end
end
confirm(message_id) click to toggle source

Confirm message has been processed

@param message_id [String, Krakow::FrameType::Message] @return [TrueClass] @raise [KeyError] connection not found

# File lib/krakow/consumer.rb, line 286
def confirm(message_id)
  message_id = message_id.message_id if message_id.respond_to?(:message_id)
  begin
    begin
      connection = distribution.in_flight_lookup(message_id)
      connection.transmit(Command::Fin.new(:message_id => message_id))
      distribution.success(connection.identifier)
    rescue => e
      abort e
    end
    true
  rescue KeyError => e
    error "Message confirmation failed: #{e}"
    abort e
  rescue Error::LookupFailed => e
    error "Lookup of message for confirmation failed! <Message ID: #{message_id} - Error: #{e}>"
    abort e
  rescue Error::ConnectionUnavailable => e
    abort e
  rescue Celluloid::DeadActorError
    abort Error::ConnectionUnavailable.new
  ensure
    con = distribution.unregister_message(message_id)
    update_ready!(con) if con
  end
end
Also aliased as: finish
connected?() click to toggle source

@return [TrueClass, FalseClass] currently connected to at least

one nsqd
# File lib/krakow/consumer.rb, line 69
def connected?
  !!connections.values.any? do |con|
    begin
      con.connected?
    rescue Celluloid::DeadActorError
      false
    end
  end
end
connection(key) click to toggle source

Returns [Krakow::Connection] associated to key

@param key [Object] identifier @return [Krakow::Connection] associated connection

# File lib/krakow/consumer.rb, line 98
def connection(key)
  @connections[key]
end
connection_failure(actor, reason) click to toggle source

Remove connection references when connection is terminated

@param actor [Object] terminated actor @param reason [Exception] reason for termination @return [nil]

# File lib/krakow/consumer.rb, line 252
def connection_failure(actor, reason)
  if(reason && key = connections.key(actor))
    warn "Connection failure detected. Removing connection: #{key} - #{reason}"
    connections.delete(key)
    begin
      distribution.remove_connection(key)
    rescue Error::ConnectionUnavailable, Error::ConnectionFailure
      warn 'Caught connection unavailability'
    end
    queue.deregister_connection(key)
    distribution.redistribute!
    direct_connect unless discovery
  end
  nil
end
consumer_cleanup() click to toggle source

Instance destructor

@return [nil]

# File lib/krakow/consumer.rb, line 110
def consumer_cleanup
  debug 'Tearing down consumer'
  if(distribution && distribution.alive?)
    distribution.terminate
  end
  if(queue && queue.alive?)
    queue.terminate
  end
  connections.values.each do |con|
    con.terminate if con.alive?
  end
  info 'Consumer torn down'
  nil
end
direct_connect() click to toggle source

Connect to nsqd instance directly

@return [Connection]

# File lib/krakow/consumer.rb, line 82
def direct_connect
  debug "Connection will be established via direct connection #{host}:#{port}"
  connection = build_connection(host, port, queue)
  if(register(connection))
    info "Registered new connection #{connection}"
    distribution.redistribute!
  else
    abort Error::ConnectionFailure.new("Failed to establish subscription at provided end point (#{host}:#{port}")
  end
  connection
end
discover() click to toggle source

Start the discovery interval lookup

@return [nil]

# File lib/krakow/consumer.rb, line 223
def discover
  init!
  after(discovery_interval + (discovery_jitter * rand)){ discover }
end
finish(message_id)
Alias for: confirm
init!() click to toggle source

Initialize the consumer by starting lookup and adding connections

@return [nil]

# File lib/krakow/consumer.rb, line 201
def init!
  debug 'Running consumer `init!` connection builds'
  found = discovery.lookup(topic)
  debug "Discovery results: #{found.inspect}"
  connection = nil
  found.each do |node|
    debug "Processing discovery result: #{node.inspect}"
    key = Connection.identifier(node[:broadcast_address], node[:tcp_port], topic, channel)
    unless(connections[key])
      connection = build_connection(node[:broadcast_address], node[:tcp_port], queue)
      info "Registered new connection #{connection}" if register(connection)
    else
      debug "Discovery result already registered: #{node.inspect}"
    end
  end
  distribution.redistribute! if connection
  nil
end
process_message(message, connection) click to toggle source

Process a given message if required

@param message [Krakow::FrameType] @param connection [Krakow::Connection] @return [Krakow::FrameType] @note If we receive a message that is already in flight, attempt

to scrub message from wait queue. If message is found, retry
distribution registration. If message is not found, assume it
is currently being processed and do not allow new message to
be queued
# File lib/krakow/consumer.rb, line 168
def process_message(message, connection)
  discard = false
  if(message.is_a?(FrameType::Message))
    message.origin = current_actor
    message.connection = connection
    retried = false
    begin
      distribution.register_message(message, connection.identifier)
    rescue KeyError => e
      if(!retried && queue.scrub_duplicate_message(message))
        retried = true
        retry
      else
        error "Received message is currently in flight and not in wait queue. Discarding! (#{message})"
        discard = true
      end
    end
  end
  discard ? nil : message
end
register(connection) click to toggle source

Register connection with distribution

@param connection [Krakow::Connection] @return [TrueClass, FalseClass] true if subscription was successful

# File lib/krakow/consumer.rb, line 232
def register(connection)
  begin
    connection.init!
    connection.transmit(Command::Sub.new(:topic_name => topic, :channel_name => channel))
    self.link connection
    connections[connection.identifier] = connection
    distribution.add_connection(connection)
    true
  rescue Error::BadResponse => e
    debug "Failed to establish connection: #{e.result ? e.result.error : '<No Response!>'}"
    connection.terminate
    false
  end
end
remove_message(messages) click to toggle source

Remove message

@param messages [Array<FrameType::Message>] @return [NilClass] @note used mainly for queue callback

# File lib/krakow/consumer.rb, line 273
def remove_message(messages)
  [messages].flatten.compact.each do |msg|
    distribution.unregister_message(msg.message_id)
    update_ready!(msg.connection)
  end
  nil
end
requeue(message_id, timeout=0) click to toggle source

Requeue message (generally due to processing failure)

@param message_id [String, Krakow::FrameType::Message] @param timeout [Numeric] @return [TrueClass]

# File lib/krakow/consumer.rb, line 319
def requeue(message_id, timeout=0)
  message_id = message_id.message_id if message_id.respond_to?(:message_id)
  distribution.in_flight_lookup(message_id) do |connection|
    distribution.unregister_message(message_id)
    connection.transmit(
      Command::Req.new(
        :message_id => message_id,
        :timeout => timeout
      )
    )
    distribution.failure(connection.identifier)
    update_ready!(connection)
  end
  true
end
to_s() click to toggle source

@return [String] stringify object

# File lib/krakow/consumer.rb, line 103
def to_s
  "<#{self.class.name}:#{object_id} T:#{topic} C:#{channel}>"
end
touch(message_id) click to toggle source

Touch message (to extend timeout)

@param message_id [String, Krakow::FrameType::Message] @return [TrueClass]

# File lib/krakow/consumer.rb, line 339
def touch(message_id)
  message_id = message_id.message_id if message_id.respond_to?(:message_id)
  begin
    distribution.in_flight_lookup(message_id) do |connection|
      connection.transmit(
        Command::Touch.new(:message_id => message_id)
      )
    end
    true
  rescue Error::LookupFailed => e
    error "Lookup of message for touch failed! <Message ID: #{message_id} - Error: #{e}>"
    abort e
  end
end
update_ready!(connection) click to toggle source

Send RDY for connection based on distribution rules

@param connection [Krakow::Connection] @return [nil]

# File lib/krakow/consumer.rb, line 193
def update_ready!(connection)
  distribution.set_ready_for(connection)
  nil
end