class Poseidon::Connection

High level internal interface to a remote broker. Provides access to the broker API. @api private

Constants

API_VERSION
REPLICA_ID
REQUEST_CONNECTION_ERRORS

Attributes

host[R]
port[R]

Public Class Methods

new(host, port, client_id, socket_timeout_ms) click to toggle source

Create a new connection

@param [String] host Host to connect to @param [Integer] port Port broker listens on @param [String] client_id Unique across processes?

# File lib/poseidon/connection.rb, line 38
def initialize(host, port, client_id, socket_timeout_ms)
  @host = host
  @port = port

  @client_id = client_id
  @socket_timeout_ms = socket_timeout_ms
end
open(host, port, client_id, socket_timeout_ms) { |connection| ... } click to toggle source

@yieldparam [Connection]

# File lib/poseidon/connection.rb, line 23
def self.open(host, port, client_id, socket_timeout_ms, &block)
  connection = new(host, port, client_id, socket_timeout_ms)

  yield connection
ensure
  connection.close
end

Public Instance Methods

close() click to toggle source

Close broker connection

# File lib/poseidon/connection.rb, line 47
def close
  @socket && @socket.close
end
fetch(max_wait_time, min_bytes, topic_fetches) click to toggle source

Execute a fetch call

@param [Integer] max_wait_time @param [Integer] min_bytes @param [Integer] topic_fetches

# File lib/poseidon/connection.rb, line 76
def fetch(max_wait_time, min_bytes, topic_fetches)
  ensure_connected
  req = FetchRequest.new( request_common(:fetch),
                            REPLICA_ID,
                            max_wait_time,
                            min_bytes,
                            topic_fetches)
  send_request(req)
  read_response(FetchResponse)
end
offset(offset_topic_requests) click to toggle source
# File lib/poseidon/connection.rb, line 87
def offset(offset_topic_requests)
  ensure_connected
  req = OffsetRequest.new(request_common(:offset),
                          REPLICA_ID,
                          offset_topic_requests)
  send_request(req)
  read_response(OffsetResponse).topic_offset_responses
end
produce(required_acks, timeout, messages_for_topics) click to toggle source

Execute a produce call

@param [Integer] required_acks @param [Integer] timeout @param [Array<Protocol::MessagesForTopics>] messages_for_topics Messages to send @return [ProduceResponse]

# File lib/poseidon/connection.rb, line 57
def produce(required_acks, timeout, messages_for_topics)
  ensure_connected
  req = ProduceRequest.new( request_common(:produce),
                            required_acks,
                            timeout,
                            messages_for_topics)
  send_request(req)
  if required_acks != 0
    read_response(ProduceResponse)
  else
    true
  end
end
topic_metadata(topic_names) click to toggle source

Fetch metadata for topic_names

@param [Enumberable<String>] topic_names

A list of topics to retrive metadata for

@return [TopicMetadataResponse] metadata for the topics

# File lib/poseidon/connection.rb, line 101
def topic_metadata(topic_names)
  ensure_connected
  req = MetadataRequest.new( request_common(:metadata),
                             topic_names)
  send_request(req)
  read_response(MetadataResponse)
end

Private Instance Methods

ensure_connected() click to toggle source
# File lib/poseidon/connection.rb, line 110
def ensure_connected
  if @socket.nil? || @socket.closed?
    begin
      @socket = TCPSocket.new(@host, @port)
    rescue SystemCallError => ex
      raise_connection_failed_from_exception(ex)
    end
  end
end
ensure_read_or_timeout(maxlen) click to toggle source
# File lib/poseidon/connection.rb, line 134
def ensure_read_or_timeout(maxlen)
  if IO.select([@socket], nil, nil, @socket_timeout_ms / 1000.0)
     @socket.read(maxlen)
  else
     raise TimeoutException.new
  end
end
ensure_write_or_timeout(data) click to toggle source
# File lib/poseidon/connection.rb, line 151
def ensure_write_or_timeout(data)
  if IO.select(nil, [@socket], nil, @socket_timeout_ms / 1000.0)
    @socket.write(data)
  else
    raise TimeoutException.new
  end
end
next_correlation_id() click to toggle source
# File lib/poseidon/connection.rb, line 168
def next_correlation_id
  @correlation_id ||= 0
  @correlation_id  += 1
end
raise_connection_failed_error(message) click to toggle source
# File lib/poseidon/connection.rb, line 177
def raise_connection_failed_error(message)
  raise ConnectionFailedError, "Failed to connect to #{@host}:#{@port}. #{message}"
end
raise_connection_failed_from_exception(ex) click to toggle source
# File lib/poseidon/connection.rb, line 173
def raise_connection_failed_from_exception(ex)
  raise_connection_failed_error("Initial exception class=#{ex.class} message=#{ex.message}")
end
read_response(response_class) click to toggle source
# File lib/poseidon/connection.rb, line 120
def read_response(response_class)
  r = ensure_read_or_timeout(4)
  if r.nil?
    raise_connection_failed_error("Could not read from socket")
  end
  n = r.unpack("N").first
  s = ensure_read_or_timeout(n)
  buffer = Protocol::ResponseBuffer.new(s)
  response_class.read(buffer)
rescue Errno::ECONNRESET, SocketError, TimeoutException => ex
  @socket = nil
  raise_connection_failed_from_exception(ex)
end
request_common(request_type) click to toggle source
# File lib/poseidon/connection.rb, line 159
def request_common(request_type)
  RequestCommon.new(
    API_KEYS[request_type],
    API_VERSION,
    next_correlation_id,
    @client_id
  )
end
send_request(request) click to toggle source
# File lib/poseidon/connection.rb, line 142
def send_request(request)
  buffer = Protocol::RequestBuffer.new
  request.write(buffer)
  ensure_write_or_timeout([buffer.to_s.bytesize].pack("N") + buffer.to_s)
rescue *REQUEST_CONNECTION_ERRORS => ex
  @socket = nil
  raise_connection_failed_from_exception(ex)
end