class Fluent::CassandraCqlOutput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_cassandra_cql.rb, line 26 def configure(conf) super # perform validations raise ConfigError, "'Host' is required by Cassandra output (ex: localhost, 127.0.0.1, ec2-54-242-141-252.compute-1.amazonaws.com" if self.host.nil? raise ConfigError, "'Port' is required by Cassandra output (ex: 9160)" if self.port.nil? raise ConfigError, "'Keyspace' is required by Cassandra output (ex: FluentdLoggers)" if self.keyspace.nil? raise ConfigError, "'ColumnFamily' is required by Cassandra output (ex: events)" if self.columnfamily.nil? raise ConfigError, "'Schema' is required by Cassandra output (ex: id,ts,payload)" if self.schema.nil? raise ConfigError, "'Schema' must contain at least two column names (ex: id,ts,payload)" if self.schema.split(',').count < 2 raise ConfigError, "'DataKeys' is required by Cassandra output (ex: tag,created_at,data)" if self.data_keys.nil? # convert schema from string to hash # NOTE: ok to use eval b/c this isn't this isn't a user # supplied string self.schema = eval(self.schema) # convert data keys from string to array self.data_keys = self.data_keys.split(',') end
connection()
click to toggle source
# File lib/fluent/plugin/out_cassandra_cql.rb, line 22 def connection @connection ||= get_connection(self.host, self.port, self.keyspace) end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_cassandra_cql.rb, line 56 def format(tag, time, record) record.to_msgpack end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_cassandra_cql.rb, line 52 def shutdown super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_cassandra_cql.rb, line 47 def start super connection end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_cassandra_cql.rb, line 60 def write(chunk) chunk.msgpack_each { |record| values = build_insert_values_string(self.schema.keys, self.data_keys, record, self.pop_data_keys) cql = "INSERT INTO #{self.columnfamily} (#{self.schema.keys.join(',')}) " + "VALUES (#{values}) " + "USING TTL #{self.ttl}" @connection.execute(cql) } end
Private Instance Methods
build_insert_values_string(schema_keys, data_keys, record, pop_data_keys)
click to toggle source
# File lib/fluent/plugin/out_cassandra_cql.rb, line 77 def build_insert_values_string(schema_keys, data_keys, record, pop_data_keys) values = data_keys.map.with_index do |key, index| if pop_data_keys schema[schema_keys[index]] == :string ? "'#{record.delete(key)}'" : record.delete(key) else schema[schema_keys[index]] == :string ? "'#{record[key]}'" : record[key] end end # if we have one more schema key than data keys, # we can then infer that we should store the event # as a string representation of the corresponding # json object in the last schema column if schema_keys.count == data_keys.count + 1 values << if record.count > 0 "'#{record.to_json}'" else # by this point, the extra schema column has been # added to insert cql statement, so we must put # something in it # TODO: detect this scenario earlier and don't # specify the column name/value at all # when constructing the cql stmt "''" end end return values.join(',') end
get_connection(host, port, keyspace)
click to toggle source
# File lib/fluent/plugin/out_cassandra_cql.rb, line 72 def get_connection(host, port, keyspace) connection_string = "#{host}:#{port}" ::CassandraCQL::Database.new(connection_string, {:keyspace => "\"#{keyspace}\"", :cql_version => "3.0.0"}) end