class Object

Public Instance Methods

format(tag, time, record) click to toggle source
# File lib/fluent/tail.rb, line 62
def format(tag, time, record)
  "#{Time.at(time).localtime} #{tag}: #{@output_proc.call(record)}"
end
main() click to toggle source
# File lib/fluent/tail.rb, line 66
def main
  opts = parse_options

  @output_proc = case opts[:output_type]
                 when :json then Proc.new {|record| Yajl.dump(record) }
                 when :hash then Proc.new {|record| record.to_s }
                 end

  unless opts[:unix].nil?
    uri = "drbunix:#{opts[:unix]}"
  else
    uri = "druby://#{opts[:host]}:#{opts[:port]}"
  end

  $remote_engine = DRb::DRbObject.new_with_uri(uri)

  remote_code = <<-CODE
    alias :original_emit_staream :emit_stream
    @fluent_tail_queue = Queue.new
    @fluent_tail_match_pattern = Fluent::MatchPattern.create("#{opts[:pattern]}")
    @fluent_tail_match_cache = {}

    def emit_stream(tag, es)
      matched = @fluent_tail_match_cache[tag]

      if matched.nil?
        matched = @fluent_tail_match_pattern.match(tag)
        @fluent_tail_match_cache[tag] = matched
      end

      @fluent_tail_queue.push([tag, es.dup]) if matched

      original_emit_staream(tag, es)
    end

    def pop
      @fluent_tail_queue.pop
    end
  CODE

  if $remote_engine.respond_to?(:original_emit_staream)
    abort 'another client has already connected to the server. abort.'
  end

  begin
    $remote_engine.method_missing(:instance_eval, remote_code)

    while e = $remote_engine.pop
      tag, es = e
      es.each do |time,record|
        STDOUT.puts format(tag, time, record)
      end
    end
  ensure
    if not $remote_engine.nil? and $remote_engine.respond_to?(:original_emit_staream)
      remote_code = <<-CODE
        @fluent_tail_queue = nil
        alias :emit_stream :original_emit_staream
        undef :original_emit_staream
      CODE
      $remote_engine.method_missing(:instance_eval, remote_code)
    end
  end
end
parse_options() click to toggle source
# File lib/fluent/tail.rb, line 5
def parse_options
  op = OptionParser.new
  op.banner += ' <pattern>'

  (class<<self;self;end).module_eval do
    define_method(:usage) do |msg|
      puts op.to_s
      puts "error: #{msg}" if msg
      exit 1
    end
  end

  opts = {
    host: '127.0.0.1',
    port: 24230,
    unix: nil,
    pattern: nil,
    output_type: :json,
  }

  op.on('-h', '--host HOST', "fluent host (default: #{opts[:host]})") {|v|
    opts[:host] = v
  }

  op.on('-p', '--port PORT', "debug_agent tcp port (default: #{opts[:host]})", Integer) {|v|
    opts[:port] = v
  }

  op.on('-u', '--unix PATH', "use unix socket instead of tcp") {|v|
    opts[:unix] = b
  }

  op.on('-t', '--output-type TYPE', "output format of record. available types are 'json' or 'hash'. (default: #{opts[:output_type]})") {|v|
    case v.downcase
    when 'json'
      opts[:output_type] = :json
    when 'hash'
      opts[:output_type] = :hash
    else
      raise ConfigError, "output_type must be 'json' or 'hash'"
    end
  }

  begin
    op.parse!(ARGV)
    opts[:pattern] = ARGV.shift

    if opts[:pattern].nil?
      usage "a pattern must be specified"
    end
  rescue
    usage $!.to_s
  end

  opts
end