class Hackle::EventProcessor

Constants

DEFAULT_FLUSH_INTERVAL

Public Class Methods

new(config:, event_dispatcher:) click to toggle source

@param config [Config] @param event_dispatcher [EventDispatcher]

# File lib/hackle/events/event_processor.rb, line 11
def initialize(config:, event_dispatcher:)
  @logger = config.logger
  @event_dispatcher = event_dispatcher
  @message_processor = MessageProcessor.new(config: config, event_dispatcher: event_dispatcher)
  @flush_task = Concurrent::TimerTask.new(execution_interval: DEFAULT_FLUSH_INTERVAL) { flush }
  @consume_task = nil
  @running = false
end

Public Instance Methods

flush() click to toggle source
# File lib/hackle/events/event_processor.rb, line 46
def flush
  @message_processor.produce(message: Message::Flush.new)
end
process(event:) click to toggle source

@param event [UserEvent]

# File lib/hackle/events/event_processor.rb, line 42
def process(event:)
  @message_processor.produce(message: Message::Event.new(event))
end
start!() click to toggle source
# File lib/hackle/events/event_processor.rb, line 20
def start!
  return if @running

  @consume_task = Thread.new { @message_processor.consuming_loop }
  @flush_task.execute
  @running = true
end
stop!() click to toggle source
# File lib/hackle/events/event_processor.rb, line 28
def stop!
  return unless @running

  @logger.info { 'Shutting down Hackle event_processor' }

  @message_processor.produce(message: Message::Shutdown.new, non_block: false)
  @consume_task.join(10)
  @flush_task.shutdown
  @event_dispatcher.shutdown

  @running = false
end