class LogStash::Outputs::Jdbc

Write events to a SQL engine, using JDBC.

It is upto the user of the plugin to correctly configure the plugin. This includes correctly crafting the SQL statement, and matching the number of parameters correctly.

Constants

RETRYABLE_SQLSTATE_CLASSES
STRFTIME_FMT

Public Instance Methods

close() click to toggle source
Calls superclass method
# File lib/logstash/outputs/jdbc.rb, line 136
def close
  @stopping.make_true
  @pool.close
  super
end
multi_receive(events) click to toggle source
# File lib/logstash/outputs/jdbc.rb, line 130
def multi_receive(events)
  events.each_slice(@flush_size) do |slice|
    retrying_submit(slice)
  end
end
register() click to toggle source
# File lib/logstash/outputs/jdbc.rb, line 110
def register
  @logger.info('JDBC - Starting up')

  load_jar_files!

  @stopping = Concurrent::AtomicBoolean.new(false)

  @logger.warn('JDBC - Flush size is set to > 1000') if @flush_size > 1000

  if @statement.empty?
    @logger.error('JDBC - No statement provided. Configuration error.')
  end

  if !@unsafe_statement && @statement.length < 2
    @logger.error("JDBC - Statement has no parameters. No events will be inserted into SQL as you're not passing any event data. Likely configuration error.")
  end

  setup_and_test_pool!
end

Private Instance Methods

