class LogStash::PluginMixins::JdbcStreaming::PreparedStatementHandler

Attributes

bind_values_array[R]
name[R]
prepared[R]
statement_prepared[R]

Public Instance Methods

cache_lookup(db, event) click to toggle source

Get from cache or performs remote lookup and saves to cache @param db [Sequel::Database] @param event [LogStash::Event] @returnparam [CachePayload]

# File lib/logstash/plugin_mixins/jdbc_streaming/statement_handler.rb, line 106
def cache_lookup(db, event)
  build_prepared_statement(db)
  common_cache_lookup(db, event)
end

Private Instance Methods

build_prepared_statement(db) click to toggle source
# File lib/logstash/plugin_mixins/jdbc_streaming/statement_handler.rb, line 127
def build_prepared_statement(db)
  # create prepared statement on first use
  if statement_prepared.false?
    prepended = parameters.keys.map{|v| v.to_s.prepend("$").to_sym}
    @prepared = db[statement, *prepended].prepare(:select, name)
    statement_prepared.make_true
  end
  # make sure the Sequel database instance has the prepared statement
  if db.prepared_statement(name).nil?
    db.set_prepared_statement(name, prepared)
  end
end
create_bind_values_hash() click to toggle source
# File lib/logstash/plugin_mixins/jdbc_streaming/statement_handler.rb, line 140
def create_bind_values_hash
  hash = {}
  bind_values_array.each_with_index {|v,i| hash[:"p#{i}"] = v}
  hash
end
execute_extract_records(db, params, result) click to toggle source
# File lib/logstash/plugin_mixins/jdbc_streaming/statement_handler.rb, line 113
def execute_extract_records(db, params, result)
  records = db.call(name, params) # returns an array of hashes
  records.each do |row|
    result.push row.inject({}){|hash,(k,v)| hash[k.to_s] = v; hash} #Stringify row keys
  end
end
post_init(plugin) click to toggle source
# File lib/logstash/plugin_mixins/jdbc_streaming/statement_handler.rb, line 120
def post_init(plugin)
  @name = plugin.prepared_statement_name.to_sym
  @bind_values_array = plugin.prepared_statement_bind_values
  @statement_prepared = Concurrent::AtomicBoolean.new(false)
  @parameters = create_bind_values_hash
end