class Ciri::P2P::NetworkState

NetworkState maintaining current connected peers

Constants

BLANK_PAYLOAD

Attributes

caps[R]
local_node_id[R]
peer_store[R]
peers[R]

Public Class Methods

new(protocols:, peer_store:, local_node_id:, max_outgoing: 10, max_incoming: 10, ping_interval_secs: 15) click to toggle source
# File lib/ciri/p2p/network_state.rb, line 42
def initialize(protocols:, peer_store:, local_node_id:, max_outgoing: 10, max_incoming: 10, ping_interval_secs: 15)
  @peers = {}
  @peers_lock = Async::Semaphore.new
  @peer_store = peer_store
  @protocols = protocols
  @local_node_id = local_node_id
  @max_outgoing = max_outgoing
  @max_incoming = max_incoming
  @ping_interval_secs = ping_interval_secs
end

Public Instance Methods

disconnect_all() click to toggle source
# File lib/ciri/p2p/network_state.rb, line 102
def disconnect_all
  debug("[#{local_node_id.short_hex}] disconnect all")
  peers.each_value do |peer|
    disconnect_peer(peer, reason: "disconnect all...")
  end
end
disconnect_peer(peer, reason: nil) click to toggle source
# File lib/ciri/p2p/network_state.rb, line 88
def disconnect_peer(peer, reason: nil)
  @peers_lock.acquire do
    # only disconnect from peers if direction correct to avoiding delete peer by mistake
    if (exist_peer = @peers[peer.raw_node_id]) && exist_peer.direction == peer.direction
      info("[#{local_node_id.short_hex}] disconnect peer: #{peer.inspect}, reason: #{reason}")
      remove_peer(peer)
      peer.disconnect
      @peer_store.update_peer_status(peer.raw_node_id, PeerStore::Status::DISCONNECTED)
    else
      debug("[#{local_node_id.short_hex}] Ignoring: disconnect peer: #{peer.inspect}, reason: #{reason}")
    end
  end
end
initialize_protocols(task: Async::Task.current) click to toggle source
# File lib/ciri/p2p/network_state.rb, line 53
def initialize_protocols(task: Async::Task.current)
  # initialize protocols
  @protocols.each do |protocol|
    context = ProtocolContext.new(self, protocol: protocol)
    task.async {protocol.initialized(context)}
  end
end
new_peer_connected(connection, handshake, direction:, task: Async::Task.current) click to toggle source
# File lib/ciri/p2p/network_state.rb, line 65
def new_peer_connected(connection, handshake, direction:, task: Async::Task.current)
  @peers_lock.acquire do
    peer = Peer.new(connection, handshake, @protocols, direction: direction)
    # disconnect already connected peers
    if @peers.include?(peer.raw_node_id)
      debug("[#{local_node_id.short_hex}] peer #{peer.inspect} is already connected")
      # disconnect duplicate connection
      peer.disconnect
      return
    end
    # check peers
    protocol_handshake_checks(handshake)
    @peers[peer.raw_node_id] = peer
    info "[#{local_node_id.short_hex}] connect to new peer #{peer.inspect}"
    @peer_store.update_peer_status(peer.raw_node_id, PeerStore::Status::CONNECTED)
    # run peer logic
    task.async do
      register_peer_protocols(peer)
      handling_peer(peer)
    end
  end
end
number_of_attemp_outgoing() click to toggle source
# File lib/ciri/p2p/network_state.rb, line 61
def number_of_attemp_outgoing
  @max_outgoing - @peers.values.select(&:outgoing?).count
end

Private Instance Methods

caps_hash() click to toggle source

{cap_name => cap_version}

# File lib/ciri/p2p/network_state.rb, line 219
def caps_hash
  @caps_hash ||= @protocols.sort_by do |cap|
    cap.version
  end.reduce({}) do |caps_hash, cap|
    caps_hash[cap.name] = cap.version
    caps_hash
  end
end
count_matching_protocols(caps) click to toggle source

calculate count of matched protocols caps

# File lib/ciri/p2p/network_state.rb, line 229
def count_matching_protocols(caps)
  caps.select do |cap|
    caps_hash[cap.name] == cap.version
  end.count
