class Andromeda::Sync::Sync

Comparable to a join in join calculus, called Sync here to reserve the name Join for map_reduce.rb

Public Class Methods

new(config = {}) click to toggle source
Calls superclass method Andromeda::Plan::new
# File lib/andromeda/sync.rb, line 10
def initialize(config = {})
  super config
  @mutex = Mutex.new
  @cv    = ConditionVariable.new
  #  box value to keep ref after clone
  @state = [ state_init ]
end

Protected Instance Methods

run_chunk(pool, scope, name, meth, k, chunk, &thunk) click to toggle source
Calls superclass method
# File lib/andromeda/sync.rb, line 31
def run_chunk(pool, scope, name, meth, k, chunk, &thunk)
  @mutex.synchronize do
    state = @state[0]
    while true
      if state_empty?(state, k, chunk)
        @state[0] = (state = state_updated(state, k, chunk))
        if state_ready?(state)
          @state[0] = state_init
        cv.signal
        new_k = state_chunk_key(name, state)
          return super pool, scope, name, meth, new_k, state, &thunk
        else
          cv.signal
          return self
        end
      else
        cv.wait @mutex
      end
    end
  end
end
state_chunk_key(name, state) click to toggle source
# File lib/andromeda/sync.rb, line 29
def state_chunk_key(name, state) ; chunk_key(name, state) end
state_empty?(state, k, chunk) click to toggle source
# File lib/andromeda/sync.rb, line 26
def state_empty?(state, k, chunk)  ; state[k].nil? end
state_init() click to toggle source
# File lib/andromeda/sync.rb, line 20
def state_init ; {} end
state_ready?(state) click to toggle source
# File lib/andromeda/sync.rb, line 22
def state_ready?(state)
  raise RuntimeException, 'Not implemented'
end
state_updated(state, k, chunk) click to toggle source
# File lib/andromeda/sync.rb, line 27
def state_updated(state, k, chunk) ; state[k] = chunk; state end