class Andromeda::Sync::Sync
Comparable to a join in join calculus, called Sync
here to reserve the name Join for map_reduce.rb
Public Class Methods
new(config = {})
click to toggle source
Calls superclass method
Andromeda::Plan::new
# File lib/andromeda/sync.rb, line 10 def initialize(config = {}) super config @mutex = Mutex.new @cv = ConditionVariable.new # box value to keep ref after clone @state = [ state_init ] end
Protected Instance Methods
run_chunk(pool, scope, name, meth, k, chunk, &thunk)
click to toggle source
Calls superclass method
# File lib/andromeda/sync.rb, line 31 def run_chunk(pool, scope, name, meth, k, chunk, &thunk) @mutex.synchronize do state = @state[0] while true if state_empty?(state, k, chunk) @state[0] = (state = state_updated(state, k, chunk)) if state_ready?(state) @state[0] = state_init cv.signal new_k = state_chunk_key(name, state) return super pool, scope, name, meth, new_k, state, &thunk else cv.signal return self end else cv.wait @mutex end end end end
state_chunk_key(name, state)
click to toggle source
# File lib/andromeda/sync.rb, line 29 def state_chunk_key(name, state) ; chunk_key(name, state) end
state_empty?(state, k, chunk)
click to toggle source
# File lib/andromeda/sync.rb, line 26 def state_empty?(state, k, chunk) ; state[k].nil? end
state_init()
click to toggle source
# File lib/andromeda/sync.rb, line 20 def state_init ; {} end
state_ready?(state)
click to toggle source
# File lib/andromeda/sync.rb, line 22 def state_ready?(state) raise RuntimeException, 'Not implemented' end
state_updated(state, k, chunk)
click to toggle source
# File lib/andromeda/sync.rb, line 27 def state_updated(state, k, chunk) ; state[k] = chunk; state end