class Volt::MessageBus::PeerConnection

Constants

CONNECT_TIMEOUT

Attributes

peer_server_id[R]

The server id for the connected server

socket[R]

The server id for the connected server

Public Class Methods

new(socket, ips, port, message_bus, server=false, peer_server_id=nil) click to toggle source
# File lib/volt/server/message_bus/peer_to_peer/peer_connection.rb, line 17
def initialize(socket, ips, port, message_bus, server=false, peer_server_id=nil)
  @message_bus = message_bus
  @ips = ips
  @port = port
  @server = server
  @socket = socket
  @server_id = message_bus.server_id
  @peer_server_id = peer_server_id
  @message_queue = SizedQueue.new(500)
  @reconnect_mutex = Mutex.new

  # The encoder handles things like formatting and encryption
  @message_encoder = MessageEncoder.new

  @worker_thread = Thread.new do
    # Connect to the remote if this PeerConnection was created from the
    # active_volt_instances collection.
    #
    # reconnect! will setup the @socket
    if @socket || reconnect!
      # Announce checks to make sure we didn't connect to ourselves
      if announce
        # Setp the listen thread.
        @listen_thread = Thread.new do
          # Listen for messages in a new thread
          listen
        end

        run_worker
      end

    end
  end
end

Public Instance Methods

announce() click to toggle source

Tells the other connect its server_id. In the event we connected to ourself, close.

# File lib/volt/server/message_bus/peer_to_peer/peer_connection.rb, line 54
def announce
  failed = false
  begin
    if @server
      # Wait for announcement
      @peer_server_id = @message_encoder.receive_message(@socket)
      @message_encoder.send_message(@socket, @server_id)
    else
      # Announce
      @message_encoder.send_message(@socket, @server_id)
      @peer_server_id = @message_encoder.receive_message(@socket)
    end
  rescue IOError => e
    failed = true
  end

  # Make sure we aren't already connected
  @message_bus.remove_duplicate_connections

  # Don't connect to self
  if failed || @peer_server_id == @server_id
    # Close the connection
    disconnect!
    return false
  end

  # Success
  return true
end
disconnect!() click to toggle source

Close the socket, kill listener thread, wait for worker thread to send all messages, and remove from message_bus's peer_connections.

# File lib/volt/server/message_bus/peer_to_peer/peer_connection.rb, line 86
def disconnect!
  @disconnected = true
  @message_queue.push(:QUIT)
  begin
    @socket.close
  rescue => e
    # Ignore close error, since we may not be connected
  end

  @listen_thread.kill if @listen_thread
  # @worker_thread.kill

  # Wait for the worker to publish all messages
  @worker_thread.join if Thread.current != @worker_thread && @worker_thread

  @message_bus.remove_peer_connection(self)
end
listen() click to toggle source
# File lib/volt/server/message_bus/peer_to_peer/peer_connection.rb, line 126
def listen
  loop do
    begin
      while (message = @message_encoder.receive_message(@socket))
        break if @disconnected
        @message_bus.handle_message(message)
      end

      # Got nil from socket
    rescue Errno::ECONNRESET, Errno::ENETUNREACH, Errno::EPIPE, IOError => e
      # handle below
    end

    if !@disconnected && !@server
      # Connection was dropped, try to reconnect
      connected = reconnect!

      # Couldn't reconnect, die
      break unless connected
    else
      break
    end
  end
end
publish(message) click to toggle source
# File lib/volt/server/message_bus/peer_to_peer/peer_connection.rb, line 104
def publish(message)
  @message_queue.push(message)
end
run_worker() click to toggle source
# File lib/volt/server/message_bus/peer_to_peer/peer_connection.rb, line 108
def run_worker
  while (message = @message_queue.pop)
    break if message == :QUIT

    begin
      @message_encoder.send_message(@socket, message)
      # 'Error: closed stream' comes in sometimes
    rescue Errno::ECONNREFUSED, Errno::ENETUNREACH, Errno::EPIPE, IOError => e # was also rescuing Error
      if reconnect!
        retry
      else
        # Unable to reconnect, die
        break
      end
    end
  end
end

Private Instance Methods

connect!() click to toggle source

Because servers can have many ips, we try the various ip's until we are able to connect to one.

# File lib/volt/server/message_bus/peer_to_peer/peer_connection.rb, line 159
def connect!
  @ips.split(',').each do |ip|
    begin
      socket = SocketWithTimeout.new(ip, @port, CONNECT_TIMEOUT)

      @socket = socket

      return
    rescue Errno::ECONNREFUSED, Errno::ENETUNREACH, Errno::ETIMEDOUT, SocketError => e
      # Unable to connect, next
      next
    end
  end

  raise Errno::ECONNREFUSED
end
reconnect!() click to toggle source
# File lib/volt/server/message_bus/peer_to_peer/peer_connection.rb, line 176
def reconnect!
  # Stop trying to reconnect if we are disconnected
  return false if @disconnected

  # Don't reconnect on the server instances
  return false if @server

  @reconnect_mutex.synchronize do
    loop do
      # Server is no longer reporting as alive, give up on reconnecting
      unless still_alive?
        # Unable to connect, let peer connection die
        disconnect!
        return false
      end

      failed = false
      begin
        connect!
      rescue Errno::ECONNREFUSED, SocketError => e
        # Unable to cnnect, wait 10, try again
        sleep 10
        failed = true
      end

      unless failed
        # Reconnected
        return true
      end
    end
  end
end
still_alive?() click to toggle source
# File lib/volt/server/message_bus/peer_to_peer/peer_connection.rb, line 153
def still_alive?
  @message_bus.still_alive?(@peer_server_id)
end