class Datapipes
Constants
- Notification
- VERSION
Public Class Methods
new(args = {})
click to toggle source
Pass datapipes components instances. Each component can be composed. See detail in examples.
Pass parameters as a hash:
datapipe = Datapipes.new(source: my_source, sink: my_sink)
Or pass all:
datapipe = Datapipes.new( source: my_source, sink: my_sink, tube: my_tube, pipe: my_pipe )
All arguments are optional. But in most case, you specify source and sink.
# File lib/datapipes.rb, line 30 def initialize(args = {}) @source, @sink, @tube, @pipe = *ArgParser.extract(args) end
Public Instance Methods
run_resource()
click to toggle source
Run sources, data flow via pipe, tubes and sinks work. Everything work with just call this method.
When all sources finished producing, and all sinks did their jobs, this method returns.
# File lib/datapipes.rb, line 39 def run_resource @source.pipe = @pipe runners = @source.run_all sink = run_sink runners.each(&:join) notify_resource_ending sink.join end
Private Instance Methods
notify_resource_ending()
click to toggle source
# File lib/datapipes.rb, line 63 def notify_resource_ending @pipe.pour_in Notification.new end
resource_ended?(data)
click to toggle source
# File lib/datapipes.rb, line 67 def resource_ended?(data) data.is_a? Notification end
run_sink()
click to toggle source
# File lib/datapipes.rb, line 52 def run_sink Thread.new do loop do data = @pipe.pour_out break if resource_ended?(data) @sink.run_all(@tube.run(data)) end end end