class Hackle::EventProcessor::MessageProcessor
Constants
- DEFAULT_MAX_EVENT_DISPATCH_SIZE
- DEFAULT_MESSAGE_QUEUE_CAPACITY
Public Class Methods
new(config:, event_dispatcher:)
click to toggle source
# File lib/hackle/events/event_processor.rb, line 74 def initialize(config:, event_dispatcher:) @logger = config.logger @event_dispatcher = event_dispatcher @message_queue = SizedQueue.new(DEFAULT_MESSAGE_QUEUE_CAPACITY) @random = Random.new @consumed_events = [] end
Public Instance Methods
consuming_loop()
click to toggle source
# File lib/hackle/events/event_processor.rb, line 92 def consuming_loop loop do message = @message_queue.pop case message when Message::Event consume_event(event: message.event) when Message::Flush dispatch_events when Message::Shutdown break end end rescue => e @logger.warn { "Uncaught exception in events message processor: #{e.inspect}" } ensure dispatch_events end
produce(message:, non_block: true)
click to toggle source
@param message [Message] @param non_block [boolean]
# File lib/hackle/events/event_processor.rb, line 84 def produce(message:, non_block: true) @message_queue.push(message, non_block) rescue ThreadError if @random.rand(1..100) == 1 # log only 1% of the time @logger.warn { 'Events are produced faster than can be consumed. Some events will be dropped.' } end end
Private Instance Methods
consume_event(event:)
click to toggle source
@param event [UserEvent]
# File lib/hackle/events/event_processor.rb, line 113 def consume_event(event:) @consumed_events << event dispatch_events if @consumed_events.length >= DEFAULT_MAX_EVENT_DISPATCH_SIZE end
dispatch_events()
click to toggle source
# File lib/hackle/events/event_processor.rb, line 118 def dispatch_events return if @consumed_events.empty? @event_dispatcher.dispatch(events: @consumed_events) @consumed_events = [] end