class Ciri::P2P::Discovery::Service
Implement the DiscV4 protocol github.com/ethereum/devp2p/blob/master/discv4.md notice difference between PeerStore
and Kad
, we use PeerStore
to store all peers we known(upon 8192), and use Kad
to store our neighbours for discovery query.
Constants
- MESSAGE_EXPIRATION_IN
Attributes
host[R]
local_node_id[R]
peer_store[R]
tcp_port[R]
udp_port[R]
Public Class Methods
new(peer_store:, host:, udp_port:, tcp_port:, private_key:, discovery_interval_secs: 15)
click to toggle source
we should consider search from peer_store
instead connect to bootnodes everytime
# File lib/ciri/p2p/discovery/service.rb, line 60 def initialize(peer_store:, host:, udp_port:, tcp_port:, private_key:, discovery_interval_secs: 15) @discovery_interval_secs = discovery_interval_secs @cache = Set.new @host = host @udp_port = udp_port @tcp_port = tcp_port @peer_store = peer_store @private_key = private_key @local_node_id = NodeID.new(private_key) @kad_table = Kad::RoutingTable.new(local_node: Kad::Node.new(@local_node_id.to_bytes)) setup_kad_table end
Public Instance Methods
run(task: Async::Task.current)
click to toggle source
# File lib/ciri/p2p/discovery/service.rb, line 73 def run(task: Async::Task.current) # start listening task.async do start_listen end # search peers every x seconds task.reactor.every(@discovery_interval_secs) do task.async do perform_discovery end end end
Private Instance Methods
find_neighbours(raw_node_id, count)
click to toggle source
find nerly neighbours
# File lib/ciri/p2p/discovery/service.rb, line 215 def find_neighbours(raw_node_id, count) @kad_table.find_neighbours(raw_node_id, k: count).map do |node| [node.raw_node_id, @peer_store.get_node_addresses(node.raw_node_id)&.first] end.delete_if do |_, addr| addr.nil? end end
handle_request(raw_packet, address, now: Time.now.to_i)
click to toggle source
# File lib/ciri/p2p/discovery/service.rb, line 109 def handle_request(raw_packet, address, now: Time.now.to_i) msg = Message.decode_message(raw_packet) msg.validate if msg.packet.expiration < now trace("ignore expired message, sender: #{msg.sender}, expired_at: #{msg.packet.expiration}") return end raw_node_id = msg.sender.to_bytes case msg.packet_type when Ping::CODE @kad_table.update(raw_node_id) from = msg.packet.from from_ip = IPAddr.new(from.sender_ip, Socket::AF_INET) from_udp_port = from.sender_udp_port from_tcp_port = from.sender_tcp_port from_address = Address.new( ip: from_ip, udp_port: from_udp_port, tcp_port: from_tcp_port) debug("receive ping msg from #{from_address.inspect}") # respond pong pong = Pong.new(to: To.from_host_port(from_ip, from_udp_port), ping_hash: msg.message_hash, expiration: Time.now.to_i + MESSAGE_EXPIRATION_IN) pong_msg = Message.pack(pong, private_key: @private_key).encode_message send_msg(pong_msg, from_ip.to_s, from_udp_port) @peer_store.add_node(Node.new(raw_node_id: raw_node_id, addresses: [from_address])) when Pong::CODE # check pong if @peer_store.has_ping?(raw_node_id, msg.packet.ping_hash) # update peer last seen @peer_store.update_last_seen(msg.sender.to_bytes) else @peer_store.ban_peer(msg.sender.to_bytes) end when FindNode::CODE unless @peer_store.has_seen?(raw_node_id) # consider add to denylist return end nodes = find_neighbours(msg.packet.target, 20).map do |raw_node_id, addr| Neighbors::Node.new(ip: addr.ip.to_i, udp_port: addr.udp_port, tcp_port: addr.tcp_port, node_id: raw_node_id) end neighbors = Neighbors.new(nodes: nodes, expiration: Time.now.to_i + MESSAGE_EXPIRATION_IN) send_msg_to_node(Message.pack(neighbors, private_key: @private_key).encode_message, raw_node_id) @peer_store.update_last_seen(raw_node_id) when Neighbors::CODE unless @peer_store.has_seen?(raw_node_id) # consider add to denylist return end debug("receive neighours #{msg.packet.nodes.size} from #{raw_node_id.to_hex}") msg.packet.nodes.each do |node| raw_id = node.node_id next if raw_id == raw_local_node_id debug("receive neighour #{node} from #{raw_node_id.to_hex}") ip = IPAddr.new(node.ip, Socket::AF_INET) address = Address.new(ip: ip, udp_port: node.udp_port, tcp_port: node.tcp_port) @peer_store.add_node(Node.new(raw_node_id: raw_id, addresses: [address])) # add new discovered node_id @kad_table.update(raw_id) end @kad_table.update(raw_node_id) @peer_store.update_last_seen(raw_node_id) else @peer_store.ban_peer(msg.sender.to_bytes) raise UnknownMessageCodeError.new("can't handle unknown code in discovery protocol, code: #{msg.packet_type}") end rescue StandardError => e @peer_store.ban_peer(msg.sender.to_bytes) error("discovery error: #{e} from address: #{address}\nbacktrace:#{e.backtrace.join("\n")}") end
perform_discovery(count_of_query_nodes=15, task: Async::Task.current)
click to toggle source
# File lib/ciri/p2p/discovery/service.rb, line 233 def perform_discovery(count_of_query_nodes=15, task: Async::Task.current) query_node = NodeID.new(Key.random) query_target = query_node.to_bytes # randomly search @kad_table.get_random_nodes(15).each do |node| address = @peer_store.get_node_addresses(node.raw_node_id)&.first next unless address # start query node in async task task.async do debug("perform discovery #{address}") send_ping(node.raw_node_id, address.ip.to_s, address.udp_port) query = FindNode.new(target: query_target, expiration: Time.now.to_i + MESSAGE_EXPIRATION_IN) query_msg = Message.pack(query, private_key: @private_key).encode_message send_msg(query_msg, address.ip.to_s, address.udp_port) end end end
raw_local_node_id()
click to toggle source
# File lib/ciri/p2p/discovery/service.rb, line 210 def raw_local_node_id @raw_local_node_id ||= @local_node_id.to_bytes end
send_msg(msg, host, port)
click to toggle source
# File lib/ciri/p2p/discovery/service.rb, line 205 def send_msg(msg, host, port) socket = Async::IO::UDPSocket.new(UDPSocket.new) socket.send(msg, 0, host, port) end
send_msg_to_node(msg, raw_node_id)
click to toggle source
# File lib/ciri/p2p/discovery/service.rb, line 199 def send_msg_to_node(msg, raw_node_id) address = @peer_store.get_node_addresses(raw_node_id)&.first raise ArgumentsError.new("can't found peer address of #{raw_node_id.to_hex} from peer_store") unless address send_msg(msg, address.ip.to_s, address.udp_port) end
send_ping(target_node_id, host, port)
click to toggle source
send discover ping to peer
# File lib/ciri/p2p/discovery/service.rb, line 187 def send_ping(target_node_id, host, port) ping = Ping.new(to: To.from_host_port(host, port), from: From.new( sender_ip: IPAddr.new(@host).to_i, sender_udp_port: @udp_port, sender_tcp_port: @tcp_port), expiration: Time.now.to_i + MESSAGE_EXPIRATION_IN) ping_msg = Message.pack(ping, private_key: @private_key) send_msg(ping_msg.encode_message, host, port) @peer_store.update_ping(target_node_id, ping_msg.message_hash) end
send_ping_to_address(target_node_id, address)
click to toggle source
# File lib/ciri/p2p/discovery/service.rb, line 182 def send_ping_to_address(target_node_id, address) send_ping(target_node_id, address[3], address[1]) end
setup_kad_table()
click to toggle source
# File lib/ciri/p2p/discovery/service.rb, line 223 def setup_kad_table if @kad_table.size.zero? @peer_store.find_bootnodes(20).each do |node| next if raw_local_node_id == node.raw_node_id debug("setup kad_table with #{node}") @kad_table.update(node.raw_node_id) end end end
start_listen(task: Async::Task.current)
click to toggle source
# File lib/ciri/p2p/discovery/service.rb, line 87 def start_listen(task: Async::Task.current) endpoint = Async::IO::Endpoint.udp(@host, @udp_port) endpoint.bind do |socket| @local_address = socket.local_address # update port if port is zero if @udp_port.zero? @udp_port = @local_address.ip_port end info "start discovery server on udp_port: #{@udp_port} tcp_port: #{@tcp_port}\nlocal_node_id: #{@local_node_id}" loop do # read discovery message packet, address = socket.recvfrom(Discovery::Protocol::Message::MAX_LEN) handle_request(packet, address) end end end