module NSConnector::ChunkedSearching

Provide threaded and non-threaded chunked searching

Public Instance Methods

grab_chunk(filters, chunk) click to toggle source

Retrieve a single chunk, this makes one HTTP connection

Raises

NSConnector::Errors::EndChunking when there's no more chunks

Returns

Resource objects

# File lib/ns_connector/chunked_searching.rb, line 6
def grab_chunk(filters, chunk)
        NSConnector::Restlet.execute!(
                :action => 'search',
                :type_id => type_id,
                :fields => fields,
                :data => {
                        :filters => filters,
                        :chunk => chunk,
                }
        ).map do |upstream_store|
                self.new(upstream_store)
        end
end
normal_search_by_chunks(filters) click to toggle source

Just keep grabbing incremental chunks till we're told to stop.

# File lib/ns_connector/chunked_searching.rb, line 93
def normal_search_by_chunks(filters)
        results = []
        chunk = 0
        while true
                begin
                        results += grab_chunk(filters, chunk)
                        chunk += 1
                end
        end
rescue NSConnector::Errors::EndChunking
        return results
end
search_by_chunks(filters) click to toggle source

Search by requesting chunks

# File lib/ns_connector/chunked_searching.rb, line 107
def search_by_chunks filters
        if NSConnector::Config[:use_threads] then
                return threaded_search_by_chunks(filters)
        else
                return normal_search_by_chunks(filters)
        end
end
threaded_search_by_chunks(filters) click to toggle source

The basic logic here is, given four threads we have four workers, those workers keep eating chunks of data specified by the master. When a worker recieves a EndChunking error, it flags done as true and everyone wraps up thier work. Pretty simple.

# File lib/ns_connector/chunked_searching.rb, line 24
def threaded_search_by_chunks(filters)
        require 'thread'
        threads = NSConnector::Config[:no_threads].to_i
        if threads < 1 then
                raise NSConnector::Config::ArgumentError, 
                        "Need more than #{threads} threads"
        end

        # We bother pre-populating the queue here because locking is
        # super expensive, on my build of ruby at least.
        queue = Queue.new
        (threads - 1).times do |i|
                queue << i
        end

        mutex = Mutex.new

        workers = []
        results = []
        current_chunk = threads - 1
        done = false

        # Workers
        threads.times do
                workers << Thread.new do
                        until done
                                begin
                                        # Avoid a deadlock by popping
                                        # off -1 to exit
                                        chunk = queue.pop 
                                        break if chunk == -1

                                        result = grab_chunk(
                                                filters, chunk
                                        )
                                rescue NSConnector::Errors::EndChunking
                                        done = true
                                        break
                                rescue Timeout::Error, Errno::ECONNRESET
                                        retry
                                end

                                mutex.synchronize do
                                        results += result
                                end
                        end
                end
        end

        # Master
        until done
                if queue.empty? then
                        queue << current_chunk
                        current_chunk += 1
                end
        end

        threads.times do 
                queue << -1
        end

        workers.each do |worker|
                worker.join
        end

        return results
end