class Embulk::Input::Redis

Public Class Methods

transaction(config) { |task, columns, 1| ... } click to toggle source
# File lib/embulk/input/rediskeys.rb, line 10
def self.transaction(config, &control)
  # configuration code:
  task = {
    'host' => config.param('host', :string, :default => 'localhost'),
    'port' => config.param('port', :integer, :default => 6379),
    'db' => config.param('db', :integer, :default => 0),
    'key_prefix' => config.param('key_prefix', :string, :default => ''),
    'match_key_as_key' => config.param('match_key_as_key', :bool, :default => true),
    'encode' => config.param('encode', :string, :default => 'json')
  }
  
  redis = ::Redis.new(:host => task['host'], :port => task['port'], :db => task['db'])
  keys = redis.keys("#{task['key_prefix']}*").inject([]){|col, k|
        col.push({'name' => k, 'type' => 'string'})
        col
      }
  puts "keys:#{keys}"

  task['columns'] = config.param('columns', :array, :default => keys).inject({}){|a, col|
        a[col['name']] = col['type'].to_sym
        a
      }

  columns = task['columns'].map.with_index{|(name, type), i|
    Column.new(i, name, type)
  }

  #resume(task, columns, 1, &control)
  puts "Redis input started."
  task_reports = yield(task, columns, 1)
  puts "Redis input finished. Commit reports = #{task_reports.to_json}"
  
  return {}
end

Public Instance Methods

deserialize_element(name, x) click to toggle source
# File lib/embulk/input/rediskeys.rb, line 70
def deserialize_element(name, x)
  begin
    type = nil
    @task['columns'].each do |key, value|
      if key == name
        type = value
        break
      end
    end
    val = x
    case type.to_sym  # Converted to String implicitly?
    when :boolean
      if val.is_a?(TrueClass) || val.is_a?(FalseClass)
        val
      else
        downcased_val = val.downcase
        case downcased_val
        when 'true' then true
        when 'false' then false
        else nil
        end
      end
    when :long
      Integer(val)
    when :double
      Float(val)
    when :string
      val
    when :timestamp
      Time.parse(val)
    else
      raise "Shouldn't reach here: val:#{val}, col_name:#{name}, col_type:#{type}"
    end
  rescue => e
    STDERR.puts "Failed to deserialize: val:#{val}, col_name:#{name}, col_type:#{type}, error:#{e.inspect}"
  end
end
expect_value_is_json() click to toggle source
# File lib/embulk/input/rediskeys.rb, line 127
def expect_value_is_json
  @redis.keys("#{@task['key_prefix']}*").each do |k|
    case @task['encode']
    when 'json'
      @page_builder.add(JSON.parse(@redis.get(k)).each_with_object({}) {|(key, value), new_hash|
        new_hash[key] = deserialize_element(key, value)
      }.values)
      @rows += 1
    when 'list'
      @redis.lrange(k, 0, -1).each do |r|
        @page_builder.add(JSON.parse(r).each_with_object({}) {|(key, value), new_hash|
          new_hash[key] = deserialize_element(key, value)
        }.values)
        @rows += 1
      end
    when 'hash'
      @page_builder.add(JSON.parse(@redis.hgetall(k).to_json).each_with_object({}) {|(key, value), new_hash|
        new_hash[key] = deserialize_element(key, value)
      }.values)
      @rows += 1
    end
  end
end
init() click to toggle source
TODO

def self.guess(config)

sample_records = [
  {"example"=>"a", "column"=>1, "value"=>0.1},
  {"example"=>"a", "column"=>2, "value"=>0.2},
]
columns = Guess::SchemaGuess.from_hash_records(sample_records)
return {"columns" => columns}

end

Calls superclass method
# File lib/embulk/input/rediskeys.rb, line 62
def init
  # initialization code:
  puts "Redis input thread #{index}..."
  super
  @rows = 0
  @redis = ::Redis.new(:host => task['host'], :port => task['port'], :db => task['db'])
end
match_key_as_key() click to toggle source
# File lib/embulk/input/rediskeys.rb, line 108
def match_key_as_key
  records = []
  @redis.keys("#{@task['key_prefix']}*").each do |k|
    case @task['encode']
    when 'json'
      v = @redis.get(k)
    when 'list'
      v = @redis.lrange(k, 0, -1)
    when 'hash'
      v = @redis.hgetall(k).to_json
    end
    v = "{\"#{k}\":#{v}}"
    x = JSON.parse(v)
    records.push(deserialize_element(k, x))
    @rows += 1
  end
  @page_builder.add(records)
end
run() click to toggle source
# File lib/embulk/input/rediskeys.rb, line 151
def run
  if @task['match_key_as_key'] then
    match_key_as_key()
  else
    expect_value_is_json()
  end
  @page_builder.finish  # don't forget to call finish :-)

  task_report = {
    "rows" => @rows
  }
  return task_report
end