module Rx::Observable
Time based operations
Public Class Methods
Propagates the observable sequence that reacts first.
# File lib/rx/operators/multiple.rb, line 426 def amb(*args) args.reduce(Observable.never) {|previous, current| previous.amb current } end
# File lib/rx/linq/observable/case.rb, line 3 def case(selector, sources, defaultSourceOrScheduler = Observable.empty) defer { if Scheduler === defaultSourceOrScheduler defaultSourceOrScheduler = Observable.empty(defaultSourceOrScheduler) end result = sources[selector.call] result || defaultSourceOrScheduler } end
Merges the specified observable sequences into one observable sequence by using the selector function whenever any of the observable sequences produces an element.
# File lib/rx/operators/multiple.rb, line 494 def combine_latest(*args, &result_selector) AnonymousObservable.new do |observer| result_selector ||= lambda {|*inner_args| inner_args } n = args.length has_value = Array.new(n, false) has_value_all = false values = Array.new(n) is_done = Array.new(n, false) next_item = lambda do |i| has_value[i] = true if has_value_all || (has_value_all = has_value.all?) res = nil begin res = result_selector.call(*values) rescue => e observer.on_error e break end observer.on_next(res) elsif enumerable_select_with_index(is_done) {|_, j| j != i} .all? observer.on_completed break end end done = lambda do |i| is_done[i] = true observer.on_completed if is_done.all? end gate = Monitor.new subscriptions = Array.new(n) do |i| sas = SingleAssignmentSubscription.new sas_obs = Observer.configure do |o| o.on_next do |x| values[i] = x next_item.call i end o.on_error(&observer.method(:on_error)) o.on_completed { done.call i } end sas.subscription = args[i].synchronize(gate).subscribe(sas_obs) sas end CompositeSubscription.new subscriptions end end
Concatenates all of the specified observable sequences, as long as the previous observable sequence terminated successfully.
# File lib/rx/operators/multiple.rb, line 553 def concat(*args) AnonymousObservable.new do |observer| disposed = false e = args.length == 1 && args[0].is_a?(Enumerator) ? args[0] : args.to_enum subscription = SerialSubscription.new gate = AsyncLock.new cancelable = CurrentThreadScheduler.instance.schedule_recursive lambda {|this| gate.wait do current = nil has_next = false err = nil if disposed break else begin current = e.next has_next = true rescue StopIteration => _ # Do nothing rescue => e err = e end end if err observer.on_error err break end unless has_next observer.on_completed break end d = SingleAssignmentSubscription.new subscription.subscription = d new_obs = Observer.configure do |o| o.on_next(&observer.method(:on_next)) o.on_error(&observer.method(:on_error)) o.on_completed { this.call } end current.subscribe new_obs end } CompositeSubscription.new [subscription, cancelable, Subscription.create { gate.wait { disposed = true }}] end end
Creates an observable sequence from a specified subscribe method implementation.
# File lib/rx/operators/creation.rb, line 17 def create(&subscribe) AnonymousObservable.new do |observer| subscription = subscribe.call(observer) case subscription when Subscription subscription when Proc Subscription.create(&subscription) else Subscription.empty end end end
Returns an observable sequence that invokes the specified factory function whenever a new observer subscribes.
# File lib/rx/operators/creation.rb, line 32 def defer AnonymousObservable.new do |observer| result = nil e = nil begin result = yield rescue => err e = Observable.raise_error(err).subscribe(observer) end e || result.subscribe(observer) end end
Returns an empty observable sequence, using the specified scheduler to send out the single OnCompleted message.
# File lib/rx/operators/creation.rb, line 47 def empty(scheduler = ImmediateScheduler.instance) AnonymousObservable.new do |observer| scheduler.schedule lambda { observer.on_completed } end end
# File lib/rx/linq/observable/for.rb, line 3 def for(sources, result_selector = nil) result_selector ||= lambda {|*args| args} enum = Enumerator.new {|y| sources.each {|v| y << result_selector.call(v) } } Observable.concat(enum) end
# File lib/rx/linq/observable/fork_join.rb, line 3 def fork_join(*all_sources) AnonymousObservable.new {|subscriber| count = all_sources.length if count == 0 subscriber.on_completed Subscription.empty end group = CompositeSubscription.new finished = false has_results = Array.new(count) has_completed = Array.new(count) results = Array.new(count) count.times {|i| source = all_sources[i] group.push( source.subscribe( lambda {|value| if !finished has_results[i] = true results[i] = value end }, lambda {|e| finished = true subscriber.on_error e group.dispose }, lambda { if !finished if !has_results[i] subscriber.on_completed return end has_completed[i] = true count.times {|ix| if !has_completed[ix] return end } finished = true subscriber.on_next results subscriber.on_completed end } ) ) } group } end
# File lib/rx/linq/observable/from.rb, line 3 def from(iterable, map_fn = nil, scheduler = CurrentThreadScheduler.instance) it = iterable.to_enum AnonymousObservable.new {|observer| i = 0 scheduler.schedule_recursive lambda {|this| begin result = it.next rescue StopIteration => e observer.on_completed return rescue => e observer.on_error e return end if Proc === map_fn begin result = map_fn.call(result, i) rescue => e observer.on_error e return end end observer.on_next result i += 1 this.call } } end
# File lib/rx/operators/creation.rb, line 204 def from_array(array, scheduler = CurrentThreadScheduler.instance) AnonymousObservable.new do |observer| scheduler.schedule_recursive_with_state 0, lambda {|i, this| if i < array.size observer.on_next array[i] this.call(i + 1) else observer.on_completed end } end end
Generates an observable sequence by running a state-driven loop producing the sequence’s elements.
# File lib/rx/operators/creation.rb, line 56 def generate(initial_state, condition, iterate, result_selector, scheduler = CurrentThreadScheduler.instance) AnonymousObservable.new do |observer| state = initial_state first = true scheduler.schedule_recursive lambda{|this| has_result = false result = nil begin if first first = false else state = iterate.call(state) end has_result = condition.call(state) if has_result result = result_selector.call state end rescue => err observer.on_error err break end if has_result observer.on_next result this.call else observer.on_completed end } end end
# File lib/rx/linq/observable/if.rb, line 3 def if(condition, then_source, else_source_or_scheduler = nil) case else_source_or_scheduler when Scheduler scheduler = else_source_or_scheduler else_source = Observable.empty(scheduler) when Observable else_source = else_source_or_scheduler when nil else_source = Observable.empty end return condition.call ? then_source : else_source end
# File lib/rx/linq/observable/interval.rb, line 2 def self.interval(period, scheduler = Rx::DefaultScheduler.instance) observable_timer_time_span_and_period(period, period, scheduler) end
Returns an observable sequence that contains a single element.
# File lib/rx/operators/creation.rb, line 99 def just(value, scheduler = ImmediateScheduler.instance) AnonymousObservable.new do |observer| scheduler.schedule lambda { observer.on_next value observer.on_completed } end end
Merges elements from all of the specified observable sequences into a single observable sequence, using the specified scheduler for enumeration of and subscription to the sources.
# File lib/rx/operators/multiple.rb, line 612 def merge_all(*args) scheduler = CurrentThreadScheduler.instance if args.size > 0 && Scheduler === args[0] scheduler = args.shift end Observable.from_array(args, scheduler).merge_all end
Merges elements from all observable sequences in the given enumerable sequence into a single observable sequence, limiting the number of concurrent subscriptions to inner sequences, and using the specified scheduler for enumeration of and subscription to the sources.
# File lib/rx/operators/multiple.rb, line 607 def merge_concurrent(max_concurrent, scheduler = CurrentThreadScheduler.instance, *args) Observable.from_array(args, scheduler).merge_concurrent(max_concurrent) end
Returns a non-terminating observable sequence, which can be used to denote an infinite duration (e.g. when using reactive joins).
# File lib/rx/operators/creation.rb, line 92 def never AnonymousObservable.new do |_| end end
# File lib/rx/linq/observable/of.rb, line 3 def of(*args) scheduler = CurrentThreadScheduler.instance if args.size > 0 && Scheduler === args[0] scheduler = args.shift end of_array(args, scheduler) end
Converts an array to an observable sequence, using an optional scheduler to enumerate the array.
# File lib/rx/operators/creation.rb, line 110 def of_array(array, scheduler = CurrentThreadScheduler.instance) AnonymousObservable.new do |observer| count = 0 scheduler.schedule_recursive lambda {|this| if count < array.length observer.on_next array[count] count += 1 this.call else observer.on_completed end } end end
Converts an Enumerable
to an observable sequence, using an optional scheduler to enumerate the array.
# File lib/rx/operators/creation.rb, line 126 def of_enumerable(enumerable, scheduler = CurrentThreadScheduler.instance) Observable.of_enumerator(enumerable.to_enum, scheduler) end
Converts an Enumerator to an observable sequence, using an optional scheduler to enumerate the array.
# File lib/rx/operators/creation.rb, line 131 def of_enumerator(enum, scheduler = CurrentThreadScheduler.instance) AnonymousObservable.new do |observer| scheduler.schedule_recursive lambda {|this| has_value = false value = nil begin value = enum.next has_value = true rescue StopIteration => _ observer.on_completed rescue => e observer.on_error e end if has_value observer.on_next value this.call end } end end
Concatenates all of the specified observable sequences, even if the previous observable sequence terminated exceptionally.
# File lib/rx/operators/multiple.rb, line 622 def on_error_resume_next(*args) AnonymousObservable.new do |observer| gate = AsyncLock.new disposed = false e = args.length == 1 && args[0].is_a?(Enumerator) ? args[0] : args.to_enum subscription = SerialSubscription.new cancelable = CurrentThreadScheduler.instance.schedule_recursive lambda {|this| gate.wait do current = nil has_next = false err = nil if !disposed begin current = e.next has_next = true rescue StopIteration => _ # Do nothing rescue => e err = e end else break end if err observer.on_error err break end unless has_next observer.on_completed break end d = SingleAssignmentSubscription.new subscription.subscription = d new_obs = Observer.configure do |o| o.on_next(&observer.method(:on_next)) o.on_error {|_| this.call } o.on_completed { this.call } end d.subscription = current.subscribe new_obs end } CompositeSubscription.new [subscription, cancelable, Subscription.create { gate.wait { disposed = true } }] end end
# File lib/rx/linq/observable/pairs.rb, line 3 def pairs(obj, scheduler = CurrentThreadScheduler.instance) of_enumerable(obj, scheduler) end
Returns an observable sequence that terminates with an exception.
# File lib/rx/operators/creation.rb, line 155 def raise_error(error, scheduler = ImmediateScheduler.instance) AnonymousObservable.new do |observer| scheduler.schedule lambda { observer.on_error error } end end
Generates an observable sequence of integral numbers within a specified range.
# File lib/rx/operators/creation.rb, line 164 def range(start, count, scheduler = CurrentThreadScheduler.instance) AnonymousObservable.new do |observer| scheduler.schedule_recursive_with_state 0, lambda {|i, this| if i < count observer.on_next (start + i) this.call(i + 1) else observer.on_completed end } end end
Generates an observable sequence that repeats the given element the specified number of times.
# File lib/rx/operators/creation.rb, line 183 def repeat(value, count, scheduler = CurrentThreadScheduler.instance) Observable.just(value, scheduler).repeat(count) end
Generates an observable sequence that repeats the given element infinitely.
# File lib/rx/operators/creation.rb, line 178 def repeat_infinitely(value, scheduler = CurrentThreadScheduler.instance) Observable.just(value, scheduler).repeat_infinitely end
Continues an observable sequence that is terminated by an exception with the next observable sequence.
# File lib/rx/operators/multiple.rb, line 431 def rescue_error(*args) AnonymousObservable.new do |observer| gate = AsyncLock.new disposed = false e = args.length == 1 && args[0].is_a?(Enumerator) ? args[0] : args.to_enum subscription = SerialSubscription.new last_error = nil cancelable = CurrentThreadScheduler.instance.schedule_recursive lambda {|this| gate.wait do current = nil has_next = false error = nil if disposed break else begin current = e.next has_next = true rescue StopIteration => _ # Do nothing rescue => e error = e end end if error observer.on_error error break end unless has_next if last_error observer.on_error last_error else observer.on_completed end break end new_obs = Observer.configure do |o| o.on_next(&observer.method(:on_next)) o.on_error do |err| last_error = err this.call end o.on_completed(&observer.method(:on_completed)) end d = SingleAssignmentSubscription.new subscription.subscription = d d.subscription = current.subscribe new_obs end } CompositeSubscription.new [subscription, cancelable, Subscription.create { gate.wait { disposed = true } }] end end
# File lib/rx/linq/observable/start.rb, line 3 def start(func, context, scheduler = DefaultScheduler.instance) Observable.to_async(func, context, scheduler).call end
# File lib/rx/linq/observable/timer.rb, line 3 def timer(due_time, period_or_scheduler = DefaultScheduler.instance, scheduler = DefaultScheduler.instance) case period_or_scheduler when Numeric period = period_or_scheduler when Scheduler scheduler = period_or_scheduler end if Time === due_time if period.nil? observable_timer_date(due_time, scheduler) else observable_timer_date_and_period(due_time, period, scheduler) end else if period.nil? observable_timer_time_span(due_time, scheduler) else observable_timer_time_span_and_period(due_time, period, scheduler) end end end
# File lib/rx/linq/observable/to_async.rb, line 3 def to_async(func, context = nil, scheduler = DefaultScheduler.instance) lambda() {|*args| subject = AsyncSubject.new scheduler.schedule lambda { begin if context result = proc_bind(func, context).call(*args) else result = func.call(*args) end rescue => e subject.on_error e return end subject.on_next result subject.on_completed } return subject.as_observable } end
Constructs an observable sequence that depends on a resource object, whose lifetime is tied to the resulting observable sequence’s lifetime.
# File lib/rx/operators/creation.rb, line 188 def using(resource_factory, observable_factory) AnonymousObservable.new do |observer| source = nil subscription = Subscription.empty begin resource = resource_factory.call subscription = resource unless resource.nil? source = observable_factory.call resource rescue => e next CompositeSubscription.new [self.raise_error(e).subscribe(observer), subscription] end CompositeSubscription.new [source.subscribe(observer), subscription] end end
# File lib/rx/linq/observable/when.rb, line 3 def when(*plans) AnonymousObservable.new do |observer| active_plans = [] external_subscriptions = {} out_observer = Observer.configure {|o| o.on_next(&observer.method(:on_next)) o.on_error {|err| external_subscriptions.each {|_, v| v.on_error err } } o.on_completed(&observer.method(:on_completed)) } begin plans.each {|x| active_plans.push x.activate(external_subscriptions, out_observer, lambda {|active_plan| active_plans.delete(active_plan) active_plans.length == 0 && observer.on_completed }) } rescue => e Observable.raise_error(e).subscribe(observer) end group = CompositeSubscription.new external_subscriptions.each {|_, join_observer| join_observer.subscribe group.push join_observer } group end end
# File lib/rx/linq/observable/while.rb, line 3 def while(condition, source) enum = Enumerator.new {|y| while condition.call y << source end } scheduler = ImmediateScheduler.instance is_disposed = false subscription = SerialSubscription.new AnonymousObservable.new do |observer| cancelable = scheduler.schedule_recursive lambda {|this| return if is_disposed begin current_value = enum.next rescue StopIteration => e observer.on_completed return rescue => e observer.on_error e return end d = SingleAssignmentSubscription.new subscription.subscription = d d.subscription = current_value.subscribe( observer.method(:on_next), observer.method(:on_error), lambda { this.call } ) } CompositeSubscription.new [subscription, cancelable, Subscription.create { is_disposed = true }] end end
Merges the specified observable sequences into one observable sequence by using the selector function whenever all of the observable sequences have produced an element at a corresponding index.
# File lib/rx/operators/multiple.rb, line 675 def zip(*args, &result_selector) AnonymousObservable.new do |observer| result_selector ||= lambda {|*inner_args| inner_args } n = args.length queues = Array.new(n) {|i| Array.new } is_done = Array.new(n, false) next_action = lambda do |i| if queues.all? {|q| q.length > 0 } res = queues.map {|q| q.shift } observer.on_next(result_selector.call(*res)) elsif enumerable_select_with_index(is_done) {|x, j| j != i } .all? observer.on_completed end end done = lambda do |i| is_done[i] = true observer.on_completed if is_done.all? end gate = Monitor.new subscriptions = Array.new(n) do |i| sas = SingleAssignmentSubscription.new sas_obs = Observer.configure do |o| o.on_next do |x| queues[i].push(x) next_action.call i end o.on_error(&observer.method(:on_error)) o.on_completed { done.call i } end sas.subscription = args[i].synchronize(gate).subscribe(sas_obs) sas end subscriptions.push(Subscription.create { queues.each {|q| q = [] }}) CompositeSubscription.new subscriptions end end
Private Class Methods
# File lib/rx/operators/multiple.rb, line 725 def enumerable_select_with_index(arr, &block) [].tap do |new_arr| arr.each_with_index do |item, index| new_arr.push item if block.call item, index end end end
# File lib/rx/linq/observable/_observable_timer_date_and_period.rb, line 4 def observable_timer_date_and_period(due_time, period, scheduler) AnonymousObservable.new do |observer| count = 0 d = due_time p = Scheduler.normalize(period) scheduler.schedule_recursive_absolute(d, lambda {|this| if p > 0 now = scheduler.now() d = d + p d <= now && (d = now + p) end observer.on_next(count) count += 1 this.call(d) }) end end
# File lib/rx/linq/observable/_observable_timer_time_span.rb, line 4 def observable_timer_time_span(due_time, scheduler) AnonymousObservable.new do |observer| scheduler.schedule_relative(Scheduler.normalize(due_time), lambda { observer.on_next(0) observer.on_completed }) end end
# File lib/rx/linq/observable/_observable_timer_time_span_and_period.rb, line 4 def observable_timer_time_span_and_period(due_time, period, scheduler) if due_time == period AnonymousObservable.new do |observer| scheduler.schedule_periodic_with_state(0, period, lambda {|count| observer.on_next(count) count + 1 }) end else Observable.defer { observable_timer_date_and_period(scheduler.now() + due_time, period, scheduler) } end end
derived from Proc#to_method from Ruby Facets github.com/rubyworks/facets/blob/master/lib/core/facets/proc/to_method.rb
# File lib/rx/linq/observable/to_async.rb, line 29 def proc_bind(block, object) time = Time.now method_name = "__bind_#{time.to_i}_#{time.usec}" (class << object; self; end).class_eval do define_method(method_name, &block) method = instance_method(method_name) remove_method(method_name) method end.bind(object) end
Public Instance Methods
Subscribes the given observer to the observable sequence. @param [Observer] observer @return [Subscription]
# File lib/rx/core/observable.rb, line 37 def _subscribe(observer) auto_detach_observer = AutoDetachObserver.new observer if CurrentThreadScheduler.schedule_required? CurrentThreadScheduler.instance.schedule_with_state auto_detach_observer, method(:schedule_subscribe) else begin auto_detach_observer.subscription = subscribe_core auto_detach_observer rescue => e raise e unless auto_detach_observer.fail e end end auto_detach_observer end
# File lib/rx/internal/util.rb, line 3 def add_ref(r) AnonymousObservable.new do |observer| CompositeSubscription.new [r.subscription, self.subscribe(observer)] end end
Determines whether all elements of an observable sequence satisfy a condition if block given, else if all are true @param [Proc] block @return [Rx::Observable]
# File lib/rx/operators/aggregates.rb, line 64 def all?(&block) block ||= lambda { |_| true } select {|v| !(block.call v)}. any?. map {|b| !b } end
Propagates the observable sequence that reacts first.
# File lib/rx/operators/multiple.rb, line 26 def amb(second) AnonymousObservable.new do |observer| left_subscription = SingleAssignmentSubscription.new right_subscription = SingleAssignmentSubscription.new choice = :neither gate = Monitor.new left = AmbObserver.new right = AmbObserver.new handle_left = lambda do |&action| if choice == :neither choice = :left right_subscription.unsubscribe left.observer = observer end action.call if choice == :left end handle_right = lambda do |&action| if choice == :neither choice = :right left_subscription.unsubscribe right.observer = observer end action.call if choice == :right end left_obs = Observer.configure do |o| o.on_next {|x| handle_left.call { observer.on_next x } } o.on_error {|err| handle_left.call { observer.on_error err } } o.on_completed { handle_left.call { observer.on_completed } } end right_obs = Observer.configure do |o| o.on_next {|x| handle_right.call { observer.on_next x } } o.on_error {|err| handle_right.call { observer.on_error err } } o.on_completed { handle_right.call { observer.on_completed } } end left.observer = Observer.allow_reentrancy(left_obs, gate) right.observer = Observer.allow_reentrancy(right_obs, gate) left_subscription.subscription = self.subscribe left right_subscription.subscription = second.subscribe right CompositeSubscription.new [left_subscription, right_subscription] end end
# File lib/rx/linq/observable/and.rb, line 3 def and(right) Pattern.new([self, right]); end
Determines whether any element of an observable sequence satisfies a condition if a block is given else if there are any items in the observable sequence. @return [Rx::Observable]
# File lib/rx/operators/aggregates.rb, line 84 def any?(&block) return map(&block).any? if block_given? AnonymousObservable.new do |observer| new_obs = Observer.configure do |o| o.on_next do |_| observer.on_next true observer.on_completed end o.on_error(&observer.method(:on_error)) o.on_completed do observer.on_next false observer.on_completed end end subscribe new_obs end end
Hides the identity of an observable sequence.
# File lib/rx/operators/single.rb, line 16 def as_observable AnonymousObservable.new {|observer| subscribe(observer) } end
Computes the average of an observable sequence of values that are optionally obtained by invoking a transform function on each element of the input sequence if a block is given @param [Object] block @return [Rx::Observable]
# File lib/rx/operators/aggregates.rb, line 109 def average(&block) return map(&block).average if block_given? scan({:sum => 0, :count => 0}) {|prev, current| {:sum => prev[:sum] + current, :count => prev[:count] + 1 }}. final. map {|x| raise 'Sequence contains no elements' if x[:count] == 0 x[:sum] / x[:count] } end
Projects each element of an observable sequence into zero or more buffers which are produced based on element count information.
# File lib/rx/operators/single.rb, line 21 def buffer_with_count(count, skip = count) raise ArgumentError.new 'Count must be greater than zero' if count <= 0 raise ArgumentError.new 'Skip must be greater than zero' if skip <= 0 window_with_count(count, skip).flat_map(&:to_a).find_all {|x| x.length > 0 } end
Projects each element of an observable sequence into consecutive non-overlapping buffers which are produced based on timing information.
# File lib/rx/operators/time.rb, line 22 def buffer_with_time(time_span, time_shift = time_span, scheduler = DefaultScheduler.instance) raise ArgumentError.new 'time_span must be greater than zero' if time_span <= 0 raise ArgumentError.new 'time_span must be greater than zero' if time_shift <= 0 window_with_time(time_span, time_shift, scheduler).flat_map(&:to_a) end
Merges two observable sequences into one observable sequence by using the selector function whenever one of the observable sequences produces an element.
# File lib/rx/operators/multiple.rb, line 117 def combine_latest(other, &result_selector) AnonymousObservable.new do |observer| has_left = false has_right = false left = nil right = nil left_done = false right_done = false left_subscription = SingleAssignmentSubscription.new right_subscription = SingleAssignmentSubscription.new gate = Monitor.new left_obs = Observer.configure do |o| o.on_next do |l| has_left = true left = l if has_right res = nil begin res = result_selector.call left, right rescue => e observer.on_error e break end observer.on_next res end observer.on_completed if right_done end o.on_error(&observer.method(:on_error)) o.on_completed do left_done = true observer.on_completed if right_done end end right_obs = Observer.configure do |o| o.on_next do |r| has_right = true right = r if has_left res = nil begin res = result_selector.call left, right rescue => e observer.on_error e break end observer.on_next res end observer.on_completed if left_done end o.on_error(&observer.method(:on_error)) o.on_completed do right_done = true observer.on_completed if left_done end end left_subscription.subscription = synchronize(gate).subscribe(left_obs) right_subscription.subscription = other.synchronize(gate).subscribe(right_obs) CompositeSubscription.new [left_subscription, right_subscription] end end
Concatenates the second observable sequence to the first observable sequence upon successful termination of the first.
# File lib/rx/operators/multiple.rb, line 195 def concat(*other) Observable.concat([self, *other].to_enum) end
# File lib/rx/linq/observable/concat_all.rb, line 3 def concat_all merge_concurrent(1) end
# File lib/rx/linq/observable/concat_map.rb, line 3 def concat_map(selector, result_selector = nil) if Proc === result_selector return concat_map(lambda {|x, i| selector_result = selector.call(x, i) if selector_result.respond_to?(:each) selector_result = Observable.from(selector_result) end selector_result.map_with_index {|y, i2| result_selector.call(x, y, i, i2) } }) end if Proc === selector _concat_map(selector) else _concat_map(lambda {|*_| selector }) end end
# File lib/rx/linq/observable/concat_map_observer.rb, line 3 def concat_map_observer(on_next, on_error, on_completed) AnonymousObservable.new do |observer| index = 0 subscribe( lambda {|x| begin result = on_next.call(x, index) index += 1 rescue => e observer.on_error e return end observer.on_next result }, lambda {|err| begin result = on_error.call(err) rescue => e observer.on_error e return end observer.on_next result observer.on_completed }, lambda { begin result = on_completed.call rescue => e observer.on_error e return end observer.on_next result observer.on_completed }) end.concat_all end
# File lib/rx/linq/observable/contains.rb, line 3 def contains(search_element, from_index = 0) AnonymousObservable.new do |observer| i = 0 n = from_index if n < 0 observer.on_next false observer.on_completed return Subscription.empty end subscribe( lambda {|x| if i.tap { i += 1 } >= n && x == search_element observer.on_next true observer.on_completed end }, observer.method(:on_error), lambda { observer.on_next false observer.on_completed }) end end
Determines whether an observable sequence contains a specified element. @param [Object] item The value to locate in the source sequence. @return [Rx::Observable] An observable sequence containing a single element determining whether the source sequence contains an element that has the specified value.
# File lib/rx/operators/aggregates.rb, line 123 def contains?(item) select {|x| x.eql? item}.any? end
Returns an observable sequence containing a number that represents how many elements in the specified observable sequence satisfy a condition if the block is given, else the number of items in the observable sequence
# File lib/rx/operators/aggregates.rb, line 130 def count(&block) return select(&block).count if block_given? reduce(0) {|c, _| c + 1 } end
# File lib/rx/linq/observable/debounce.rb, line 3 def debounce(due_time, scheduler = DefaultScheduler.instance) AnonymousObservable.new do |observer| cancelable = SerialSubscription.new hasvalue = false value = nil id = 0 subscription = subscribe( lambda {|x| hasvalue = true value = x id += 1 current_id = id d = SingleAssignmentSubscription.new cancelable.subscription = d d.subscription = scheduler.schedule_relative(due_time, lambda { observer.on_next value if hasvalue && id == current_id hasvalue = false }) }, lambda {|e| cancelable.dispose observer.on_error e hasvalue = false id += 1 }, lambda { cancelable.dispose observer.on_next value if hasvalue observer.on_completed hasvalue = false id += 1 }) CompositeSubscription.new [subscription, cancelable] end end
Returns the elements of the specified sequence or the type parameter’s default value in a singleton sequence if the sequence is empty.
# File lib/rx/operators/standard_query_operators.rb, line 16 def default_if_empty(default_value = nil) AnonymousObservable.new do |observer| found = false new_observer = Observer.configure do |o| o.on_next do |x| found = true observer.on_next x end o.on_error(&observer.method(:on_error)) o.on_completed do observer.on_next(default_value) unless found observer.on_completed end end subscribe(new_observer) end end
# File lib/rx/linq/observable/delay.rb, line 3 def delay(due_time, scheduler = DefaultScheduler.instance) if Time === due_time delay_date(due_time, scheduler) else delay_time_span(due_time, scheduler) end end
# File lib/rx/linq/observable/delay_with_selector.rb, line 3 def delay_with_selector(subscription_delay, delay_duration_selector = nil) if Proc === subscription_delay selector = subscription_delay else sub_delay = subscription_delay selector = delay_duration_selector end AnonymousObservable.new do |observer| delays = CompositeSubscription.new at_end = false done = lambda { if at_end && delays.length == 0 observer.on_completed end } subscription = SerialSubscription.new start = lambda {|*_| subscription.subscription = subscribe( lambda {|x| begin delay = selector.call(x) rescue => error observer.on_error error return end d = SingleAssignmentSubscription.new delays.push(d) d.subscription = delay.subscribe( lambda {|_| observer.on_next x delays.delete(d) done.call }, observer.method(:on_error), lambda { observer.on_next x delays.delete(d) done.call }) }, observer.method(:on_error), lambda { at_end = true subscription.dispose done.call }) } if !sub_delay start.call else subscription.subscription = sub_delay.subscribe( start, observer.method(:on_error), start) end CompositeSubscription.new [subscription, delays] end end
Dematerializes the explicit notification values of an observable sequence as implicit notifications.
# File lib/rx/operators/single.rb, line 28 def dematerialize AnonymousObservable.new do |observer| new_obs = Rx::Observer.configure do |o| o.on_next {|x| x.accept observer } o.on_error(&observer.method(:on_error)) o.on_completed(&observer.method(:on_completed)) end subscribe new_obs end end
Returns an observable sequence that contains only distinct elements according to the optional key_selector.
# File lib/rx/operators/standard_query_operators.rb, line 39 def distinct(&key_selector) key_selector ||= lambda {|x| x} AnonymousObservable.new do |observer| h = Hash.new new_observer = Observer.configure do |o| o.on_next do |x| key = nil has_added = false begin key = key_selector.call x key_s = key.to_s unless h.key? key_s has_added = true h[key_s] = true end rescue => e observer.on_error e next end observer.on_next x if has_added end o.on_error(&observer.method(:on_error)) o.on_completed(&observer.method(:on_completed)) end subscribe(new_observer) end end
Returns an observable sequence that contains only distinct contiguous elements according to the optional key_selector.
# File lib/rx/operators/single.rb, line 42 def distinct_until_changed(&key_selector) key_selector ||= lambda {|x| x} AnonymousObservable.new do |observer| current_key = nil has_current = nil new_obs = Rx::Observer.configure do |o| o.on_next do |value| key = nil begin key = key_selector.call value rescue => err observer.on_error err next end if !current_key || key != current_key has_current = true current_key = key observer.on_next value end end o.on_error(&observer.method(:on_error)) o.on_completed(&observer.method(:on_completed)) end subscribe new_obs end end
# File lib/rx/linq/observable/do.rb, line 3 def do(observer_or_on_next = nil, on_error_func = nil, on_completed_func = nil) if block_given? on_next_func = Proc.new elsif Proc === observer_or_on_next on_next_func = observer_or_on_next else on_next_func = observer_or_on_next.method(:on_next) on_error_func = observer_or_on_next.method(:on_error) on_completed_func = observer_or_on_next.method(:on_completed) end AnonymousObservable.new do |observer| subscribe( lambda {|x| begin on_next_func.call x rescue => e observer.on_error e end observer.on_next x }, lambda {|err| begin on_error_func && on_error_func.call(x) rescue => e observer.on_error e end observer.on_error err }, lambda { begin on_completed_func && on_completed_func.call rescue => e observer.on_error e end observer.on_completed }) end end
Returns the element at a specified index in a sequence. @param [Numeric] index The zero-based index of the element to retrieve. @return [Rx::Observable] An observable sequence that produces the element at the specified position in the source sequence.
# File lib/rx/operators/aggregates.rb, line 139 def element_at(index) raise ArgumentError.new 'index cannot be less than zero' if index < 0 AnonymousObservable.new do |observer| i = index new_obs = Observer.configure do |o| o.on_next do |value| if i == 0 observer.on_next value observer.on_completed end i -= 1 end o.on_error(&observer.method(:on_error)) o.on_completed { raise 'Sequence contains no elements' } end subscribe new_obs end end
Returns the element at a specified index in a sequence or a default value if the index is out of range. @param [Numeric] index The zero-based index of the element to retrieve. @param [Object] default_value The default value to use if the index is out of range.
# File lib/rx/operators/aggregates.rb, line 164 def element_at_or_default(index, default_value = nil) raise ArgumentError.new 'index cannot be less than zero' if index < 0 AnonymousObservable.new do |observer| i = index new_obs = Observer.configure do |o| o.on_next do |value| if i == 0 observer.on_next value observer.on_completed end i -= 1 end o.on_error(&observer.method(:on_error)) o.on_completed do observer.on_next default_value observer.on_completed end end subscribe new_obs end end
Determines whether an observable sequence is empty. @return [Rx::Observable] An observable sequence containing a single element determining whether the source sequence is empty.
# File lib/rx/operators/aggregates.rb, line 242 def empty? any?.map {|b| !b } end
Invokes a specified action after the source observable sequence terminates gracefully or exceptionally.
# File lib/rx/operators/single.rb, line 117 def ensures AnonymousObservable.new do |observer| subscription = subscribe observer Subscription.create do begin subscription.unsubscribe ensure yield end end end end
# File lib/rx/operators/single.rb, line 390 def enumerator_repeat_infinitely(value) Enumerator.new do |y| while true y << value end end end
# File lib/rx/operators/single.rb, line 382 def enumerator_repeat_times(num, value) Enumerator.new do |y| num.times do |i| y << value end end end
Internal method to get the final value @return [Rx::Observable]
# File lib/rx/operators/aggregates.rb, line 16 def final AnonymousObservable.new do |observer| value = nil has_value = false new_obs = Observer.configure do |o| o.on_next do |x| value = x has_value = true end o.on_error(&observer.method(:on_error)) o.on_completed do if has_value observer.on_next value observer.on_completed else observer.on_error(RuntimeError.new 'Sequence contains no elements') end end end subscribe new_obs end end
Returns the first element of an observable sequence that satisfies the condition in the predicate if a block is given, else the first item in the observable sequence. @param [Proc] block Optional predicate function to evaluate for elements in the source sequence. @return [Rx::Observable] Sequence containing the first element in the observable sequence that satisfies the condition in the predicate if a block is given, else the first element.
# File lib/rx/operators/aggregates.rb, line 195 def first(&block) return select(&block).first if block_given? AnonymousObservable.new do |observer| new_obs = Observer.configure do |o| o.on_next do |x| observer.on_next x observer.on_completed end o.on_error(&observer.method(:on_error)) o.on_completed { raise 'Sequence contains no elements' } end subscribe new_obs end end
Returns the first element of an observable sequence that satisfies the condition in the predicate if given, or a default value if no such element exists. @param [Object] default_value The default value to use if the sequence is empty. @param [Proc] block An optional predicate function to evaluate for elements in the source sequence. @return [Rx::Observable] Sequence containing the first element in the observable sequence that satisfies the condition in the predicate if given, or a default value if no such element exists.
# File lib/rx/operators/aggregates.rb, line 218 def first_or_default(default_value = nil, &block) return select(&block).first_or_default(default_value) if block_given? AnonymousObservable.new do |observer| new_obs = Observer.configure do |o| o.on_next do |x| observer.on_next x observer.on_completed end o.on_error(&observer.method(:on_error)) o.on_completed do observer.on_next default_value observer.on_completed end end subscribe new_obs end end
Projects each element of the source observable sequence to the other observable sequence and merges the resulting observable sequences into one observable sequence.
# File lib/rx/operators/standard_query_operators.rb, line 108 def flat_map(&block) map(&block).merge_all end
Projects each element of an observable sequence to an observable sequence by incorporating the element’s index and merges the resulting observable sequences into one observable sequence.
# File lib/rx/operators/standard_query_operators.rb, line 113 def flat_map_with_index(&block) map_with_index(&block).merge_all end
# File lib/rx/linq/observable/group_join.rb, line 3 def group_join(right, left_duration_selector, right_duration_selector, result_selector) AnonymousObservable.new do |observer| group = CompositeSubscription.new r = RefCountSubscription.new(group) left_map = {} right_map = {} left_id = 0 right_id = 0 left_obs = Observer.configure do |o| o.on_next {|value| s = Subject.new id = left_id left_id += 1 left_map[id] = s begin result = result_selector.call(value, s.add_ref(r)) rescue => err left_map.values.each {|v| v.on_error(err) } observer.on_error(err) next end observer.on_next(result) right_map.values.each {|v| s.on_next(v) } md = SingleAssignmentSubscription.new group.push md expire = lambda { if left_map.delete(id) s.on_completed end group.delete(md) } begin duration = left_duration_selector.call(value) rescue => err left_map.values.each {|v| v.on_error(err) } observer.on_error(err) next end md.subscription = duration.take(1).subscribe( lambda {|_| }, lambda {|e| left_map.values.each {|v| v.on_error(e) } observer.on_error(e) }, expire) } o.on_error {|e| left_map.values.each {|v| v.on_error(e) } observer.on_error(e) } o.on_completed(&observer.method(:on_completed)) end group.push self.subscribe(left_obs) right_obs = Observer.configure do |o| o.on_next {|value| id = right_id right_id += 1 right_map[id] = value md = SingleAssignmentSubscription.new group.push md expire = lambda { right_map.delete(id) group.delete(md) } begin duration = right_duration_selector.call(value) rescue => err right_map.values.each {|v| v.on_error(err) } observer.on_error(err) next end md.subscription = duration.take(1).subscribe( lambda {|_| }, lambda {|e| left_map.values.each {|v| v.on_error(e) } observer.on_error(e) }, expire) } o.on_error {|e| left_map.values.each {|v| v.on_error(e) } observer.on_error(e) } end group.push right.subscribe(right_obs) r end end
Ignores all elements in an observable sequence leaving only the termination messages.
# File lib/rx/operators/single.rb, line 131 def ignore_elements AnonymousObservable.new do |observer| new_obs = Rx::Observer.configure do |o| o.on_next {|_| } o.on_error(&observer.method(:on_error)) o.on_completed(&observer.method(:on_completed)) end subscribe new_obs end end
Returns the last element of an observable sequence that satisfies the condition in the predicate if the block is given, else the last element in the observable sequence. @param [Proc] block An predicate function to evaluate for elements in the source sequence. @return {Rx::Observable} Sequence containing the last element in the observable sequence that satisfies the condition in the predicate if given, or the last element in the observable sequence.
# File lib/rx/operators/aggregates.rb, line 251 def last(&block) return select(&block).last if block_given? AnonymousObservable.new do |observer| value = nil seen_value = false new_obs = Observer.configure do |o| o.on_next do |v| value = v seen_value = true end o.on_error(&observer.method(:on_error)) o.on_completed do if seen_value observer.on_next value observer.on_completed else observer.on_error(RuntimeError.new 'Sequence contains no elements' ) end end end subscribe new_obs end end
Returns the last element of an observable sequence that satisfies the condition in the predicate if given, or a default value if no such element exists. @param [Object] default_value The default value to use if the sequence is empty. @param [Proc] block An predicate function to evaluate for elements in the source sequence. @return {Rx::Observable} Sequence containing the last element in the observable sequence that satisfies the condition in the predicate if given, or a default value if no such element exists.
# File lib/rx/operators/aggregates.rb, line 286 def last_or_default(default_value = nil, &block) return select(&block).last_or_default(default_value) if block_given? AnonymousObservable.new do |observer| value = nil seen_value = false new_obs = Observer.configure do |o| o.on_next do |v| value = v seen_value = true end o.on_error(&observer.method(:on_error)) o.on_completed do observer.on_next (seen_value ? value : default_value) observer.on_completed end end subscribe new_obs end end
Transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence. Each time a new inner observable sequence is received, unsubscribe from the previous inner observable sequence.
# File lib/rx/operators/multiple.rb, line 347 def latest AnonymousObservable.new do |observer| gate = Monitor.new inner_subscription = SerialSubscription.new stopped = false latest_num = 0 has_latest = false source_obs = Observer.configure do |o| o.on_next do |inner_source| id = 0 gate.synchronize do latest_num += 1 id = latest_num end d = SingleAssignmentSubscription.new inner_obs = Observer.configure do |io| io.on_next {|x| gate.synchronize { observer.on_next x if latest_num == id } } io.on_error do |err| gate.synchronize do has_latest = false observer.on_error err if latest_num == id end end end d.subscription = inner_source.subscribe inner_obs end o.on_error {|err| gate.synchronize { observer.on_error err } } o.on_completed do gate.synchronize do stopped = true observer.on_completed unless has_latest end end end subscription = subscribe source_obs CompositeSubscription.new [subscription, inner_subscription] end end
Projects each element of an observable sequence into a new form.
# File lib/rx/operators/standard_query_operators.rb, line 103 def map(&block) map_with_index {|x, _| block.call x } end
Projects each element of an observable sequence into a new form by incorporating the element’s index.
# File lib/rx/operators/standard_query_operators.rb, line 76 def map_with_index(&block) AnonymousObservable.new do |observer| new_observer = Observer.configure do |o| i = 0 o.on_next do |x| result = nil begin result = block.call(x, i) i += 1 rescue => e observer.on_error e next end observer.on_next result end o.on_error(&observer.method(:on_error)) o.on_completed(&observer.method(:on_completed)) end subscribe(new_observer) end end
Materializes the implicit notifications of an observable sequence as explicit notification values.
# File lib/rx/operators/single.rb, line 144 def materialize AnonymousObservable.new do |observer| new_obs = Rx::Observer.configure do |o| o.on_next {|x| observer.on_next(Notification.create_on_next x) } o.on_error do |err| observer.on_next(Notification.create_on_next err) observer.on_completed end o.on_completed do observer.on_next(Notification.create_on_completed) observer.on_completed end end subscribe new_obs end end
Returns the maximum element in an observable sequence. @param [Proc] block An optional selector function to produce an element. @return [Rx::Observable] The maximum element in an observable sequence.
# File lib/rx/operators/aggregates.rb, line 314 def max(&block) return map(&block).max if block_given? max_by {x| x} .map {|x| x[0] } end
Returns the elements in an observable sequence with the maximum key value. @param [Proc] block Key selector function. @return [Rx::Observable] An observable sequence containing a list of zero or more elements that have a maximum key value.
# File lib/rx/operators/aggregates.rb, line 323 def max_by(&block) extrema_by(&block) end
Merges elements from two observable sequences into a single observable sequence, using the specified scheduler for enumeration of and subscription to the sources.
# File lib/rx/operators/multiple.rb, line 200 def merge(other, scheduler = CurrentThreadScheduler.instance) Observable.merge_all(scheduler, *[self, other]) end
Concatenates all inner observable sequences, as long as the previous observable sequence terminated successfully.
# File lib/rx/operators/multiple.rb, line 265 def merge_all AnonymousObservable.new do |observer| gate = Monitor.new stopped = false m = SingleAssignmentSubscription.new group = CompositeSubscription.new [m] new_obs = Observer.configure do |o| o.on_next do |inner_source| inner_subscription = SingleAssignmentSubscription.new group << inner_subscription inner_obs = Observer.configure do |io| io.on_next {|x| gate.synchronize { observer.on_next x } } io.on_error {|err| gate.synchronize { observer.on_error err } } io.on_completed do group.delete inner_subscription gate.synchronize { observer.on_completed } if stopped && group.length == 1 end end inner_subscription.subscription = inner_source.subscribe inner_obs end o.on_error {|err| gate.synchronize { observer.on_error err } } o.on_completed do stopped = true gate.synchronize { observer.on_completed } if group.length == 1 end end subscribe new_obs end end
Merges elements from all inner observable sequences into a single observable sequence, limiting the number of concurrent subscriptions to inner sequences.
# File lib/rx/operators/multiple.rb, line 205 def merge_concurrent(max_concurrent = 1) AnonymousObservable.new do |observer| gate = Monitor.new q = [] stopped = false group = CompositeSubscription.new active = 0 subscriber = nil subscriber = lambda do |xs| subscription = SingleAssignmentSubscription.new group << subscription new_obs = Observer.configure do |o| o.on_next {|x| gate.synchronize { observer.on_next x } } o.on_error {|err| gate.synchronize { observer.on_error err } } o.on_completed do group.delete subscription gate.synchronize do if q.length > 0 s = q.shift subscriber.call s else active -= 1 observer.on_completed if stopped && active == 0 end end end end xs.subscribe new_obs end inner_obs = Observer.configure do |o| o.on_next do |inner_source| gate.synchronize do if active < max_concurrent active += 1 subscriber.call inner_source else q << inner_source end end end o.on_error {|err| gate.synchronize { observer.on_error err } } o.on_completed do stopped = true observer.on_completed if active == 0 end end group << subscribe(inner_obs) end end
Returns the minimum element in an observable sequence. @param [Proc] block An optional selector function to produce an element. @return [Rx::Observable] The minimum element in an observable sequence.
# File lib/rx/operators/aggregates.rb, line 330 def min(&block) return map(&block).min if block_given? min_by {|x| x} .map {|x| x[0] } end
Returns the elements in an observable sequence with the minimum key value. @param [Proc] block Key selector function. @return [Rx::Observable] >An observable sequence containing a list of zero or more elements that have a minimum key value.
# File lib/rx/operators/aggregates.rb, line 339 def min_by(&block) extrema_by(true, &block) end
# File lib/rx/linq/observable/multicast.rb, line 3 def multicast(subject_or_subject_selector, selector = nil) if Proc === subject_or_subject_selector AnonymousObservable.new do |observer| connectable = self.multicast(subject_or_subject_selector.call) CompositeSubscription.new [selector.call(connectable).subscribe(observer), self] end else ConnectableObservable.new(self, subject_or_subject_selector) end end
Determines whether no elements of an observable sequence satisfy a condition if block given, else if all are false @param [Proc] block @return [Rx::Observable]
# File lib/rx/operators/aggregates.rb, line 75 def none?(&block) block ||= lambda { |_| true } select {|v| !(block.call v)}. any? end
Wraps the source sequence in order to run its observer callbacks on the specified scheduler.
# File lib/rx/operators/synchronization.rb, line 32 def observe_on(scheduler) raise ArgumentError.new 'Scheduler cannot be nil' unless scheduler AnonymousObservable.new do |observer| subscribe(ObserveOnObserver.new scheduler, observer) end end
Concatenates the second observable sequence to the first observable sequence upon successful or exceptional termination of the first.
# File lib/rx/operators/multiple.rb, line 304 def on_error_resume_next(other) raise ArgumentError.new 'Other cannot be nil' unless other Observable.on_error_resume_next self, other end
# File lib/rx/linq/observable/pluck.rb, line 3 def pluck(prop) self.map {|x| x[prop]} end
# File lib/rx/linq/observable/publish.rb, line 3 def publish(&selector) if block_given? multicast(lambda { Subject.new }, Proc.new) else multicast(Subject.new) end end
Applies an accumulator function over an observable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value. For aggregation behavior with incremental intermediate results, see Rx::Observable.scan
@return [Rx::Observable]
# File lib/rx/operators/aggregates.rb, line 47 def reduce(*args, &block) # Argument parsing to support: # 1. (seed, Symbol) || (seed, &block) # 2. (Symbol) || (&block) if (args.length == 2 && args[1].is_a?(Symbol)) || (args.length == 1 && block_given?) scan(*args, &block).start_with(args[0]).final elsif (args.length == 1 && args[0].is_a?(Symbol)) || (args.length == 0 && block_given?) scan(*args, &block).final else raise ArgumentError.new 'Invalid arguments' end end
Repeats the observable sequence a specified number of times.
# File lib/rx/operators/single.rb, line 171 def repeat(repeat_count) Observable.concat(enumerator_repeat_times(repeat_count, self)) end
Repeats the observable sequence indefinitely.
# File lib/rx/operators/single.rb, line 166 def repeat_infinitely Observable.concat(enumerator_repeat_infinitely(self)) end
Continues an observable sequence that is terminated by an exception of the specified type with the observable sequence produced by the handler or continues an observable sequence that is terminated by an exception with the next observable sequence.
# File lib/rx/operators/multiple.rb, line 81 def rescue_error(other = nil, &action) return Observable.rescue_error(other) if other && !block_given? raise ArgumentError.new 'Invalid arguments' if other.nil? && !block_given? AnonymousObservable.new do |observer| subscription = SerialSubscription.new d1 = SingleAssignmentSubscription.new subscription.subscription = d1 new_obs = Observer.configure do |o| o.on_next(&observer.method(:on_next)) o.on_error do |err| result = nil begin result = action.call(err) rescue => e observer.on_error(e) next end d = SingleAssignmentSubscription.new subscription.subscription = d d.subscription = result.subscribe observer end o.on_completed(&observer.method(:on_completed)) end d1.subscription = subscribe new_obs subscription end end
Repeats the source observable sequence the specified number of times or until it successfully terminates.
# File lib/rx/operators/single.rb, line 181 def retry(retry_count) Observable.rescue_error(enumerator_repeat_times(retry_count, self)) end
Repeats the source observable sequence until it successfully terminates.
# File lib/rx/operators/single.rb, line 176 def retry_infinitely Observable.rescue_error(enumerator_repeat_infinitely(self)) end
Applies an accumulator function over an observable sequence and returns each intermediate result. The optional seed value is used as the initial accumulator value. For aggregation behavior with no intermediate results, see Observable.reduce
.
# File lib/rx/operators/single.rb, line 188 def scan(*args, &block) has_seed = false seed = nil action = nil # Argument parsing to support: # 1. (seed, Symbol) # 2. (seed, &block) # 3. (Symbol) # 4. (&block) if args.length == 2 && args[1].is_a?(Symbol) seed = args[0] action = args[1].to_proc has_seed = true elsif args.length == 1 && block_given? seed = args[0] has_seed = true action = block elsif args.length == 1 && args[0].is_a?(Symbol) action = args[0].to_proc elsif args.length == 0 && block_given? action = block else raise ArgumentError.new 'Invalid arguments' end AnonymousObservable.new do |observer| has_accumulation = false accumulation = nil has_value = false new_obs = Observer.configure do |o| o.on_next do |x| begin has_value = true unless has_value if has_accumulation accumulation = action.call(accumulation, x) else accumulation = has_seed ? action.call(seed, x) : x has_accumulation = true end rescue => err observer.on_error err break end observer.on_next accumulation end o.on_error(&observer.method(:on_error)) o.on_completed do observer.on_next seed if !has_value && has_seed observer.on_completed end end subscribe new_obs end end
Filters the elements of an observable sequence based on a predicate.
# File lib/rx/operators/standard_query_operators.rb, line 244 def select(&block) select_with_index {|x, _| block.call x } end
Filters the elements of an observable sequence based on a predicate by incorporating the element’s index.
# File lib/rx/operators/standard_query_operators.rb, line 250 def select_with_index(&block) AnonymousObservable.new do |observer| i = 0 new_observer = Observer.configure do |o| o.on_next do |x| should_run = false begin should_run = block.call(x, i) i += 1 rescue => e observer.on_error e next end observer.on_next x if should_run end o.on_error(&observer.method(:on_error)) o.on_completed(&observer.method(:on_completed)) end subscribe(new_observer) end end
Determines whether two sequences are equal by comparing the elements pairwise. @param [Rx::Observable] other Other observable sequence to compare. @return [Rx::Observable] An observable sequence that contains a single element which indicates whether both sequences are of equal length and their corresponding elements are equal.
# File lib/rx/operators/aggregates.rb, line 347 def sequence_eql?(other) AnonymousObservable.new do |observer| gate = Mutex.new left_done = false right_done = false left_queue = [] right_queue = [] obs1 = Observer.configure do |o| o.on_next do |x| gate.synchronize do if right_queue.length > 0 v = right_queue.shift equal = x == v unless equal observer.on_next false observer.on_completed end elsif right_done observer.on_next false observer.on_completed else left_queue.push x end end end o.on_error(&observer.method(:on_error)) o.on_completed do gate.synchronize do left_done = true if left_queue.length == 0 if right_queue.length > 0 observer.on_next false observer.on_completed elsif right_done observer.on_next true observer.on_completed end end end end end subscription1 = subscribe obs1 obs2 = Observer.configure do |o| o.on_next do |x| gate.synchronize do if left_queue.length > 0 v = left_queue.shift equal = x == v unless equal observer.on_next false observer.on_completed end elsif left_done observer.on_next false observer.on_completed else right_queue.push x end end end o.on_error(&observer.method(:on_error)) o.on_completed do gate.synchronize do right_done = true if right_queue.length == 0 if left_queue.length > 0 observer.on_next false observer.on_completed elsif left_done observer.on_next true observer.on_completed end end end end end subscription2 = other.subscribe obs2 CompositeSubscription.new [subscription1, subscription2] end end
Returns the only element of an observable sequence, and reports an exception if there is not exactly one element in the observable sequence. @param [Proc] block A predicate function to evaluate for elements in the source sequence. @return [Rx::Observable] >Sequence containing the single element in the observable sequence.
# File lib/rx/operators/aggregates.rb, line 443 def single(&block) return select(&block).single if block_given? AnonymousObservable.new do |observer| seen_value = false value = nil new_obs = Observer.configure do |o| o.on_next do |x| if seen_value observer.on_error(RuntimeError.new 'More than one element produced') else value = x seen_value = true end end o.on_error(&observer.method(:on_error)) o.on_completed do if seen_value observer.on_next value observer.on_completed else observer.on_error(RuntimeError.new 'Sequence contains no elements') end end end subscribe new_obs end end
Returns the only element of an observable sequence, or a default value if the observable sequence is empty; this method reports an exception if there is more than one element in the observable sequence. @param [Object] default_value The default value if no value is provided @param [Proc] block A predicate function to evaluate for elements in the source sequence. @return [Rx::Observable] Sequence containing the single element in the observable sequence, or a default value if no such element exists.
# File lib/rx/operators/aggregates.rb, line 481 def single_or_default(default_value = nil, &block) return select(&block).single_or_default(default_value) if block_given? AnonymousObservable.new do |observer| seen_value = false value = nil new_obs = Observer.configure do |o| o.on_next do |x| if seen_value observer.on_error(RuntimeError.new 'More than one element produced') else value = x seen_value = true end end o.on_error(&observer.method(:on_error)) o.on_completed do observer.on_next (seen_value ? value : default_value) observer.on_completed end end subscribe new_obs end end
Bypasses a specified number of elements in an observable sequence and then returns the remaining elements.
# File lib/rx/operators/standard_query_operators.rb, line 118 def skip(count) AnonymousObservable.new do |observer| remaining = count new_observer = Observer.configure do |o| o.on_next do |x| if remaining <= 0 observer.on_next x else remaining -= 1 end end o.on_error(&observer.method(:on_error)) o.on_completed(&observer.method(:on_completed)) end subscribe(new_observer) end end
Bypasses a specified number of elements at the end of an observable sequence. @param [Numeric] count The number of elements to bypass at the end of an observable sequence.
# File lib/rx/operators/single.rb, line 253 def skip_last(count) raise ArgumentError.new 'Count cannot be less than zero' if count < 0 AnonymousObservable.new do |observer| q = [] new_obs = Observer.configure do |o| o.on_next do |x| q.push x observer.on_next(q.shift) if q.length > count end o.on_error(&observer.method(:on_error)) o.on_completed(&observer.method(:on_completed)) end subscribe new_obs end end
Returns the elements from the source observable sequence only after the other observable sequence produces an element.
# File lib/rx/operators/multiple.rb, line 311 def skip_until(other) raise ArgumentError.new 'Other cannot be nil' unless other AnonymousObservable.new do |observer| source_subscription = SingleAssignmentSubscription.new other_subscription = SingleAssignmentSubscription.new open = false gate = Monitor.new source_obs = Observer.configure do |o| o.on_next {|x| observer.on_next x if open } o.on_error(&observer.method(:on_error)) o.on_completed { observer.on_completed if open } end other_obs = Observer.configure do |o| o.on_next do |_| open = true other_subscription.unsubscribe end o.on_error(&observer.method(:on_error)) end source_subscription.subscription = synchronize(gate).subscribe(source_obs) other_subscription.subscription = other.synchronize(gate).subscribe(other_obs) CompositeSubscription.new [source_subscription, other_subscription] end end
Bypasses elements in an observable sequence as long as a specified condition is true and then returns the remaining elements.
# File lib/rx/operators/standard_query_operators.rb, line 142 def skip_while(&block) skip_while_with_index {|x, _| block.call x } end
Bypasses elements in an observable sequence as long as a specified condition is true and then returns the remaining elements. The element’s index is used in the logic of the predicate function.
# File lib/rx/operators/standard_query_operators.rb, line 148 def skip_while_with_index(&block) AnonymousObservable.new do |observer| running = false i = 0 new_observer = Observer.configure do |o| o.on_next do |x| unless running begin running = !block.call(x, i) i += 1 rescue => e observer.on_error e next end observer.on_next x if running end end o.on_error(&observer.method(:on_error)) o.on_completed(&observer.method(:on_completed)) end subscribe(new_observer) end end
Prepends a sequence of values to an observable sequence.
# File lib/rx/operators/single.rb, line 273 def start_with(*args) scheduler = CurrentThreadScheduler.instance if args.size > 0 && Scheduler === args[0] scheduler = args.shift end Observable.from_array(args, scheduler).concat(self) end
# File lib/rx/core/observable.rb, line 13 def subscribe(*args) case args.size when 0 if block_given? _subscribe Observer.configure {|o| o.on_next(&Proc.new) } else _subscribe Observer.configure end when 1 _subscribe args[0] when 3 _subscribe Observer.configure {|o| o.on_next(&args[0]) o.on_error(&args[1]) o.on_completed(&args[2]) } else raise ArgumentError, "wrong number of arguments (#{args.size} for 0..1 or 3)" end end
Wraps the source sequence in order to run its subscription and unsubscribe logic on the specified scheduler.
# File lib/rx/operators/synchronization.rb, line 15 def subscribe_on(scheduler) raise ArgumentError.new 'Scheduler cannot be nil' unless scheduler AnonymousObservable.new do |observer| m = SingleAssignmentSubscription.new d = SerialSubscription.new d.subscription = m m.subscription = scheduler.schedule lambda { d.subscription = ScheduledSubscription.new scheduler, (subscribe observer) } d end end
Subscribes the given block to the on_completed action of the observable sequence.
# File lib/rx/core/observable.rb, line 69 def subscribe_on_completed(&block) raise ArgumentError.new 'Block is required' unless block_given? subscribe(Observer.configure {|o| o.on_completed(&block) }) end
Subscribes the given block to the on_error action of the observable sequence.
# File lib/rx/core/observable.rb, line 63 def subscribe_on_error(&block) raise ArgumentError.new 'Block is required' unless block_given? subscribe(Observer.configure {|o| o.on_error(&block) }) end
Subscribes the given block to the on_next action of the observable sequence. @param [Object] block @return [Subscription]
# File lib/rx/core/observable.rb, line 57 def subscribe_on_next(&block) raise ArgumentError.new 'Block is required' unless block_given? subscribe(Observer.configure {|o| o.on_next(&block) }) end
Computes the sum of a sequence of values. @param [Proc] block Optional block used to obtain the value to sum. @return [Rx::Observable] An observable sequence containing a single element with the sum of the values in the source sequence.
# File lib/rx/operators/aggregates.rb, line 513 def sum(&block) return map(&block).sum if block_given? reduce(0) {|acc, x| acc + x} end
Wraps the source sequence in order to ensure observer callbacks are synchronized using the specified gate object.
# File lib/rx/operators/synchronization.rb, line 41 def synchronize(gate = Monitor.new) AnonymousObservable.new do |observer| subscribe(Observer.allow_reentrancy observer, gate) end end
Returns a specified number of contiguous elements from the start of an observable sequence.
# File lib/rx/operators/standard_query_operators.rb, line 178 def take(count, scheduler = ImmediateScheduler.instance) return Observable.empty(scheduler) if count == 0 AnonymousObservable.new do |observer| remaining = count new_observer = Observer.configure do |o| o.on_next do |x| if remaining > 0 remaining -= 1 observer.on_next x observer.on_completed if remaining == 0 end end o.on_error(&observer.method(:on_error)) o.on_completed(&observer.method(:on_completed)) end subscribe(new_observer) end end
Returns a specified number of contiguous elements from the end of an observable sequence.
# File lib/rx/operators/single.rb, line 282 def take_last(count, scheduler = CurrentThreadScheduler.instance) raise ArgumentError.new 'Count cannot be less than zero' if count < 0 AnonymousObservable.new do |observer| q = [] g = CompositeSubscription.new new_obs = Observer.configure do |o| o.on_next do |x| q.push x q.shift if q.length > 0 end o.on_error(&observer.method(:on_error)) o.on_completed do g.push(scheduler.schedule_recursive lambda {|this| if q.length > 0 observer.on_next(q.shift) this.call else observer.on_completed end }) end g.add(subscribe new_obs) g end end end
Returns a list with the specified number of contiguous elements from the end of an observable sequence.
# File lib/rx/operators/single.rb, line 314 def take_last_buffer(count) AnonymousObservable.new do |observer| q = [] new_obs = Observer.configure do |o| o.on_next do |x| q.push x q.shift if q.length > count end o.on_error(&observer.method(:on_error)) o.on_completed do observer.on_next q observer.on_completed end end susbcribe new_obs end end
Returns the elements from the source observable sequence until the other observable sequence produces an element.
# File lib/rx/operators/multiple.rb, line 396 def take_until(other) raise ArgumentError.new 'other cannot be nil' unless other AnonymousObservable.new do |observer| source_subscription = SingleAssignmentSubscription.new other_subscription = SingleAssignmentSubscription.new gate = Monitor.new other_obs = Observer.configure do |o| o.on_next {|_| observer.on_completed } o.on_error(&observer.method(:on_error)) end other_subscription.subscription = other.synchronize(gate).subscribe(other_obs) source_subscription.subscription = synchronize(gate).ensures(&other_subscription.method(:unsubscribe)).subscribe(observer) CompositeSubscription.new [source_subscription, other_subscription] end end
Returns elements from an observable sequence as long as a specified condition is true.
# File lib/rx/operators/standard_query_operators.rb, line 204 def take_while(&block) take_while_with_index {|x, _| block.call x } end
Returns elements from an observable sequence as long as a specified condition is true. The element’s index is used in the logic of the predicate function.
# File lib/rx/operators/standard_query_operators.rb, line 210 def take_while_with_index(&block) AnonymousObservable.new do |observer| running = true i = 0 new_observer = Observer.configure do |o| o.on_next do |x| if running begin running = block.call(x, i) i += 1 rescue => e observer.on_error e next end if running observer.on_next x else observer.on_completed end end end o.on_error(&observer.method(:on_error)) o.on_completed(&observer.method(:on_completed)) end subscribe(new_observer) end end
Invokes the observer’s methods for each message in the source sequence. This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
# File lib/rx/operators/single.rb, line 75 def tap(observer) raise ArgumentError.new 'Observer cannot be nil' unless observer AnonymousObservable.new do |obs| new_obs = Rx::Observer.configure do |o| o.on_next do |value| begin observer.on_next value rescue => err obs.on_error err end obs.on_next value end o.on_error do |err| begin observer.on_error err rescue => e obs.on_error e end obs.on_error err end o.on_completed do begin observer.on_completed rescue => err obs.on_error err end obs.on_completed end end subscribe new_obs end end
# File lib/rx/linq/observable/time_interval.rb, line 3 def time_interval(scheduler = DefaultScheduler.instance) Observable.defer { last = scheduler.now self.map {|x| now = scheduler.now span = now - last last = now TimeInterval.new(span, x) } } end
# File lib/rx/linq/observable/timestamp.rb, line 3 def timestamp(scheduler = DefaultScheduler.instance) map do |x| { value: x, timestamp: scheduler.now } end end
Creates an array from an observable sequence. @return [Rx::Observable] An array created from an observable sequence.
# File lib/rx/operators/aggregates.rb, line 520 def to_a AnonymousObservable.new do |observer| arr = [] self.subscribe( arr.method(:push), observer.method(:on_error), lambda { observer.on_next arr observer.on_completed }) end end
Creates a Hash from the observable collection. Note that any duplicate keys will be overwritten. @return [Rx::Observable] A Hash created from an observable sequence.
# File lib/rx/operators/aggregates.rb, line 554 def to_h h = HashConfiguration.new yield h if block_given? reduce(Hash.new) do |acc, x| acc[h.key_selector_block.call x] = h.value_selector_block.call x acc end end
Projects each element of an observable sequence into zero or more windows which are produced based on element count information.
# File lib/rx/operators/single.rb, line 336 def window_with_count(count, skip) raise ArgumentError.new 'Count must be greater than zero' if count <= 0 raise ArgumentError.new 'Skip must be greater than zero' if skip <= 0 AnonymousObservable.new do |observer| q = [] n = 0 m = SingleAssignmentSubscription.new ref_count_disposable = RefCountSubscription.new m create_window = lambda { s = Subject.new q.push s observer.on_next(s.add_ref(ref_count_disposable)) } create_window.call new_obs = Observer.configure do |o| o.on_next do |x| q.each {|s| s.on_next x} c = n - count + 1 q.shift.on_completed if c >=0 && c % skip == 0 n += 1 create_window.call if n % skip == 0 end o.on_error do |err| q.shift.on_error err while q.length > 0 observer.on_error err end o.on_completed do q.shift.on_completed while q.length > 0 observer.on_completed end end m.subscription = subscribe new_obs ref_count_disposable end end
Projects each element of an observable sequence into consecutive non-overlapping windows which are produced based on timing information.
# File lib/rx/operators/time.rb, line 30 def window_with_time(time_span, time_shift = time_span, scheduler = DefaultScheduler.instance) raise ArgumentError.new 'time_span must be greater than zero' if time_span <= 0 raise ArgumentError.new 'time_span must be greater than zero' if time_shift <= 0 AnonymousObservable.new do |observer| total_time = 0 next_shift = time_shift next_span = time_span gate = Mutex.new q = [] timer_d = SerialSubscription.new group_subscription = CompositeSubscription.new [timer_d] ref_count_subscription = RefCountSubscription.new(group_subscription) create_timer = lambda { m = SingleAssignmentSubscription.new timer_d.subscription = m is_span = false is_shift = false if next_span == next_shift is_span = true is_shift = true elsif next_span < next_shift is_span = true else is_shift = true end new_total_time = is_span ? next_span : next_shift ts = new_total_time - total_time total_time = new_total_time if is_span next_span += time_shift end if is_shift next_shift += time_shift end m.subscription = scheduler.schedule_relative(ts, lambda { gate.synchronize do if is_shift s = Subject.new q.push s observer.on_next(s.add_ref(ref_count_subscription)) end if is_span s = q.shift s.on_completed end create_timer.call end }) } q.push(Subject.new) observer.on_next(q[0].add_ref(ref_count_subscription)) create_timer.call new_obs = Observer.configure do |o| o.on_next do |x| gate.synchronize do q.each {|s| s.on_next x} end end o.on_error do |err| gate.synchronize do q.each {|s| s.on_error err} observer.on_error err end end o.on_completed do gate.synchronize do q.each {|s| s.on_on_completed} observer.on_completed end end end group_subscription.push subscribe(new_obs) ref_count_subscription end end
Merges the specified observable sequences into one observable sequence by using the selector function whenever all of the observable sequences have produced an element at a corresponding index.
# File lib/rx/operators/multiple.rb, line 418 def zip(*args, &result_selector) args.unshift(self) Observable.zip(*args, &result_selector) end
Private Instance Methods
# File lib/rx/linq/observable/concat_map.rb, line 25 def _concat_map(selector) map_with_index {|x, i| result = selector.call(x, i) if result.respond_to?(:each) result = Observable.from(result) end result }.concat_all end
# File lib/rx/linq/observable/delay.rb, line 77 def delay_date(due_time, scheduler) delay_time_span(due_time - scheduler.now, scheduler) end
# File lib/rx/linq/observable/delay.rb, line 13 def delay_time_span(due_time, scheduler) AnonymousObservable.new do |observer| active = false cancelable = SerialSubscription.new exception = nil q = [] running = false subscription = materialize.timestamp(scheduler).subscribe do |notification| if notification[:value].on_error? q = [] q.push notification exception = notification[:value].error should_run = !running else q.push({ value: notification[:value], timestamp: notification[:timestamp] + due_time }) should_run = !active active = true end if should_run if exception != nil observer.on_error exception else d = SingleAssignmentSubscription.new cancelable.subscription = d d.subscription = scheduler.schedule_recursive_relative(due_time, lambda {|this| return if exception != nil running = true begin result = nil if q.length > 0 && q[0][:timestamp] - scheduler.now <= 0 result = q.shift[:value] end if result != nil result.accept observer end end while result != nil should_recurse = false recurse_due_time = 0 if q.length > 0 should_recurse = true recurse_due_time = [0, q[0][:timestamp] - scheduler.now].max else active = false end e = exception running = false if e != nil observer.on_error e elsif should_recurse this.call recurse_due_time end }) end end end CompositeSubscription.new [subscription, cancelable] end end
# File lib/rx/operators/aggregates.rb, line 565 def extrema_by(is_min = false, &block) AnonymousObservable.new do |observer| has_value = false last_key = nil list = [] new_obs = Observer.configure do |o| o.on_next do |x| key = nil begin key = block.call(x) rescue => e observer.on_error e return end comparison = 0 if has_value comparison = key<=>last_key comparison = comparison * -1 if is_min else has_value = true last_key = key end if comparison > 0 last_key = key list = [] end list.push x if comparison >= 0 end o.on_error(&observer.method(:on_error)) o.on_completed do observer.on_next list observer.on_completed end end subscribe new_obs end end
# File lib/rx/core/observable.rb, line 76 def schedule_subscribe(_, auto_detach_observer) begin auto_detach_observer.subscription = subscribe_core auto_detach_observer rescue => e raise e unless auto_detach_observer.fail e end Subscription.empty end