class Reacto::Trackable
Constants
- EXECUTOR_ALIASES
- TOPICS
Public Class Methods
close(executor: nil)
click to toggle source
# File lib/reacto/trackable.rb, line 53 def close(executor: nil) make(executor) { |subscriber| subscriber.on_close } end
combine(*trackables, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 33 def combine(*trackables, &block) combine_create( Subscriptions::CombiningSubscription, *trackables, &block ) end
Also aliased as: combine_latest
combine_last(*trackables, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 43 def combine_last(*trackables, &block) combine_create( Subscriptions::CombiningLastSubscription, *trackables, &block ) end
concat(*trackables)
click to toggle source
# File lib/reacto/trackable.rb, line 39 def concat(*trackables) trackables.inject { |current, trackable| current.concat(trackable) } end
enumerable(enumerable, executor: nil)
click to toggle source
# File lib/reacto/trackable.rb, line 162 def enumerable(enumerable, executor: nil) make(executor, &Behaviours.enumerable(enumerable)) end
error(err, executor: nil)
click to toggle source
# File lib/reacto/trackable.rb, line 57 def error(err, executor: nil) make(executor) do |subscriber| subscriber.on_error(err) end end
interval( interval, enumerator = Behaviours.integers_enumerator, executor: nil )
click to toggle source
# File lib/reacto/trackable.rb, line 88 def interval( interval, enumerator = Behaviours.integers_enumerator, executor: nil ) stored = EXECUTOR_ALIASES[executor] executor = stored if stored if executor.is_a?(Concurrent::ImmediateExecutor) make do |tracker| Behaviours.with_close_and_error(tracker) do |subscriber| while subscriber.subscribed? sleep interval if subscriber.subscribed? if subscriber.subscribed? begin subscriber.on_value(enumerator.next) rescue StopIteration break end else break end end end end else make do |tracker| Thread.abort_on_exception = true queue = Queue.new task = Concurrent::TimerTask.new(execution_interval: interval) do queue.push('ready') end thread = Thread.new do begin loop do queue.pop break unless tracker.subscribed? begin value = enumerator.next tracker.on_value(value) rescue StopIteration tracker.on_close if tracker.subscribed? break rescue StandardError => error tracker.on_error(error) if tracker.subscribed? break end end ensure task.shutdown end end task.execute tracker.add_resource(Reacto::Resources::ExecutorResource.new( task, threads: [thread] )) end end end
later(secs, value, executor: Reacto::Executors.tasks)
click to toggle source
# File lib/reacto/trackable.rb, line 70 def later(secs, value, executor: Reacto::Executors.tasks) stored = EXECUTOR_ALIASES[executor] executor = stored if stored if executor.is_a?(Concurrent::ImmediateExecutor) make do |tracker| sleep secs Behaviours.single_tracker_value(tracker, value) end else make do |tracker| Concurrent::ScheduledTask.execute(secs, executor: executor) do Behaviours.single_tracker_value(tracker, value) end end end end
make(executor_param = nil, executor: nil, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 63 def make(executor_param = nil, executor: nil, &block) real_executor = executor_param ? executor_param : executor behaviour = block_given? ? block : NO_ACTION self.new(real_executor, &behaviour) end
never()
click to toggle source
# File lib/reacto/trackable.rb, line 29 def never self.new end
new(executor = nil, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 180 def initialize(executor = nil, &block) @behaviour = block_given? ? block : NO_ACTION stored = EXECUTOR_ALIASES[executor] executor = stored if stored @executor = executor end
repeat(array, int: 0.1, executor: nil)
click to toggle source
# File lib/reacto/trackable.rb, line 152 def repeat(array, int: 0.1, executor: nil) interval( int, Behaviours.array_repeat_enumerator(array), executor: executor ) end
value(value, executor: nil)
click to toggle source
# File lib/reacto/trackable.rb, line 158 def value(value, executor: nil) make(executor, &Behaviours.single_value(value)) end
zip(*trackables, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 49 def zip(*trackables, &block) combine_create(Subscriptions::ZippingSubscription, *trackables, &block) end
Private Class Methods
combine_create(type, *trackables, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 170 def combine_create(type, *trackables, &block) make do |subscriber| main = type.new(block, subscriber) trackables.each do |trackable| trackable.do_track main.subscription! end end end
Public Instance Methods
[](x)
click to toggle source
# File lib/reacto/trackable.rb, line 521 def [](x) lift(Operations::Drop.new(x, 1)) end
act(on: Operations::Act::ALL, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 579 def act(on: Operations::Act::ALL, &block) lift(Operations::Act.new(block, on)) end
all?(&block)
click to toggle source
# File lib/reacto/trackable.rb, line 188 def all?(&block) lift(Operations::BlockingEnumerable.new(:'all?', block)) end
any?(&block)
click to toggle source
# File lib/reacto/trackable.rb, line 192 def any?(&block) lift(Operations::BlockingEnumerable.new(:'any?', block)) end
append(to_append, condition: nil)
click to toggle source
# File lib/reacto/trackable.rb, line 533 def append(to_append, condition: nil) lift(Operations::Append.new(to_append, condition: condition)) end
await(subscription, timeout = nil)
click to toggle source
# File lib/reacto/trackable.rb, line 655 def await(subscription, timeout = nil) return unless subscription.subscribed? latch = Concurrent::CountDownLatch.new(1) subscription.add(Subscriptions.on_close_and_error { latch.count_down }) latch.wait(timeout) rescue Exception => e raise e unless e.message.include?('No live threads left') end
buffer(count: nil, delay: nil)
click to toggle source
# File lib/reacto/trackable.rb, line 545 def buffer(count: nil, delay: nil) lift(Operations::Buffer.new(count: count, delay: delay)) end
chunk(executor: nil, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 223 def chunk(executor: nil, &block) return self unless block_given? executor = retrieve_executor(executor) executor = @executor if executor.nil? lift(Operations::Chunk.new(block, executor: executor)) end
chunk_while(executor: nil, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 232 def chunk_while(executor: nil, &block) executor = retrieve_executor(executor) executor = @executor if executor.nil? lift(Operations::ChunkWhile.new(block, executor: executor)) end
combine(*trackables, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 609 def combine(*trackables, &block) return self unless block_given? self.class.combine(*([self] + trackables), &block) end
Also aliased as: combine_latest
combine_last(*trackables, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 603 def combine_last(*trackables, &block) return self unless block_given? self.class.combine_last(*([self] + trackables), &block) end
concat(trackable)
click to toggle source
# File lib/reacto/trackable.rb, line 537 def concat(trackable) lift(Operations::Concat.new(trackable)) end
count(value = NO_VALUE, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 263 def count(value = NO_VALUE, &block) source = if value != NO_VALUE select(&Behaviours.same_predicate(value)) elsif block_given? select(&block) else self end source.map(1).inject(&:+).last end
cycle(n = nil)
click to toggle source
# File lib/reacto/trackable.rb, line 239 def cycle(n = nil) lift(Operations::Cycle.new(@behaviour, n)) end
delay(delay)
click to toggle source
# File lib/reacto/trackable.rb, line 549 def delay(delay) buffer(delay: delay) end
delay_each(delay)
click to toggle source
# File lib/reacto/trackable.rb, line 553 def delay_each(delay) lift(Operations::DelayEach.new(delay)) end
depend_on(trackable, key: :data, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 561 def depend_on(trackable, key: :data, &block) lift(Operations::DependOn.new( trackable, key: key, accumulator: block )) end
diff(initial = NO_VALUE, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 478 def diff(initial = NO_VALUE, &block) lift(Operations::Diff.new( block_given? ? block : Operations::Diff::DEFAULT_FN, initial )) end
do_track(subscription)
click to toggle source
# File lib/reacto/trackable.rb, line 679 def do_track(subscription) if @executor @executor.post(subscription, &@behaviour) else @behaviour.call(subscription) end end
drop(how_many_to_drop)
click to toggle source
# File lib/reacto/trackable.rb, line 484 def drop(how_many_to_drop) lift(Operations::Drop.new(how_many_to_drop)) end
Also aliased as: skip
drop_errors()
click to toggle source
# File lib/reacto/trackable.rb, line 493 def drop_errors lift(Operations::DropErrors.new) end
Also aliased as: skip_errors
drop_while(&block)
click to toggle source
# File lib/reacto/trackable.rb, line 488 def drop_while(&block) predicate = block_given? ? block : FALSE_PREDICATE lift(Operations::DropWhile.new(predicate)) end
each_cons(n, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 276 def each_cons(n, &block) raise ArgumentError.new('invalid size') if n <= 0 return each(&block) if n == 1 reset_action = -> (current) { current[1..-1] } trackable = lift(Operations::EachCollect.new( n, reset_action: reset_action, on_error: NO_ACTION, on_close: NO_ACTION )) block_given? ? trackable.on(&block) : trackable end
each_slice(n, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 288 def each_slice(n, &block) raise ArgumentError.new('invalid size') if n <= 0 trackable = lift(Operations::EachCollect.new(n)) block_given? ? trackable.on(&block) : trackable end
each_with_index(&block)
click to toggle source
# File lib/reacto/trackable.rb, line 296 def each_with_index(&block) index = 0 collect_action = -> (val, collection) do collection << val collection << index index += 1 end trackable = lift(Operations::EachCollect.new( 2, collect_action: collect_action, init_action: -> () { index = 0 }, on_error: NO_ACTION, on_close: NO_ACTION )) block_given? ? trackable.on(&block) : trackable end
each_with_object(obj, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 474 def each_with_object(obj, &block) lift(Operations::EachWithObject.new(block, obj)) end
entries(n = nil)
click to toggle source
# File lib/reacto/trackable.rb, line 313 def entries(n = nil) return [] if n && n.is_a?(Integer) && n <= 0 trackable = self trackable = trackable.take(n) if n && n.is_a?(Integer) && n > 0 result = [] subscription = trackable.on(value: ->(v) { result << v }) trackable.await(subscription) result end
execute_on(executor)
click to toggle source
# File lib/reacto/trackable.rb, line 648 def execute_on(executor) stored = EXECUTOR_ALIASES[executor] executor = stored if stored self.class.new(executor, &@behaviour) end
find(if_none = NO_VALUE, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 243 def find(if_none = NO_VALUE, &block) trackable = select(&block).first if if_none != NO_VALUE trackable = trackable.append(if_none, condition: :source_empty) end trackable end
Also aliased as: detect
find_index(value = NO_VALUE, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 253 def find_index(value = NO_VALUE, &block) predicate = if value != NO_VALUE -> (v) { value == v } else block end lift(Operations::FindIndex.new(predicate)) end
first(n = 1)
click to toggle source
# File lib/reacto/trackable.rb, line 515 def first(n = 1) raise ArgumentError.new('negative array size') if n < 0 take(n) end
flat_map(transform = nil, label: nil, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 373 def flat_map(transform = nil, label: nil, &block) if label lift(Operations::OperationOnLabeled.new( label, block_given? ? block : transform, op: :flat_map )) else lift(Operations::FlatMap.new(block_given? ? block : transform)) end end
Also aliased as: collect_concat
flat_map_latest(transform = nil, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 383 def flat_map_latest(transform = nil, &block) lift(Operations::FlatMapLatest.new(block_given? ? block : transform)) end
flatten()
click to toggle source
# File lib/reacto/trackable.rb, line 511 def flatten lift(Operations::Flatten.new) end
flatten_labeled(initial: NO_VALUE, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 571 def flatten_labeled(initial: NO_VALUE, &block) lift(Operations::FlattenLabeled.new(block, initial)) end
grep(pattern, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 427 def grep(pattern, &block) select_map(-> (v) { pattern === v }, &block) end
grep_v(pattern, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 431 def grep_v(pattern, &block) select_map(-> (v) { !(pattern === v) }, &block) end
group_by_label(executor: nil, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 567 def group_by_label(executor: nil, &block) lift(Operations::GroupByLabel.new(block, executor)) end
Also aliased as: group_by
include?(obj)
click to toggle source
# File lib/reacto/trackable.rb, line 435 def include?(obj) lift(Operations::Include.new(obj)) end
inject(initial = NO_VALUE, label: nil, initial_value: NO_VALUE, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 461 def inject(initial = NO_VALUE, label: nil, initial_value: NO_VALUE, &block) return self unless block_given? init = initial != NO_VALUE ? initial : initial_value if label lift(Operations::OperationOnLabeled.new( label, block, op: :inject, initial_value: init )) else lift(Operations::Inject.new(block, init)) end end
Also aliased as: reduce
last()
click to toggle source
# File lib/reacto/trackable.rb, line 525 def last lift(Operations::Last.new) end
lazy()
click to toggle source
# File lib/reacto/trackable.rb, line 439 def lazy self # Just to comply with Enumerable end
lift(operation = nil, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 361 def lift(operation = nil, &block) operation = block_given? ? block : operation create_lifted do |tracker_subscription| begin modified = operation.call(tracker_subscription) lift_behaviour(modified) unless modified == NOTHING rescue Exception => e tracker_subscription.on_error(e) end end end
map(val = NO_VALUE, error: nil, close: nil, label: nil, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 387 def map(val = NO_VALUE, error: nil, close: nil, label: nil, &block) action = if block_given? block else val == NO_VALUE ? IDENTITY_ACTION : Behaviours.constant(val) end if label lift(Operations::OperationOnLabeled.new( label, action, error: error, close: close )) else lift(Operations::Map.new(action, error: error, close: close)) end end
Also aliased as: collect
max(&block)
click to toggle source
# File lib/reacto/trackable.rb, line 403 def max(&block) lift(Operations::Extremums.new(action: block)) end
max_by(&block)
click to toggle source
# File lib/reacto/trackable.rb, line 407 def max_by(&block) lift(Operations::Extremums.new(by: block)) end
merge(*trackables, delay_error: false)
click to toggle source
# File lib/reacto/trackable.rb, line 541 def merge(*trackables, delay_error: false) lift(Operations::Merge.new(trackables, delay_error: delay_error)) end
min(&block)
click to toggle source
# File lib/reacto/trackable.rb, line 411 def min(&block) lift(Operations::Extremums.new(action: block, type: :min)) end
min_by(&block)
click to toggle source
# File lib/reacto/trackable.rb, line 415 def min_by(&block) lift(Operations::Extremums.new(by: block, type: :min)) end
minmax(&block)
click to toggle source
# File lib/reacto/trackable.rb, line 419 def minmax(&block) lift(Operations::Extremums.new(action: block, type: :minmax)) end
minmax_by(&block)
click to toggle source
# File lib/reacto/trackable.rb, line 423 def minmax_by(&block) lift(Operations::Extremums.new(by: block, type: :minmax)) end
none?(&block)
click to toggle source
# File lib/reacto/trackable.rb, line 196 def none?(&block) lift(Operations::BlockingEnumerable.new(:'none?', block)) end
off(notification_tracker = nil)
click to toggle source
# File lib/reacto/trackable.rb, line 346 def off(notification_tracker = nil) # Clean-up logic end
on(trackers = {}, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 335 def on(trackers = {}, &block) trackers[:value] = block if block_given? unless (trackers.keys - TOPICS).empty? raise "This Trackable supports only #{TOPICS}, " \ "but #{trackers.keys} were passed." end track(Tracker.new(trackers)) end
Also aliased as: each, each_entry
one?(&block)
click to toggle source
# File lib/reacto/trackable.rb, line 200 def one?(&block) lift(Operations::BlockingEnumerable.new(:'one?', block)) end
partition(executor: nil, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 214 def partition(executor: nil, &block) return self unless block_given? executor = retrieve_executor(executor) executor = @executor if executor.nil? lift(Operations::Partition.new(block, executor: executor)) end
prepend(enumerable)
click to toggle source
# File lib/reacto/trackable.rb, line 529 def prepend(enumerable) lift(Operations::Prepend.new(enumerable)) end
reject(&block)
click to toggle source
# File lib/reacto/trackable.rb, line 457 def reject(&block) select(&->(val) { !block.call(val)} ) end
rescue_and_replace_error(&block)
click to toggle source
# File lib/reacto/trackable.rb, line 593 def rescue_and_replace_error(&block) return self unless block_given? lift(Operations::RescueAndReplaceError.new(block)) end
rescue_and_replace_error_with(trackable)
click to toggle source
# File lib/reacto/trackable.rb, line 599 def rescue_and_replace_error_with(trackable) rescue_and_replace_error { |_error| trackable } end
retry(retries = 1)
click to toggle source
# File lib/reacto/trackable.rb, line 583 def retry(retries = 1) lift(Operations::Retry.new(@behaviour, retries)) end
retry_when(&block)
click to toggle source
# File lib/reacto/trackable.rb, line 587 def retry_when(&block) return self unless block_given? lift(Operations::RetryWhen.new(@behaviour, block)) end
select(label: nil, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 447 def select(label: nil, &block) return self unless block_given? if label lift(Operations::OperationOnLabeled.new(label, block, op: :select)) else lift(Operations::Select.new(block)) end end
Also aliased as: find_all
slice(pattern = NO_ACTION, type:, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 623 def slice(pattern = NO_ACTION, type:, &block) predicate = if pattern != NO_VALUE -> (val) { pattern === val } else block end lift(Operations::Slice.new(predicate, type: type)) end
slice_after(pattern = NO_VALUE, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 615 def slice_after(pattern = NO_VALUE, &block) slice(pattern, type: :after, &block) end
slice_before(pattern = NO_VALUE, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 619 def slice_before(pattern = NO_VALUE, &block) slice(pattern, type: :before, &block) end
slice_when(&block)
click to toggle source
# File lib/reacto/trackable.rb, line 633 def slice_when(&block) lift(Operations::SliceWhen.new(block)) end
sort(&block)
click to toggle source
# File lib/reacto/trackable.rb, line 204 def sort(&block) lift(Operations::BlockingEnumerable.new(:sort, block)) end
sort_by(&block)
click to toggle source
# File lib/reacto/trackable.rb, line 208 def sort_by(&block) return self unless block_given? lift(Operations::BlockingEnumerable.new(:sort_by, block)) end
split_labeled(label, executor: nil, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 575 def split_labeled(label, executor: nil, &block) lift(Operations::SplitLabeled.new(label, block, executor)) end
take(how_many_to_take)
click to toggle source
# File lib/reacto/trackable.rb, line 497 def take(how_many_to_take) lift(Operations::Take.new(how_many_to_take)) end
take_while(&block)
click to toggle source
# File lib/reacto/trackable.rb, line 501 def take_while(&block) return self unless block_given? lift(Operations::TakeWhile.new(block)) end
throttle(delay)
click to toggle source
# File lib/reacto/trackable.rb, line 557 def throttle(delay) lift(Operations::Throttle.new(delay)) end
to_a()
click to toggle source
# File lib/reacto/trackable.rb, line 327 def to_a entries end
to_h()
click to toggle source
# File lib/reacto/trackable.rb, line 331 def to_h to_a.to_h end
track(notification_tracker, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 350 def track(notification_tracker, &block) return on(&block) if block_given? subscription = Subscriptions::TrackerSubscription.new(notification_tracker, self) do_track(subscription) Subscriptions::SubscriptionWrapper.new(subscription) end
track_on(executor)
click to toggle source
# File lib/reacto/trackable.rb, line 641 def track_on(executor) stored = EXECUTOR_ALIASES[executor] executor = stored if stored lift(Operations::TrackOn.new(executor)) end
uniq()
click to toggle source
# File lib/reacto/trackable.rb, line 507 def uniq lift(Operations::Uniq.new) end
wrap(**args)
click to toggle source
# File lib/reacto/trackable.rb, line 443 def wrap(**args) lift(Operations::Wrap.new(args)) end
zip(*trackables, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 637 def zip(*trackables, &block) self.class.zip(*([self] + trackables), &block) end
Protected Instance Methods
create_lifted(&block)
click to toggle source
# File lib/reacto/trackable.rb, line 689 def create_lifted(&block) self.class.new(@executor, &block) end
Private Instance Methods
lift_behaviour(lifted_tracker_subscription)
click to toggle source
# File lib/reacto/trackable.rb, line 709 def lift_behaviour(lifted_tracker_subscription) begin lifted_tracker_subscription.on_open @behaviour.call(lifted_tracker_subscription) rescue Exception => e lifted_tracker_subscription.on_error(e) end end
retrieve_executor(executor)
click to toggle source
# File lib/reacto/trackable.rb, line 702 def retrieve_executor(executor) return nil if executor.nil? stored = EXECUTOR_ALIASES[executor] stored ? stored : executor end
select_map(predicate, &block)
click to toggle source
# File lib/reacto/trackable.rb, line 695 def select_map(predicate, &block) result = select(&predicate) result = result.map(&block) if block_given? result end