class Abid::Waiter
non-block waiter
Constants
- Entry
Public Class Methods
new()
click to toggle source
# File lib/abid/waiter.rb, line 7 def initialize @cv = ConditionVariable.new @mutex = Mutex.new @queue = MultiRBTree.new @thread = nil @error = nil end
Public Instance Methods
alive?()
click to toggle source
# File lib/abid/waiter.rb, line 38 def alive? @thread.alive? end
empty?()
click to toggle source
# File lib/abid/waiter.rb, line 34 def empty? @queue.empty? end
shutdown(error = nil)
click to toggle source
# File lib/abid/waiter.rb, line 25 def shutdown(error = nil) error ||= RuntimeError.new('waiter is shutting down') @mutex.synchronize do @error = error @queue.each { |_, e| e.ivar.fail(error) } @queue.clear end end
wait(interval: 5, timeout: 60, &block)
click to toggle source
# File lib/abid/waiter.rb, line 15 def wait(interval: 5, timeout: 60, &block) run_thread ivar = Concurrent::IVar.new now = Time.now.to_f next_time = now + interval push(Entry.new(ivar, now, next_time, interval, timeout, block)) ivar end
Private Instance Methods
proc_entry(entry)
click to toggle source
# File lib/abid/waiter.rb, line 70 def proc_entry(entry) unless sleep_until_next_time(entry) # failed to wait. retry. push(entry) return end return if entry.ivar.complete? # canceled now = Time.now.to_f elapsed = now - entry.start_time ret = entry.block.call(elapsed) if ret entry.ivar.try_set(ret) elsif entry.timeout > 0 && entry.timeout < elapsed fail 'timeout exceeded' else entry.next_time = now + entry.interval push(entry) end rescue Exception => err begin entry.ivar.fail(err) rescue Concurrent::MultipleAssignmentError nil end end
push(entry)
click to toggle source
# File lib/abid/waiter.rb, line 44 def push(entry) @mutex.synchronize do fail @error if @error @queue[entry.next_time] = entry @cv.signal end end
run_thread()
click to toggle source
# File lib/abid/waiter.rb, line 98 def run_thread return if @thread @thread = Thread.new do begin proc_entry(shift) while @error.nil? ensure shutdown($ERROR_INFO) if $ERROR_INFO end end end
shift()
click to toggle source
# File lib/abid/waiter.rb, line 53 def shift _, e = @mutex.synchronize do @cv.wait(@mutex) while @queue.empty? @queue.shift end e end
sleep_until_next_time(entry)
click to toggle source
# File lib/abid/waiter.rb, line 61 def sleep_until_next_time(entry) sleep_time = entry.next_time - Time.now.to_f return true if sleep_time <= 0 @mutex.synchronize { @cv.wait(@mutex, sleep_time) } entry.next_time <= Time.now.to_f end