class Volt::MessageBus::PeerToPeer
Constants
- DEAD_TIME
How long without an update before we mark an instance as dead (in seconds)
Attributes
server_id[R]
Public Class Methods
new(volt_app)
click to toggle source
# File lib/volt/server/message_bus/peer_to_peer.rb, line 60 def initialize(volt_app) @volt_app = volt_app if Volt::DataStore.fetch.connected? # Generate a guid @server_id = SecureRandom.uuid # The PeerConnection's to peers @peer_connections = {} # The server id's for each peer we're connected to @peer_server_ids = {} setup_peer_server start_tracker @peer_connection_threads = [] @connect_thread = Thread.new do connect_to_peers end else Volt.logger.error('Unable to connect to the database. Currently Volt requires running mongodb for a few things to work. Volt will still run, but the message bus requires a database connection to setup connections between nodes, so the message bus has been disabled. Also, the store collection can not be used without a database. This means updates will not be propagated between instances (server, console, runners, etc...)') end end
Public Instance Methods
add_peer_connection(peer_connection)
click to toggle source
# File lib/volt/server/message_bus/peer_to_peer.rb, line 138 def add_peer_connection(peer_connection) @peer_connections[peer_connection] = true @peer_server_ids[peer_connection.peer_server_id] = true end
connect_to_peers()
click to toggle source
# File lib/volt/server/message_bus/peer_to_peer.rb, line 120 def connect_to_peers peers.each do |peer| # Start connecting to all at the same time. Since most will connect or # timeout, this is the desired behaviour. # sometimes we get nil peers for some reason if peer peer_connection = PeerConnection.new(nil, peer._ips, peer._port, self, false, peer._server_id) add_peer_connection(peer_connection) end end end
disconnect!()
click to toggle source
Blocks until all peers have connected or timed out.
# File lib/volt/server/message_bus/peer_to_peer.rb, line 133 def disconnect! # Wait for disconnect on each @peer_connections.keys.each(&:disconnect!) end
handle_message(message)
click to toggle source
Called when a message comes in
# File lib/volt/server/message_bus/peer_to_peer.rb, line 149 def handle_message(message) channel_name, message = message.split('|', 2) trigger!(channel_name, message) end
peers()
click to toggle source
Return an array of peer records.
# File lib/volt/server/message_bus/peer_to_peer.rb, line 114 def peers instances = @volt_app.store._active_volt_instances instances.where(server_id: {'$ne' => @server_id}).all.sync end
publish(channel, message)
click to toggle source
# File lib/volt/server/message_bus/peer_to_peer.rb, line 100 def publish(channel, message) full_msg = "#{channel}|#{message}" @peer_connections.keys.each do |peer| begin # Queue message on each peer peer.publish(full_msg) rescue IOError => e # Connection to peer lost Volt.logger.warn("Message bus connection to peer lost: #{e}") end end end
remove_duplicate_connections()
click to toggle source
We only want one connection between two instances, this loops through each connection
# File lib/volt/server/message_bus/peer_to_peer.rb, line 156 def remove_duplicate_connections peer_server_ids = {} # remove any we are connected to twice @peer_connections.keys.each do |peer| peer_id = peer.peer_server_id if peer_id # peer is connected if peer_server_ids[peer_id] # Peer is already connected somewhere else, remove connection peer.disconnect! # remove the connection @peer_connections.delete(peer) else # Mark that we are connected peer_server_ids[peer_id] = true end end end end
remove_peer_connection(peer_connection)
click to toggle source
# File lib/volt/server/message_bus/peer_to_peer.rb, line 143 def remove_peer_connection(peer_connection) @peer_connections.delete(peer_connection) @peer_server_ids.delete(peer_connection.peer_server_id) end
setup_peer_server()
click to toggle source
The peer server maintains a socket other instances can connect to.
# File lib/volt/server/message_bus/peer_to_peer.rb, line 85 def setup_peer_server @peer_server = PeerServer.new(self) end
start_tracker()
click to toggle source
The tracker updates the socket ip's and port and a timestamp into the database every minute. If the timestamp is more than 2 minutes old, an instance is marked as “dead” and removed.
# File lib/volt/server/message_bus/peer_to_peer.rb, line 92 def start_tracker @server_tracker = ServerTracker.new(@volt_app, @server_id, @peer_server.port) # Do the initial registration, and wait until its done before connecting # to peers. @server_tracker.register() end
still_alive?(peer_server_id)
click to toggle source
Returns true if the server is still reporting as alive.
# File lib/volt/server/message_bus/peer_to_peer.rb, line 181 def still_alive?(peer_server_id) # Unable to write to the socket, retry until the instance is no # longer marking its self as active in the database peer_table = @volt_app.store.active_volt_instances peer = peer_table.where(server_id: peer_server_id).first.sync if peer # Found the peer, retry if it has reported in in the last 2 # minutes. if peer._time > (Time.now.to_i - DEAD_TIME) # Peer reported in less than 2 minutes ago return true else # Delete the entry peer.destroy end end false end