class LogStash::Filters::Jdbc::Lookup

Attributes

id[R]
parameters[R]
query[R]
target[R]

Public Class Methods

find_validation_errors(array_of_options) click to toggle source
# File lib/logstash/filters/jdbc/lookup.rb, line 37
def self.find_validation_errors(array_of_options)
  if !array_of_options.is_a?(Array)
    return "The options must be an Array"
    end
  errors = []
  array_of_options.each_with_index do |options, i|
    instance = new(options, {}, "lookup-#{i.next}")
    unless instance.valid?
      errors << instance.formatted_errors
    end
  end
  return nil if errors.empty?
  errors.join("; ")
end
new(options, globals, default_id) click to toggle source
# File lib/logstash/filters/jdbc/lookup.rb, line 54
def initialize(options, globals, default_id)
  @id = options["id"] || default_id
  @target = options["target"]
  @id_used_as_target = @target.nil?
  if @id_used_as_target
    # target shouldn't be nil if ecs_compatibility is not :disabled
    if globals[:ecs_compatibility] != :disabled
      logger.info('ECS compatibility is enabled but no ``target`` option was specified, it is recommended'\
                        ' to set the option to avoid potential schema conflicts (if your data is ECS compliant or'\
                        ' non-conflicting feel free to ignore this message)')
    end
    @target = @id
  end
  @options = options
  @globals = globals
  @valid = false
  @option_errors = []
  @default_result = nil
  @prepared_statement = nil
  @symbol_parameters = nil
  parse_options
  @load_method_ref = method(:load_data_from_local)
end

Public Instance Methods

enhance(local, event) click to toggle source
# File lib/logstash/filters/jdbc/lookup.rb, line 90
def enhance(local, event)
  result = retrieve_local_data(local, event, &@load_method_ref) # return a LookupResult
  if result.failed? || result.parameters_invalid?
    tag_failure(event)
  end

  if result.valid?
    if @use_default && result.empty?
      tag_default(event)
      process_event(event, @default_result)
    else
      process_event(event, result)
    end
    true
  else
    false
  end
end
formatted_errors() click to toggle source
# File lib/logstash/filters/jdbc/lookup.rb, line 86
def formatted_errors
  @option_errors.join(", ")
end
id_used_as_target?() click to toggle source
# File lib/logstash/filters/jdbc/lookup.rb, line 78
def id_used_as_target?
  @id_used_as_target
end
prepare(local) click to toggle source
# File lib/logstash/filters/jdbc/lookup.rb, line 113
def prepare(local)
  hash = {}
  @prepared_parameters.each_with_index { |v, i| hash[:"$p#{i}"] = v }
  @prepared_param_placeholder_map = hash
  @prepared_statement = local.prepare(query, hash.keys)
  @load_method_ref = method(:load_data_from_prepared)
end
use_prepared_statement?() click to toggle source
# File lib/logstash/filters/jdbc/lookup.rb, line 109
def use_prepared_statement?
  @prepared_parameters && !@prepared_parameters.empty?
end
valid?() click to toggle source
# File lib/logstash/filters/jdbc/lookup.rb, line 82
def valid?
  @valid
end

Private Instance Methods

load_data_from_local(local, query, params, result) click to toggle source
# File lib/logstash/filters/jdbc/lookup.rb, line 135
def load_data_from_local(local, query, params, result)
  local.fetch(query, params).each do |row|
    stringified = row.inject({}){|hash,(k,v)| hash[k.to_s] = v; hash} #Stringify row keys
    result.push(stringified)
  end
end
load_data_from_prepared(_local, _query, params, result) click to toggle source
# File lib/logstash/filters/jdbc/lookup.rb, line 142
def load_data_from_prepared(_local, _query, params, result)
  @prepared_statement.call(params).each do |row|
    stringified = row.inject({}){|hash,(k,v)| hash[k.to_s] = v; hash} #Stringify row keys
    result.push(stringified)
  end
