class ElasticSearch::BulkStream
Public Class Methods
new(client, queue_size=10, flush_interval=1)
click to toggle source
Create a new bulk stream. This allows you to send index and other bulk events asynchronously and use the bulk api in ElasticSearch
in a streaming way.
The 'queue_size' is the maximum size of unflushed requests. If the queue reaches this size, new requests will block until there is room to move.
# File lib/jruby-elasticsearch/bulkstream.rb, line 12 def initialize(client, queue_size=10, flush_interval=1) @bulkthread = Thread.new { run } @client = client @queue_size = queue_size @queue = SizedQueue.new(@queue_size) @flush_interval = flush_interval end
Public Instance Methods
flush()
click to toggle source
Flush the queue right now. This will block until the bulk request has completed.
# File lib/jruby-elasticsearch/bulkstream.rb, line 61 def flush bulk = @client.bulk flush_one = proc do # block if no data. method, *args = @queue.pop return if args.nil? # probably we are now stopping. bulk.send(method, *args) end flush_one.call 1.upto([@queue.size, @queue_size - 1].min) do flush_one.call end # Block until this finishes bulk.execute! end
index(*args)
click to toggle source
See ElasticSearch::BulkRequest#index
for arguments.
# File lib/jruby-elasticsearch/bulkstream.rb, line 22 def index(*args) # TODO(sissel): It's not clear I need to queue this up, I could just # call BulkRequest#index() and when we have 10 or whatnot, flush, but # Queue gives us a nice blocking mechanism anyway. @queue << [:index, *args] end
stop()
click to toggle source
Stop the stream
# File lib/jruby-elasticsearch/bulkstream.rb, line 53 def stop @queue << nil @stop = true end
Private Instance Methods
run()
click to toggle source
The stream runner.
# File lib/jruby-elasticsearch/bulkstream.rb, line 31 def run # TODO(sissel): Make a way to shutdown this thread. while true requests = [] if @queue.size == @queue_size # queue full, flush now. flush else # Not full, so sleep and flush anyway. sleep(@flush_interval) flush end if @stop and @queue.size == 0 # Queue empty and it's time to stop. break end end # while true end