class Estore::Connection
Connection
owns the TCP socket, formats and sends commands over the socket. It also starts a background thread to read from the TCP socket and handle received packages, dispatching them to the calling app.
Mapping between command names and codes From github.com/EventStore/EventStore/blob/master/src/EventStore.ClientAPI/SystemData/TcpCommand.cs
Constants
- COMMANDS
Attributes
host[R]
port[R]
Public Class Methods
command_name(code)
click to toggle source
# File lib/estore/connection/protocol.rb, line 81 def self.command_name(code) @names ||= reverse_lookup_table @names.fetch(code) end
new(host, port)
click to toggle source
# File lib/estore/connection.rb, line 11 def initialize(host, port) @host = host @port = Integer(port) @context = ConnectionContext.new @buffer = Buffer.new(&method(:on_received_package)) @mutex = Mutex.new end
reverse_lookup_table()
click to toggle source
# File lib/estore/connection/protocol.rb, line 86 def self.reverse_lookup_table COMMANDS.inject({}) { |h, (k, v)| h.merge!(v => k) } end
Public Instance Methods
close()
click to toggle source
# File lib/estore/connection.rb, line 19 def close @terminating = true socket.close end
write(uuid, command, msg = nil)
click to toggle source
# File lib/estore/connection.rb, line 24 def write(uuid, command, msg = nil) msg.validate! if msg code = COMMANDS.fetch(command) frame = Package.encode(code, uuid, msg) @mutex.synchronize do socket.write(frame.to_s) end end
Private Instance Methods
connect()
click to toggle source
# File lib/estore/connection.rb, line 49 def connect @socket = TCPSocket.open(@host, @port) Thread.new { process_downstream } @socket rescue TimeoutError, Errno::ECONNREFUSED, Errno::EHOSTDOWN, Errno::EHOSTUNREACH, Errno::ENETUNREACH, Errno::ETIMEDOUT raise CannotConnectError, "#{@host}:#{@port} (#{$ERROR_INFO.class})" end
on_received_package(type, package, uuid, _flags)
click to toggle source
# File lib/estore/connection.rb, line 37 def on_received_package(type, package, uuid, _flags) if type == 'HeartbeatRequestCommand' write(SecureRandom.uuid, 'HeartbeatResponseCommand') else @context.dispatch(uuid, Package.decode(type, package)) end end
process_downstream()
click to toggle source
# File lib/estore/connection.rb, line 58 def process_downstream loop { @buffer << socket.sysread(4096) } rescue IOError, EOFError @context.on_error(DisconnectionError.new) unless @terminating rescue => error @context.on_error(error) end
socket()
click to toggle source
# File lib/estore/connection.rb, line 45 def socket @socket || connect end