class Fluent::IlmHbaseOutput
Constants
- DEFAULT_FORMAT_TYPE
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_ilm_hbase.rb, line 6 def initialize super require 'massive_record' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_ilm_hbase.rb, line 39 def configure(conf) super @fields_to_columns = @fields_to_columns_mapping.split(",").map { |src_to_dst| src_to_dst.split("=>") } @mapping = Hash[*@fields_to_columns.flatten] @rowchangeflag = @include_custom_row_key if(@rowchangeflag == true) @rowkeys = @custom_key_list.split(",") end @customrow = SecureRandom.uuid @formatter = formatter_create end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_ilm_hbase.rb, line 84 def format(tag, time, record) row_values = {} new_record = (use_format==true)? @formatter.format(tag, time, record):record row_values[@tag_column_name] = tag unless @tag_column_name.nil? row_values[@time_column_name] = time unless @time_column_name.nil? log.warn("#{record.values}") log.warn("#{new_record.values}") @fields_to_columns.each {|field,column| next if field.nil? or column.nil? components = field.split(".") value = new_record for c in components value = value[c] break if value.nil? end row_values[column] = value } log.warn("test point 1") row_values.to_msgpack end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_ilm_hbase.rb, line 55 def start super @conn = MassiveRecord::Wrapper::Connection.new(:host => @hbase_host, :port => @hbase_port) @table = MassiveRecord::Wrapper::Table.new(@conn, @hbase_table.intern) unless @table.exists? columns = ([@tag_column_name, @time_column_name] + @mapping.values).reject(&:nil?) column_families = columns.map {|column_family_with_column| column_family, column = column_family_with_column.split(":") if column.nil? or column_family.nil? raise <<MESSAGE Unexpected format for column name: #{column_family_with_column} Each destination column in the 'record_to_columns_mapping' option must be specified in the format of \"column_family:column\". Are you sure you included ':' in column names? MESSAGE end column_family.intern } column_families.uniq! @table.create_column_families(column_families) @table.save end end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_ilm_hbase.rb, line 110 def write(chunk) row_key = "" row_attrs = Array.new if @rowchangeflag chunk.msgpack_each {|row_values| row_attrs << row_values.each do |column_family_and_column, value| if @rowkeys.include? column_family_and_column row_key << value row_key << "-" end end } @customrow = SecureRandom.uuid + ":" + row_key[0,row_key.length-1] else @customrow = SecureRandom.uuid end log.warn("test point 2") chunk.msgpack_each {|row_values| event = {} row_values.each {|column_family_and_column, value| column_family, column = column_family_and_column.split(":") (event[column_family.intern] ||= {}).update({column => value}) } log.warn("test point 3") row = MassiveRecord::Wrapper::Row.new row.id = @customrow row.values = event row.table = @table row.save } end