class Msgknife::Stream

Attributes

optpsr[RW]

Public Class Methods

new() click to toggle source
# File lib/msgknife.rb, line 13
def initialize
  @out_encode = false
  @optpsr = OptionParser.new
  @optpsr.on('-F', 'input as fluentd format')  { |v| @in_fluentd = v }
  @optpsr.on('-T VAL', 'key of timestamp in message')  { |v| @ts_key = v }
  @optpsr.on('-N', 'ignore if value is nil') {|v| @ignore_nil = v }
  @argv = []
  
  @output_fmt = 'console'
  @optpsr.on('-j', 'output as json') { |v| @output_fmt = 'json' }
  @optpsr.on('-m', 'output as msgpack') { |v| @output_fmt = 'msgpack' }
end

Public Instance Methods

read_io(io) click to toggle source
# File lib/msgknife.rb, line 40
def read_io(io)
  u = MessagePack::Unpacker.new(io)
  begin
    u.each {|obj|
      if @in_fluentd
        recv(obj[2], obj[1], obj[0])
      else
        if @ts_key.nil? or !(obj.key?(@ts_key))
          recv(obj, nil, nil)
        else
          ts = nil
          ts_val = obj[@ts_key]
          case ts_val
          when String
            dt = Time.parse(ts_val) rescue nil
            ts = dt.to_i
          when Fixnum
            ts = ts_val
          when Float
            ts = ts_val.to_i
          end
              
          recv(obj, ts, nil)
        end
      end
    }
  rescue EOFError
    # ignore
  rescue Interrupt
    return
  end
end
read_stream(files=nil) click to toggle source
# File lib/msgknife.rb, line 73
def read_stream(files=nil)
  if files == nil or files.size == 0
    read_io(STDIN)
  else
    f_list = Array.new
    if files.instance_of? String
      f_list << files
    else
      f_list += files
    end

    f_list.each do |fpath|
      if File.directory?(fpath)
        Find.find(fpath) do |file|
          next if File.directory?(file)
          read_io(File.open(file, 'r'))
        end
      else
        read_io(File.open(fpath, 'r'))
      end 
    end
  end
end
recv(obj, ts, tag) click to toggle source
# File lib/msgknife.rb, line 115
def recv(obj, ts, tag)
  raise 'exec(obj) must be implemented'
end
run(cmd_argv, range = nil) click to toggle source
# File lib/msgknife.rb, line 26
def run(cmd_argv, range = nil)
  args = @optpsr.parse(cmd_argv)

  unless range.nil?
    raise "Not enough arguments" if args.size < range.last + 1
    @argv = args.slice!(range)
  end

  setup(@argv)
  read_stream(args)
  teardown
end
setup(argv) click to toggle source
# File lib/msgknife.rb, line 111
def setup(argv)
  raise 'setup(argv) must be implemented' if argv.size > 0
end
teardown() click to toggle source
# File lib/msgknife.rb, line 119
def teardown; end
write_stream(obj, ts=nil, tag=nil, io=STDOUT) click to toggle source
# File lib/msgknife.rb, line 97
def write_stream(obj, ts=nil, tag=nil, io=STDOUT)
  msg =(ts.nil? and tag.nil?) ? obj : [tag, ts, obj]

  begin
    case @output_fmt
    when 'console'; PP.pp(msg, io);
    when 'json';    JSON.dump(msg, io);
    when 'msgpack'; io.write(msg.to_msgpack);
    end
  rescue Errno::EPIPE => e ;
  end
end