class Aggkit::Watcher::ProcessHandler

Attributes

command[RW]
options[RW]
process[RW]
stderr[RW]
stdin[RW]
stdout[RW]
watcher[RW]

Public Class Methods

capture(cmd) click to toggle source
# File lib/aggkit/watcher/process_handler.rb, line 7
def self.capture(cmd)
  io = IO.popen(cmd)
  output = io.read
  begin
    io.close
  rescue StandardError
    nil
  end
  [output, $?]
end
new(watcher, *cmd) click to toggle source
# File lib/aggkit/watcher/process_handler.rb, line 18
def initialize(watcher, *cmd)
  @watcher = watcher

  @options = if cmd.last.is_a? Hash
    cmd.pop
  else
    {}
  end

  @command = cmd.flatten.map{|c| c.to_s.strip }.reject(&:empty?)
  @process = build_process(*command)

  initialize_streams

  @process.start
end

Public Instance Methods

build_process(*cmd) click to toggle source
# File lib/aggkit/watcher/process_handler.rb, line 59
def build_process(*cmd)
  pr = ::Aggkit::ChildProcess.build(*cmd)
  @stdout, wout = IO.pipe
  @stderr, werr = IO.pipe

  @stdin = pr.io.stdin

  pr.io.stdout = wout
  pr.io.stderr = werr
  pr.duplex = true
  pr
end
handled?() click to toggle source
# File lib/aggkit/watcher/process_handler.rb, line 72
def handled?
  !!@process.exit_code
end
initialize_streams() click to toggle source
# File lib/aggkit/watcher/process_handler.rb, line 35
def initialize_streams
  @threads = []

  @threads << Thread.new do
    loop do
      break unless synchro_readline(stdout, STDOUT)
    end
  end

  @threads << Thread.new do
    loop do
      break unless synchro_readline(stderr, STDERR)
    end
  end
end
method_missing(m, *args, &block) click to toggle source
Calls superclass method
# File lib/aggkit/watcher/process_handler.rb, line 126
def method_missing(m, *args, &block)
  if @process.respond_to? m
    @process.send(m, *args, &block)
  else
    super
  end
end
stop(timeout = (@options[:timeout] || 5)) click to toggle source
# File lib/aggkit/watcher/process_handler.rb, line 103
def stop(timeout = (@options[:timeout] || 5))
  return if exited?

  terminate

  begin
    return poll_for_exit(timeout)
  rescue TimeoutError
    # try next
  end

  begin
    @process.send(:send_kill)
  rescue Errno::ECHILD, Errno::ESRCH
    # handle race condition where process dies between timeout
    # and send_kill
  end

  wait
ensure
  clean_all
end
stop!(status) click to toggle source
# File lib/aggkit/watcher/process_handler.rb, line 76
def stop!(status)
  @process.send(:set_exit_code, status)
  stop
end
synchro_readline(io, out) click to toggle source
# File lib/aggkit/watcher/process_handler.rb, line 51
def synchro_readline(io, out)
  str = io.gets
  @watcher.iolock.synchronize{ out.puts(str) }
  true
rescue StandardError
  false
end
terminate() click to toggle source
# File lib/aggkit/watcher/process_handler.rb, line 81
def terminate
  return if exited?
  return if @terminating

  @terminating = true

  if @options[:termcmd]
    termcmd = @options[:termcmd].gsub('%PID%', pid.to_s)
    @watcher.log "Terminating by #{termcmd}..."
    output, status = ::Aggkit::Watcher::ProcessHandler.capture(termcmd)

    if status.success?
      @watcher.log "Success: #{output}"
    else
      @watcher.error "Failed: #{output}"
      @process.send(:send_term)
    end
  else
    @process.send(:send_term)
  end
end

Private Instance Methods

clean_all() click to toggle source
# File lib/aggkit/watcher/process_handler.rb, line 136
def clean_all
  @stdin.close rescue nil
  @stdout.close rescue nil
  @stderr.close rescue nil

  @threads.each do |th|
    th.terminate rescue nil
    th.join rescue nil
  end

  @threads = []
end