class Actory::Sender::Dispatcher
Attributes
actors[RW]
my_processor_count[RW]
receiver_count[RW]
system_info[RW]
trusted_hosts[RW]
Public Class Methods
new(actors: [])
click to toggle source
# File lib/actory/sender/dispatcher.rb, line 7 def initialize(actors: []) @actors = [] @trusted_hosts = [] @receiver_count = 0 @system_info = [] @my_processor_count = Parallel.processor_count ret = initial_handshaking(actors) raise StandardError if ret == 0 count = establish_connections raise StandardError if count == 0 rescue => e @@logger.error Actory::Errors::Generator.new.json(level: "error", message: "Initialization failed.", backtrace: $@) exit 1 end
Public Instance Methods
message(method, args=[], results=[])
click to toggle source
# File lib/actory/sender/dispatcher.rb, line 22 def message(method, args=[], results=[]) args = [nil] if args.empty? assignment = assign_jobs(args) pbar = ProgressBar.new(method, @receiver_count) if SENDER['show_progress'] results << Parallel.map(assignment, :in_processes => @receiver_count) do |arg, actor| if SENDER['show_progress'] begin pbar.set pbar.current + 1 if pbar.current <= @receiver_count rescue end end begin actor.send("receive", "reload") if SENDER['reload_receiver_plugins'] res = actor.send("receive", method, arg) sleep SENDER['get_interval'] ret = res.get ret.flatten! {actor.address.to_s => ret} rescue => e @@logger.warn Actory::Errors::Generator.new.json(level: "warn", message: "Something wrong with sending a message to #{actor.address}", backtrace: $@) actor = change_actor(actor) retry end end results.flatten end
Private Instance Methods
assign_jobs(args)
click to toggle source
# File lib/actory/sender/dispatcher.rb, line 132 def assign_jobs(args) num = 0 params = {} actors = @actors.sample(@my_processor_count) args.each do |arg| next if params.has_key?(arg) num = 0 unless actors[num] actor = actors[num] num += 1 params.merge!(arg => actor) end @@logger.debug params params end
change_actor(previous_actor)
click to toggle source
def select_actors
actors = nil case SENDER['policy'] when "even" actors = @actors when "random", "safe-random" actors = @actors.sample(@my_processor_count) else actors = @actors end actors
end
# File lib/actory/sender/dispatcher.rb, line 160 def change_actor(previous_actor) new_actor = nil loop do new_actor = @actors.sample break unless new_actor == previous_actor end new_actor end
establish_connections()
click to toggle source
# File lib/actory/sender/dispatcher.rb, line 72 def establish_connections case SENDER['policy'] when "even" establish_connections_evenly when "random" establish_connections_randomly(@receiver_count) when "safe-random" return 0 if @trusted_hosts.empty? establish_connections_randomly(@my_processor_count / @trusted_hosts.count) else establish_connections_evenly end @actors.count end
establish_connections_evenly()
click to toggle source
# File lib/actory/sender/dispatcher.rb, line 91 def establish_connections_evenly return nil if @trusted_hosts.empty? cores_per_host = @my_processor_count / @trusted_hosts.count cores_per_host = 1 if cores_per_host <= 0 establish_connections_helper(cores_per_host) end
establish_connections_helper(num=0)
click to toggle source
# File lib/actory/sender/dispatcher.rb, line 98 def establish_connections_helper(num=0) num.times do |n| SENDER['actors'].each do |actor| next unless actor.class == String actor = actor.gsub(/:/, " ").split host = actor[0] next unless trusted_hosts.include?(host) port = actor[1].to_i cli = MessagePack::RPC::Client.new(host, port + n) cli.timeout = SENDER['timeout'] @actors << cli end end @@logger.debug @actors end
establish_connections_randomly(num=0)
click to toggle source
# File lib/actory/sender/dispatcher.rb, line 87 def establish_connections_randomly(num=0) establish_connections_helper(num) end
get_receiver_count()
click to toggle source
# File lib/actory/sender/dispatcher.rb, line 127 def get_receiver_count res = @cli.send("receive", "processor_count") @receiver_count += res.get[0] if res.get[0] end
get_system_info()
click to toggle source
# File lib/actory/sender/dispatcher.rb, line 122 def get_system_info res = @cli.send("receive", "system_info") res.get[0] end
get_trusted_hosts(host)
click to toggle source
# File lib/actory/sender/dispatcher.rb, line 114 def get_trusted_hosts(host) res = @cli.send("receive", "auth?", SENDER['auth']['shared_key']) res.get[0] ? @trusted_hosts << host : nil rescue => e @@logger.warn Actory::Errors::Generator.new.json(level: "warn", message: "#{__method__} failed with #{host}", backtrace: $@) return nil end
initial_handshaking(actors=[])
click to toggle source
# File lib/actory/sender/dispatcher.rb, line 54 def initial_handshaking(actors=[]) actors = SENDER['actors'] if actors.empty? and SENDER['actors'].nil? == false actors.each do |actor| next unless actor.class == String actor = actor.gsub(/:/, " ").split host = actor[0] port = actor[1].to_i @cli = MessagePack::RPC::Client.new(host, port) @cli.timeout = SENDER['auth']['timeout'] ret = get_trusted_hosts(host) next unless ret @system_info << {:host => host, :system_info => get_system_info} get_receiver_count end rescue => e puts $@, e end