class FastlyNsq::Feeder

FastlyNsq::Feeder is a queue interface wrapper for the manager's thread pool. This allows a consumer read loop to post a message directly to a processor (FastlyNsq::Listener) with a specified priority.

Attributes

priority[R]
processor[R]

Public Class Methods

new(processor, priority) click to toggle source

Create a FastlyNsq::Feeder @param processor [FastlyNsq::Listener] @param priority [Numeric]

# File lib/fastly_nsq/feeder.rb, line 14
def initialize(processor, priority)
  @processor = processor
  @priority = priority
end

Public Instance Methods

push(message) click to toggle source

Send a message to the processor with specified priority

This will post to the FastlyNsq.manager.pool with a queue priority and block that will +call+ed. FastlyNsq.manager.pool is a PriorityThreadPool which is a Concurrent::ThreadPoolExecutor that has @queue which in turn is a priority queue that manages job priority

The ThreadPoolExecutor is what actually works the @queue and sends call to the queued Proc. When that code is exec'ed +processer.call(message)+ is run. Processor in this context is a FastlyNsq::Listener

The block also will log exceptions here because Concurrent::ThreadPoolExecutor will swallow the exception.

@param message [Nsq::Message] @see ruby-concurrency.github.io/concurrent-ruby/1.0.5/Concurrent/ThreadPoolExecutor.html#post-instance_method @see Nsq::Connection#read_loop

# File lib/fastly_nsq/feeder.rb, line 37
def push(message)
  FastlyNsq.manager.pool.post(priority) do
    begin
      processor.call(message)
    rescue => ex
      FastlyNsq.logger.error ex
      FastlyNsq.tracer.notice_error ex
      raise ex
    end
  end
end