class LogStash::Outputs::Faye

Public Instance Methods

close() click to toggle source
# File lib/logstash/outputs/faye.rb, line 118
def close
  client.close
end
multi_receive(events) click to toggle source
# File lib/logstash/outputs/faye.rb, line 56
def multi_receive(events)
  events.each {|event| receive(event, :parallel)}
  client.execute!
end
receive(event, async_type=:background) click to toggle source

Once we no longer need to support Logstash < 2.2 (pre-ng-pipeline) We don't need to handle :background style requests

We use :background style requests for Logstash < 2.2 because before the microbatching pipeline performance is greatly improved by having some degree of async behavior.

In Logstash 2.2 and after things are much simpler, we just run each batch in parallel This will make performance much easier to reason about, and more importantly let us guarantee that if `multi_receive` returns all items have been sent.

# File lib/logstash/outputs/faye.rb, line 70
def receive(event, async_type=:background)

  if @faye_token
    body = LogStash::Json.dump({:channel => @channel, :data => map_event(event), :ext => {:auth_token => @faye_token}})
  else
    body = LogStash::Json.dump({:channel => @channel, :data => map_event(event)})
  end

  # Block waiting for a token
  token = @request_tokens.pop if async_type == :background

  # Send the request
  url = event.sprintf(@url)
  headers = event_headers(event)

  # Create an async request
  request = client.send(async_type).send(@http_method, url, :body => body, :headers => headers)

  request.on_complete do
    # Make sure we return the token to the pool
    @request_tokens << token  if async_type == :background
  end

  request.on_success do |response|
    if response.code < 200 || response.code > 299
      log_failure(
        "Encountered non-200 HTTP code #{response.code}",
        :response_code => response.code,
        :url => url,
        :event => event.to_hash)
    end
  end

  request.on_failure do |exception|
    log_failure("Could not fetch URL",
                :url => url,
                :method => @http_method,
                :body => faye_body.to_json,
                :headers => headers,
                :message => exception.message,
                :class => exception.class.name,
                :backtrace => exception.backtrace
    )
  end

  request.call if async_type == :background
end
register() click to toggle source
# File lib/logstash/outputs/faye.rb, line 42
def register
  @http_method = :post
  @content_type = "application/json"

  # We count outstanding requests with this queue
  # This queue tracks the requests to create backpressure
  # When this queue is empty no new requests may be sent,
  # tokens must be added back by the client on success
  @request_tokens = SizedQueue.new(@pool_max)
  @pool_max.times {|t| @request_tokens << true }

  @requests = Array.new
end

Private Instance Methods

custom_headers(event) click to toggle source
# File lib/logstash/outputs/faye.rb, line 156
def custom_headers(event)
  return nil unless @headers

  @headers.reduce({}) do |acc,kv|
    k,v = kv
    acc[k] = event.sprintf(v)
    acc
  end
end
event_headers(event) click to toggle source
# File lib/logstash/outputs/faye.rb, line 150
def event_headers(event)
  headers = custom_headers(event) || {}
  headers["Content-Type"] = @content_type
  headers
end
log_failure(message, opts) click to toggle source

This is split into a separate method mostly to help testing

# File lib/logstash/outputs/faye.rb, line 125
def log_failure(message, opts)
  @logger.error("[HTTP Output Failure] #{message}", opts)
end
map_event(event) click to toggle source
# File lib/logstash/outputs/faye.rb, line 138
def map_event(event)
  if @mapping
    @mapping.reduce({}) do |acc,kv|
      k,v = kv
      acc[k] = event.sprintf(v)
      acc
    end
  else
    event.to_hash
  end
end
request_async_background(request) click to toggle source

Manticore doesn't provide a way to attach handlers to background or async requests well It wants you to use futures. The async method kinda works but expects single thread batches and background only returns futures. Proposed fix to manticore here: github.com/cheald/manticore/issues/32

# File lib/logstash/outputs/faye.rb, line 133
def request_async_background(request)
  @method ||= client.executor.java_method(:submit, [java.util.concurrent.Callable.java_class])
  @method.call(request)
end