end
deregister_peer_protocols(peer, task: Async::Task.current) click to toggle source
# File lib/ciri/p2p/network_state.rb, line 130
def deregister_peer_protocols(peer, task: Async::Task.current)
  peer.protocol_ios.dup.each do |protocol_io|
    task.async do
      # Protocol#connected
      context = ProtocolContext.new(self, peer: peer, protocol: protocol_io.protocol, protocol_io: protocol_io)
      context.protocol.disconnected(context)
    rescue StandardError => e
      error("Protocol#disconnected error: {e}\nbacktrace: #{e.backtrace.join "\n"}")
      disconnect_peer(peer, reason: "Protocol#disconnected callback error: #{e}")
    end
  end
end
handle_message(peer, msg, task: Async::Task.current) click to toggle source

handle peer message

# File lib/ciri/p2p/network_state.rb, line 189
def handle_message(peer, msg, task: Async::Task.current)
  if msg.code == RLPX::Code::PING
    pong(peer)
  elsif msg.code == RLPX::Code::DISCONNECT
    reason = RLP.decode_with_type(msg.payload, Integer)
    raise DisconnectError.new("receive disconnect message, reason: #{reason}")
  elsif msg.code == RLPX::Code::PONG
    # TODO update peer node
  else
    # send msg to sub protocol
    if (protocol_io = peer.find_protocol_io_by_msg_code(msg.code)).nil?
      raise UnknownMessageCodeError.new("can't find protocol with msg code #{msg.code}")
    end
    # fix msg code
    msg.code -= protocol_io.offset
    task.async do
      # Protocol#received
      context = ProtocolContext.new(self, peer: peer, protocol: protocol_io.protocol, protocol_io: protocol_io)
      context.protocol.received(context, msg)
    end
  end
end
handling_peer(peer, task: Async::Task.current) click to toggle source

handling peer IO

# File lib/ciri/p2p/network_state.rb, line 144
def handling_peer(peer, task: Async::Task.current)
  start_peer_io(peer)
rescue Exception => e
  remove_peer(peer)
  error("remove peer #{peer}, error: #{e}")
end
ping(peer) click to toggle source

response pong to message

# File lib/ciri/p2p/network_state.rb, line 179
def ping(peer)
  peer.connection.send_data(RLPX::Code::PING, BLANK_PAYLOAD)
end
pong(peer) click to toggle source

response pong to message

# File lib/ciri/p2p/network_state.rb, line 184
def pong(peer)
  peer.connection.send_data(RLPX::Code::PONG, BLANK_PAYLOAD)
end
protocol_handshake_checks(handshake) click to toggle source
# File lib/ciri/p2p/network_state.rb, line 212
def protocol_handshake_checks(handshake)
  if @protocols && count_matching_protocols(handshake.caps) == 0
    raise UselessPeerError.new('discovery useless peer')
  end
end
register_peer_protocols(peer, task: Async::Task.current) click to toggle source
# File lib/ciri/p2p/network_state.rb, line 117
def register_peer_protocols(peer, task: Async::Task.current)
  peer.protocol_ios.dup.each do |protocol_io|
    task.async do
      # Protocol#connected
      context = ProtocolContext.new(self, peer: peer, protocol: protocol_io.protocol, protocol_io: protocol_io)
      context.protocol.connected(context)
    rescue StandardError => e
      error("Protocol#connected error: #{e}\nbacktrace: #{e.backtrace.join "\n"}")
      disconnect_peer(peer, reason: "Protocol#connected callback error: #{e}")
    end
  end
end
remove_peer(peer) click to toggle source
# File lib/ciri/p2p/network_state.rb, line 111
def remove_peer(peer)
  @peers.delete(peer.raw_node_id)
  deregister_peer_protocols(peer)
end
start_peer_io(peer, task: Async::Task.current) click to toggle source

starting peer IO loop

# File lib/ciri/p2p/network_state.rb, line 152
def start_peer_io(peer, task: Async::Task.current)
  ping_timer = task.reactor.every(@ping_interval_secs) do
    task.async do
      ping(peer)
    rescue StandardError => e
      disconnect_peer(peer, reason: "ping error: #{e}")
    end
  end

  message_service = task.async do
    loop do
      raise DisconnectError.new("disconnect peer") if @disconnect
      msg = peer.connection.read_msg
      msg.received_at = Time.now
      handle_message(peer, msg)
    end
  rescue StandardError => e
    disconnect_peer(peer, reason: "io error: #{e}")
    error("io error: #{e}\n#{e.backtrace.join "\n"}")
  end

  message_service.wait
end