class Datapipes::Source

Build your own source logic in ‘run` method. Use `produce` method to emitt data to pipe.

def run
  10.times {|i| produce(i) }
end

You can use infinitie stream like:

def run
  twitter_client.userstream do |event|
    produce(event)
  end
end

Attributes

pipe[RW]

For internal uses. Do not touch.

Public Instance Methods

run() click to toggle source

Override in sub class.

# File lib/datapipes/source.rb, line 21
def run
end
run_all() click to toggle source

For internal used.

Run accumulated sources which are set by composition. Each source works in new thread.

# File lib/datapipes/source.rb, line 28
def run_all
  @accumulated ||= [self]
  set_pipe
  @accumulated.map {|s| Thread.new { s.run } }
end

Private Instance Methods

produce(data) click to toggle source
# File lib/datapipes/source.rb, line 39
def produce(data)
  @pipe.pour_in(data)
end
set_pipe() click to toggle source
# File lib/datapipes/source.rb, line 43
def set_pipe
  @accumulated.each {|s| s.pipe = @pipe }
end