class Krakow::Connection
Provides TCP connection to NSQD
Constants
- ENABLEABLE_FEATURES
List of features that may be enabled by the client
- EXCLUSIVE_FEATURES
List of features that may not be enabled together
- FEATURES
Available connection features
Attributes
@return [Hash] current configuration for endpoint
@return [TrueClass, FalseClass]
@return [Ksocket] underlying socket like instance
Public Class Methods
Generate identifier for connection
@param host [String] @param port [String, Integer] @param topic [String] @param channel [String] @return [String]
# File lib/krakow/connection.rb, line 15 def self.identifier(host, port, topic, channel) [host, port, topic, channel].compact.join('__') end
Create new instance
@param args [Hash] @option args [String] :host (required) server host @option args [String, Numeric] :port (required) server port @option args [String] :version @option args [Queue] :queue received message queue @option args [Hash] :callbacks @option args [Queue] :responses received responses queue @option args [Celluloid::Actor] :notifier actor to notify on new message @option args [Hash] :features features to enable @option args [Numeric] :response_wait time to wait for response @option args [Numeric] :response_interval sleep interval for wait loop @option args [Numeric] :error_wait time to wait for error response @option args [TrueClass, FalseClass] :enforce_features fail if features are unavailable @option args [Hash] :feature_args options for connection features
# File lib/krakow/connection.rb, line 95 def initialize(args={}) super @endpoint_settings = {} @running = false end
Public Instance Methods
Send authentication request for connection
@return [TrueClass]
# File lib/krakow/connection.rb, line 339 def auth_required info 'Authentication required for this connection' if(feature_args[:auth]) transmit(Command::Auth.new(:secret => feature_args[:auth])) response = receive true else error 'No authentication information provided for connection!' abort 'Authentication failure. No authentication secret provided' end end
Execute callback for given type
@overload callback_for
(type, arg, connection)
@param type [Symbol] type of callback @param arg [Object] argument for callback (can be multiple) @param connection [Krakow::Connection] current connection
@return [Object] result of callback
# File lib/krakow/connection.rb, line 261 def callback_for(type, *args) callback = callbacks[type] if(callback) debug "Processing connection callback for #{type.inspect} (#{callback.inspect})" if(callback[:actor].alive?) callback[:actor].send(callback[:method], *(args + [current_actor])) else error "Expected actor for callback processing is not alive! (type: `#{type.inspect}`)" end else debug "No connection callback defined for #{type.inspect}" args.size == 1 ? args.first : args end end
@return [TrueClass, FalseClass] underlying socket is connected
# File lib/krakow/connection.rb, line 385 def connected? begin !!(socket && socket.alive?) rescue Celluloid::DeadActorError false end end
Destructor method for cleanup
@return [nil]
# File lib/krakow/connection.rb, line 170 def connection_cleanup debug 'Tearing down connection' @running = false if(connected?) socket.terminate end @socket = nil info 'Connection torn down' nil end
Enable deflate feature on underlying socket
@return [TrueClass]
# File lib/krakow/connection.rb, line 365 def deflate debug 'Loading support for deflate compression and converting connection' @socket = ConnectionFeatures::Deflate::Io.new(socket, features_args) response = receive info "Deflate connection conversion complete. Response: #{response.inspect}" true end
Handle non-message type Krakow::FrameType
@param message [Krakow::FrameType] received message @return [Krakow::FrameType, nil]
# File lib/krakow/connection.rb, line 236 def handle(message) # Grab heartbeats upfront if(message.is_a?(FrameType::Response) && message.response == '_heartbeat_') debug 'Responding to heartbeat' transmit Command::Nop.new nil else message = callback_for(:handle, message) if(!message.is_a?(FrameType::Message)) debug "Captured non-message type response: #{message}" responses << message nil else message end end end
@return [String] identifier for this connection
# File lib/krakow/connection.rb, line 102 def identifier self.class.identifier(host, port, topic, channel) end
IDENTIFY with server and negotiate features
@return [TrueClass]
# File lib/krakow/connection.rb, line 307 def identify_and_negotiate expected_features = identify_defaults.merge(features) ident = Command::Identify.new( expected_features ) socket.put(ident.to_line) response = receive if(expected_features[:feature_negotiation]) begin @endpoint_settings = MultiJson.load(response.content, :symbolize_keys => true) info "Connection settings: #{endpoint_settings.inspect}" # Enable things we need to enable ENABLEABLE_FEATURES.each do |key| if(endpoint_settings[key]) send(key) elsif(enforce_features && expected_features[key]) abort Error::ConnectionFeatureFailure.new("Failed to enable #{key} feature on connection!") end end rescue MultiJson::LoadError => e error "Failed to parse response from Identify request: #{e} - #{response}" abort e end else @endpoint_settings = {} end true end
@return [Hash] default settings for IDENTIFY
# File lib/krakow/connection.rb, line 292 def identify_defaults unless(@identify_defaults) @identify_defaults = { :short_id => Socket.gethostname, :long_id => Socket.gethostbyname(Socket.gethostname).flatten.compact.first, :user_agent => "krakow/#{Krakow::VERSION}", :feature_negotiation => true } end @identify_defaults end
Initialize the connection
@return [nil]
# File lib/krakow/connection.rb, line 114 def init! connect! async.process_to_queue! nil end
Receive messages and place into queue
@return [nil]
# File lib/krakow/connection.rb, line 212 def process_to_queue! unless(@running) @running = true while(@running) message = handle(receive) if(message) debug "Adding message to queue #{message}" queue << message if(notifier) warn "Sending new message notification: #{notifier} - #{message}" notifier.broadcast(message) end else debug 'Received `nil` message. Ignoring.' end end end nil end
Receive from server
@return [Krakow::FrameType, nil] message or nothing if read was empty @raise [Error::ConnectionUnavailable] socket is closed
# File lib/krakow/connection.rb, line 185 def receive debug 'Read wait for frame start' buf = socket.get(8) if(buf) @receiving = true debug "<<< #{buf.inspect}" struct = FrameType.decode(buf) debug "Decoded structure: #{struct.inspect}" struct[:data] = socket.get(struct[:size]) debug "<<< #{struct[:data].inspect}" @receiving = false frame = FrameType.build(struct) debug "Struct: #{struct.inspect} Frame: #{frame.inspect}" frame else nil end end
@return [TrueClass, FalseClass] is connection currently receiving a message
# File lib/krakow/connection.rb, line 205 def receiving? !!@receiving end
Enable snappy feature on underlying socket
@return [TrueClass]
# File lib/krakow/connection.rb, line 354 def snappy info 'Loading support for snappy compression and converting connection' @socket = ConnectionFeatures::SnappyFrames::Io.new(socket, features_args) response = receive info "Snappy connection conversion complete. Response: #{response.inspect}" true end
Enable TLS feature on underlying socket
@return [TrueClass]
# File lib/krakow/connection.rb, line 376 def tls_v1 info 'Enabling TLS for connection' @socket = ConnectionFeatures::Ssl::Io.new(socket, features_args) response = receive info "TLS enable complete. Response: #{response.inspect}" true end
@return [String] stringify object
# File lib/krakow/connection.rb, line 107 def to_s "<#{self.class.name}:#{object_id} {#{host}:#{port}}>" end
Send message to remote server
@param message [Krakow::Message] message to send @return [TrueClass, Krakow::FrameType] response if expected or true
# File lib/krakow/connection.rb, line 124 def transmit(message) unless(message.respond_to?(:to_line)) abort TypeError.new("Expecting type `Krakow::FrameType` but received `#{message.class}`") end output = message.to_line response_wait = wait_time_for(message) if(response_wait > 0) transmit_with_response(message, response_wait) else debug ">>> #{output}" socket.put(output) true end end
Sends message and waits for response
@param message [Krakow::Message] message to send @return [Krakow::FrameType] response
# File lib/krakow/connection.rb, line 143 def transmit_with_response(message, wait_time) responses.clear socket.put(message.to_line) response = nil (wait_time / response_interval).to_i.times do |i| response = responses.pop unless responses.empty? break if response sleep(response_interval) end if(response) message.response = response if(message.error?(response)) res = Error::BadResponse.new "Message transmission failed #{message}" res.result = response abort res end response else unless(Command.response_for(message) == :error_only) abort Error::BadResponse::NoResponse.new "No response provided for message #{message}" end end end
Returns configured wait time for given message type
@param message [Krakow::Command] @return [Numeric] seconds to wait
# File lib/krakow/connection.rb, line 280 def wait_time_for(message) case Command.response_for(message) when :required response_wait when :error_only error_wait else 0 end end
Protected Instance Methods
Connect the underlying socket
@return [nil]
# File lib/krakow/connection.rb, line 398 def connect! debug 'Initializing connection' unless(@connecting) @connecting = true if(socket && socket.alive?) socket.terminate @socket = nil end @socket = Ksocket.new(:host => host, :port => port) self.link socket socket.put version.rjust(4).upcase identify_and_negotiate info 'Connection initialized' @connecting = false end nil end