class Fibril
Constants
- VERSION
Attributes
current[RW]
guards[RW]
id_seq[RW]
loop_thread[RW]
queue[RW]
running[RW]
stopped[RW]
task_count[RW]
block[RW]
fiber[RW]
guards[RW]
id[RW]
Public Class Methods
deplete_guard(guard, result)
click to toggle source
# File lib/fibril/core.rb, line 109 def self.deplete_guard(guard, result) return unless waiter_list = guards[guard.id] waiter_list.each do |waiters| switches = waiters[:switches] switches[guard.id] = true if waiters.has_key?(:to_fulfill) Fibril.enqueue waiters[:to_fulfill] if switches.values.all? waiters[:result] ||= [] waiters[:result] << result else waiters[:result] ||= [] waiters[:result] << result waiters[:block][*sort_results(waiters[:result], waiters[:guards])] if waiters[:block] && switches.values.all? end end end
enqueue(fibril)
click to toggle source
# File lib/fibril/core.rb, line 94 def self.enqueue(fibril) Fibril.log "Enqueing fibril #{fibril.id}" Fibril.queue << fibril end
guard()
click to toggle source
# File lib/fibril/core.rb, line 27 def self.guard @@guard ||= OpenStruct.new end
log(msg)
click to toggle source
# File lib/fibril/core.rb, line 19 def self.log(msg) # puts msg end
new(&blk)
click to toggle source
Calls superclass method
# File lib/fibril/core.rb, line 39 def initialize(&blk) self.id = Fibril.id_seq += 1 self.block = blk self.guards = [] define_singleton_method :execute_fibril, self.block if Fibril.running super(&method(:execute)) Fibril.enqueue self else Fibril.task_count = 0 Fibril.stopped = false Fibril.running = true super(&method(:execute)) Fibril.enqueue self Fibril.start end end
pending_tasks?()
click to toggle source
# File lib/fibril/core.rb, line 209 def self.pending_tasks? ((@task_count > 0 || !@queue.empty?) && !@stopped) end
profile(test) { || ... }
click to toggle source
# File lib/fibril/core.rb, line 192 def self.profile(test) starts = Time.now result = yield ends = Time.now Fibril.log "#{test} took #{ends - starts}" return result end
sort_results(results, guards)
click to toggle source
# File lib/fibril/core.rb, line 140 def self.sort_results(results, guards) by_complete_order = guards.sort_by(&:depleted_at) results.zip(by_complete_order).sort do |(_, guard_a), (_, guard_b)| guards.index(guard_a) <=> guards.index(guard_b) end.map(&:first) end
start()
click to toggle source
# File lib/fibril/core.rb, line 187 def self.start self.start_loop if !queue.empty? self.running = false end
start_loop()
click to toggle source
# File lib/fibril/core.rb, line 200 def self.start_loop Fibril.log "Starting loop inside #{Fibril.current}" Fibril.loop_thread = Thread.current while pending_tasks? Fibril.current = nil Fibril.queue.shift.resume while !queue.empty? end end
stop()
click to toggle source
# File lib/fibril/core.rb, line 175 def self.stop Fibril do Fibril.stopped = true end end
variables()
click to toggle source
# File lib/fibril/core.rb, line 35 def self.variables @@variables ||= OpenStruct.new end
Public Instance Methods
Guard(i, fibril)
click to toggle source
# File lib/fibril/core.rb, line 213 def Guard(i, fibril) return Guard.new(i, fibril) end
await(*guards, &block)
click to toggle source
# File lib/fibril/core.rb, line 147 def await(*guards, &block) guards.map!{|guard| guard.kind_of?(Symbol) ? Fibril.guard.send(guard) : guard} raise "Invalid guard given #{guards}" unless guards.all?{|g| g.kind_of?(Guard) || g.kind_of?(Future)} if block_given? return block[*guards.map(&:result)] if guards.all?(&:result?) await_block = { switches: Hash[guards.map{|guard| [guard.id, false]}], block: block, guards: guards } guards.each do |guard| Fibril.guards[guard.id] << await_block end else guard = guards.first guard.kind_of?(Future) ? await_future(guard) : await_fibril(guards) end end
await_all(*futures)
click to toggle source
# File lib/fibril/core.rb, line 171 def await_all(*futures) futures.map(&:await) end
await_fibril(guards)
click to toggle source
# File lib/fibril/core.rb, line 126 def await_fibril(guards) singular = guards.one? return singular ? guards[0].result : guards.map(&:result) if guards.all?(&:result?) await_block = { switches: Hash[guards.map{|guard| [guard.id, false]}], to_fulfill: Fibril.current } guards.each do |guard| Fibril.guards[guard.id] << await_block end self.yield return singular ? await_block[:result][0] : Fibril.sort_results(await_block[:result], guards) end
await_future(future)
click to toggle source
# File lib/fibril/core.rb, line 166 def await_future(future) tick while future.alive? future.await end
current()
click to toggle source
# File lib/fibril/core.rb, line 105 def current self end
enqueue()
click to toggle source
# File lib/fibril/core.rb, line 90 def enqueue Fibril.enqueue(self) end
execute()
click to toggle source
# File lib/fibril/core.rb, line 63 def execute Fibril.task_count += 1 exception = nil result = begin execute_fibril rescue Exception => e exception = e end self.guards.each do |guard| guard.visit(result) end Fibril.task_count -= 1 Fibril.log "Ending #{id}" raise exception if exception end
guard()
click to toggle source
# File lib/fibril/core.rb, line 23 def guard Fibril.guard end
reset(guard)
click to toggle source
# File lib/fibril/core.rb, line 57 def reset(guard) copy = Fibril.new(&self.block) copy.guards << guard return copy end
resume()
click to toggle source
Calls superclass method
# File lib/fibril/core.rb, line 181 def resume Fibril.current = self Fibril.log "Resuming #{id}" super end
tick()
click to toggle source
# File lib/fibril/core.rb, line 79 def tick if Thread.current != Fibril.loop_thread Fibril.log "Current thread is #{Thread.current.object_id}" Fibril.log "Fibril thread is #{Fibril.loop_thread.object_id}" Fibril.log "WARN: Cannot tick inside async code outside of main loop thread. This will be a noop" elsif !Fibril.queue.empty? Fibril.enqueue self self.yield end end
variables()
click to toggle source
# File lib/fibril/core.rb, line 31 def variables Fibril.variables end
yield() { |self| ... }
click to toggle source
# File lib/fibril/core.rb, line 99 def yield Fibril.log "Yielding #{id}" yield(self) if block_given? Fiber.yield end