class LogStash::Inputs::Jdbc
Attributes
database[R]
Public Instance Methods
close()
click to toggle source
# File lib/logstash/inputs/jdbc.rb, line 307 def close @scheduler.shutdown if @scheduler end
register()
click to toggle source
# File lib/logstash/inputs/jdbc.rb, line 235 def register @logger = self.logger require "rufus/scheduler" prepare_jdbc_connection if @use_column_value # Raise an error if @use_column_value is true, but no @tracking_column is set if @tracking_column.nil? raise(LogStash::ConfigurationError, "Must set :tracking_column if :use_column_value is true.") end end unless @statement.nil? ^ @statement_filepath.nil? raise(LogStash::ConfigurationError, "Must set either :statement or :statement_filepath. Only one may be set at a time.") end @statement = ::File.read(@statement_filepath) if @statement_filepath # must validate prepared statement mode after trying to read in from @statement_filepath if @use_prepared_statements validation_errors = validate_prepared_statement_mode unless validation_errors.empty? raise(LogStash::ConfigurationError, "Prepared Statement Mode validation errors: " + validation_errors.join(", ")) end end set_value_tracker(LogStash::PluginMixins::Jdbc::ValueTracking.build_last_value_tracker(self)) set_statement_logger(LogStash::PluginMixins::Jdbc::CheckedCountLogger.new(@logger)) @enable_encoding = !@charset.nil? || !@columns_charset.empty? if (@jdbc_password_filepath and @jdbc_password) raise(LogStash::ConfigurationError, "Only one of :jdbc_password, :jdbc_password_filepath may be set at a time.") end @jdbc_password = LogStash::Util::Password.new(::File.read(@jdbc_password_filepath).strip) if @jdbc_password_filepath if enable_encoding? encodings = @columns_charset.values encodings << @charset if @charset @converters = encodings.each_with_object({}) do |encoding, converters| converter = LogStash::Util::Charset.new(encoding) converter.logger = self.logger converters[encoding] = converter end end end
run(queue)
click to toggle source
# File lib/logstash/inputs/jdbc.rb, line 293 def run(queue) load_driver if @schedule @scheduler = Rufus::Scheduler.new(:max_work_threads => 1) @scheduler.cron @schedule do execute_query(queue) end @scheduler.join else execute_query(queue) end end
set_statement_logger(instance)
click to toggle source
test injection points
# File lib/logstash/inputs/jdbc.rb, line 285 def set_statement_logger(instance) @statement_handler = LogStash::PluginMixins::Jdbc::StatementHandler.build_statement_handler(self, instance) end
set_value_tracker(instance)
click to toggle source
# File lib/logstash/inputs/jdbc.rb, line 289 def set_value_tracker(instance) @value_tracker = instance end
stop()
click to toggle source
# File lib/logstash/inputs/jdbc.rb, line 311 def stop close_jdbc_connection @scheduler.shutdown(:wait) if @scheduler end
Private Instance Methods
convert(column_name, value)
click to toggle source
make sure the encoding is uniform over fields
# File lib/logstash/inputs/jdbc.rb, line 354 def convert(column_name, value) return value unless value.is_a?(String) column_charset = @columns_charset[column_name] if column_charset converter = @converters[column_charset] converter.convert(value) elsif @charset converter = @converters[@charset] converter.convert(value) else value end end
enable_encoding?()
click to toggle source
# File lib/logstash/inputs/jdbc.rb, line 349 def enable_encoding? @enable_encoding end
execute_query(queue)
click to toggle source
# File lib/logstash/inputs/jdbc.rb, line 334 def execute_query(queue) execute_statement do |row| if enable_encoding? ## do the necessary conversions to string elements row = Hash[row.map { |k, v| [k.to_s, convert(k, v)] }] end event = targeted_event_factory.new_event(row) decorate(event) queue << event end @value_tracker.write end
validate_prepared_statement_mode()
click to toggle source
# File lib/logstash/inputs/jdbc.rb, line 318 def validate_prepared_statement_mode error_messages = [] if @prepared_statement_name.empty? error_messages << "must provide a name for the Prepared Statement, it must be unique for the db session" end if @statement.count("?") != @prepared_statement_bind_values.size # mismatch in number of bind value elements to placeholder characters error_messages << "there is a mismatch between the number of statement `?` placeholders and :prepared_statement_bind_values array setting elements" end if @jdbc_paging_enabled # Pagination is not supported when using prepared statements error_messages << "JDBC pagination cannot be used at this time" end error_messages end