class Hackle::EventProcessor::MessageProcessor

Constants

DEFAULT_MAX_EVENT_DISPATCH_SIZE
DEFAULT_MESSAGE_QUEUE_CAPACITY

Public Class Methods

new(config:, event_dispatcher:) click to toggle source
# File lib/hackle/events/event_processor.rb, line 74
def initialize(config:, event_dispatcher:)
  @logger = config.logger
  @event_dispatcher = event_dispatcher
  @message_queue = SizedQueue.new(DEFAULT_MESSAGE_QUEUE_CAPACITY)
  @random = Random.new
  @consumed_events = []
end

Public Instance Methods

consuming_loop() click to toggle source
# File lib/hackle/events/event_processor.rb, line 92
def consuming_loop
  loop do
    message = @message_queue.pop
    case message
    when Message::Event
      consume_event(event: message.event)
    when Message::Flush
      dispatch_events
    when Message::Shutdown
      break
    end
  end
rescue => e
  @logger.warn { "Uncaught exception in events message processor: #{e.inspect}" }
ensure
  dispatch_events
end
produce(message:, non_block: true) click to toggle source

@param message [Message] @param non_block [boolean]

# File lib/hackle/events/event_processor.rb, line 84
def produce(message:, non_block: true)
  @message_queue.push(message, non_block)
rescue ThreadError
  if @random.rand(1..100) == 1 # log only 1% of the time
    @logger.warn { 'Events are produced faster than can be consumed. Some events will be dropped.' }
  end
end

Private Instance Methods

consume_event(event:) click to toggle source

@param event [UserEvent]

# File lib/hackle/events/event_processor.rb, line 113
def consume_event(event:)
  @consumed_events << event
  dispatch_events if @consumed_events.length >= DEFAULT_MAX_EVENT_DISPATCH_SIZE
end
dispatch_events() click to toggle source
# File lib/hackle/events/event_processor.rb, line 118
def dispatch_events
  return if @consumed_events.empty?

  @event_dispatcher.dispatch(events: @consumed_events)
  @consumed_events = []
end