class Fluent::CassandraCqlOutput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_cassandra_cql.rb, line 26
def configure(conf)
  super

  # perform validations
  raise ConfigError, "'Host' is required by Cassandra output (ex: localhost, 127.0.0.1, ec2-54-242-141-252.compute-1.amazonaws.com" if self.host.nil?
  raise ConfigError, "'Port' is required by Cassandra output (ex: 9160)" if self.port.nil?
  raise ConfigError, "'Keyspace' is required by Cassandra output (ex: FluentdLoggers)" if self.keyspace.nil?
  raise ConfigError, "'ColumnFamily' is required by Cassandra output (ex: events)" if self.columnfamily.nil?
  raise ConfigError, "'Schema' is required by Cassandra output (ex: id,ts,payload)" if self.schema.nil?
  raise ConfigError, "'Schema' must contain at least two column names (ex: id,ts,payload)" if self.schema.split(',').count < 2
  raise ConfigError, "'DataKeys' is required by Cassandra output (ex: tag,created_at,data)" if self.data_keys.nil?

  # convert schema from string to hash
  # NOTE: ok to use eval b/c this isn't this isn't a user
  #       supplied string
  self.schema = eval(self.schema)

  # convert data keys from string to array
  self.data_keys = self.data_keys.split(',')
end
connection() click to toggle source
# File lib/fluent/plugin/out_cassandra_cql.rb, line 22
def connection
  @connection ||= get_connection(self.host, self.port, self.keyspace)
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_cassandra_cql.rb, line 56
def format(tag, time, record)
  record.to_msgpack
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_cassandra_cql.rb, line 52
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_cassandra_cql.rb, line 47
def start
  super
  connection
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_cassandra_cql.rb, line 60
def write(chunk)
  chunk.msgpack_each  { |record|
    values = build_insert_values_string(self.schema.keys, self.data_keys, record, self.pop_data_keys)
    cql = "INSERT INTO #{self.columnfamily} (#{self.schema.keys.join(',')}) " +
                        "VALUES (#{values}) " +
                        "USING TTL #{self.ttl}"
    @connection.execute(cql)
  }
end

Private Instance Methods

build_insert_values_string(schema_keys, data_keys, record, pop_data_keys) click to toggle source
# File lib/fluent/plugin/out_cassandra_cql.rb, line 77
def build_insert_values_string(schema_keys, data_keys, record, pop_data_keys)
  values = data_keys.map.with_index do |key, index|
    if pop_data_keys
      schema[schema_keys[index]] == :string ? "'#{record.delete(key)}'" : record.delete(key)
    else
      schema[schema_keys[index]] == :string ? "'#{record[key]}'" : record[key]
    end
  end

  # if we have one more schema key than data keys,
  # we can then infer that we should store the event
  # as a string representation of the corresponding
  # json object in the last schema column
  if schema_keys.count == data_keys.count + 1
    values << if record.count > 0
                "'#{record.to_json}'"
              else
                # by this point, the extra schema column has been
                # added to insert cql statement, so we must put
                # something in it
                # TODO: detect this scenario earlier and don't
                #       specify the column name/value at all
                #       when constructing the cql stmt
                "''"
              end
  end

  return values.join(',')
end
get_connection(host, port, keyspace) click to toggle source
# File lib/fluent/plugin/out_cassandra_cql.rb, line 72
def get_connection(host, port, keyspace)
  connection_string = "#{host}:#{port}"
  ::CassandraCQL::Database.new(connection_string, {:keyspace => "\"#{keyspace}\"", :cql_version => "3.0.0"})
end