class Krakow::Producer

TCP based producer

Attributes

connection[R]
notifier[R]

Public Class Methods

new(args={}) click to toggle source

@!endgroup

Calls superclass method
# File lib/krakow/producer.rb, line 41
def initialize(args={})
  super
  arguments[:connection_options] = {:features => {}, :config => {}, :options => {}}.merge(
    arguments.fetch(:connection_options, {})
  )
  connect
end

Public Instance Methods

connect() click to toggle source

Establish connection to configured `host` and `port`

@return nil

# File lib/krakow/producer.rb, line 52
def connect
  @connecting = true
  info "Establishing connection to: #{host}:#{port}"
  begin
    con_args = connection_options[:options].dup.tap do |args|
      args[:host] = host
      args[:port] = port
      if(connection_options[:features])
        args[:features] = connection_options[:features]
      end
      if(connection_options[:config])
        args[:features_args] = connection_options[:config]
      end
    end
    @connection = Connection.new(con_args)
    @connection.init!
    self.link @connection
    info "Connection established: #{@connection}"
    nil
  rescue => e
    abort e
  end
  @connecting = false
end
connected?() click to toggle source

@return [TrueClass, FalseClass] currently connected to server

# File lib/krakow/producer.rb, line 83
def connected?
  begin
    !!(!@connecting &&
      connection &&
      connection.alive? &&
      connection.connected?)
  rescue Celluloid::DeadActorError
    false
  end
end
connection_failure(obj, reason) click to toggle source

Process connection failure and attempt reconnection

@return [TrueClass]

# File lib/krakow/producer.rb, line 97
def connection_failure(obj, reason)
  if(obj == connection && !reason.nil?)
    begin
      @connection = nil
      warn "Connection failure detected for #{host}:#{port} - #{reason}"
      obj.terminate if obj.alive?
      connect
    rescue => reason
      warn "Failed to establish connection to #{host}:#{port}. Pausing #{reconnect_interval} before retry"
      sleep reconnect_interval
      retry
    end
  end
  true
end
producer_cleanup() click to toggle source

Instance destructor @return nil

# File lib/krakow/producer.rb, line 115
def producer_cleanup
  debug 'Tearing down producer'
  if(connection && connection.alive?)
    connection.terminate
  end
  @connection = nil
  info 'Producer torn down'
  nil
end
to_s() click to toggle source

@return [String] stringify object

# File lib/krakow/producer.rb, line 78
def to_s
  "<#{self.class.name}:#{object_id} {#{host}:#{port}} T:#{topic}>"
end
write(*message) click to toggle source

Write message to server

@param message [String] message to write @return [Krakow::FrameType, TrueClass] @note if connection response wait is set to 0, writes will

return a `true` value on completion

@raise [Krakow::Error::ConnectionUnavailable]

# File lib/krakow/producer.rb, line 132
def write(*message)
  if(message.empty?)
    abort ArgumentError.new 'Expecting one or more messages to send. None provided.'
  end
  begin
    if(message.size > 1)
      debug 'Multiple message publish'
      connection.transmit(
        Command::Mpub.new(
          :topic_name => topic,
          :messages => message
        )
      )
    else
      debug 'Single message publish'
      connection.transmit(
        Command::Pub.new(
          :message => message.first,
          :topic_name => topic
        )
      )
    end
  rescue Celluloid::Task::TerminatedError
    abort Error::ConnectionUnavailable.new 'Connection is currently unavailable'
  rescue => e
    abort e
  end
end