class Embulk::InputRedis
Public Class Methods
guess()
click to toggle source
# File lib/embulk/input/redis.rb, line 9 def self.guess end
transaction(config) { |task, columns, 1| ... }
click to toggle source
# File lib/embulk/input/redis.rb, line 12 def self.transaction(config, &control) task = { 'host' => config.param('host', :string, :default => 'localhost'), 'port' => config.param('port', :integer, :default => 6379), 'db' => config.param('db', :integer, :default => 0), 'key_prefix' => config.param('key_prefix', :string, :default => ''), 'encode' => config.param('encode', :string, :default => 'json'), 'columns' => config.param('columns', :array, :default => []).inject({}){|a, col| a[col['name']] = col['type'].to_sym a }, 'rows' => 0 } columns = task['columns'].map.with_index{|(name, type), i| Column.new(i, name, type) } puts "Redis input started." commit_reports = yield(task, columns, 1) puts "Redis input finished. Commit reports = #{commit_reports.to_json}" return {} end
Public Instance Methods
deserialize_element(x)
click to toggle source
# File lib/embulk/input/redis.rb, line 38 def deserialize_element(x) @task['columns'].map{|(name, type)| begin val = x[name] case type.to_sym # Converted to String implicitly? when :boolean if val.is_a?(TrueClass) || val.is_a?(FalseClass) val else downcased_val = val.downcase case downcased_val when 'true' then true when 'false' then false else nil end end when :long Integer(val) when :double Float(val) when :string val when :timestamp Time.parse(val) else raise "Shouldn't reach here: val:#{val}, col_name:#{name}, col_type:#{type}" end rescue => e STDERR.puts "Failed to deserialize: val:#{val}, col_name:#{name}, col_type:#{type}, error:#{e.inspect}" end } end
run()
click to toggle source
# File lib/embulk/input/redis.rb, line 71 def run puts "Redis input thread #{@index}..." r = Redis.new(:host => @task['host'], :port => @task['port'], :db => @task['db']) r.keys("#{@task['key_prefix']}*").each do |k| case @task['encode'] when 'json' v = r.get(k) x = JSON.parse(v) @page_builder.add(deserialize_element(x)) when 'hash' x = r.hgetall(k) @page_builder.add(deserialize_element(x)) end @task['rows'] += 1 end @page_builder.finish # don't forget to call finish :-) commit_report = { "rows" => @task['rows'] } return commit_report end