class Datapipes::Sink

Build your own sink logic in ‘run` method.

Be careful each sinks are executed concurrently. Avoid race condition in multi sinks.

This is bad:

$shared = []

class A < Datapipes::Sink
  def run(data)
    $shared << data
  end
end

class B < Datapipes::Sink
  def run(data)
    $shared << data
  end
end

On the other hand, a sink is called serially. So you can touch shared object in one sink logic.

This is good:

class A < Datapipes::Source
  def initialize
    @shared = []
  end

  def run(data)
    @shared << data
  end
end

Public Instance Methods

run(data) click to toggle source

Override this in sub class

# File lib/datapipes/sink.rb, line 42
def run(data)
  data
end
run_all(data) click to toggle source

For internal uses.

# File lib/datapipes/sink.rb, line 47
def run_all(data)
  @accumulated ||= [self]
  count = Parallel.processor_count
  Parallel.each(@accumulated, in_threads: count) do |sink|
    sink.run(data)
  end
end