class Msgknife::Stream
Attributes
optpsr[RW]
Public Class Methods
new()
click to toggle source
# File lib/msgknife.rb, line 13 def initialize @out_encode = false @optpsr = OptionParser.new @optpsr.on('-F', 'input as fluentd format') { |v| @in_fluentd = v } @optpsr.on('-T VAL', 'key of timestamp in message') { |v| @ts_key = v } @optpsr.on('-N', 'ignore if value is nil') {|v| @ignore_nil = v } @argv = [] @output_fmt = 'console' @optpsr.on('-j', 'output as json') { |v| @output_fmt = 'json' } @optpsr.on('-m', 'output as msgpack') { |v| @output_fmt = 'msgpack' } end
Public Instance Methods
read_io(io)
click to toggle source
# File lib/msgknife.rb, line 40 def read_io(io) u = MessagePack::Unpacker.new(io) begin u.each {|obj| if @in_fluentd recv(obj[2], obj[1], obj[0]) else if @ts_key.nil? or !(obj.key?(@ts_key)) recv(obj, nil, nil) else ts = nil ts_val = obj[@ts_key] case ts_val when String dt = Time.parse(ts_val) rescue nil ts = dt.to_i when Fixnum ts = ts_val when Float ts = ts_val.to_i end recv(obj, ts, nil) end end } rescue EOFError # ignore rescue Interrupt return end end
read_stream(files=nil)
click to toggle source
# File lib/msgknife.rb, line 73 def read_stream(files=nil) if files == nil or files.size == 0 read_io(STDIN) else f_list = Array.new if files.instance_of? String f_list << files else f_list += files end f_list.each do |fpath| if File.directory?(fpath) Find.find(fpath) do |file| next if File.directory?(file) read_io(File.open(file, 'r')) end else read_io(File.open(fpath, 'r')) end end end end
recv(obj, ts, tag)
click to toggle source
# File lib/msgknife.rb, line 115 def recv(obj, ts, tag) raise 'exec(obj) must be implemented' end
run(cmd_argv, range = nil)
click to toggle source
# File lib/msgknife.rb, line 26 def run(cmd_argv, range = nil) args = @optpsr.parse(cmd_argv) unless range.nil? raise "Not enough arguments" if args.size < range.last + 1 @argv = args.slice!(range) end setup(@argv) read_stream(args) teardown end
setup(argv)
click to toggle source
# File lib/msgknife.rb, line 111 def setup(argv) raise 'setup(argv) must be implemented' if argv.size > 0 end
teardown()
click to toggle source
# File lib/msgknife.rb, line 119 def teardown; end
write_stream(obj, ts=nil, tag=nil, io=STDOUT)
click to toggle source
# File lib/msgknife.rb, line 97 def write_stream(obj, ts=nil, tag=nil, io=STDOUT) msg =(ts.nil? and tag.nil?) ? obj : [tag, ts, obj] begin case @output_fmt when 'console'; PP.pp(msg, io); when 'json'; JSON.dump(msg, io); when 'msgpack'; io.write(msg.to_msgpack); end rescue Errno::EPIPE => e ; end end