class RSwim::MemberPool
Public Class Methods
new(node_member_id, seed_member_ids)
click to toggle source
# File lib/rswim/member_pool.rb, line 5 def initialize(node_member_id, seed_member_ids) seed_member_ids -= [node_member_id] @node_member_id = node_member_id @me = Member::Me.new(node_member_id) @members = { node_member_id => @me } seed_member_ids.each { |id| member(id) } @subscribers = [] end
Public Instance Methods
forward_ack_to(member_id)
click to toggle source
# File lib/rswim/member_pool.rb, line 76 def forward_ack_to(member_id) member(member_id).forward_ack end
halt_member(member_id)
click to toggle source
# File lib/rswim/member_pool.rb, line 80 def halt_member(member_id) member(member_id).halt end
member_failed_to_reply(member_id)
click to toggle source
# File lib/rswim/member_pool.rb, line 93 def member_failed_to_reply(member_id) member(member_id).failed_to_reply end
member_replied_in_time(member_id)
click to toggle source
# File lib/rswim/member_pool.rb, line 89 def member_replied_in_time(member_id) member(member_id).replied_in_time end
prepare_output()
click to toggle source
# File lib/rswim/member_pool.rb, line 45 def prepare_output update_entries = @members.map { |_k, member| member.prepare_update_entry } # .select { |entry| entry.propagation_count < 5 } .sort_by { |entry| entry.propagation_count } # sort ascending! .take(15) # TODO: constant update_entries.each do |entry| publish(entry.member_id, entry.status) if entry.propagation_count.zero? member(entry.member_id).increment_propagation_count end ms = @members.values.flat_map(&:prepare_output) ms.each { |message| message.payload[:updates] = update_entries } ms end
remove_member(member_id)
click to toggle source
# File lib/rswim/member_pool.rb, line 84 def remove_member(member_id) raise 'boom' if member_id == @node_member_id @members.delete(member_id) end
send_ping_request_to_k_members(target_id)
click to toggle source
# File lib/rswim/member_pool.rb, line 70 def send_ping_request_to_k_members(target_id) @members.inject([]) { |acc, (id, m)| id != target_id && m.can_be_pinged? ? (acc << m) : acc } .take(K) .each { |m| m.ping_request!(target_id) } end
send_ping_to_random_healthy_member()
click to toggle source
# File lib/rswim/member_pool.rb, line 61 def send_ping_to_random_healthy_member ms = @members.values.select(&:can_be_pinged?) return if ms.empty? index = ms.one? ? 0 : rand(ms.size) member = ms[index] member.ping! end
status_report()
click to toggle source
# File lib/rswim/member_pool.rb, line 36 def status_report StatusReport.print(@node_member_id, @members) end
subscribe(&block)
click to toggle source
# File lib/rswim/member_pool.rb, line 40 def subscribe(&block) @subscribers << block end
update_member(message)
click to toggle source
# File lib/rswim/member_pool.rb, line 14 def update_member(message) updates = message.payload[:updates] update_suspicions(updates) unless updates.nil? sender = member(message.from) # NB: records member if not seen before case message.type when :ping @me.schedule_ack(message.from) when :ack sender.replied_with_ack when :ping_req target_id = message.payload[:target_id] member(target_id).ping_from!(message.from) else raise 'bad message type' end end
update_members(elapsed_seconds)
click to toggle source
# File lib/rswim/member_pool.rb, line 32 def update_members(elapsed_seconds) @members.values.each { |m| m.update(elapsed_seconds) } end
Private Instance Methods
member(id)
click to toggle source
# File lib/rswim/member_pool.rb, line 109 def member(id) raise 'boom' if id.nil? @members[id] ||= Member::Peer.new(id, @node_member_id, self) end
publish(member_id, status)
click to toggle source
# File lib/rswim/member_pool.rb, line 99 def publish(member_id, status) @subscribers.each { |s| s.call(member_id, status) } end
update_suspicions(updates)
click to toggle source
# File lib/rswim/member_pool.rb, line 103 def update_suspicions(updates) updates.each do |u| member(u.member_id).update_suspicion(u.status, u.incarnation_number) end end