class Amberletters::Process

Constants

DEFAULT_LOG_LEVEL
DEFAULT_TIMEOUT
END_MARKER
RUBY
RUBY_EXT

Shamelessly stolen from Rake

Attributes

blocker[RW]
command[R]
cwd[R]
input[R]
input_buffer[R]
output[R]
output_buffer[R]
pid[R]
status[R]
timeout[RW]
triggers[R]

Public Class Methods

new(*args) click to toggle source
# File lib/amberletters.rb, line 246
def initialize(*args)
  options         = if args.last.is_a?(Hash) then args.pop else {} end
  @command        = args
  @triggers       = []
  @blocker        = nil
  @input_buffer   = StringIO.new
  @output_buffer  = StringScanner.new("")
  @env            = options.fetch(:env) {{}}
  @cwd            = options.fetch(:cwd) {Dir.pwd}
  @logger   = options.fetch(:logger) {
    l = ::Logger.new($stdout)
    l.level = DEFAULT_LOG_LEVEL
    l
  }
  @state         = :not_started
  @shell         = options.fetch(:shell) { '/bin/sh' }
  @transcript    = options.fetch(:transcript) {
    t = Object.new
    def t.<<(*)
      # NOOP
    end
    t
  }
  @history = TranscriptHistoryBuffer.new(@transcript)
  @timeout = options.fetch(:timeout) { DEFAULT_TIMEOUT }

  ObjectSpace.define_finalizer(self) do
    kill!
  end
end

Public Instance Methods

add_blocking_trigger(event, *args, &block) click to toggle source
# File lib/amberletters.rb, line 319
def add_blocking_trigger(event, *args, &block)
  t = add_trigger(event, *args, &block)
  t.time_to_live = 1
  @logger.debug "waiting for #{t}"
  self.blocker = t
  catchup_trigger!(t)
  t
end
add_nonblocking_trigger(event, *args, &block) click to toggle source
# File lib/amberletters.rb, line 292
def add_nonblocking_trigger(event, *args, &block)
  t = add_trigger(event, *args, &block)
  catchup_trigger!(t)
  t
end
add_trigger(event, *args, &block) click to toggle source
# File lib/amberletters.rb, line 298
def add_trigger(event, *args, &block)
  t = build_trigger(event, *args, &block)
  triggers << t
  @logger.debug "added trigger on #{t}"
  t
end
alive?() click to toggle source
# File lib/amberletters.rb, line 357
def alive?
  ::Process.kill(0, @pid)
  true
rescue Errno::ESRCH, Errno::ENOENT
  false
end
blocked?() click to toggle source
# File lib/amberletters.rb, line 364
def blocked?
  @blocker
end
ended?() click to toggle source

Have we seen the end marker yet?

# File lib/amberletters.rb, line 381
def ended?
  @state == :ended
end
exited?() click to toggle source
# File lib/amberletters.rb, line 376
def exited?
  @state == :exited
end
flush_output_buffer!() click to toggle source
# File lib/amberletters.rb, line 345
def flush_output_buffer!
  @logger.debug "flushing output buffer"
  @output_buffer.terminate
end
kill!(signal="TERM") click to toggle source
# File lib/amberletters.rb, line 350
def kill!(signal="TERM")
  handle_child_exit do
    @logger.info "Killing process #{@pid}"
    ::Process.kill(signal, @pid)
  end
end
not_started?() click to toggle source
# File lib/amberletters.rb, line 372
def not_started?
  @state == :not_started
end
on(event, *args, &block) click to toggle source
# File lib/amberletters.rb, line 277
def on(event, *args, &block)
  add_nonblocking_trigger(event, *args, &block)
end
prepend_trigger(event, *args, &block) click to toggle source
# File lib/amberletters.rb, line 311
def prepend_trigger(event, *args, &block)
  t = build_trigger(event, *args, &block)
  triggers.unshift(t)
  @logger.debug "prepended trigger on #{t}"
  t
end
remove_trigger(t) click to toggle source
# File lib/amberletters.rb, line 305
def remove_trigger(t)
  triggers.delete(t)
  @logger.debug "removed trigger on #{t}"
  t
end
running?() click to toggle source
# File lib/amberletters.rb, line 368
def running?
  @state == :running