add_statement_event_params(statement, event) click to toggle source
# File lib/logstash/outputs/jdbc.rb, line 270
def add_statement_event_params(statement, event)
  @statement[1..-1].each_with_index do |i, idx|
    if @enable_event_as_json_keyword == true and i.is_a? String and i == @event_as_json_keyword
      value = event.to_json
    elsif i.is_a? String
      value = event.get(i)
      if value.nil? and i =~ /%\{/
        value = event.sprintf(i)
      end
    else
      value = i
    end

    case value
    when Time
      # See LogStash::Timestamp, below, for the why behind strftime.
      statement.setString(idx + 1, value.strftime(STRFTIME_FMT))
    when LogStash::Timestamp
      # XXX: Using setString as opposed to setTimestamp, because setTimestamp
      # doesn't behave correctly in some drivers (Known: sqlite)
      #
      # Additionally this does not use `to_iso8601`, since some SQL databases
      # choke on the 'T' in the string (Known: Derby).
      #
      # strftime appears to be the most reliable across drivers.
      statement.setString(idx + 1, value.time.strftime(STRFTIME_FMT))
    when Fixnum, Integer
      if value > 2147483647 or value < -2147483648
        statement.setLong(idx + 1, value)
      else
        statement.setInt(idx + 1, value)
      end
    when BigDecimal
      statement.setBigDecimal(idx + 1, value.to_java)
    when Float
      statement.setFloat(idx + 1, value)
    when String
      statement.setString(idx + 1, value)
    when Array, Hash
      statement.setString(idx + 1, value.to_json)
    when true, false
      statement.setBoolean(idx + 1, value)
    else
      statement.setString(idx + 1, nil)
    end
  end

  statement
end
load_jar_files!() click to toggle source
# File lib/logstash/outputs/jdbc.rb, line 176
def load_jar_files!
  # Load jar from driver path
  unless @driver_jar_path.nil?
    raise LogStash::ConfigurationError, 'JDBC - Could not find jar file at given path. Check config.' unless File.exist? @driver_jar_path
    require @driver_jar_path
    return
  end

  # Revert original behaviour of loading from vendor directory
  # if no path given
  jarpath = if ENV['LOGSTASH_HOME']
              File.join(ENV['LOGSTASH_HOME'], '/vendor/jar/jdbc/*.jar')
            else
              File.join(File.dirname(__FILE__), '../../../vendor/jar/jdbc/*.jar')
            end

  @logger.trace('JDBC - jarpath', path: jarpath)

  jars = Dir[jarpath]
  raise LogStash::ConfigurationError, 'JDBC - No jars found. Have you read the README?' if jars.empty?

  jars.each do |jar|
    @logger.trace('JDBC - Loaded jar', jar: jar)
    require jar
  end
end
log_jdbc_exception(exception, retrying, event) click to toggle source
# File lib/logstash/outputs/jdbc.rb, line 327
def log_jdbc_exception(exception, retrying, event)
  current_exception = exception
  log_text = 'JDBC - Exception. ' + (retrying ? 'Retrying' : 'Not retrying') 
  
  log_method = (retrying ? 'warn' : 'error')

  loop do
    # TODO reformat event output so that it only shows the fields necessary.

    @logger.send(log_method, log_text, :exception => current_exception, :statement => @statement[0], :event => event)

    if current_exception.respond_to? 'getNextException'
      current_exception = current_exception.getNextException()
    else
      current_exception = nil
    end

    break if current_exception == nil
  end
end
next_sleep_interval(current_interval) click to toggle source
# File lib/logstash/outputs/jdbc.rb, line 348
def next_sleep_interval(current_interval)
  doubled = current_interval * 2
  doubled > @retry_max_interval ? @retry_max_interval : doubled
end
retry_exception?(exception, event) click to toggle source
# File lib/logstash/outputs/jdbc.rb, line 320
def retry_exception?(exception, event)
  retrying = (exception.respond_to? 'getSQLState' and (RETRYABLE_SQLSTATE_CLASSES.include?(exception.getSQLState.to_s[0,2]) or @retry_sql_states.include?(exception.getSQLState)))
  log_jdbc_exception(exception, retrying, event)

  retrying
end
retrying_submit(actions) click to toggle source
# File lib/logstash/outputs/jdbc.rb, line 238
def retrying_submit(actions)
  # Initially we submit the full list of actions
  submit_actions = actions
  count_as_attempt = true

  attempts = 1

  sleep_interval = @retry_initial_interval
  while @stopping.false? and (submit_actions and !submit_actions.empty?)
    return if !submit_actions || submit_actions.empty? # If everything's a success we move along
    # We retry whatever didn't succeed
    submit_actions, count_as_attempt = submit(submit_actions)

    # Everything was a success!
    break if !submit_actions || submit_actions.empty?

    if @max_flush_exceptions > 0 and count_as_attempt == true
      attempts += 1

      if attempts > @max_flush_exceptions
        @logger.error("JDBC - max_flush_exceptions has been reached. #{submit_actions.length} events have been unable to be sent to SQL and are being dropped. See previously logged exceptions for details.")
        break
      end
    end

    # If we're retrying the action sleep for the recommended interval
    # Double the interval for the next time through to achieve exponential backoff
    Stud.stoppable_sleep(sleep_interval) { @stopping.true? }
    sleep_interval = next_sleep_interval(sleep_interval)
  end
end
setup_and_test_pool!() click to toggle source
# File lib/logstash/outputs/jdbc.rb, line 144
def setup_and_test_pool!
  # Setup pool
  @pool = Java::ComZaxxerHikari::HikariDataSource.new

  @pool.setAutoCommit(@driver_auto_commit)
  @pool.setDriverClassName(@driver_class) if @driver_class

  @pool.setJdbcUrl(@connection_string)

  @pool.setUsername(@username) if @username
  @pool.setPassword(@password) if @password

  @pool.setMaximumPoolSize(@max_pool_size)
  @pool.setConnectionTimeout(@connection_timeout)

  validate_connection_timeout = (@connection_timeout / 1000) / 2

  if !@connection_test_query.nil? and @connection_test_query.length > 1
    @pool.setConnectionTestQuery(@connection_test_query)
    @pool.setConnectionInitSql(@connection_test_query)
  end

  return unless @connection_test

  # Test connection
  test_connection = @pool.getConnection
  unless test_connection.isValid(validate_connection_timeout)
    @logger.warn('JDBC - Connection is not reporting as validate. Either connection is invalid, or driver is not getting the appropriate response.')
  end
  test_connection.close
end
submit(events) click to toggle source
# File lib/logstash/outputs/jdbc.rb, line 203
def submit(events)
  connection = nil
  statement = nil
  events_to_retry = []

  begin
    connection = @pool.getConnection
  rescue => e
    log_jdbc_exception(e, true, nil)
    # If a connection is not available, then the server has gone away
    # We're not counting that towards our retry count.
    return events, false
  end

  events.each do |event|
    begin
      statement = connection.prepareStatement(
        (@unsafe_statement == true) ? event.sprintf(@statement[0]) : @statement[0]
      )
      statement = add_statement_event_params(statement, event) if @statement.length > 1
      statement.execute
    rescue => e
      if retry_exception?(e, event.to_json())
        events_to_retry.push(event)
      end
    ensure
      statement.close unless statement.nil?
    end
  end

  connection.close unless connection.nil?

  return events_to_retry, true
end