class Krakow::Connection

Provides TCP connection to NSQD

Constants

ENABLEABLE_FEATURES

List of features that may be enabled by the client

EXCLUSIVE_FEATURES

List of features that may not be enabled together

FEATURES

Available connection features

Attributes

endpoint_settings[R]

@return [Hash] current configuration for endpoint

running[R]

@return [TrueClass, FalseClass]

socket[R]

@return [Ksocket] underlying socket like instance

Public Class Methods

identifier(host, port, topic, channel) click to toggle source

Generate identifier for connection

@param host [String] @param port [String, Integer] @param topic [String] @param channel [String] @return [String]

# File lib/krakow/connection.rb, line 15
def self.identifier(host, port, topic, channel)
  [host, port, topic, channel].compact.join('__')
end
new(args={}) click to toggle source

Create new instance

@param args [Hash] @option args [String] :host (required) server host @option args [String, Numeric] :port (required) server port @option args [String] :version @option args [Queue] :queue received message queue @option args [Hash] :callbacks @option args [Queue] :responses received responses queue @option args [Celluloid::Actor] :notifier actor to notify on new message @option args [Hash] :features features to enable @option args [Numeric] :response_wait time to wait for response @option args [Numeric] :response_interval sleep interval for wait loop @option args [Numeric] :error_wait time to wait for error response @option args [TrueClass, FalseClass] :enforce_features fail if features are unavailable @option args [Hash] :feature_args options for connection features

Calls superclass method
# File lib/krakow/connection.rb, line 95
def initialize(args={})
  super
  @endpoint_settings = {}
  @running = false
end

Public Instance Methods

auth_required() click to toggle source

Send authentication request for connection

@return [TrueClass]

# File lib/krakow/connection.rb, line 339
def auth_required
  info 'Authentication required for this connection'
  if(feature_args[:auth])
    transmit(Command::Auth.new(:secret => feature_args[:auth]))
    response = receive
    true
  else
    error 'No authentication information provided for connection!'
    abort 'Authentication failure. No authentication secret provided'
  end
end
callback_for(type, *args) click to toggle source

Execute callback for given type

@overload callback_for(type, arg, connection)

@param type [Symbol] type of callback
@param arg [Object] argument for callback (can be multiple)
@param connection [Krakow::Connection] current connection

@return [Object] result of callback

# File lib/krakow/connection.rb, line 261
def callback_for(type, *args)
  callback = callbacks[type]
  if(callback)
    debug "Processing connection callback for #{type.inspect} (#{callback.inspect})"
    if(callback[:actor].alive?)
      callback[:actor].send(callback[:method], *(args + [current_actor]))
    else
      error "Expected actor for callback processing is not alive! (type: `#{type.inspect}`)"
    end
  else
    debug "No connection callback defined for #{type.inspect}"
    args.size == 1 ? args.first : args
  end
end
connected?() click to toggle source

@return [TrueClass, FalseClass] underlying socket is connected

# File lib/krakow/connection.rb, line 385
def connected?
  begin
    !!(socket && socket.alive?)
  rescue Celluloid::DeadActorError
    false
  end
end
connection_cleanup() click to toggle source

Destructor method for cleanup

@return [nil]

# File lib/krakow/connection.rb, line 170
def connection_cleanup
  debug 'Tearing down connection'
  @running = false
  if(connected?)
    socket.terminate
  end
  @socket = nil
  info 'Connection torn down'
  nil
end
deflate() click to toggle source

Enable deflate feature on underlying socket

@return [TrueClass]

# File lib/krakow/connection.rb, line 365
def deflate
  debug 'Loading support for deflate compression and converting connection'
  @socket = ConnectionFeatures::Deflate::Io.new(socket, features_args)
  response = receive
  info "Deflate connection conversion complete. Response: #{response.inspect}"
  true
end
handle(message) click to toggle source

Handle non-message type Krakow::FrameType

@param message [Krakow::FrameType] received message @return [Krakow::FrameType, nil]

# File lib/krakow/connection.rb, line 236
def handle(message)
  # Grab heartbeats upfront
  if(message.is_a?(FrameType::Response) && message.response == '_heartbeat_')
    debug 'Responding to heartbeat'
    transmit Command::Nop.new
    nil
  else
    message = callback_for(:handle, message)
    if(!message.is_a?(FrameType::Message))
      debug "Captured non-message type response: #{message}"
      responses << message
      nil
    else
      message
    end
  end
end
identifier() click to toggle source

@return [String] identifier for this connection

# File lib/krakow/connection.rb, line 102
def identifier
  self.class.identifier(host, port, topic, channel)
end
identify_and_negotiate() click to toggle source

IDENTIFY with server and negotiate features

@return [TrueClass]

# File lib/krakow/connection.rb, line 307
def identify_and_negotiate
  expected_features = identify_defaults.merge(features)
  ident = Command::Identify.new(
    expected_features
  )
  socket.put(ident.to_line)
  response = receive
  if(expected_features[:feature_negotiation])
    begin
      @endpoint_settings = MultiJson.load(response.content, :symbolize_keys => true)
      info "Connection settings: #{endpoint_settings.inspect}"
      # Enable things we need to enable
      ENABLEABLE_FEATURES.each do |key|
        if(endpoint_settings[key])
          send(key)
        elsif(enforce_features && expected_features[key])
          abort Error::ConnectionFeatureFailure.new("Failed to enable #{key} feature on connection!")
        end
      end
    rescue MultiJson::LoadError => e
      error "Failed to parse response from Identify request: #{e} - #{response}"
      abort e
    end
  else
    @endpoint_settings = {}
  end
  true
