class Fluent::HttpInput

Constants

EMPTY_GIF_IMAGE
EVENT_RECORD_PARAMETER

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::DetachMultiProcessMixin.new
# File lib/fluent/plugin/in_http.rb, line 35
def initialize
  require 'webrick/httputils'
  super
end

Public Instance Methods

configure(conf) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 63
def configure(conf)
  super

  m = if @format == 'default'
        method(:parse_params_default)
      else
        @parser = Plugin.new_parser(@format)
        @parser.configure(conf)
        method(:parse_params_with_parser)
      end
  (class << self; self; end).module_eval do
    define_method(:parse_params, m)
  end
end
on_request(path_info, params) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 144
def on_request(path_info, params)
  begin
    path = path_info[1..-1]  # remove /
    tag = path.split('/').join('.')
    record_time, record = parse_params(params)

    # Skip nil record
    if record.nil?
      if @respond_with_empty_img
        return ["200 OK", {'Content-Type'=>'image/gif; charset=utf-8'}, EMPTY_GIF_IMAGE]
      else
        return ["200 OK", {'Content-Type'=>'text/plain'}, ""]
      end
    end

    unless record.is_a?(Array)
      if @add_http_headers
        params.each_pair { |k,v|
          if k.start_with?("HTTP_")
            record[k] = v
          end
        }
      end
      if @add_remote_addr
        record['REMOTE_ADDR'] = params['REMOTE_ADDR']
      end
    end
    time = if param_time = params['time']
             param_time = param_time.to_f
             param_time.zero? ? Engine.now : Fluent::EventTime.from_time(Time.at(param_time))
           else
             record_time.nil? ? Engine.now : record_time
           end
  rescue
    return ["400 Bad Request", {'Content-Type'=>'text/plain'}, "400 Bad Request\n#{$!}\n"]
  end

  # TODO server error
  begin
    # Support batched requests
    if record.is_a?(Array)
      mes = MultiEventStream.new
      record.each do |single_record|
        if @add_http_headers
          params.each_pair { |k,v|
            if k.start_with?("HTTP_")
              single_record[k] = v
            end
          }
        end
        if @add_remote_addr
          single_record['REMOTE_ADDR'] = params['REMOTE_ADDR']
        end
        single_time = single_record.delete("time") || time
        mes.add(single_time, single_record)
      end
      router.emit_stream(tag, mes)
    else
      router.emit(tag, time, record)
    end
  rescue
    return ["500 Internal Server Error", {'Content-Type'=>'text/plain'}, "500 Internal Server Error\n#{$!}\n"]
  end

  if @respond_with_empty_img
    return ["200 OK", {'Content-Type'=>'image/gif; charset=utf-8'}, EMPTY_GIF_IMAGE]
  else
    return ["200 OK", {'Content-Type'=>'text/plain'}, ""]
  end
end
run() click to toggle source
# File lib/fluent/plugin/in_http.rb, line 137
def run
  @loop.run(@blocking_timeout)
rescue
  log.error "unexpected error", error: $!.to_s
  log.error_backtrace
end
shutdown() click to toggle source
Calls superclass method Fluent::Compat::Input#shutdown
# File lib/fluent/plugin/in_http.rb, line 128
def shutdown
  @loop.watchers.each {|w| w.detach }
  @loop.stop
  @lsock.close
  @thread.join

  super
end
start() click to toggle source
Calls superclass method Fluent::Compat::Input#start
# File lib/fluent/plugin/in_http.rb, line 102
def start
  log.debug "listening http on #{@bind}:#{@port}"

  socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
  if Fluent.windows?
    socket_manager_path = socket_manager_path.to_i
  end
  client = ServerEngine::SocketManager::Client.new(socket_manager_path)
  lsock = client.listen_tcp(@bind, @port)

  detach_multi_process do
    super
    @km = KeepaliveManager.new(@keepalive_timeout)
    @lsock = Coolio::TCPServer.new(lsock, nil, Handler, @km, method(:on_request),
                                   @body_size_limit, @format, log,
                                   @cors_allow_origins)
    @lsock.listen(@backlog) unless @backlog.nil?

    @loop = Coolio::Loop.new
    @loop.attach(@km)
    @loop.attach(@lsock)

    @thread = Thread.new(&method(:run))
  end
end

Private Instance Methods

parse_params_default(params) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 217
def parse_params_default(params)
  record = if msgpack = params['msgpack']
             Engine.msgpack_factory.unpacker.feed(msgpack).read
           elsif js = params['json']
             JSON.parse(js)
           else
             raise "'json' or 'msgpack' parameter is required"
           end
  return nil, record
end
parse_params_with_parser(params) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 230
def parse_params_with_parser(params)
  if content = params[EVENT_RECORD_PARAMETER]
    @parser.parse(content) { |time, record|
      raise "Received event is not #{@format}: #{content}" if record.nil?
      return time, record
    }
  else
    raise "'#{EVENT_RECORD_PARAMETER}' parameter is required"
  end
end