class Abid::Waiter

non-block waiter

Constants

Entry

Public Class Methods

new() click to toggle source
# File lib/abid/waiter.rb, line 7
def initialize
  @cv = ConditionVariable.new
  @mutex = Mutex.new
  @queue = MultiRBTree.new
  @thread = nil
  @error = nil
end

Public Instance Methods

alive?() click to toggle source
# File lib/abid/waiter.rb, line 38
def alive?
  @thread.alive?
end
empty?() click to toggle source
# File lib/abid/waiter.rb, line 34
def empty?
  @queue.empty?
end
shutdown(error = nil) click to toggle source
# File lib/abid/waiter.rb, line 25
def shutdown(error = nil)
  error ||= RuntimeError.new('waiter is shutting down')
  @mutex.synchronize do
    @error = error
    @queue.each { |_, e| e.ivar.fail(error) }
    @queue.clear
  end
end
wait(interval: 5, timeout: 60, &block) click to toggle source
# File lib/abid/waiter.rb, line 15
def wait(interval: 5, timeout: 60, &block)
  run_thread

  ivar = Concurrent::IVar.new
  now = Time.now.to_f
  next_time = now + interval
  push(Entry.new(ivar, now, next_time, interval, timeout, block))
  ivar
end

Private Instance Methods

proc_entry(entry) click to toggle source
# File lib/abid/waiter.rb, line 70
def proc_entry(entry)
  unless sleep_until_next_time(entry)
    # failed to wait. retry.
    push(entry)
    return
  end

  return if entry.ivar.complete? # canceled

  now = Time.now.to_f
  elapsed = now - entry.start_time
  ret = entry.block.call(elapsed)
  if ret
    entry.ivar.try_set(ret)
  elsif entry.timeout > 0 && entry.timeout < elapsed
    fail 'timeout exceeded'
  else
    entry.next_time = now + entry.interval
    push(entry)
  end
rescue Exception => err
  begin
    entry.ivar.fail(err)
  rescue Concurrent::MultipleAssignmentError
    nil
  end
end
push(entry) click to toggle source
# File lib/abid/waiter.rb, line 44
def push(entry)
  @mutex.synchronize do
    fail @error if @error

    @queue[entry.next_time] = entry
    @cv.signal
  end
end
run_thread() click to toggle source
# File lib/abid/waiter.rb, line 98
def run_thread
  return if @thread

  @thread = Thread.new do
    begin
      proc_entry(shift) while @error.nil?
    ensure
      shutdown($ERROR_INFO) if $ERROR_INFO
    end
  end
end
shift() click to toggle source
# File lib/abid/waiter.rb, line 53
def shift
  _, e = @mutex.synchronize do
    @cv.wait(@mutex) while @queue.empty?
    @queue.shift
  end
  e
end
sleep_until_next_time(entry) click to toggle source
# File lib/abid/waiter.rb, line 61
def sleep_until_next_time(entry)
  sleep_time = entry.next_time - Time.now.to_f
  return true if sleep_time <= 0

  @mutex.synchronize { @cv.wait(@mutex, sleep_time) }

  entry.next_time <= Time.now.to_f
end