class LogStash::Outputs::Rethinkdb

Public Instance Methods

close() click to toggle source
# File lib/logstash/outputs/rethinkdb.rb, line 53
def close
  connection.close
end
receive(event) click to toggle source
# File lib/logstash/outputs/rethinkdb.rb, line 57
def receive(event)
  @codec.encode(event)
rescue LocalJumpError
  # This LocalJumpError rescue clause is required to test for regressions
  # for https://github.com/logstash-plugins/logstash-output-redis/issues/26
  # see specs. Without it the LocalJumpError is rescued by the StandardError
  raise
rescue StandardError => e
  @logger.warn('Error encoding event', exception: e,
                                       event: event)
end
register() click to toggle source

# If batch is set to true, the maximum amount of time between RPUSH commands # when there are pending events to flush. config :batch_timeout, :validate => :number, :default => 5

# File lib/logstash/outputs/rethinkdb.rb, line 48
def register
  @codec.on_event(&method(:send_to_rethink))
  create_table
end

Protected Instance Methods

connection() click to toggle source
# File lib/logstash/outputs/rethinkdb.rb, line 88
def connection
  @connection ||= begin
          ssl = ({ ca_certs: @ca_certs } if @ca_certs)
          if @auth_key == ''
            r.connect(
              host: @host,
              port: @port,
              user: @user,
              password: @password,
              ssl: ssl
            )
          else
            r.connect(
              host: @host,
              port: @port,
              auth_key: @auth_key,
              ssl: ssl
            )
          end
        end
end
create_table() click to toggle source
# File lib/logstash/outputs/rethinkdb.rb, line 71
def create_table
  return false if self.database.table_list.run(self.connection).include?(@table)
  self.database.table_create(@table).run(self.connection)
end
database() click to toggle source
# File lib/logstash/outputs/rethinkdb.rb, line 110
def database
  r.db(@database)
end
identity() click to toggle source

A string used to identify a RethinkDB instance in log messages

# File lib/logstash/outputs/rethinkdb.rb, line 119
def identity
  # "redis://#{@password}@#{@current_host}:#{@current_port}/#{@db} #{@data_type}:#{@key}"
  'rethinkdb://TODO'
end
send_to_rethink(event, payload) click to toggle source
# File lib/logstash/outputs/rethinkdb.rb, line 76
def send_to_rethink(event, payload)
  table.insert(JSON.parse(payload, symbolize_names: true)).run(self.connection)
rescue => e
  pp e
  @logger.warn('Failed to send event to Redis', event: event,
                                                identity: identity, exception: e,
                                                backtrace: e.backtrace)
  sleep @reconnect_interval
  @redis = nil
  retry
end
table() click to toggle source
# File lib/logstash/outputs/rethinkdb.rb, line 114
def table
  r.table(@table)
end