class LogStash::Outputs::Faye
Public Instance Methods
# File lib/logstash/outputs/faye.rb, line 118 def close client.close end
# File lib/logstash/outputs/faye.rb, line 56 def multi_receive(events) events.each {|event| receive(event, :parallel)} client.execute! end
Once we no longer need to support Logstash < 2.2 (pre-ng-pipeline) We don't need to handle :background style requests
We use :background style requests for Logstash < 2.2 because before the microbatching pipeline performance is greatly improved by having some degree of async behavior.
In Logstash 2.2 and after things are much simpler, we just run each batch in parallel This will make performance much easier to reason about, and more importantly let us guarantee that if `multi_receive` returns all items have been sent.
# File lib/logstash/outputs/faye.rb, line 70 def receive(event, async_type=:background) if @faye_token body = LogStash::Json.dump({:channel => @channel, :data => map_event(event), :ext => {:auth_token => @faye_token}}) else body = LogStash::Json.dump({:channel => @channel, :data => map_event(event)}) end # Block waiting for a token token = @request_tokens.pop if async_type == :background # Send the request url = event.sprintf(@url) headers = event_headers(event) # Create an async request request = client.send(async_type).send(@http_method, url, :body => body, :headers => headers) request.on_complete do # Make sure we return the token to the pool @request_tokens << token if async_type == :background end request.on_success do |response| if response.code < 200 || response.code > 299 log_failure( "Encountered non-200 HTTP code #{response.code}", :response_code => response.code, :url => url, :event => event.to_hash) end end request.on_failure do |exception| log_failure("Could not fetch URL", :url => url, :method => @http_method, :body => faye_body.to_json, :headers => headers, :message => exception.message, :class => exception.class.name, :backtrace => exception.backtrace ) end request.call if async_type == :background end
# File lib/logstash/outputs/faye.rb, line 42 def register @http_method = :post @content_type = "application/json" # We count outstanding requests with this queue # This queue tracks the requests to create backpressure # When this queue is empty no new requests may be sent, # tokens must be added back by the client on success @request_tokens = SizedQueue.new(@pool_max) @pool_max.times {|t| @request_tokens << true } @requests = Array.new end
Private Instance Methods
# File lib/logstash/outputs/faye.rb, line 156 def custom_headers(event) return nil unless @headers @headers.reduce({}) do |acc,kv| k,v = kv acc[k] = event.sprintf(v) acc end end
# File lib/logstash/outputs/faye.rb, line 150 def event_headers(event) headers = custom_headers(event) || {} headers["Content-Type"] = @content_type headers end
This is split into a separate method mostly to help testing
# File lib/logstash/outputs/faye.rb, line 125 def log_failure(message, opts) @logger.error("[HTTP Output Failure] #{message}", opts) end
# File lib/logstash/outputs/faye.rb, line 138 def map_event(event) if @mapping @mapping.reduce({}) do |acc,kv| k,v = kv acc[k] = event.sprintf(v) acc end else event.to_hash end end
Manticore doesn't provide a way to attach handlers to background or async requests well It wants you to use futures. The async method kinda works but expects single thread batches and background only returns futures. Proposed fix to manticore here: github.com/cheald/manticore/issues/32
# File lib/logstash/outputs/faye.rb, line 133 def request_async_background(request) @method ||= client.executor.java_method(:submit, [java.util.concurrent.Callable.java_class]) @method.call(request) end