module Enumerable

Public Instance Methods

subscribe(observer, scheduler = Rx::ImmediateScheduler.instance) click to toggle source
# File lib/core_ext/enumerable.rb, line 2
def subscribe(observer, scheduler = Rx::ImmediateScheduler.instance)
  begin
    self.each do |e|
      scheduler.schedule lambda {
        observer.on_next(e)
      }
    end
  rescue => ex
    observer.on_error(ex)
    return
  end
  
  observer.on_completed
end
to_observable(scheduler = Rx::ImmediateScheduler.instance) click to toggle source
# File lib/core_ext/enumerable.rb, line 17
def to_observable(scheduler = Rx::ImmediateScheduler.instance)
  Rx::AnonymousObservable.new do |observer|
    self.subscribe(observer, scheduler)
  end
end