end
parse_options() click to toggle source
# File lib/logstash/filters/jdbc/lookup.rb, line 195
def parse_options
  @query = @options["query"]
  unless @query && @query.is_a?(String)
    @option_errors << "The options for '#{@id}' must include a 'query' string"
  end

  if @options["parameters"] && @options["prepared_parameters"]
    @option_errors << "Can't specify 'parameters' and 'prepared_parameters' in the same lookup"
  else
    @parameters = @options["parameters"]
    @prepared_parameters = @options["prepared_parameters"]
    @parameters_specified = false
    if @parameters
      if !@parameters.is_a?(Hash)
        @option_errors << "The 'parameters' option for '#{@id}' must be a Hash"
      else
        # this is done once per lookup at start, i.e. Sprintfier.new et.al is done once.
        @symbol_parameters = @parameters.inject({}) {|hash,(k,v)| hash[k.to_sym] = sprintf_or_get(v) ; hash }
        # the user might specify an empty hash parameters => {}
        # maybe due to an unparameterised query
        @parameters_specified = !@symbol_parameters.empty?
      end
    elsif @prepared_parameters
      if !@prepared_parameters.is_a?(Array)
        @option_errors << "The 'prepared_parameters' option for '#{@id}' must be an Array"
      elsif @query.count("?") != @prepared_parameters.size
        @option_errors << "The 'prepared_parameters' option for '#{@id}' doesn't match count with query's placeholder"
      else
        #prepare the map @symbol_parameters :n => sprintf_or_get
        hash = {}
        @prepared_parameters.each_with_index {|v,i| hash[:"p#{i}"] = sprintf_or_get(v)}
        @symbol_parameters = hash
        @parameters_specified = !@prepared_parameters.empty?
      end
    end
  end

  default_hash = @options["default_hash"]
  if default_hash && !default_hash.empty?
    @default_result = LookupResult.new()
    @default_result.push(default_hash)
  end

  @use_default = !@default_result.nil?

  @tag_on_failure = @options["tag_on_failure"] || @globals["tag_on_failure"] || []
  @tag_on_default_use = @options["tag_on_default_use"] || @globals["tag_on_default_use"] || []

  @valid = @option_errors.empty?
end
prepare_parameters_from_event(event, result) click to toggle source
# File lib/logstash/filters/jdbc/lookup.rb, line 183
def prepare_parameters_from_event(event, result)
  @symbol_parameters.inject({}) do |hash,(k,v)|
    value = v.fetch(event, result)
    hash[k] = value.is_a?(::LogStash::Timestamp) ? value.time : value
    hash
  end
end
process_event(event, result) click to toggle source
# File lib/logstash/filters/jdbc/lookup.rb, line 178
def process_event(event, result)
  # use deep clone here so other filter function don't taint the payload by reference
  event.set(@target, ::LogStash::Util.deep_clone(result.payload))
end
retrieve_local_data(local, event, &proc) click to toggle source

the &block is invoked with 4 arguments: local, query, params, result the result is used as accumulator return variable

# File lib/logstash/filters/jdbc/lookup.rb, line 151
def retrieve_local_data(local, event, &proc)
  result = LookupResult.new()
  if @parameters_specified
    params = prepare_parameters_from_event(event, result)
    if result.parameters_invalid?
      logger.warn? && logger.warn("Parameter field not found in event", :lookup_id => @id, :invalid_parameters => result.invalid_parameters)
      return result
    end
  else
    params = {}
  end
  begin
    logger.debug? && logger.debug("Executing Jdbc query", :lookup_id => @id, :statement => query, :parameters => params)
    proc.call(local, query, params, result)
  rescue => e
    # In theory all exceptions in Sequel should be wrapped in Sequel::Error
    # However, there are cases where other errors can occur - a `SQLTransactionRollbackException`
    # may be thrown during `prepareStatement`. Let's handle these cases here, where we can tag and warn
    # appropriately rather than bubble up and potentially crash the plugin.
    result.failed!
    logger.warn? && logger.warn("Exception when executing Jdbc query", :lookup_id => @id, :exception => e.message, :backtrace => e.backtrace.take(8))
  end
  # if either of: no records or a Sequel exception occurs the payload is
  # empty and the default can be substituted later.
  result
end
sprintf_or_get(v) click to toggle source
# File lib/logstash/filters/jdbc/lookup.rb, line 191
def sprintf_or_get(v)
  v.match(/%{([^}]+)}/) ? Sprintfier.new(v) : Getfier.new(v)
end
tag_default(event) click to toggle source
# File lib/logstash/filters/jdbc/lookup.rb, line 129
def tag_default(event)
  @tag_on_default_use.each do |tag|
    event.tag(tag)
  end
end
tag_failure(event) click to toggle source
# File lib/logstash/filters/jdbc/lookup.rb, line 123
def tag_failure(event)
  @tag_on_failure.each do |tag|
    event.tag(tag)
  end
end