MessagePack FixArray length is 3
# File lib/fluent/plugin/out_forward.rb, line 44 def initialize super require 'fluent/plugin/socket_util' @nodes = [] #=> [Node] @loop = nil @thread = nil @finished = false end
# File lib/fluent/plugin/out_forward.rb, line 119 def configure(conf) super @read_interval = @read_interval_msec / 1000.0 @recover_sample_size = @recover_wait / @heartbeat_interval if @dns_round_robin if @heartbeat_type == :udp raise ConfigError, "forward output heartbeat type must be 'tcp' or 'none' to use dns_round_robin option" end end @servers.each do |server| failure = FailureDetector.new(@heartbeat_interval, @hard_timeout, Time.now.to_i.to_f) name = server.name || "#{server.host}:#{server.port}" log.info "adding forwarding server '#{name}'", host: server.host, port: server.port, weight: server.weight, plugin_id: plugin_id if @heartbeat_type == :none @nodes << NoneHeartbeatNode.new(self, server, failure: failure) else @nodes << Node.new(self, server, failure: failure) end end if @compress == :gzip && @buffer.compress == :text @buffer.compress = :gzip elsif @compress == :text && @buffer.compress == :gzip log.info "buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in <match>" end if @nodes.empty? raise ConfigError, "forward output plugin requires at least one <server> is required" end raise Fluent::ConfigError, "ack_response_timeout must be a positive integer" if @ack_response_timeout < 1 end
# File lib/fluent/plugin/out_forward.rb, line 231 def forward_header FORWARD_HEADER end
# File lib/fluent/plugin/out_forward.rb, line 194 def run @loop.run if @loop rescue log.error "unexpected error", error: $!.to_s log.error_backtrace end
# File lib/fluent/plugin/out_forward.rb, line 182 def shutdown @finished = true if @loop @loop.watchers.each {|w| w.detach } @loop.stop end @thread.join if @thread @usock.close if @usock super end
# File lib/fluent/plugin/out_forward.rb, line 156 def start super @rand_seed = Random.new.seed rebuild_weight_array @rr = 0 @usock = nil unless @heartbeat_type == :none @loop = Coolio::Loop.new if @heartbeat_type == :udp # assuming all hosts use udp @usock = SocketUtil.create_udp_socket(@nodes.first.host) @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) @hb = HeartbeatHandler.new(@usock, method(:on_heartbeat)) @loop.attach(@hb) end @timer = HeartbeatRequestTimer.new(@heartbeat_interval, method(:on_timer)) @loop.attach(@timer) @thread = Thread.new(&method(:run)) end end
# File lib/fluent/plugin/out_forward.rb, line 201 def write_objects(tag, chunk) return if chunk.empty? error = nil wlen = @weight_array.length wlen.times do @rr = (@rr + 1) % wlen node = @weight_array[@rr] if node.available? begin node.send_data(tag, chunk) return rescue # for load balancing during detecting crashed servers error = $! # use the latest error end end end if error raise error else raise "no nodes are available" # TODO message end end
# File lib/fluent/plugin/out_forward.rb, line 330 def on_heartbeat(sockaddr, msg) if node = @nodes.find {|n| n.sockaddr == sockaddr } #log.trace "heartbeat from '#{node.name}'", :host=>node.host, :port=>node.port if node.heartbeat rebuild_weight_array end end end
# File lib/fluent/plugin/out_forward.rb, line 292 def on_timer return if @finished @nodes.each {|n| if n.tick rebuild_weight_array end begin log.trace "sending heartbeat", host: n.host, port: n.port, heartbeat_type: @heartbeat_type n.usock = @usock if @usock n.send_heartbeat rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR, Errno::ECONNREFUSED log.debug "failed to send heartbeat packet", host: n.host, port: n.port, heartbeat_type: @heartbeat_type, error: $! end } end
# File lib/fluent/plugin/out_forward.rb, line 237 def rebuild_weight_array standby_nodes, regular_nodes = @nodes.partition {|n| n.standby? } lost_weight = 0 regular_nodes.each {|n| unless n.available? lost_weight += n.weight end } log.debug "rebuilding weight array", lost_weight: lost_weight if lost_weight > 0 standby_nodes.each {|n| if n.available? regular_nodes << n log.warn "using standby node #{n.host}:#{n.port}", weight: n.weight lost_weight -= n.weight break if lost_weight <= 0 end } end weight_array = [] gcd = regular_nodes.map {|n| n.weight }.inject(0) {|r,w| r.gcd(w) } regular_nodes.each {|n| (n.weight / gcd).times { weight_array << n } } # for load balancing during detecting crashed servers coe = (regular_nodes.size * 6) / weight_array.size weight_array *= coe if coe > 1 r = Random.new(@rand_seed) weight_array.sort_by! { r.rand } @weight_array = weight_array end