class Actory::Receiver::Worker
Public Class Methods
new(protocol="tcp", target: nil)
click to toggle source
# File lib/actory/receiver/worker.rb, line 5 def initialize(protocol="tcp", target: nil) protocol = RECEIVER['protocol'] if RECEIVER['protocol'] num = Parallel.processor_count target ||= Actory::Receiver::EventHandler Parallel.map(0..num, :in_processes => num) do |n| @@logger.info "Starting Actory Receiver Worker ##{n + 1}/#{num} (PID = #{Process.pid}, PGROUP = #{Process.getpgrp}, protocol = #{protocol})" is_retried = false begin worker = send(protocol, target, n) Signal.trap(:TERM) { worker.stop } Signal.trap(:INT) { worker.stop } worker.run rescue => e @@logger.error(Actory::Errors::Generator.new.json(level: "error", message: e, backtrace: $@)) unless is_retried is_retried = true retry end end end
Private Instance Methods
tcp(target, num)
click to toggle source
# File lib/actory/receiver/worker.rb, line 27 def tcp(target, num) worker = MessagePack::RPC::Server.new worker.listen(RECEIVER['address'], RECEIVER['port'] + num, target.new) worker end
udp(target, num)
click to toggle source
# File lib/actory/receiver/worker.rb, line 33 def udp(target, num) address = MessagePack::RPC::Address.new(RECEIVER['address'], RECEIVER['port'] + num) listener = MessagePack::RPC::UDPServerTransport.new(address) worker = MessagePack::RPC::Server.new worker.listen(listener, target.new) worker end