class Embulk::Input::Redis
Public Class Methods
transaction(config) { |task, columns, 1| ... }
click to toggle source
# File lib/embulk/input/rediskeys.rb, line 10 def self.transaction(config, &control) # configuration code: task = { 'host' => config.param('host', :string, :default => 'localhost'), 'port' => config.param('port', :integer, :default => 6379), 'db' => config.param('db', :integer, :default => 0), 'key_prefix' => config.param('key_prefix', :string, :default => ''), 'match_key_as_key' => config.param('match_key_as_key', :bool, :default => true), 'encode' => config.param('encode', :string, :default => 'json') } redis = ::Redis.new(:host => task['host'], :port => task['port'], :db => task['db']) keys = redis.keys("#{task['key_prefix']}*").inject([]){|col, k| col.push({'name' => k, 'type' => 'string'}) col } puts "keys:#{keys}" task['columns'] = config.param('columns', :array, :default => keys).inject({}){|a, col| a[col['name']] = col['type'].to_sym a } columns = task['columns'].map.with_index{|(name, type), i| Column.new(i, name, type) } #resume(task, columns, 1, &control) puts "Redis input started." task_reports = yield(task, columns, 1) puts "Redis input finished. Commit reports = #{task_reports.to_json}" return {} end
Public Instance Methods
deserialize_element(name, x)
click to toggle source
# File lib/embulk/input/rediskeys.rb, line 70 def deserialize_element(name, x) begin type = nil @task['columns'].each do |key, value| if key == name type = value break end end val = x case type.to_sym # Converted to String implicitly? when :boolean if val.is_a?(TrueClass) || val.is_a?(FalseClass) val else downcased_val = val.downcase case downcased_val when 'true' then true when 'false' then false else nil end end when :long Integer(val) when :double Float(val) when :string val when :timestamp Time.parse(val) else raise "Shouldn't reach here: val:#{val}, col_name:#{name}, col_type:#{type}" end rescue => e STDERR.puts "Failed to deserialize: val:#{val}, col_name:#{name}, col_type:#{type}, error:#{e.inspect}" end end
expect_value_is_json()
click to toggle source
# File lib/embulk/input/rediskeys.rb, line 127 def expect_value_is_json @redis.keys("#{@task['key_prefix']}*").each do |k| case @task['encode'] when 'json' @page_builder.add(JSON.parse(@redis.get(k)).each_with_object({}) {|(key, value), new_hash| new_hash[key] = deserialize_element(key, value) }.values) @rows += 1 when 'list' @redis.lrange(k, 0, -1).each do |r| @page_builder.add(JSON.parse(r).each_with_object({}) {|(key, value), new_hash| new_hash[key] = deserialize_element(key, value) }.values) @rows += 1 end when 'hash' @page_builder.add(JSON.parse(@redis.hgetall(k).to_json).each_with_object({}) {|(key, value), new_hash| new_hash[key] = deserialize_element(key, value) }.values) @rows += 1 end end end
init()
click to toggle source
TODO
def self.guess(config)
sample_records = [ {"example"=>"a", "column"=>1, "value"=>0.1}, {"example"=>"a", "column"=>2, "value"=>0.2}, ] columns = Guess::SchemaGuess.from_hash_records(sample_records) return {"columns" => columns}
end
Calls superclass method
# File lib/embulk/input/rediskeys.rb, line 62 def init # initialization code: puts "Redis input thread #{index}..." super @rows = 0 @redis = ::Redis.new(:host => task['host'], :port => task['port'], :db => task['db']) end
match_key_as_key()
click to toggle source
# File lib/embulk/input/rediskeys.rb, line 108 def match_key_as_key records = [] @redis.keys("#{@task['key_prefix']}*").each do |k| case @task['encode'] when 'json' v = @redis.get(k) when 'list' v = @redis.lrange(k, 0, -1) when 'hash' v = @redis.hgetall(k).to_json end v = "{\"#{k}\":#{v}}" x = JSON.parse(v) records.push(deserialize_element(k, x)) @rows += 1 end @page_builder.add(records) end
run()
click to toggle source
# File lib/embulk/input/rediskeys.rb, line 151 def run if @task['match_key_as_key'] then match_key_as_key() else expect_value_is_json() end @page_builder.finish # don't forget to call finish :-) task_report = { "rows" => @rows } return task_report end