class Ciri::P2P::Discovery::Service

Implement the DiscV4 protocol github.com/ethereum/devp2p/blob/master/discv4.md notice difference between PeerStore and Kad, we use PeerStore to store all peers we known(upon 8192), and use Kad to store our neighbours for discovery query.

Constants

MESSAGE_EXPIRATION_IN

Attributes

host[R]
local_node_id[R]
peer_store[R]
tcp_port[R]
udp_port[R]

Public Class Methods

new(peer_store:, host:, udp_port:, tcp_port:, private_key:, discovery_interval_secs: 15) click to toggle source

we should consider search from peer_store instead connect to bootnodes everytime

# File lib/ciri/p2p/discovery/service.rb, line 60
def initialize(peer_store:, host:, udp_port:, tcp_port:, private_key:, discovery_interval_secs: 15)
  @discovery_interval_secs = discovery_interval_secs
  @cache = Set.new
  @host = host
  @udp_port = udp_port
  @tcp_port = tcp_port
  @peer_store = peer_store
  @private_key = private_key
  @local_node_id = NodeID.new(private_key)
  @kad_table = Kad::RoutingTable.new(local_node: Kad::Node.new(@local_node_id.to_bytes))
  setup_kad_table
end

Public Instance Methods

run(task: Async::Task.current) click to toggle source
# File lib/ciri/p2p/discovery/service.rb, line 73
def run(task: Async::Task.current)
  # start listening
  task.async do
    start_listen
  end
  # search peers every x seconds
  task.reactor.every(@discovery_interval_secs) do
    task.async do
      perform_discovery
    end
  end
end

Private Instance Methods

find_neighbours(raw_node_id, count) click to toggle source

find nerly neighbours

# File lib/ciri/p2p/discovery/service.rb, line 215
def find_neighbours(raw_node_id, count)
  @kad_table.find_neighbours(raw_node_id, k: count).map do |node|
    [node.raw_node_id, @peer_store.get_node_addresses(node.raw_node_id)&.first]
  end.delete_if do |_, addr|
    addr.nil?
  end
end
handle_request(raw_packet, address, now: Time.now.to_i) click to toggle source
# File lib/ciri/p2p/discovery/service.rb, line 109
def handle_request(raw_packet, address, now: Time.now.to_i)
  msg = Message.decode_message(raw_packet)
  msg.validate
  if msg.packet.expiration < now
    trace("ignore expired message, sender: #{msg.sender}, expired_at: #{msg.packet.expiration}")
    return
  end
  raw_node_id = msg.sender.to_bytes
  case msg.packet_type
  when Ping::CODE
    @kad_table.update(raw_node_id)
    from = msg.packet.from
    from_ip = IPAddr.new(from.sender_ip, Socket::AF_INET)
    from_udp_port = from.sender_udp_port
    from_tcp_port = from.sender_tcp_port
    from_address = Address.new(
      ip: from_ip,
      udp_port: from_udp_port, 
      tcp_port: from_tcp_port)
    debug("receive ping msg from #{from_address.inspect}")
    # respond pong
    pong = Pong.new(to: To.from_host_port(from_ip, from_udp_port), 
                    ping_hash: msg.message_hash, 
                    expiration: Time.now.to_i + MESSAGE_EXPIRATION_IN)
    pong_msg = Message.pack(pong, private_key: @private_key).encode_message
    send_msg(pong_msg, from_ip.to_s, from_udp_port)
    @peer_store.add_node(Node.new(raw_node_id: raw_node_id, addresses: [from_address]))
  when Pong::CODE
    # check pong
    if @peer_store.has_ping?(raw_node_id, msg.packet.ping_hash)
      # update peer last seen
      @peer_store.update_last_seen(msg.sender.to_bytes)
    else
      @peer_store.ban_peer(msg.sender.to_bytes)
    end
  when FindNode::CODE
    unless @peer_store.has_seen?(raw_node_id)
      # consider add to denylist
      return
    end
    nodes = find_neighbours(msg.packet.target, 20).map do |raw_node_id, addr|
      Neighbors::Node.new(ip: addr.ip.to_i, udp_port: addr.udp_port, tcp_port: addr.tcp_port, node_id: raw_node_id)
    end
    neighbors = Neighbors.new(nodes: nodes, expiration: Time.now.to_i + MESSAGE_EXPIRATION_IN)
    send_msg_to_node(Message.pack(neighbors, private_key: @private_key).encode_message, raw_node_id)
    @peer_store.update_last_seen(raw_node_id)
  when Neighbors::CODE
    unless @peer_store.has_seen?(raw_node_id)
      # consider add to denylist
      return
    end
    debug("receive neighours #{msg.packet.nodes.size} from #{raw_node_id.to_hex}")
    msg.packet.nodes.each do |node|
      raw_id = node.node_id
      next if raw_id == raw_local_node_id
      debug("receive neighour #{node} from #{raw_node_id.to_hex}")
      ip = IPAddr.new(node.ip, Socket::AF_INET)
      address = Address.new(ip: ip, udp_port: node.udp_port, tcp_port: node.tcp_port)
      @peer_store.add_node(Node.new(raw_node_id: raw_id, addresses: [address]))
      # add new discovered node_id
      @kad_table.update(raw_id)
    end
    @kad_table.update(raw_node_id)
    @peer_store.update_last_seen(raw_node_id)
  else
    @peer_store.ban_peer(msg.sender.to_bytes)
    raise UnknownMessageCodeError.new("can't handle unknown code in discovery protocol, code: #{msg.packet_type}")
  end
