class Reacto::Operations::Merge

Public Class Methods

new(trackables, delay_error: false) click to toggle source
# File lib/reacto/operations/merge.rb, line 7
def initialize(trackables, delay_error: false)
  @trackables = trackables

  @close_notifications =
    Concurrent::AtomicFixnum.new(@trackables.size + 1)
  @lock = Mutex.new
  @delay_error = delay_error
end

Public Instance Methods

call(tracker) click to toggle source
# File lib/reacto/operations/merge.rb, line 16
def call(tracker)
  error = nil

  close = -> () do
    @lock.synchronize do
      if @close_notifications.decrement == 0
        error.nil? ? tracker.on_close : tracker.on_error(error)
      end
    end
  end

  err =
    if @delay_error
      -> (er) do
        @lock.synchronize do
          error = er
          tracker.on_error(error) if @close_notifications.decrement == 0
        end
      end
    else
      tracker.method(:on_error)
    end

  sub = Subscriptions::OperationSubscription.new(
    tracker, close: close, error: err
  )

  @trackables.each { |trackable| trackable.send(:do_track, sub) }
  sub
end