end
identify_defaults() click to toggle source

@return [Hash] default settings for IDENTIFY

# File lib/krakow/connection.rb, line 292
def identify_defaults
  unless(@identify_defaults)
    @identify_defaults = {
      :short_id => Socket.gethostname,
      :long_id => Socket.gethostbyname(Socket.gethostname).flatten.compact.first,
      :user_agent => "krakow/#{Krakow::VERSION}",
      :feature_negotiation => true
    }
  end
  @identify_defaults
end
init!() click to toggle source

Initialize the connection

@return [nil]

# File lib/krakow/connection.rb, line 114
def init!
  connect!
  async.process_to_queue!
  nil
end
process_to_queue!() click to toggle source

Receive messages and place into queue

@return [nil]

# File lib/krakow/connection.rb, line 212
def process_to_queue!
  unless(@running)
    @running = true
    while(@running)
      message = handle(receive)
      if(message)
        debug "Adding message to queue #{message}"
        queue << message
        if(notifier)
          warn "Sending new message notification: #{notifier} - #{message}"
          notifier.broadcast(message)
        end
      else
        debug 'Received `nil` message. Ignoring.'
      end
    end
  end
  nil
end
receive() click to toggle source

Receive from server

@return [Krakow::FrameType, nil] message or nothing if read was empty @raise [Error::ConnectionUnavailable] socket is closed

# File lib/krakow/connection.rb, line 185
def receive
  debug 'Read wait for frame start'
  buf = socket.get(8)
  if(buf)
    @receiving = true
    debug "<<< #{buf.inspect}"
    struct = FrameType.decode(buf)
    debug "Decoded structure: #{struct.inspect}"
    struct[:data] = socket.get(struct[:size])
    debug "<<< #{struct[:data].inspect}"
    @receiving = false
    frame = FrameType.build(struct)
    debug "Struct: #{struct.inspect} Frame: #{frame.inspect}"
    frame
  else
    nil
  end
end
receiving?() click to toggle source

@return [TrueClass, FalseClass] is connection currently receiving a message

# File lib/krakow/connection.rb, line 205
def receiving?
  !!@receiving
end
snappy() click to toggle source

Enable snappy feature on underlying socket

@return [TrueClass]

# File lib/krakow/connection.rb, line 354
def snappy
  info 'Loading support for snappy compression and converting connection'
  @socket = ConnectionFeatures::SnappyFrames::Io.new(socket, features_args)
  response = receive
  info "Snappy connection conversion complete. Response: #{response.inspect}"
  true
end
tls_v1() click to toggle source

Enable TLS feature on underlying socket

@return [TrueClass]

# File lib/krakow/connection.rb, line 376
def tls_v1
  info 'Enabling TLS for connection'
  @socket = ConnectionFeatures::Ssl::Io.new(socket, features_args)
  response = receive
  info "TLS enable complete. Response: #{response.inspect}"
  true
end
to_s() click to toggle source

@return [String] stringify object

# File lib/krakow/connection.rb, line 107
def to_s
  "<#{self.class.name}:#{object_id} {#{host}:#{port}}>"
end
transmit(message) click to toggle source

Send message to remote server

@param message [Krakow::Message] message to send @return [TrueClass, Krakow::FrameType] response if expected or true

# File lib/krakow/connection.rb, line 124
def transmit(message)
  unless(message.respond_to?(:to_line))
    abort TypeError.new("Expecting type `Krakow::FrameType` but received `#{message.class}`")
  end
  output = message.to_line
  response_wait = wait_time_for(message)
  if(response_wait > 0)
    transmit_with_response(message, response_wait)
  else
    debug ">>> #{output}"
    socket.put(output)
    true
  end
end
transmit_with_response(message, wait_time) click to toggle source

Sends message and waits for response

@param message [Krakow::Message] message to send @return [Krakow::FrameType] response

# File lib/krakow/connection.rb, line 143
def transmit_with_response(message, wait_time)
  responses.clear
  socket.put(message.to_line)
  response = nil
  (wait_time / response_interval).to_i.times do |i|
    response = responses.pop unless responses.empty?
    break if response
    sleep(response_interval)
  end
  if(response)
    message.response = response
    if(message.error?(response))
      res = Error::BadResponse.new "Message transmission failed #{message}"
      res.result = response
      abort res
    end
    response
  else
    unless(Command.response_for(message) == :error_only)
      abort Error::BadResponse::NoResponse.new "No response provided for message #{message}"
    end
  end
end
wait_time_for(message) click to toggle source

Returns configured wait time for given message type

@param message [Krakow::Command] @return [Numeric] seconds to wait

# File lib/krakow/connection.rb, line 280
def wait_time_for(message)
  case Command.response_for(message)
  when :required
    response_wait
  when :error_only
    error_wait
  else
    0
  end
end

Protected Instance Methods

connect!() click to toggle source

Connect the underlying socket

@return [nil]

# File lib/krakow/connection.rb, line 398
def connect!
  debug 'Initializing connection'
  unless(@connecting)
    @connecting = true
    if(socket && socket.alive?)
      socket.terminate
      @socket = nil
    end
    @socket = Ksocket.new(:host => host, :port => port)
    self.link socket
    socket.put version.rjust(4).upcase
    identify_and_negotiate
    info 'Connection initialized'
    @connecting = false
  end
  nil
end