rescue StandardError => e
  @peer_store.ban_peer(msg.sender.to_bytes)
  error("discovery error: #{e} from address: #{address}\nbacktrace:#{e.backtrace.join("\n")}")
end
perform_discovery(count_of_query_nodes=15, task: Async::Task.current) click to toggle source
# File lib/ciri/p2p/discovery/service.rb, line 233
def perform_discovery(count_of_query_nodes=15, task: Async::Task.current)
  query_node = NodeID.new(Key.random)
  query_target = query_node.to_bytes
  # randomly search
  @kad_table.get_random_nodes(15).each do |node|
    address = @peer_store.get_node_addresses(node.raw_node_id)&.first
    next unless address
    # start query node in async task
    task.async do
      debug("perform discovery #{address}")
      send_ping(node.raw_node_id, address.ip.to_s, address.udp_port)
      query = FindNode.new(target: query_target, expiration: Time.now.to_i + MESSAGE_EXPIRATION_IN)
      query_msg = Message.pack(query, private_key: @private_key).encode_message
      send_msg(query_msg, address.ip.to_s, address.udp_port)
    end
  end
end
raw_local_node_id() click to toggle source
# File lib/ciri/p2p/discovery/service.rb, line 210
def raw_local_node_id
    @raw_local_node_id ||= @local_node_id.to_bytes
end
send_msg(msg, host, port) click to toggle source
# File lib/ciri/p2p/discovery/service.rb, line 205
def send_msg(msg, host, port)
  socket = Async::IO::UDPSocket.new(UDPSocket.new)
  socket.send(msg, 0, host, port)
end
send_msg_to_node(msg, raw_node_id) click to toggle source
# File lib/ciri/p2p/discovery/service.rb, line 199
def send_msg_to_node(msg, raw_node_id)
  address = @peer_store.get_node_addresses(raw_node_id)&.first
  raise ArgumentsError.new("can't found peer address of #{raw_node_id.to_hex} from peer_store") unless address
  send_msg(msg, address.ip.to_s, address.udp_port)
end
send_ping(target_node_id, host, port) click to toggle source

send discover ping to peer

# File lib/ciri/p2p/discovery/service.rb, line 187
def send_ping(target_node_id, host, port)
  ping = Ping.new(to: To.from_host_port(host, port), 
                  from: From.new(
                    sender_ip: IPAddr.new(@host).to_i,
                    sender_udp_port: @udp_port,
                    sender_tcp_port: @tcp_port),
                    expiration: Time.now.to_i + MESSAGE_EXPIRATION_IN)
  ping_msg = Message.pack(ping, private_key: @private_key)
  send_msg(ping_msg.encode_message, host, port)
  @peer_store.update_ping(target_node_id, ping_msg.message_hash)
end
send_ping_to_address(target_node_id, address) click to toggle source
# File lib/ciri/p2p/discovery/service.rb, line 182
def send_ping_to_address(target_node_id, address)
  send_ping(target_node_id, address[3], address[1])
end
setup_kad_table() click to toggle source
# File lib/ciri/p2p/discovery/service.rb, line 223
def setup_kad_table
  if @kad_table.size.zero?
    @peer_store.find_bootnodes(20).each do |node|
      next if raw_local_node_id == node.raw_node_id
      debug("setup kad_table with #{node}")
      @kad_table.update(node.raw_node_id)
    end
  end
end
start_listen(task: Async::Task.current) click to toggle source
# File lib/ciri/p2p/discovery/service.rb, line 87
def start_listen(task: Async::Task.current)
  endpoint = Async::IO::Endpoint.udp(@host, @udp_port)
  endpoint.bind do |socket|
    @local_address = socket.local_address

    # update port if port is zero
    if @udp_port.zero?
      @udp_port = @local_address.ip_port
    end

    info "start discovery server on udp_port: #{@udp_port} tcp_port: #{@tcp_port}\nlocal_node_id: #{@local_node_id}"

    loop do
      # read discovery message
      packet, address = socket.recvfrom(Discovery::Protocol::Message::MAX_LEN)
      handle_request(packet, address)
    end
  end
end