class Datapipes

Constants

Notification
VERSION

Public Class Methods

new(args = {}) click to toggle source

Pass datapipes components instances. Each component can be composed. See detail in examples.

Pass parameters as a hash:

datapipe = Datapipes.new(source: my_source, sink: my_sink)

Or pass all:

datapipe = Datapipes.new(
  source: my_source,
  sink: my_sink,
  tube: my_tube,
  pipe: my_pipe
)

All arguments are optional. But in most case, you specify source and sink.

# File lib/datapipes.rb, line 30
def initialize(args = {})
  @source, @sink, @tube, @pipe = *ArgParser.extract(args)
end

Public Instance Methods

run_resource() click to toggle source

Run sources, data flow via pipe, tubes and sinks work. Everything work with just call this method.

When all sources finished producing, and all sinks did their jobs, this method returns.

# File lib/datapipes.rb, line 39
def run_resource
  @source.pipe = @pipe
  runners = @source.run_all

  sink = run_sink
  runners.each(&:join)

  notify_resource_ending
  sink.join
end

Private Instance Methods

notify_resource_ending() click to toggle source
# File lib/datapipes.rb, line 63
def notify_resource_ending
  @pipe.pour_in Notification.new
end
resource_ended?(data) click to toggle source
# File lib/datapipes.rb, line 67
def resource_ended?(data)
  data.is_a? Notification
end
run_sink() click to toggle source
# File lib/datapipes.rb, line 52
def run_sink
  Thread.new do
    loop do
      data = @pipe.pour_out
      break if resource_ended?(data)

      @sink.run_all(@tube.run(data))
    end
  end
end