class Fluent::SixpackOutput

Constants

SIXPACK_PATH

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sixpack.rb, line 4
def initialize
  super
  require 'net/http'
  require 'uri'
  require 'resolve/hostname'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sixpack.rb, line 47
def configure(conf)
  super

  @mode = case @mode
          when 'count' then :count
          when 'modified' then :modified
          else
            :gauge
          end

  @auth = case @authentication
          when 'basic' then :basic
          else
            :none
          end
  @resolver = Resolve::Hostname.new(:system_resolver => true)
end
connect_to() click to toggle source
# File lib/fluent/plugin/out_sixpack.rb, line 104
def connect_to
  url = URI.parse(@sixpackapi_url)
  return url.host, url.port
end
emit(tag, es, chain) click to toggle source
# File lib/fluent/plugin/out_sixpack.rb, line 234
def emit(tag, es, chain)
  events = []

  es.each {|time,record|
    if SIXPACK_PATH.has_key?(record[@key_record_type].to_sym)
      events.push({:time => time, :tag => tag, :record => record})
    end
  }

  if @thread
    @mutex.synchronize do
      @queue += events
    end
  else
    begin
      post_events(events)
    rescue => e
      log.warn "HTTP POST Error occures to sixpack server", :error_class => e.class, :error => e.message
      raise if @retry
    end
  end

  chain.next
end
form_encode_params_convert(record) click to toggle source
# File lib/fluent/plugin/out_sixpack.rb, line 146
def form_encode_params_convert(record)
  params = {
     :experiment   => record[@key_experiment],
     :client_id    => record[@key_client_id],
  }
  params.merge!({:kpi => record[@key_kpi]}) if(record[@key_kpi])

  return URI.encode_www_form(params)
end
http_connection(host, port) click to toggle source
# File lib/fluent/plugin/out_sixpack.rb, line 109
def http_connection(host, port)
  http = Net::HTTP.new(@resolver.getaddress(host), port)
  if @timeout
    http.open_timeout = @timeout
    http.read_timeout = @timeout
  end
  if @ssl
    http.use_ssl = true
    unless @verify_ssl
      http.verify_mode = OpenSSL::SSL::VERIFY_NONE
    end
  end
  http
end
map_sixpack_path(record) click to toggle source
# File lib/fluent/plugin/out_sixpack.rb, line 124
def map_sixpack_path(record)
  sixpack_path
end
map_sixpack_path_with_query(record) click to toggle source
# File lib/fluent/plugin/out_sixpack.rb, line 128
def map_sixpack_path_with_query(record)
  sixpack_path = SIXPACK_PATH[record[@key_record_type].to_sym]
  case record[@key_record_type]
  when 'participate'
    return sixpack_path, URI.encode_www_form({
             :experiment   => record[@key_experiment],
             :alternatives => record[@key_alternatives].split(','),
             :alternative  => record[@key_alternative],
             :client_id    => record[@key_client_id],
           })
  when 'convert'
    return sixpack_path, form_encode_params_convert(record)
  else
    log.warn 'failed to map sixpack path and query'
    raise
  end
end
post(event) click to toggle source
# File lib/fluent/plugin/out_sixpack.rb, line 171
def post(event)
  url = @sixpackapi_url
  res = nil
  begin
    host,port = connect_to
    req = post_request(event)
    http = http_connection(host, port)
    res = http.start {|http| http.request(req) }
  rescue IOError, EOFError, SystemCallError
    # server didn't respond
    log.warn "net/http GET raises exception: #{$!.class}, '#{$!.message}'"
  end
  unless res and res.is_a?(Net::HTTPSuccess)
    log.warn "failed to post to sixpack #{url}, record#{event[:record]}, code: #{res && res.code}"
  end
end
post_events(events) click to toggle source
# File lib/fluent/plugin/out_sixpack.rb, line 224
def post_events(events)
  if @keepalive
    post_keepalive(events)
  else
    events.each do |event|
      post(event)
    end
  end
end
post_keepalive(events) click to toggle source
# File lib/fluent/plugin/out_sixpack.rb, line 188
def post_keepalive(events) # [{:tag=>'',:name=>'',:value=>X}]
  return if events.size < 1

  # sixpack host/port is same for all events (host is from configuration)
  host,port = connect_to

  requests = events.map{|e| post_request(e)}

  http = nil
  requests.each do |req|
    begin
      unless http
        http = http_connection(host, port)
        http.start
      end
      res = http.request(req)
      unless res and res.is_a?(Net::HTTPSuccess)
        log.warn "failed to post to sixpack: #{host}:#{port}#{req.path}, post_data: #{req.body} code: #{res && res.code}"
      end
    rescue IOError, EOFError, Errno::ECONNRESET, Errno::ETIMEDOUT, SystemCallError
      log.warn "net/http keepalive POST raises exception: #{$!.class}, '#{$!.message}'"
      begin
        http.finish
      rescue
        # ignore all errors for connection with error
      end
      http = nil
    end
  end
  begin
    http.finish
  rescue
    # ignore
  end
end
post_request(event) click to toggle source
# File lib/fluent/plugin/out_sixpack.rb, line 156
def post_request(event)
  uri = URI.parse(@sixpackapi_url)
  uri.path, uri.query = map_sixpack_path_with_query(event[:record])
  req = Net::HTTP::Get.new(uri.request_uri)
  if @auth and @auth == :basic
    req.basic_auth(@username, @password)
  end
  req['Host'] = uri.host
  if @keepalive
    req['Connection'] = 'Keep-Alive'
  end

  req
end
poster() click to toggle source
# File lib/fluent/plugin/out_sixpack.rb, line 85
def poster
  while @running
    if @queue.size < 1
      sleep(0.2)
      next
    end

    events = @mutex.synchronize {
      es,@queue = @queue,[]
      es
    }
    begin
      post_events(events) if events.size > 0
    rescue => e
      log.warn "HTTP POST in background Error occures to sixpack server", :error_class => e.class, :error => e.message
    end
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sixpack.rb, line 79
def shutdown
  @running = false
  @thread.join if @thread
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sixpack.rb, line 65
def start
  super

  @running = true
  @thread = nil
  @queue = nil
  @mutex = nil
  if @background_post
    @mutex = Mutex.new
    @queue = []
    @thread = Thread.new(&method(:poster))
  end
end