end
start!() click to toggle source
# File lib/amberletters.rb, line 328
def start!
  raise StateError, "Already started!" unless not_started?
  @logger.debug "installing end marker handler for #{END_MARKER}"
  prepend_trigger(:output, /#{END_MARKER}/, :exclusive => false, :time_to_live => 1) do |process, data|
    handle_end_marker
  end
  handle_child_exit do
    cmd = wrapped_command
    @logger.debug "executing #{cmd.join(' ')}"
    merge_environment(@env) do
      @output, @input, @pid = PTY.spawn(*cmd)
    end
    @state = :running
    @logger.debug "spawned pid #{@pid}; in: #{@input.inspect}; out: #{@output.inspect}"
  end
end
time() click to toggle source
# File lib/amberletters.rb, line 385
def time
  Time.now
end
to_s() click to toggle source
# File lib/amberletters.rb, line 389
def to_s
  "Process<pid: #{pid}; in: #{input.inspect}; out: #{output.inspect}>"
end
wait_for(event, *args, &block) click to toggle source
# File lib/amberletters.rb, line 281
def wait_for(event, *args, &block)
  raise "Already waiting for #{blocker}" if blocker
  t = add_blocking_trigger(event, *args, &block)
  @logger.debug "Entering wait cycle for #{event}"
  process_events
rescue
  unblock!
  triggers.delete(t)
  raise
end

Private Instance Methods

build_trigger(event, *args, &block) click to toggle source
# File lib/amberletters.rb, line 395
def build_trigger(event, *args, &block)
  klass = trigger_class_for_event(event)
  t = klass.new(*args, &block)
  t.logger = @logger if @logger
  t
end
catchup_trigger!(trigger) click to toggle source
# File lib/amberletters.rb, line 616
def catchup_trigger!(trigger)
  @logger.debug "Catching up trigger #{trigger}"
  check_trigger(trigger)
end
check_trigger(trigger) { || ... } click to toggle source
# File lib/amberletters.rb, line 530
def check_trigger(trigger)
  if trigger.call(self)         # match
    @logger.debug "match trigger #{trigger}"
    if blocker.equal?(trigger)
      unblock!
    end
    if trigger.time_to_live
      if trigger.time_to_live > 1
        trigger.time_to_live -= 1
        @logger.debug "trigger ttl reduced to #{trigger.time_to_live}"
      else
        triggers.delete(trigger)
        @logger.debug "trigger removed"
      end
    end
    yield if block_given?
  else
    @logger.debug "no match"
  end
end
collect_remaining_output() click to toggle source
# File lib/amberletters.rb, line 465
def collect_remaining_output
  if @output.nil?
    @logger.debug "unable to collect output for missing output handle"
    return
  end
  @logger.debug "collecting remaining output"
  while data = @output.read_nonblock(1024)
    output_buffer << data
    @logger.debug "read #{data.size} bytes"
  end
rescue EOFError, Errno::EIO => error
  @logger.debug error.message
end
flush_triggers!(kind) click to toggle source
# File lib/amberletters.rb, line 585
def flush_triggers!(kind)
  @logger.debug "flushing triggers matching #{kind}"
  triggers.delete_if{|t| kind === t}
end
format_input_for_log(text) click to toggle source
# File lib/amberletters.rb, line 625
def format_input_for_log(text)
  "\n" + text.split("\n").map{|l| "<< #{l}"}.join("\n")
end
format_output_for_log(text) click to toggle source
# File lib/amberletters.rb, line 621
def format_output_for_log(text)
  "\n" + text.split("\n").map{|l| ">> #{l}"}.join("\n")
end
handle_child_exit() { || ... } click to toggle source
# File lib/amberletters.rb, line 568
def handle_child_exit
  handle_eio do
    yield
  end
rescue PTY::ChildExited => error
  @logger.debug "caught PTY::ChildExited"
  collect_remaining_output
  handle_exit(error.status)
end
handle_eio() { || ... } click to toggle source
# File lib/amberletters.rb, line 578
def handle_eio
  yield
rescue Errno::EIO => error
  @logger.debug "Errno::EIO caught"
  wait_for_child_to_die
end
handle_end_marker() click to toggle source
# File lib/amberletters.rb, line 551
def handle_end_marker
  return false if ended?
  @logger.debug "end marker found"
  output_buffer.string.gsub!(/#{END_MARKER}\s*/, '')
  output_buffer.unscan
  @state = :ended
  @logger.debug "end marker expunged from output buffer"
  @logger.debug "acknowledging end marker"
  self.puts
end
handle_exit(status=status_from_waitpid) click to toggle source
# File lib/amberletters.rb, line 499
def handle_exit(status=status_from_waitpid)
  return false if exited?
  @logger.debug "handling exit of process #{@pid}"
  @state  = :exited
  @status = status
  handle_triggers(:exit)
  if status == 0
    process_interruption(:exit)
  else
    process_interruption(:abnormal_exit)
  end
end
handle_triggers(event) click to toggle source
# File lib/amberletters.rb, line 517
def handle_triggers(event)
  klass = trigger_class_for_event(event)
  matches = 0
  triggers.grep(klass).each do |t|
    @logger.debug "checking #{event} against #{t}"
    check_trigger(t) do
      matches += 1
      break if t.exclusive?
    end
  end
  matches > 0
end
merge_environment(new_env) { || ... } click to toggle source
# File lib/amberletters.rb, line 590
  def merge_environment(new_env)
    old_env = new_env.inject({}) do |old, (key, value)|
      old[key] = ENV[key]
      ENV[key] = value
      old
    end
    yield
  ensure
    old_env.each_pair do |key, value|
      if value.nil? then ENV.delete(key) else ENV[key] = value end
    end
  end

  def process_interruption(reason)
    if blocked?
      self.interruption = reason
      unless handle_triggers(:unsatisfied)
        raise SystemError,
              "Interrupted (#{reason}) while waiting for #{blocker}.\n" \
              "Recent activity:\n" +
              @history.buffer + "\n" + ("-" * 60) + "\n"
      end
      unblock!
    end
  end

  def catchup_trigger!(trigger)
    @logger.debug "Catching up trigger #{trigger}"
    check_trigger(trigger)
  end

  def format_output_for_log(text)
    "\n" + text.split("\n").map{|l| ">> #{l}"}.join("\n")
  end

  def format_input_for_log(text)
    "\n" + text.split("\n").map{|l| "<< #{l}"}.join("\n")
  end

  def shortest_timeout
    result = triggers.grep(TimeoutTrigger).map{|t|
      t.expiration_time - Time.now
    }.min
    if result.nil? then result = @timeout end
    if result < 0 then result = 0 end
    result
  end
end
process_error(handle) click to toggle source
# File lib/amberletters.rb, line 488
def process_error(handle)
  @logger.debug "error on #{handle.inspect}"
  raise "Error running <#{@command}>\n#{@history.buffer.split(/\r?\n/).grep_v(END_MARKER).join("\n")}"
end
process_events() click to toggle source
# File lib/amberletters.rb, line 418
def process_events
  raise StateError, "Process not started!" if not_started?
  handle_child_exit do
    while blocked?
      input_handles  = input_buffer.string.empty? ? [] : [@input]
      output_handles = [@output]
      error_handles  = [@input, @output].uniq
      timeout        = shortest_timeout
      @logger.debug "select() on #{[output_handles, input_handles, error_handles, timeout].inspect}"

      ready_handles = IO.select(
        output_handles, input_handles, error_handles, timeout)

      if ready_handles.nil?
        process_timeout
      else
        ready_outputs, ready_inputs, ready_errors = *ready_handles
        ready_errors.each do |handle| process_error(handle) end
        ready_outputs.each do |handle| process_output(handle) end
        ready_inputs.each do |handle| process_input(handle) end
      end
    end
  end
end
process_input(handle) click to toggle source
# File lib/amberletters.rb, line 443
def process_input(handle)
  @logger.debug "input ready #{handle.inspect}"
  handle.write(input_buffer.string)
  @logger.debug format_output_for_log(input_buffer.string)
  @logger.debug "wrote #{input_buffer.string.size} bytes"
  input_buffer.string = ""
end
process_interruption(reason) click to toggle source
# File lib/amberletters.rb, line 603
def process_interruption(reason)
  if blocked?
    self.interruption = reason
    unless handle_triggers(:unsatisfied)
      raise SystemError,
            "Interrupted (#{reason}) while waiting for #{blocker}.\n" \
            "Recent activity:\n" +
            @history.buffer + "\n" + ("-" * 60) + "\n"
    end
    unblock!
  end
end
process_output(handle) click to toggle source
# File lib/amberletters.rb, line 451
def process_output(handle)
  @logger.debug "output ready #{handle.inspect}"
  data = handle.readpartial(1024)
  output_buffer << data
  @history << data
  @logger.debug format_input_for_log(data)
  @logger.debug "read #{data.size} bytes"
  handle_triggers(:bytes)
  handle_triggers(:output)
  flush_triggers!(OutputTrigger) if ended?
  flush_triggers!(BytesTrigger) if ended?
  # flush_output_buffer! unless ended?
end
process_timeout() click to toggle source
# File lib/amberletters.rb, line 493
def process_timeout
  @logger.debug "timeout"
  handle_triggers(:timeout)
  process_interruption(:timeout)
end
shortest_timeout() click to toggle source
# File lib/amberletters.rb, line 629
def shortest_timeout
  result = triggers.grep(TimeoutTrigger).map{|t|
    t.expiration_time - Time.now
  }.min
  if result.nil? then result = @timeout end
  if result < 0 then result = 0 end
  result
end
status_from_waitpid() click to toggle source
# File lib/amberletters.rb, line 512
def status_from_waitpid
  @logger.debug "waiting for exist status of #{@pid}"
  ::Process.waitpid2(@pid)[1]
end
trigger_class_for_event(event) click to toggle source
# File lib/amberletters.rb, line 402
def trigger_class_for_event(event)
  ::Amberletters.const_get("#{event.to_s.capitalize}Trigger")
end
unblock!() click to toggle source
# File lib/amberletters.rb, line 562
def unblock!
  @logger.debug "unblocked"
  triggers.delete(@blocker)
  @blocker = nil
end
wait_for_child_to_die() click to toggle source
# File lib/amberletters.rb, line 479
def wait_for_child_to_die
  # Soon we should get a PTY::ChildExited
  while running? || ended?
    @logger.debug "waiting for child #{@pid} to die"
    PTY.check(@pid, true) if PTY.respond_to? :check
    sleep 0.1
  end
end
wrapped_command() click to toggle source
# File lib/amberletters.rb, line 408
def wrapped_command
  [RUBY,
    '-C', cwd,
    '-e', "system(*#{command.inspect})",
    '-e', "puts(#{END_MARKER.inspect})",
    '-e', "gets",
    '-e', "exit $?.exitstatus"
  ]
end