class Krakow::Producer
TCP based producer
Attributes
connection[R]
notifier[R]
Public Class Methods
new(args={})
click to toggle source
@!endgroup
Calls superclass method
# File lib/krakow/producer.rb, line 41 def initialize(args={}) super arguments[:connection_options] = {:features => {}, :config => {}, :options => {}}.merge( arguments.fetch(:connection_options, {}) ) connect end
Public Instance Methods
connect()
click to toggle source
Establish connection to configured `host` and `port`
@return nil
# File lib/krakow/producer.rb, line 52 def connect @connecting = true info "Establishing connection to: #{host}:#{port}" begin con_args = connection_options[:options].dup.tap do |args| args[:host] = host args[:port] = port if(connection_options[:features]) args[:features] = connection_options[:features] end if(connection_options[:config]) args[:features_args] = connection_options[:config] end end @connection = Connection.new(con_args) @connection.init! self.link @connection info "Connection established: #{@connection}" nil rescue => e abort e end @connecting = false end
connected?()
click to toggle source
@return [TrueClass, FalseClass] currently connected to server
# File lib/krakow/producer.rb, line 83 def connected? begin !!(!@connecting && connection && connection.alive? && connection.connected?) rescue Celluloid::DeadActorError false end end
connection_failure(obj, reason)
click to toggle source
Process connection failure and attempt reconnection
@return [TrueClass]
# File lib/krakow/producer.rb, line 97 def connection_failure(obj, reason) if(obj == connection && !reason.nil?) begin @connection = nil warn "Connection failure detected for #{host}:#{port} - #{reason}" obj.terminate if obj.alive? connect rescue => reason warn "Failed to establish connection to #{host}:#{port}. Pausing #{reconnect_interval} before retry" sleep reconnect_interval retry end end true end
producer_cleanup()
click to toggle source
Instance destructor @return nil
# File lib/krakow/producer.rb, line 115 def producer_cleanup debug 'Tearing down producer' if(connection && connection.alive?) connection.terminate end @connection = nil info 'Producer torn down' nil end
to_s()
click to toggle source
@return [String] stringify object
# File lib/krakow/producer.rb, line 78 def to_s "<#{self.class.name}:#{object_id} {#{host}:#{port}} T:#{topic}>" end
write(*message)
click to toggle source
Write message to server
@param message [String] message to write @return [Krakow::FrameType, TrueClass] @note if connection response wait is set to 0, writes will
return a `true` value on completion
@raise [Krakow::Error::ConnectionUnavailable]
# File lib/krakow/producer.rb, line 132 def write(*message) if(message.empty?) abort ArgumentError.new 'Expecting one or more messages to send. None provided.' end begin if(message.size > 1) debug 'Multiple message publish' connection.transmit( Command::Mpub.new( :topic_name => topic, :messages => message ) ) else debug 'Single message publish' connection.transmit( Command::Pub.new( :message => message.first, :topic_name => topic ) ) end rescue Celluloid::Task::TerminatedError abort Error::ConnectionUnavailable.new 'Connection is currently unavailable' rescue => e abort e end end