class Magellan::LogFunnel

Constants

VERSION

Public Class Methods

new(argv) click to toggle source
# File lib/magellan/log_funnel.rb, line 18
def initialize(argv)
  @argv = argv
end
start(argv) click to toggle source
# File lib/magellan/log_funnel.rb, line 14
def self.start(argv)
  self.new(argv).run
end

Public Instance Methods

connect_fluentd() click to toggle source
# File lib/magellan/log_funnel.rb, line 47
def connect_fluentd
  @logger = Fluent::Logger::FluentLogger.new("worker_logs")
end
now() click to toggle source
# File lib/magellan/log_funnel.rb, line 51
def now
  Time.now
end
parse_options(argv) click to toggle source
# File lib/magellan/log_funnel.rb, line 22
def parse_options(argv)
  @conf = {}
  OptionParser.new do |o|
    o.on("--fluentd URI") do |uri|
      @conf[:fluentd] = URI(uri)
    end
    o.on("--project PROJECT") do |proj|
      @conf[:project] = proj
    end
    o.on("--stage STAGE") do |stage|
      @conf[:stage] = stage
    end
    o.on("--worker-version VERSION") do |ver|
      @conf[:version] = ver
    end
    o.on("--container NAME") do |cname|
      @conf[:container] = cname
    end
    o.on("-d", "--[no-]daemon") do |daemon|
      @conf[:daemon] = daemon
    end
    o.order!(argv)
  end
end
post_msg(msg) click to toggle source
# File lib/magellan/log_funnel.rb, line 59
def post_msg(msg)
  @logger.post(stage_tag, {
    time: self.now.to_f,
    project: @conf[:project],
    stage: @conf[:stage],
    version: @conf[:version],
    container: @conf[:container],
    message: msg
  })
end
run() click to toggle source
# File lib/magellan/log_funnel.rb, line 70
def run
  parse_options(@argv)

  connect_fluentd

  if @conf[:daemon]
    Process.daemon
  end

  begin
    IO.pipe do |out|
      IO.pipe do |err|
        @pid = Process.spawn(*@argv, out: out[1], err: err[1])
        out[1].close
        err[1].close

        th_out = Thread.start do
          begin
            while l = out[0].gets
              post_msg(l.chomp)
            end
          rescue IOError
          end
        end
        th_err = Thread.start do
          begin
            while l = err[0].gets
              post_msg(l.chomp)
            end
          rescue IOError
          end
        end
        th_out.join
        th_err.join
        Process.waitpid(@pid)
        @child_status = $?
        return (@child_status.exitstatus || 127)
      end
    end
  end
ensure
  unless @child_status
    # interrupted by signal or so.
    # stop child process
    if @pid
      Process.kill(:TERM, @pid)
    end
  end
end
stage_tag() click to toggle source
# File lib/magellan/log_funnel.rb, line 55
def stage_tag
  @stage_tag ||= "#{@conf[:project]}.#{@conf[:stage]}"
end