class Serf::Client::Connection

Public Class Methods

new(address, port) click to toggle source

finalizer :shutdown

# File lib/serf/client/connection.rb, line 39
def initialize address, port
  info "connecting to socket #{address} on #{port}"
  connect address, port
  @io = IO.supervise(@socket, Actor.current).actors.first # avoid self
  @callbacks = Callbacks.supervise.actors.first
  @seqid = 0
  @messages = {}
  @requests = {}
  async.receive_response
end

Public Instance Methods

call(method, param=nil, &block) click to toggle source
# File lib/serf/client/connection.rb, line 111
def call(method, param=nil, &block)
  msgid = send_request(method, param)
  @callbacks.add msgid, block if block_given?

  future.wait_for_response msgid
  #::Celluloid::Future.new do
  #  until msg = @messages[msgid]; end
  #  msg
  #end
end
handshake() click to toggle source
# File lib/serf/client/connection.rb, line 50
def handshake
  debug 'handshake'
  send_request(:handshake, Version: 1)
end
process_response(header) { || ... } click to toggle source

Process the response, yielding retrieves next message

# File lib/serf/client/connection.rb, line 72
def process_response header, &block
  msgid = header["Seq"]

  h = @requests[msgid]
  raise "No request for #{header}" if not h

  cmd = h[:header]['Command']
  parts = command cmd
  debug "Processing #{cmd}"

  raise "No such command #{h}" unless parts

  # This is most likely the ACK
  if not h[:ack?]
    if parts.include? :body
      # ACK comes with a response body
      body = yield
      # Could probably clean up old things like events here, anything not a stream
    end
    h[:ack?] = true
  else
    # Alread ACKed -> should be a stream!
    raise "Cannot handle #{h}" unless ['monitor', 'stream', 'query'].include? cmd
    body = yield
  end

  resp = Response.new(header, body)
  received_response msgid, resp
  resp
end
receive_response() click to toggle source
# File lib/serf/client/connection.rb, line 55
def receive_response
  loop do
    # header
    header = receive
    debug "received: #{header}"

    error header unless header['Seq']
    if header["Error"].empty?
      # Keep the :receive contained here
      process_response(header) { r = receive; debug "received more: #{r}"; r }
    else
      error header["Error"]
    end
  end
end
received_response(msgid, resp) click to toggle source
# File lib/serf/client/connection.rb, line 103
def received_response msgid, resp
  debug 'connection#received_response'
  # Tell the call back actor about our new response
  @callbacks.mailbox << resp
  # Let the future we created know about the response
  @messages[msgid] = resp
end
send_request(method, param) click to toggle source
# File lib/serf/client/connection.rb, line 127
def send_request method, param
  debug 'send_request'

  msgid = seqid
  header = { "Command" => method.to_s, "Seq" => msgid }

  # Keep a reference for our response processing
  @requests[msgid] = { header: header, ack?: false }
  # Send to the writer
  @io.mailbox << [header, param]

  msgid
end
seqid() click to toggle source
# File lib/serf/client/connection.rb, line 141
def seqid
  v = @seqid
  @seqid += 1
  v
end
wait_for_response(msgid) click to toggle source
# File lib/serf/client/connection.rb, line 122
def wait_for_response msgid
  until msg = @messages[msgid]; sleep 0.1; end
  msg
end

Private Instance Methods

connect(address, port) click to toggle source
# File lib/serf/client/connection.rb, line 148
def connect address, port
  @socket = Celluloid::IO::TCPSocket.new(address, port)
end