class LogStash::Inputs::Http

Using this input you can receive single or multiline events over http(s). Applications can send a HTTP POST request with a body to the endpoint started by this input and Logstash will convert it into an event for subsequent processing. Users can pass plain text, JSON, or any formatted data and use a corresponding codec with this input. For Content-Type `application/json` the `json` codec is used, but for all other data formats, `plain` codec is used.

This input can also be used to receive webhook requests to integrate with other services and applications. By taking advantage of the vast plugin ecosystem available in Logstash you can trigger actionable events right from your application.

Security

This plugin supports standard HTTP basic authentication headers to identify the requester. You can pass in an username, password combination while sending data to this input

You can also setup SSL and send data securely over https, with an option of validating the client's certificate. Currently, the certificate setup is through docs.oracle.com/cd/E19509-01/820-3503/ggfen/index.html[Java Keystore format]

Constants

BUSY_RESPONSE
REJECTED_HEADERS

useless headers puma adds to the requests mostly due to rack compliance

Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/http.rb, line 95
def register
  require "logstash/util/http_compressed_requests"
  @server = ::HTTPInputWebServer.new(nil) # we'll set the rack handler later
  if @user && @password then
    token = Base64.strict_encode64("#{@user}:#{@password.value}")
    @auth_token = "Basic #{token}"
  end
  if @ssl
    if @keystore.nil? || @keystore_password.nil?
      raise(LogStash::ConfigurationError, "Settings :keystore and :keystore_password are required because :ssl is enabled.")
    end
    ctx = Puma::MiniSSL::Context.new
    ctx.keystore = @keystore
    ctx.keystore_pass = @keystore_password.value
    ctx.verify_mode = case @verify_mode
                      when 'peer'
                        Puma::MiniSSL::VERIFY_PEER
                      when 'force_peer'
                        Puma::MiniSSL::VERIFY_PEER | Puma::MiniSSL::VERIFY_FAIL_IF_NO_PEER_CERT
                      when 'none'
                        Puma::MiniSSL::VERIFY_NONE
                      end
    @server.add_ssl_listener(@host, @port, ctx)
  else
    @server.add_tcp_listener(@host, @port)
  end
  @server.min_threads = 0
  # The actual number of threads is one higher to let us reject additional requests
  @server.max_threads = @threads + 1

  # set the default codec in the @codecs hash so that it is also cloned in the write_slots
  @codecs = { :default => @codec }

  @additional_codecs.each do |content_type, codec|
    @codecs[content_type] = LogStash::Plugin.lookup("codec", codec).new
  end

  @write_slots = java.util.concurrent.ArrayBlockingQueue.new(threads)
  threads.times do
    # Freeze these guys just in case, since they aren't threadsafe
    @write_slots.put(Hash[@codecs.map {|k,v| [k.freeze, v.clone].freeze }.freeze].freeze)
  end

end
run(queue) click to toggle source
# File lib/logstash/inputs/http.rb, line 141
def run(queue)
  # proc needs to be defined at this context
  # to capture @codecs, @logger and lowercase_keys
  p = Proc.new do |req|
    begin
      remote_host = req['puma.socket'].peeraddr[3]
      REJECTED_HEADERS.each {|k| req.delete(k) }
      req = lowercase_keys(req)
      body = req.delete("rack.input")
      local_codecs = @write_slots.poll()
      if !local_codecs # No free write slot
        next [429, {}, BUSY_RESPONSE]
      end
      begin
        codec = local_codecs[req["content_type"]] || local_codecs[:default]

        codec.decode(body.read) { |event| push_decorated_event(queue, event, remote_host, req) }

        # since payloads are self-contained and we don't handle multipart we should flush
        # the codec after each request.
        codec.flush { |event| push_decorated_event(queue, event, remote_host, req) }
      ensure
        @write_slots.put(local_codecs)
      end
      ['200', @response_headers, ['ok']]
    rescue => e
      @logger.error(
        "unable to process event.", 
        :request => req,
        :message => e.message,
        :class => e.class.name,
        :backtrace => e.backtrace
      )
      ['500', @response_headers, ['internal error']]
    end
  end

  auth = Proc.new do |username, password|
    username == @user && password == @password.value
  end if (@user && @password)

  @server.app = Rack::Builder.new do
    use(Rack::Auth::Basic, &auth) if auth
    use CompressedRequests
    run(p)
  end
  @server.run.join
end
stop() click to toggle source
# File lib/logstash/inputs/http.rb, line 190
def stop
  return unless @server
  @server.stop(true)
  @server.binder.close if @server.binder
rescue IOError
  # do nothing
end

Private Instance Methods

lowercase_keys(hash) click to toggle source
# File lib/logstash/inputs/http.rb, line 200
def lowercase_keys(hash)
  new_hash = {}
  hash.each_pair do |k,v|
    new_hash[k.downcase] = v
  end
  new_hash
end
push_decorated_event(queue, event, host, headers) click to toggle source
# File lib/logstash/inputs/http.rb, line 208
def push_decorated_event(queue, event, host, headers)
  event.set("host", host)
  event.set("headers", headers)
  decorate(event)
  queue << event
end