class Fluent::IlmHbaseOutput

Constants

DEFAULT_FORMAT_TYPE

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_ilm_hbase.rb, line 6
def initialize
  super
  require 'massive_record'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_ilm_hbase.rb, line 39
def configure(conf)
  super
  @fields_to_columns = @fields_to_columns_mapping.split(",").map { |src_to_dst|
    src_to_dst.split("=>")
  }
  @mapping = Hash[*@fields_to_columns.flatten]
  @rowchangeflag = @include_custom_row_key

  if(@rowchangeflag == true)
  @rowkeys = @custom_key_list.split(",")
  end
  
  @customrow = SecureRandom.uuid
  @formatter = formatter_create
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_ilm_hbase.rb, line 84
def format(tag, time, record)
  row_values = {}
  new_record = (use_format==true)? @formatter.format(tag, time, record):record
  row_values[@tag_column_name] = tag unless @tag_column_name.nil?
  row_values[@time_column_name] = time unless @time_column_name.nil?

    log.warn("#{record.values}")
    log.warn("#{new_record.values}")
  @fields_to_columns.each {|field,column|

    next if field.nil? or column.nil?

    components = field.split(".")
    value = new_record
    for c in components
      value = value[c]

      break if value.nil?
    end

    row_values[column] = value
  }
    log.warn("test point 1")
  row_values.to_msgpack
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_ilm_hbase.rb, line 55
    def start
      super

      @conn = MassiveRecord::Wrapper::Connection.new(:host => @hbase_host, :port => @hbase_port)
      @table = MassiveRecord::Wrapper::Table.new(@conn, @hbase_table.intern)

      unless @table.exists?
        columns = ([@tag_column_name, @time_column_name] + @mapping.values).reject(&:nil?)
        column_families = columns.map {|column_family_with_column|
          column_family, column = column_family_with_column.split(":")

          if column.nil? or column_family.nil?
            raise <<MESSAGE
Unexpected format for column name: #{column_family_with_column}
Each destination column in the 'record_to_columns_mapping' option
must be specified in the format of \"column_family:column\".
Are you sure you included ':' in column names?
MESSAGE
          end

          column_family.intern
        }
        column_families.uniq!

        @table.create_column_families(column_families)
        @table.save
      end
    end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_ilm_hbase.rb, line 110
    def write(chunk)
        row_key = ""
        row_attrs = Array.new
        if @rowchangeflag
        chunk.msgpack_each {|row_values|
        
        row_attrs << row_values.each do |column_family_and_column, value| 
                if  @rowkeys.include? column_family_and_column
                  row_key << value
                  row_key << "-"
                end
        end
   
        }
        @customrow = SecureRandom.uuid + ":" + row_key[0,row_key.length-1]
        else
        @customrow = SecureRandom.uuid
        end
log.warn("test point 2")
      chunk.msgpack_each {|row_values|
        event = {}

        row_values.each {|column_family_and_column, value|
          column_family, column = column_family_and_column.split(":")

          (event[column_family.intern] ||= {}).update({column => value})
        }
log.warn("test point 3")
        row = MassiveRecord::Wrapper::Row.new
        row.id = @customrow
        row.values = event
        row.table = @table
        row.save
      }
    end