class Fluent::PullForwardInput
Constants
- DEFAULT_PULLFORWARD_LISTEN_PORT
Attributes
hostname_resolver[R]
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_pull_forward.rb, line 21 def initialize super require 'resolve/hostname' require 'net/http' require 'net/https' require 'openssl' require 'yajl' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_pull_forward.rb, line 35 def configure(conf) super @verify_mode = if @allow_self_signed_certificate OpenSSL::SSL::VERIFY_NONE else OpenSSL::SSL::VERIFY_PEER end @resolver = Resolve::Hostname.new(:system_resolver => true) end
fetch(server)
click to toggle source
# File lib/fluent/plugin/in_pull_forward.rb, line 74 def fetch(server) body = nil begin address = @resolver.getaddress(server.host) https = Net::HTTP.new(address, server.port) https.open_timeout = @timeout https.read_timeout = @timeout https.use_ssl = true https.verify_mode = @verify_mode req = Net::HTTP::Get.new('/') req.basic_auth server.username, server.password res = https.start{ https.request(req) } if res && res.is_a?(Net::HTTPSuccess) body = res.body else log.warn "failed to GET from Fluentd PullForward: #{server.host}, #{address}:#{server.port}, by #{res.class}" end rescue IOError, EOFError, SystemCallError => e log.warn "net/http GET raised an exception: #{e.class}, '#{e.message}'" end return unless body data = nil begin data = Yajl::Parser.parse(body) rescue => e # maybe parse error log.warn "an error occured for parse of transferred content: #{e.class}, '#{e.message}'" end return unless data bundle = {} data.each do |tag, time, record| bundle[tag] ||= Fluent::MultiEventStream.new bundle[tag].add(time, record) end bundle.each do |tag, es| Fluent::Engine.emit_stream(tag, es) end end
fetcher()
click to toggle source
# File lib/fluent/plugin/in_pull_forward.rb, line 58 def fetcher next_fetch = Time.now while @running if Time.now >= next_fetch @servers.each do |server| if @running fetch(server) end end next_fetch = Time.now + @fetch_interval end break unless @running sleep 1 end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_pull_forward.rb, line 52 def shutdown super @running = false @thread.join end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_pull_forward.rb, line 46 def start super @running = true @thread = Thread.new(&method(:fetcher)) end