class LogStash::Outputs::Jdbc
Write events to a SQL engine, using JDBC.
It is upto the user of the plugin to correctly configure the plugin. This includes correctly crafting the SQL statement, and matching the number of parameters correctly.
Constants
- RETRYABLE_SQLSTATE_CLASSES
- STRFTIME_FMT
Public Instance Methods
close()
click to toggle source
Calls superclass method
# File lib/logstash/outputs/jdbc.rb, line 136 def close @stopping.make_true @pool.close super end
multi_receive(events)
click to toggle source
# File lib/logstash/outputs/jdbc.rb, line 130 def multi_receive(events) events.each_slice(@flush_size) do |slice| retrying_submit(slice) end end
register()
click to toggle source
# File lib/logstash/outputs/jdbc.rb, line 110 def register @logger.info('JDBC - Starting up') load_jar_files! @stopping = Concurrent::AtomicBoolean.new(false) @logger.warn('JDBC - Flush size is set to > 1000') if @flush_size > 1000 if @statement.empty? @logger.error('JDBC - No statement provided. Configuration error.') end if !@unsafe_statement && @statement.length < 2 @logger.error("JDBC - Statement has no parameters. No events will be inserted into SQL as you're not passing any event data. Likely configuration error.") end setup_and_test_pool! end
Private Instance Methods
add_statement_event_params(statement, event)
click to toggle source
# File lib/logstash/outputs/jdbc.rb, line 270 def add_statement_event_params(statement, event) @statement[1..-1].each_with_index do |i, idx| if @enable_event_as_json_keyword == true and i.is_a? String and i == @event_as_json_keyword value = event.to_json elsif i.is_a? String value = event.get(i) if value.nil? and i =~ /%\{/ value = event.sprintf(i) end else value = i end case value when Time # See LogStash::Timestamp, below, for the why behind strftime. statement.setString(idx + 1, value.strftime(STRFTIME_FMT)) when LogStash::Timestamp # XXX: Using setString as opposed to setTimestamp, because setTimestamp # doesn't behave correctly in some drivers (Known: sqlite) # # Additionally this does not use `to_iso8601`, since some SQL databases # choke on the 'T' in the string (Known: Derby). # # strftime appears to be the most reliable across drivers. statement.setString(idx + 1, value.time.strftime(STRFTIME_FMT)) when Fixnum, Integer if value > 2147483647 or value < -2147483648 statement.setLong(idx + 1, value) else statement.setInt(idx + 1, value) end when BigDecimal statement.setBigDecimal(idx + 1, value.to_java) when Float statement.setFloat(idx + 1, value) when String statement.setString(idx + 1, value) when Array, Hash statement.setString(idx + 1, value.to_json) when true, false statement.setBoolean(idx + 1, value) else statement.setString(idx + 1, nil) end end statement end
load_jar_files!()
click to toggle source
# File lib/logstash/outputs/jdbc.rb, line 176 def load_jar_files! # Load jar from driver path unless @driver_jar_path.nil? raise LogStash::ConfigurationError, 'JDBC - Could not find jar file at given path. Check config.' unless File.exist? @driver_jar_path require @driver_jar_path return end # Revert original behaviour of loading from vendor directory # if no path given jarpath = if ENV['LOGSTASH_HOME'] File.join(ENV['LOGSTASH_HOME'], '/vendor/jar/jdbc/*.jar') else File.join(File.dirname(__FILE__), '../../../vendor/jar/jdbc/*.jar') end @logger.trace('JDBC - jarpath', path: jarpath) jars = Dir[jarpath] raise LogStash::ConfigurationError, 'JDBC - No jars found. Have you read the README?' if jars.empty? jars.each do |jar| @logger.trace('JDBC - Loaded jar', jar: jar) require jar end end
log_jdbc_exception(exception, retrying, event)
click to toggle source
# File lib/logstash/outputs/jdbc.rb, line 327 def log_jdbc_exception(exception, retrying, event) current_exception = exception log_text = 'JDBC - Exception. ' + (retrying ? 'Retrying' : 'Not retrying') log_method = (retrying ? 'warn' : 'error') loop do # TODO reformat event output so that it only shows the fields necessary. @logger.send(log_method, log_text, :exception => current_exception, :statement => @statement[0], :event => event) if current_exception.respond_to? 'getNextException' current_exception = current_exception.getNextException() else current_exception = nil end break if current_exception == nil end end
next_sleep_interval(current_interval)
click to toggle source
# File lib/logstash/outputs/jdbc.rb, line 348 def next_sleep_interval(current_interval) doubled = current_interval * 2 doubled > @retry_max_interval ? @retry_max_interval : doubled end
retry_exception?(exception, event)
click to toggle source
# File lib/logstash/outputs/jdbc.rb, line 320 def retry_exception?(exception, event) retrying = (exception.respond_to? 'getSQLState' and (RETRYABLE_SQLSTATE_CLASSES.include?(exception.getSQLState.to_s[0,2]) or @retry_sql_states.include?(exception.getSQLState))) log_jdbc_exception(exception, retrying, event) retrying end
retrying_submit(actions)
click to toggle source
# File lib/logstash/outputs/jdbc.rb, line 238 def retrying_submit(actions) # Initially we submit the full list of actions submit_actions = actions count_as_attempt = true attempts = 1 sleep_interval = @retry_initial_interval while @stopping.false? and (submit_actions and !submit_actions.empty?) return if !submit_actions || submit_actions.empty? # If everything's a success we move along # We retry whatever didn't succeed submit_actions, count_as_attempt = submit(submit_actions) # Everything was a success! break if !submit_actions || submit_actions.empty? if @max_flush_exceptions > 0 and count_as_attempt == true attempts += 1 if attempts > @max_flush_exceptions @logger.error("JDBC - max_flush_exceptions has been reached. #{submit_actions.length} events have been unable to be sent to SQL and are being dropped. See previously logged exceptions for details.") break end end # If we're retrying the action sleep for the recommended interval # Double the interval for the next time through to achieve exponential backoff Stud.stoppable_sleep(sleep_interval) { @stopping.true? } sleep_interval = next_sleep_interval(sleep_interval) end end
setup_and_test_pool!()
click to toggle source
# File lib/logstash/outputs/jdbc.rb, line 144 def setup_and_test_pool! # Setup pool @pool = Java::ComZaxxerHikari::HikariDataSource.new @pool.setAutoCommit(@driver_auto_commit) @pool.setDriverClassName(@driver_class) if @driver_class @pool.setJdbcUrl(@connection_string) @pool.setUsername(@username) if @username @pool.setPassword(@password) if @password @pool.setMaximumPoolSize(@max_pool_size) @pool.setConnectionTimeout(@connection_timeout) validate_connection_timeout = (@connection_timeout / 1000) / 2 if !@connection_test_query.nil? and @connection_test_query.length > 1 @pool.setConnectionTestQuery(@connection_test_query) @pool.setConnectionInitSql(@connection_test_query) end return unless @connection_test # Test connection test_connection = @pool.getConnection unless test_connection.isValid(validate_connection_timeout) @logger.warn('JDBC - Connection is not reporting as validate. Either connection is invalid, or driver is not getting the appropriate response.') end test_connection.close end
submit(events)
click to toggle source
# File lib/logstash/outputs/jdbc.rb, line 203 def submit(events) connection = nil statement = nil events_to_retry = [] begin connection = @pool.getConnection rescue => e log_jdbc_exception(e, true, nil) # If a connection is not available, then the server has gone away # We're not counting that towards our retry count. return events, false end events.each do |event| begin statement = connection.prepareStatement( (@unsafe_statement == true) ? event.sprintf(@statement[0]) : @statement[0] ) statement = add_statement_event_params(statement, event) if @statement.length > 1 statement.execute rescue => e if retry_exception?(e, event.to_json()) events_to_retry.push(event) end ensure statement.close unless statement.nil? end end connection.close unless connection.nil? return events_to_retry, true end