module Enumerable
Public Instance Methods
subscribe(observer, scheduler = Rx::ImmediateScheduler.instance)
click to toggle source
# File lib/core_ext/enumerable.rb, line 2 def subscribe(observer, scheduler = Rx::ImmediateScheduler.instance) begin self.each do |e| scheduler.schedule lambda { observer.on_next(e) } end rescue => ex observer.on_error(ex) return end observer.on_completed end
to_observable(scheduler = Rx::ImmediateScheduler.instance)
click to toggle source
# File lib/core_ext/enumerable.rb, line 17 def to_observable(scheduler = Rx::ImmediateScheduler.instance) Rx::AnonymousObservable.new do |observer| self.subscribe(observer, scheduler) end end