class Embulk::Output::GroongaOutputPlugin

Public Class Methods

transaction(config, schema, count) { |task| ... } click to toggle source
# File lib/embulk/output/groonga.rb, line 10
      def self.transaction(config, schema, count, &control)
        # configuration code:
        task = {
          "host" => config.param("host", :string),
          "port" => config.param("port", :integer, default: 10041),
          "protocol" => config.param("protocol", :string, default: 'http'),
          "user" => config.param("user", :string, default: nil),
          "password" => config.param("password", :string, default: nil),
          "key_column" => config.param("key_column",:string),
          "table" => config.param("table",:string),
#          "create_table" => config.param("create_table",:string)
        }
        prot = task['protocol']
        raise RuntimeError,"Unknown protocol #{prot}. supported protocol: gqtp, http" unless %w[gqtp http].include?(prot)

        # resumable output:
        # resume(task, schema, count, &control)

        # non-resumable output:
        commit_reports = yield(task)
        next_config_diff = {}
        return next_config_diff
      end

Public Instance Methods

abort() click to toggle source
# File lib/embulk/output/groonga.rb, line 96
def abort
end
add(page) click to toggle source
# File lib/embulk/output/groonga.rb, line 64
def add(page)
  # output code:
  records = []
  idx = 0
  page.each_with_index do |record,idx|
    hash = Hash[schema.names.zip(record)]
    v = hash.delete(@key_column)
    hash['_key'] = v
    records << hash
    if( idx > 0 && idx % FLUSH_SIZE == 0 )
      ret = @client.load({:table => @out_table,
                          :values => records })
       Embulk.logger.info "groonga inserted #{ret.body} / #{records.size}"
       if( ret.body != records.size )
         raise RuntimeError,"inserted value does not match"
       end
       records.clear
    end
  end
  if( records.size > 0 )
    ret = @client.load({:table => @out_table,
                        :values => records })
     Embulk.logger.info "groonga inserted #{ret.body} / #{records.size}"
     if( ret.body != records.size )
       raise RuntimeError,"inserted value does not match"
     end
  end
end
close() click to toggle source
# File lib/embulk/output/groonga.rb, line 60
def close
  @client.close
end
commit() click to toggle source
# File lib/embulk/output/groonga.rb, line 99
def commit
  commit_report = {}
  return commit_report
end
finish() click to toggle source
# File lib/embulk/output/groonga.rb, line 93
def finish
end
init() click to toggle source

def self.resume(task, schema, count, &control)

commit_reports = yield(task)

next_config_diff = {}
return next_config_diff

end

# File lib/embulk/output/groonga.rb, line 41
      def init
        # initialization code:
        param = {}
        param[:host] = task["host"]
        param[:port] = task["port"]
        param[:protocol] = task["protocol"].to_sym
        if user = task["user"]
          param[:user] = user
        end
        if password = task["password"]
          param[:password] = password
        end
        @client = Groonga::Client.open(param)
        @key_column = task["key_column"]
        @out_table = task["table"]

#        create_table
      end

Private Instance Methods

create_table() click to toggle source
# File lib/embulk/output/groonga.rb, line 114
def create_table
  return if table_exist?(@out_table)
  create_table = @task['create_table']

  # TODO Error check

  @client.execute(create_table)

end
table_exist?(name) click to toggle source
# File lib/embulk/output/groonga.rb, line 109
def table_exist?(name)
  # TODO Error check
  table_names.include?(name)
end
table_names() click to toggle source
# File lib/embulk/output/groonga.rb, line 104
def table_names
  # TODO Error check
  @client.table_list.map{ |x| x['name'] }
end