class Fluent::SixpackOutput
Constants
- SIXPACK_PATH
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sixpack.rb, line 4 def initialize super require 'net/http' require 'uri' require 'resolve/hostname' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sixpack.rb, line 47 def configure(conf) super @mode = case @mode when 'count' then :count when 'modified' then :modified else :gauge end @auth = case @authentication when 'basic' then :basic else :none end @resolver = Resolve::Hostname.new(:system_resolver => true) end
connect_to()
click to toggle source
# File lib/fluent/plugin/out_sixpack.rb, line 104 def connect_to url = URI.parse(@sixpackapi_url) return url.host, url.port end
emit(tag, es, chain)
click to toggle source
# File lib/fluent/plugin/out_sixpack.rb, line 234 def emit(tag, es, chain) events = [] es.each {|time,record| if SIXPACK_PATH.has_key?(record[@key_record_type].to_sym) events.push({:time => time, :tag => tag, :record => record}) end } if @thread @mutex.synchronize do @queue += events end else begin post_events(events) rescue => e log.warn "HTTP POST Error occures to sixpack server", :error_class => e.class, :error => e.message raise if @retry end end chain.next end
form_encode_params_convert(record)
click to toggle source
# File lib/fluent/plugin/out_sixpack.rb, line 146 def form_encode_params_convert(record) params = { :experiment => record[@key_experiment], :client_id => record[@key_client_id], } params.merge!({:kpi => record[@key_kpi]}) if(record[@key_kpi]) return URI.encode_www_form(params) end
http_connection(host, port)
click to toggle source
# File lib/fluent/plugin/out_sixpack.rb, line 109 def http_connection(host, port) http = Net::HTTP.new(@resolver.getaddress(host), port) if @timeout http.open_timeout = @timeout http.read_timeout = @timeout end if @ssl http.use_ssl = true unless @verify_ssl http.verify_mode = OpenSSL::SSL::VERIFY_NONE end end http end
map_sixpack_path(record)
click to toggle source
# File lib/fluent/plugin/out_sixpack.rb, line 124 def map_sixpack_path(record) sixpack_path end
map_sixpack_path_with_query(record)
click to toggle source
# File lib/fluent/plugin/out_sixpack.rb, line 128 def map_sixpack_path_with_query(record) sixpack_path = SIXPACK_PATH[record[@key_record_type].to_sym] case record[@key_record_type] when 'participate' return sixpack_path, URI.encode_www_form({ :experiment => record[@key_experiment], :alternatives => record[@key_alternatives].split(','), :alternative => record[@key_alternative], :client_id => record[@key_client_id], }) when 'convert' return sixpack_path, form_encode_params_convert(record) else log.warn 'failed to map sixpack path and query' raise end end
post(event)
click to toggle source
# File lib/fluent/plugin/out_sixpack.rb, line 171 def post(event) url = @sixpackapi_url res = nil begin host,port = connect_to req = post_request(event) http = http_connection(host, port) res = http.start {|http| http.request(req) } rescue IOError, EOFError, SystemCallError # server didn't respond log.warn "net/http GET raises exception: #{$!.class}, '#{$!.message}'" end unless res and res.is_a?(Net::HTTPSuccess) log.warn "failed to post to sixpack #{url}, record#{event[:record]}, code: #{res && res.code}" end end
post_events(events)
click to toggle source
# File lib/fluent/plugin/out_sixpack.rb, line 224 def post_events(events) if @keepalive post_keepalive(events) else events.each do |event| post(event) end end end
post_keepalive(events)
click to toggle source
# File lib/fluent/plugin/out_sixpack.rb, line 188 def post_keepalive(events) # [{:tag=>'',:name=>'',:value=>X}] return if events.size < 1 # sixpack host/port is same for all events (host is from configuration) host,port = connect_to requests = events.map{|e| post_request(e)} http = nil requests.each do |req| begin unless http http = http_connection(host, port) http.start end res = http.request(req) unless res and res.is_a?(Net::HTTPSuccess) log.warn "failed to post to sixpack: #{host}:#{port}#{req.path}, post_data: #{req.body} code: #{res && res.code}" end rescue IOError, EOFError, Errno::ECONNRESET, Errno::ETIMEDOUT, SystemCallError log.warn "net/http keepalive POST raises exception: #{$!.class}, '#{$!.message}'" begin http.finish rescue # ignore all errors for connection with error end http = nil end end begin http.finish rescue # ignore end end
post_request(event)
click to toggle source
# File lib/fluent/plugin/out_sixpack.rb, line 156 def post_request(event) uri = URI.parse(@sixpackapi_url) uri.path, uri.query = map_sixpack_path_with_query(event[:record]) req = Net::HTTP::Get.new(uri.request_uri) if @auth and @auth == :basic req.basic_auth(@username, @password) end req['Host'] = uri.host if @keepalive req['Connection'] = 'Keep-Alive' end req end
poster()
click to toggle source
# File lib/fluent/plugin/out_sixpack.rb, line 85 def poster while @running if @queue.size < 1 sleep(0.2) next end events = @mutex.synchronize { es,@queue = @queue,[] es } begin post_events(events) if events.size > 0 rescue => e log.warn "HTTP POST in background Error occures to sixpack server", :error_class => e.class, :error => e.message end end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sixpack.rb, line 79 def shutdown @running = false @thread.join if @thread super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sixpack.rb, line 65 def start super @running = true @thread = nil @queue = nil @mutex = nil if @background_post @mutex = Mutex.new @queue = [] @thread = Thread.new(&method(:poster)) end end