class ElasticSearch::BulkStream

Public Class Methods

new(client, queue_size=10, flush_interval=1) click to toggle source

Create a new bulk stream. This allows you to send index and other bulk events asynchronously and use the bulk api in ElasticSearch in a streaming way.

The 'queue_size' is the maximum size of unflushed requests. If the queue reaches this size, new requests will block until there is room to move.

# File lib/jruby-elasticsearch/bulkstream.rb, line 12
def initialize(client, queue_size=10, flush_interval=1)
  @bulkthread = Thread.new { run } 
  @client = client
  @queue_size = queue_size
  @queue = SizedQueue.new(@queue_size)
  @flush_interval = flush_interval
end

Public Instance Methods

flush() click to toggle source

Flush the queue right now. This will block until the bulk request has completed.

# File lib/jruby-elasticsearch/bulkstream.rb, line 61
def flush
  bulk = @client.bulk

  flush_one = proc do
    # block if no data.
    method, *args = @queue.pop
    return if args.nil? # probably we are now stopping.
    bulk.send(method, *args)
  end

  flush_one.call

  1.upto([@queue.size, @queue_size - 1].min) do
    flush_one.call
  end

  # Block until this finishes
  bulk.execute!
end
index(*args) click to toggle source

See ElasticSearch::BulkRequest#index for arguments.

# File lib/jruby-elasticsearch/bulkstream.rb, line 22
def index(*args)
  # TODO(sissel): It's not clear I need to queue this up, I could just
  # call BulkRequest#index() and when we have 10 or whatnot, flush, but
  # Queue gives us a nice blocking mechanism anyway.
  @queue << [:index, *args]
end
stop() click to toggle source

Stop the stream

# File lib/jruby-elasticsearch/bulkstream.rb, line 53
def stop
  @queue << nil
  @stop = true
end

Private Instance Methods

run() click to toggle source

The stream runner.

# File lib/jruby-elasticsearch/bulkstream.rb, line 31
def run
  # TODO(sissel): Make a way to shutdown this thread.
  while true
    requests = []
    if @queue.size == @queue_size
      # queue full, flush now.
      flush
    else
      # Not full, so sleep and flush anyway.
      sleep(@flush_interval)
      flush
    end

    if @stop and @queue.size == 0
      # Queue empty and it's time to stop.
      break
    end
  end # while true
end