class LogStash::Inputs::Http
Using this input you can receive single or multiline events over http(s). Applications can send a HTTP POST request with a body to the endpoint started by this input and Logstash will convert it into an event for subsequent processing. Users can pass plain text, JSON, or any formatted data and use a corresponding codec with this input. For Content-Type `application/json` the `json` codec is used, but for all other data formats, `plain` codec is used.
This input can also be used to receive webhook requests to integrate with other services and applications. By taking advantage of the vast plugin ecosystem available in Logstash you can trigger actionable events right from your application.
Security¶ ↑
This plugin supports standard HTTP basic authentication headers to identify the requester. You can pass in an username, password combination while sending data to this input
You can also setup SSL and send data securely over https, with an option of validating the client's certificate. Currently, the certificate setup is through docs.oracle.com/cd/E19509-01/820-3503/ggfen/index.html[Java Keystore format]
Constants
- BUSY_RESPONSE
- REJECTED_HEADERS
useless headers puma adds to the requests mostly due to rack compliance
Public Instance Methods
# File lib/logstash/inputs/http.rb, line 95 def register require "logstash/util/http_compressed_requests" @server = ::HTTPInputWebServer.new(nil) # we'll set the rack handler later if @user && @password then token = Base64.strict_encode64("#{@user}:#{@password.value}") @auth_token = "Basic #{token}" end if @ssl if @keystore.nil? || @keystore_password.nil? raise(LogStash::ConfigurationError, "Settings :keystore and :keystore_password are required because :ssl is enabled.") end ctx = Puma::MiniSSL::Context.new ctx.keystore = @keystore ctx.keystore_pass = @keystore_password.value ctx.verify_mode = case @verify_mode when 'peer' Puma::MiniSSL::VERIFY_PEER when 'force_peer' Puma::MiniSSL::VERIFY_PEER | Puma::MiniSSL::VERIFY_FAIL_IF_NO_PEER_CERT when 'none' Puma::MiniSSL::VERIFY_NONE end @server.add_ssl_listener(@host, @port, ctx) else @server.add_tcp_listener(@host, @port) end @server.min_threads = 0 # The actual number of threads is one higher to let us reject additional requests @server.max_threads = @threads + 1 # set the default codec in the @codecs hash so that it is also cloned in the write_slots @codecs = { :default => @codec } @additional_codecs.each do |content_type, codec| @codecs[content_type] = LogStash::Plugin.lookup("codec", codec).new end @write_slots = java.util.concurrent.ArrayBlockingQueue.new(threads) threads.times do # Freeze these guys just in case, since they aren't threadsafe @write_slots.put(Hash[@codecs.map {|k,v| [k.freeze, v.clone].freeze }.freeze].freeze) end end
# File lib/logstash/inputs/http.rb, line 141 def run(queue) # proc needs to be defined at this context # to capture @codecs, @logger and lowercase_keys p = Proc.new do |req| begin remote_host = req['puma.socket'].peeraddr[3] REJECTED_HEADERS.each {|k| req.delete(k) } req = lowercase_keys(req) body = req.delete("rack.input") local_codecs = @write_slots.poll() if !local_codecs # No free write slot next [429, {}, BUSY_RESPONSE] end begin codec = local_codecs[req["content_type"]] || local_codecs[:default] codec.decode(body.read) { |event| push_decorated_event(queue, event, remote_host, req) } # since payloads are self-contained and we don't handle multipart we should flush # the codec after each request. codec.flush { |event| push_decorated_event(queue, event, remote_host, req) } ensure @write_slots.put(local_codecs) end ['200', @response_headers, ['ok']] rescue => e @logger.error( "unable to process event.", :request => req, :message => e.message, :class => e.class.name, :backtrace => e.backtrace ) ['500', @response_headers, ['internal error']] end end auth = Proc.new do |username, password| username == @user && password == @password.value end if (@user && @password) @server.app = Rack::Builder.new do use(Rack::Auth::Basic, &auth) if auth use CompressedRequests run(p) end @server.run.join end
# File lib/logstash/inputs/http.rb, line 190 def stop return unless @server @server.stop(true) @server.binder.close if @server.binder rescue IOError # do nothing end
Private Instance Methods
# File lib/logstash/inputs/http.rb, line 200 def lowercase_keys(hash) new_hash = {} hash.each_pair do |k,v| new_hash[k.downcase] = v end new_hash end
# File lib/logstash/inputs/http.rb, line 208 def push_decorated_event(queue, event, host, headers) event.set("host", host) event.set("headers", headers) decorate(event) queue << event end