class Volt::MessageBus::PeerConnection
Constants
- CONNECT_TIMEOUT
Attributes
peer_server_id[R]
The server id for the connected server
socket[R]
The server id for the connected server
Public Class Methods
new(socket, ips, port, message_bus, server=false, peer_server_id=nil)
click to toggle source
# File lib/volt/server/message_bus/peer_to_peer/peer_connection.rb, line 17 def initialize(socket, ips, port, message_bus, server=false, peer_server_id=nil) @message_bus = message_bus @ips = ips @port = port @server = server @socket = socket @server_id = message_bus.server_id @peer_server_id = peer_server_id @message_queue = SizedQueue.new(500) @reconnect_mutex = Mutex.new # The encoder handles things like formatting and encryption @message_encoder = MessageEncoder.new @worker_thread = Thread.new do # Connect to the remote if this PeerConnection was created from the # active_volt_instances collection. # # reconnect! will setup the @socket if @socket || reconnect! # Announce checks to make sure we didn't connect to ourselves if announce # Setp the listen thread. @listen_thread = Thread.new do # Listen for messages in a new thread listen end run_worker end end end end
Public Instance Methods
announce()
click to toggle source
Tells the other connect its server_id. In the event we connected to ourself, close.
# File lib/volt/server/message_bus/peer_to_peer/peer_connection.rb, line 54 def announce failed = false begin if @server # Wait for announcement @peer_server_id = @message_encoder.receive_message(@socket) @message_encoder.send_message(@socket, @server_id) else # Announce @message_encoder.send_message(@socket, @server_id) @peer_server_id = @message_encoder.receive_message(@socket) end rescue IOError => e failed = true end # Make sure we aren't already connected @message_bus.remove_duplicate_connections # Don't connect to self if failed || @peer_server_id == @server_id # Close the connection disconnect! return false end # Success return true end
disconnect!()
click to toggle source
Close the socket, kill listener thread, wait for worker thread to send all messages, and remove from message_bus's peer_connections.
# File lib/volt/server/message_bus/peer_to_peer/peer_connection.rb, line 86 def disconnect! @disconnected = true @message_queue.push(:QUIT) begin @socket.close rescue => e # Ignore close error, since we may not be connected end @listen_thread.kill if @listen_thread # @worker_thread.kill # Wait for the worker to publish all messages @worker_thread.join if Thread.current != @worker_thread && @worker_thread @message_bus.remove_peer_connection(self) end
listen()
click to toggle source
# File lib/volt/server/message_bus/peer_to_peer/peer_connection.rb, line 126 def listen loop do begin while (message = @message_encoder.receive_message(@socket)) break if @disconnected @message_bus.handle_message(message) end # Got nil from socket rescue Errno::ECONNRESET, Errno::ENETUNREACH, Errno::EPIPE, IOError => e # handle below end if !@disconnected && !@server # Connection was dropped, try to reconnect connected = reconnect! # Couldn't reconnect, die break unless connected else break end end end
publish(message)
click to toggle source
# File lib/volt/server/message_bus/peer_to_peer/peer_connection.rb, line 104 def publish(message) @message_queue.push(message) end
run_worker()
click to toggle source
# File lib/volt/server/message_bus/peer_to_peer/peer_connection.rb, line 108 def run_worker while (message = @message_queue.pop) break if message == :QUIT begin @message_encoder.send_message(@socket, message) # 'Error: closed stream' comes in sometimes rescue Errno::ECONNREFUSED, Errno::ENETUNREACH, Errno::EPIPE, IOError => e # was also rescuing Error if reconnect! retry else # Unable to reconnect, die break end end end end
Private Instance Methods
connect!()
click to toggle source
Because servers can have many ips, we try the various ip's until we are able to connect to one.
# File lib/volt/server/message_bus/peer_to_peer/peer_connection.rb, line 159 def connect! @ips.split(',').each do |ip| begin socket = SocketWithTimeout.new(ip, @port, CONNECT_TIMEOUT) @socket = socket return rescue Errno::ECONNREFUSED, Errno::ENETUNREACH, Errno::ETIMEDOUT, SocketError => e # Unable to connect, next next end end raise Errno::ECONNREFUSED end
reconnect!()
click to toggle source
# File lib/volt/server/message_bus/peer_to_peer/peer_connection.rb, line 176 def reconnect! # Stop trying to reconnect if we are disconnected return false if @disconnected # Don't reconnect on the server instances return false if @server @reconnect_mutex.synchronize do loop do # Server is no longer reporting as alive, give up on reconnecting unless still_alive? # Unable to connect, let peer connection die disconnect! return false end failed = false begin connect! rescue Errno::ECONNREFUSED, SocketError => e # Unable to cnnect, wait 10, try again sleep 10 failed = true end unless failed # Reconnected return true end end end end
still_alive?()
click to toggle source
# File lib/volt/server/message_bus/peer_to_peer/peer_connection.rb, line 153 def still_alive? @message_bus.still_alive?(@peer_server_id) end