class Volt::MessageBus::PeerToPeer

Constants

DEAD_TIME

How long without an update before we mark an instance as dead (in seconds)

Attributes

server_id[R]

Public Class Methods

new(volt_app) click to toggle source
# File lib/volt/server/message_bus/peer_to_peer.rb, line 60
def initialize(volt_app)
  @volt_app = volt_app

  if Volt::DataStore.fetch.connected?
    # Generate a guid
    @server_id = SecureRandom.uuid
    # The PeerConnection's to peers
    @peer_connections = {}
    # The server id's for each peer we're connected to
    @peer_server_ids = {}

    setup_peer_server
    start_tracker

    @peer_connection_threads = []

    @connect_thread = Thread.new do
      connect_to_peers
    end
  else
    Volt.logger.error('Unable to connect to the database.  Currently Volt requires running mongodb for a few things to work.  Volt will still run, but the message bus requires a database connection to setup connections between nodes, so the message bus has been disabled.  Also, the store collection can not be used without a database.  This means updates will not be propagated between instances (server, console, runners, etc...)')
  end
end

Public Instance Methods

add_peer_connection(peer_connection) click to toggle source
# File lib/volt/server/message_bus/peer_to_peer.rb, line 138
def add_peer_connection(peer_connection)
  @peer_connections[peer_connection] = true
  @peer_server_ids[peer_connection.peer_server_id] = true
end
connect_to_peers() click to toggle source
# File lib/volt/server/message_bus/peer_to_peer.rb, line 120
def connect_to_peers
  peers.each do |peer|
    # Start connecting to all at the same time.  Since most will connect or
    # timeout, this is the desired behaviour.
    # sometimes we get nil peers for some reason
    if peer
      peer_connection = PeerConnection.new(nil, peer._ips, peer._port, self, false, peer._server_id)
      add_peer_connection(peer_connection)
    end
  end
end
disconnect!() click to toggle source

Blocks until all peers have connected or timed out.

# File lib/volt/server/message_bus/peer_to_peer.rb, line 133
def disconnect!
  # Wait for disconnect on each
  @peer_connections.keys.each(&:disconnect!)
end
handle_message(message) click to toggle source

Called when a message comes in

# File lib/volt/server/message_bus/peer_to_peer.rb, line 149
def handle_message(message)
  channel_name, message = message.split('|', 2)
  trigger!(channel_name, message)
end
peers() click to toggle source

Return an array of peer records.

# File lib/volt/server/message_bus/peer_to_peer.rb, line 114
def peers
  instances = @volt_app.store._active_volt_instances

  instances.where(server_id: {'$ne' => @server_id}).all.sync
end
publish(channel, message) click to toggle source
# File lib/volt/server/message_bus/peer_to_peer.rb, line 100
def publish(channel, message)
  full_msg = "#{channel}|#{message}"
  @peer_connections.keys.each do |peer|
    begin
      # Queue message on each peer
      peer.publish(full_msg)
    rescue IOError => e
      # Connection to peer lost
      Volt.logger.warn("Message bus connection to peer lost: #{e}")
    end
  end
end
remove_duplicate_connections() click to toggle source

We only want one connection between two instances, this loops through each connection

# File lib/volt/server/message_bus/peer_to_peer.rb, line 156
def remove_duplicate_connections
  peer_server_ids = {}

  # remove any we are connected to twice
  @peer_connections.keys.each do |peer|
    peer_id = peer.peer_server_id

    if peer_id
      # peer is connected

      if peer_server_ids[peer_id]
        # Peer is already connected somewhere else, remove connection
        peer.disconnect!

        # remove the connection
        @peer_connections.delete(peer)
      else
        # Mark that we are connected
        peer_server_ids[peer_id] = true
      end
    end
  end
end
remove_peer_connection(peer_connection) click to toggle source
# File lib/volt/server/message_bus/peer_to_peer.rb, line 143
def remove_peer_connection(peer_connection)
  @peer_connections.delete(peer_connection)
  @peer_server_ids.delete(peer_connection.peer_server_id)
end
setup_peer_server() click to toggle source

The peer server maintains a socket other instances can connect to.

# File lib/volt/server/message_bus/peer_to_peer.rb, line 85
def setup_peer_server
  @peer_server = PeerServer.new(self)
end
start_tracker() click to toggle source

The tracker updates the socket ip's and port and a timestamp into the database every minute. If the timestamp is more than 2 minutes old, an instance is marked as “dead” and removed.

# File lib/volt/server/message_bus/peer_to_peer.rb, line 92
def start_tracker
  @server_tracker = ServerTracker.new(@volt_app, @server_id, @peer_server.port)

  # Do the initial registration, and wait until its done before connecting
  # to peers.
  @server_tracker.register()
end
still_alive?(peer_server_id) click to toggle source

Returns true if the server is still reporting as alive.

# File lib/volt/server/message_bus/peer_to_peer.rb, line 181
def still_alive?(peer_server_id)
  # Unable to write to the socket, retry until the instance is no
  # longer marking its self as active in the database
  peer_table = @volt_app.store.active_volt_instances
  peer = peer_table.where(server_id: peer_server_id).first.sync
  if peer
    # Found the peer, retry if it has reported in in the last 2
    # minutes.
    if peer._time > (Time.now.to_i - DEAD_TIME)
      # Peer reported in less than 2 minutes ago
      return true
    else
      # Delete the entry
      peer.destroy
    end
  end

  false
end