class Reacto::Operations::DependOn
Public Class Methods
new(trackable, key: :data, accumulator: nil)
click to toggle source
# File lib/reacto/operations/depend_on.rb, line 9 def initialize(trackable, key: :data, accumulator: nil) @key = key @trackable = if accumulator.nil? trackable.first else trackable.inject(NO_VALUE, &accumulator) end @lock = Mutex.new @close = false @error = nil end
Public Instance Methods
buffer()
click to toggle source
# File lib/reacto/operations/depend_on.rb, line 24 def buffer @buffer ||= [] end
call(tracker)
click to toggle source
# File lib/reacto/operations/depend_on.rb, line 57 def call(tracker) value = ->(v) do if @result.nil? @lock.synchronize do buffer << v check_ready_and_track(tracker) end else tracker.on_value( OpenStruct.new({ value: v }.merge(@key => @result)) ) end end error = ->(e) do if @result.nil? @lock.synchronize do @error = e check_ready_and_track(tracker) end else tracker.on_error(e) end end close = -> do if @result.nil? @lock.synchronize do @close = true check_ready_and_track(tracker) end else tracker.on_close end end Subscriptions::OperationSubscription.new( tracker, value: value, error: error, close: close ) end
check_ready_and_track(tracker)
click to toggle source
# File lib/reacto/operations/depend_on.rb, line 44 def check_ready_and_track(tracker) depend_value = ->(v) { @result = v } depend_error = ->(e) { flush(e) } depend_close = -> { flush } if @subscription.nil? @tracker = tracker @subscription = @trackable.on( value: depend_value, error: depend_error, close: depend_close ) end end
flush(error = nil)
click to toggle source
# File lib/reacto/operations/depend_on.rb, line 28 def flush(error = nil) unless error.nil? @tracker.on_error(error) return end buffer.each do |val| @tracker.on_value( OpenStruct.new({ value: val }.merge(@key => @result)) ) end @tracker.on_close if @close @tracker.on_error(@error) if @error end