class Estore::Connection

Connection owns the TCP socket, formats and sends commands over the socket. It also starts a background thread to read from the TCP socket and handle received packages, dispatching them to the calling app.

Mapping between command names and codes From github.com/EventStore/EventStore/blob/master/src/EventStore.ClientAPI/SystemData/TcpCommand.cs

Constants

COMMANDS

Attributes

host[R]
port[R]

Public Class Methods

command_name(code) click to toggle source
# File lib/estore/connection/protocol.rb, line 81
def self.command_name(code)
  @names ||= reverse_lookup_table
  @names.fetch(code)
end
new(host, port) click to toggle source
# File lib/estore/connection.rb, line 11
def initialize(host, port)
  @host = host
  @port = Integer(port)
  @context = ConnectionContext.new
  @buffer = Buffer.new(&method(:on_received_package))
  @mutex = Mutex.new
end
reverse_lookup_table() click to toggle source
# File lib/estore/connection/protocol.rb, line 86
def self.reverse_lookup_table
  COMMANDS.inject({}) { |h, (k, v)| h.merge!(v => k) }
end

Public Instance Methods

close() click to toggle source
# File lib/estore/connection.rb, line 19
def close
  @terminating = true
  socket.close
end
write(uuid, command, msg = nil) click to toggle source
# File lib/estore/connection.rb, line 24
def write(uuid, command, msg = nil)
  msg.validate! if msg

  code = COMMANDS.fetch(command)
  frame = Package.encode(code, uuid, msg)

  @mutex.synchronize do
    socket.write(frame.to_s)
  end
end

Private Instance Methods

connect() click to toggle source
# File lib/estore/connection.rb, line 49
def connect
  @socket = TCPSocket.open(@host, @port)
  Thread.new { process_downstream }
  @socket
rescue TimeoutError, Errno::ECONNREFUSED, Errno::EHOSTDOWN,
       Errno::EHOSTUNREACH, Errno::ENETUNREACH, Errno::ETIMEDOUT
  raise CannotConnectError, "#{@host}:#{@port} (#{$ERROR_INFO.class})"
end
on_received_package(type, package, uuid, _flags) click to toggle source
# File lib/estore/connection.rb, line 37
def on_received_package(type, package, uuid, _flags)
  if type == 'HeartbeatRequestCommand'
    write(SecureRandom.uuid, 'HeartbeatResponseCommand')
  else
    @context.dispatch(uuid, Package.decode(type, package))
  end
end
process_downstream() click to toggle source
# File lib/estore/connection.rb, line 58
def process_downstream
  loop { @buffer << socket.sysread(4096) }
rescue IOError, EOFError
  @context.on_error(DisconnectionError.new) unless @terminating
rescue => error
  @context.on_error(error)
end
socket() click to toggle source
# File lib/estore/connection.rb, line 45
def socket
  @socket || connect
end