class Levdon::Worker

Attributes

pid[R]

Public Class Methods

new(&block) click to toggle source
# File lib/levdon.rb, line 134
def initialize(&block)
  @child_read, @parent_write = create_pipe
  @parent_read, @child_write = create_pipe
  @block = block
  @io_stream = NonBlockLineStream.new(@parent_read,@parent_write)
end

Public Instance Methods

alive?() click to toggle source
# File lib/levdon.rb, line 179
def alive?
  Process.kill(0, @pid)
  true
rescue Errno::ESRCH
  false
end
async_execute(*msg) click to toggle source
# File lib/levdon.rb, line 169
def async_execute(*msg)
  nonblock_write_to_child(msg)
end
create_pipe() click to toggle source
# File lib/levdon.rb, line 141
def create_pipe
  IO.pipe.map{|pipe| pipe.tap{|_| _.set_encoding("ASCII-8BIT", "ASCII-8BIT") } }
end
execute(*msg) click to toggle source
# File lib/levdon.rb, line 164
def execute(*msg)
  write_to_child(msg)
  Thread.new { read_from_child }
end
install_exit_handler() click to toggle source
# File lib/levdon.rb, line 243
def install_exit_handler
  at_exit do
    next unless alive?
    begin
      Process.kill("KILL", @pid)
      Process.wait(@pid)
    rescue Errno::ESRCH
      # noop
    rescue => e
      puts "error at_exit: #{ e }"
      raise e
    end
  end
end
install_signal_handler() click to toggle source
# File lib/levdon.rb, line 258
def install_signal_handler
  [:INT, :QUIT].each do |signal|
    old_handler = Signal.trap(signal) {
      Process.kill(signal, @pid)
      Process.wait(@pid)
      old_handler.call
    }
  end
end
nonblock_read_from_child() click to toggle source
# File lib/levdon.rb, line 200
def nonblock_read_from_child()
  data = @io_stream.read
  if(data)
    return Marshal.load(data.chomp.gsub("@NDELIMITER@", "\n"))
  end
  return nil
end
nonblock_write_to_child(obj) click to toggle source
# File lib/levdon.rb, line 208
def nonblock_write_to_child(obj)
  data = Marshal.dump(obj).gsub("\n", "@NDELIMITER@") + "\n"
  @io_stream.write(data)
end
poll() click to toggle source
# File lib/levdon.rb, line 186
def poll
  @io_stream.poll
end
read_from_child() click to toggle source
# File lib/levdon.rb, line 213
def read_from_child
  read_object(@parent_read)
end
read_from_parent() click to toggle source
# File lib/levdon.rb, line 221
def read_from_parent
  read_object(@child_read)
end
read_object(read) click to toggle source
# File lib/levdon.rb, line 195
def read_object(read)
  data = read.gets
  Marshal.load(data.chomp.gsub("@NDELIMITER@", "\n"))
end
run() click to toggle source
# File lib/levdon.rb, line 145
def run
  @pid = fork do
    @parent_read.close
    @parent_write.close
    write_to_parent(:ready)
    loop do
      args = read_from_parent
      break if args == :stop 
      result = @block.call(*args)
      write_object(result, @child_write)
    end

    @child_read.close
    @child_write.close
  end

  wait_after_fork if @pid
end
stop() click to toggle source
# File lib/levdon.rb, line 173
def stop
  return unless alive?
  write_to_child(:stop)
  Process.wait(@pid)
end
wait_after_fork() click to toggle source
# File lib/levdon.rb, line 229
def wait_after_fork
  @child_read.close
  @child_write.close

  install_exit_handler
  install_signal_handler
  
  Thread.new {
    result = read_from_child
    raise "Failed to start worker pid #{ @pid }" unless result == :ready
    result
  }
end
write_object(obj, write) click to toggle source
# File lib/levdon.rb, line 190
def write_object(obj, write)
  data = Marshal.dump(obj).gsub("\n", "@NDELIMITER@") + "\n"
  write.write data
end
write_to_child(obj) click to toggle source
# File lib/levdon.rb, line 217
def write_to_child(obj)
  write_object(obj, @parent_write)
end
write_to_parent(obj) click to toggle source
# File lib/levdon.rb, line 225
def write_to_parent(obj)
  write_object(obj, @child_write)
end