class Cult::Paramap::Job

Attributes

block[R]
ident[R]
pid[R]
pipe[R]
rebind[R]
value[R]

Public Class Methods

new(ident, value, rebind: {}, &block) click to toggle source
# File lib/cult/paramap.rb, line 24
def initialize(ident, value, rebind: {}, &block)
  @ident, @value, @rebind, @block = ident, value, rebind, block

  @pipe = IO.pipe
  @pid = fork do
    @pipe[0].close
    prepare_forked_environment!
    begin
      write_response!('=', block.call(value))
    rescue Exception => e
      write_response!('!', e)
    end
  end
  @pipe[1].close
end

Public Instance Methods

exception() click to toggle source
# File lib/cult/paramap.rb, line 99
def exception
  fetch_response!
  @exception
end
fetch_response!() click to toggle source
# File lib/cult/paramap.rb, line 75
def fetch_response!
  unless pipe[0].closed?
    data = @pipe[0].read

    scode = data[0]
    fail unless ['!', '='].include?(scode)

    data = data[1..-1]
    ivar = (scode == '!') ? :exception : :result
    begin
      obj = Marshal.load(data)
    rescue
      obj = nil
    end
    instance_variable_set("@#{ivar}", obj)
    pipe[0].close
  end
end
prepare_forked_environment!() click to toggle source
# File lib/cult/paramap.rb, line 54
def prepare_forked_environment!
  rebind_streams!
  # Stub out things that have caused a problem in the past.
  Kernel.send(:define_method, :exec) do |*a|
    fail "don't use Kernel\#exec inside of a paramap job"
  end
end
rebind_streams!() click to toggle source
# File lib/cult/paramap.rb, line 40
def rebind_streams!
  names = {
    stdout: STDOUT,
    stdin: STDIN,
    stderr: STDERR,
    nil => '/dev/null'
  }
  rebind.each do |k, v|
    src, dst = names[k], names[v]
    dst = File.open(dst, 'w+') if dst.is_a?(String)
    src.reopen(dst)
  end
end
result() click to toggle source
# File lib/cult/paramap.rb, line 94
def result
  fetch_response!
  @result
end
success?() click to toggle source
# File lib/cult/paramap.rb, line 104
def success?
  exception.nil?
end
write_response!(scode, obj) click to toggle source
# File lib/cult/paramap.rb, line 62
def write_response!(scode, obj)
  fail unless ['!', '='].include?(scode)
  begin
    pipe[1].write(scode + Marshal.dump(obj))
  rescue TypeError => e
    # Unmarshallable
    raise unless e.message.match(/_dump_data/)
    pipe[1].write(scode + Marshal.dump(nil))
  end
  pipe[1].flush
  pipe[1].close
end