class Hackle::EventProcessor
Constants
- DEFAULT_FLUSH_INTERVAL
Public Class Methods
new(config:, event_dispatcher:)
click to toggle source
@param config [Config] @param event_dispatcher [EventDispatcher]
# File lib/hackle/events/event_processor.rb, line 11 def initialize(config:, event_dispatcher:) @logger = config.logger @event_dispatcher = event_dispatcher @message_processor = MessageProcessor.new(config: config, event_dispatcher: event_dispatcher) @flush_task = Concurrent::TimerTask.new(execution_interval: DEFAULT_FLUSH_INTERVAL) { flush } @consume_task = nil @running = false end
Public Instance Methods
flush()
click to toggle source
# File lib/hackle/events/event_processor.rb, line 46 def flush @message_processor.produce(message: Message::Flush.new) end
process(event:)
click to toggle source
@param event [UserEvent]
# File lib/hackle/events/event_processor.rb, line 42 def process(event:) @message_processor.produce(message: Message::Event.new(event)) end
start!()
click to toggle source
# File lib/hackle/events/event_processor.rb, line 20 def start! return if @running @consume_task = Thread.new { @message_processor.consuming_loop } @flush_task.execute @running = true end
stop!()
click to toggle source
# File lib/hackle/events/event_processor.rb, line 28 def stop! return unless @running @logger.info { 'Shutting down Hackle event_processor' } @message_processor.produce(message: Message::Shutdown.new, non_block: false) @consume_task.join(10) @flush_task.shutdown @event_dispatcher.shutdown @running = false end