class Fluent::ForwardOutput

Constants

FORWARD_HEADER

MessagePack FixArray length is 3

LISTEN_PORT

Attributes

nodes[R]
read_interval[R]
recover_sample_size[R]

Public Class Methods

new() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 44
def initialize
  super
  require 'fluent/plugin/socket_util'
  @nodes = []  #=> [Node]
  @loop = nil
  @thread = nil
  @finished = false
end

Public Instance Methods

configure(conf) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 119
def configure(conf)
  super

  @read_interval = @read_interval_msec / 1000.0
  @recover_sample_size = @recover_wait / @heartbeat_interval

  if @dns_round_robin
    if @heartbeat_type == :udp
      raise ConfigError, "forward output heartbeat type must be 'tcp' or 'none' to use dns_round_robin option"
    end
  end

  @servers.each do |server|
    failure = FailureDetector.new(@heartbeat_interval, @hard_timeout, Time.now.to_i.to_f)
    name = server.name || "#{server.host}:#{server.port}"

    log.info "adding forwarding server '#{name}'", host: server.host, port: server.port, weight: server.weight, plugin_id: plugin_id
    if @heartbeat_type == :none
      @nodes << NoneHeartbeatNode.new(self, server, failure: failure)
    else
      @nodes << Node.new(self, server, failure: failure)
    end
  end

  if @compress == :gzip && @buffer.compress == :text
    @buffer.compress = :gzip
  elsif @compress == :text && @buffer.compress == :gzip
    log.info "buffer is compressed.  If you also want to save the bandwidth of a network, Add `compress` configuration in <match>"
  end

  if @nodes.empty?
    raise ConfigError, "forward output plugin requires at least one <server> is required"
  end

  raise Fluent::ConfigError, "ack_response_timeout must be a positive integer" if @ack_response_timeout < 1
end
forward_header() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 231
def forward_header
  FORWARD_HEADER
end
run() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 194
def run
  @loop.run if @loop
rescue
  log.error "unexpected error", error: $!.to_s
  log.error_backtrace
end
shutdown() click to toggle source
Calls superclass method Fluent::Plugin::Output#shutdown
# File lib/fluent/plugin/out_forward.rb, line 182
def shutdown
  @finished = true
  if @loop
    @loop.watchers.each {|w| w.detach }
    @loop.stop
  end
  @thread.join if @thread
  @usock.close if @usock

  super
end
start() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 156
def start
  super

  @rand_seed = Random.new.seed
  rebuild_weight_array
  @rr = 0
  @usock = nil

  unless @heartbeat_type == :none
    @loop = Coolio::Loop.new

    if @heartbeat_type == :udp
      # assuming all hosts use udp
      @usock = SocketUtil.create_udp_socket(@nodes.first.host)
      @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
      @hb = HeartbeatHandler.new(@usock, method(:on_heartbeat))
      @loop.attach(@hb)
    end

    @timer = HeartbeatRequestTimer.new(@heartbeat_interval, method(:on_timer))
    @loop.attach(@timer)

    @thread = Thread.new(&method(:run))
  end
end
write_objects(tag, chunk) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 201
def write_objects(tag, chunk)
  return if chunk.empty?

  error = nil

  wlen = @weight_array.length
  wlen.times do
    @rr = (@rr + 1) % wlen
    node = @weight_array[@rr]

    if node.available?
      begin
        node.send_data(tag, chunk)
        return
      rescue
        # for load balancing during detecting crashed servers
        error = $!  # use the latest error
      end
    end
  end

  if error
    raise error
  else
    raise "no nodes are available"  # TODO message
  end
end

Private Instance Methods

on_heartbeat(sockaddr, msg) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 330
def on_heartbeat(sockaddr, msg)
  if node = @nodes.find {|n| n.sockaddr == sockaddr }
    #log.trace "heartbeat from '#{node.name}'", :host=>node.host, :port=>node.port
    if node.heartbeat
      rebuild_weight_array
    end
  end
end
on_timer() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 292
def on_timer
  return if @finished
  @nodes.each {|n|
    if n.tick
      rebuild_weight_array
    end
    begin
      log.trace "sending heartbeat", host: n.host, port: n.port, heartbeat_type: @heartbeat_type
      n.usock = @usock if @usock
      n.send_heartbeat
    rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR, Errno::ECONNREFUSED
      log.debug "failed to send heartbeat packet", host: n.host, port: n.port, heartbeat_type: @heartbeat_type, error: $!
    end
  }
end
rebuild_weight_array() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 237
def rebuild_weight_array
  standby_nodes, regular_nodes = @nodes.partition {|n|
    n.standby?
  }

  lost_weight = 0
  regular_nodes.each {|n|
    unless n.available?
      lost_weight += n.weight
    end
  }
  log.debug "rebuilding weight array", lost_weight: lost_weight

  if lost_weight > 0
    standby_nodes.each {|n|
      if n.available?
        regular_nodes << n
        log.warn "using standby node #{n.host}:#{n.port}", weight: n.weight
        lost_weight -= n.weight
        break if lost_weight <= 0
      end
    }
  end

  weight_array = []
  gcd = regular_nodes.map {|n| n.weight }.inject(0) {|r,w| r.gcd(w) }
  regular_nodes.each {|n|
    (n.weight / gcd).times {
      weight_array << n
    }
  }

  # for load balancing during detecting crashed servers
  coe = (regular_nodes.size * 6) / weight_array.size
  weight_array *= coe if coe > 1

  r = Random.new(@rand_seed)
  weight_array.sort_by! { r.rand }

  @weight_array = weight_array
end