class Bricolage::StreamingLoad::Dispatcher
Attributes
logger[R]
Public Class Methods
_main()
click to toggle source
# File lib/bricolage/streamingload/dispatcher.rb, line 30 def Dispatcher._main opts = DispatcherOptions.new(ARGV) opts.parse unless opts.rest_arguments.size == 1 $stderr.puts opts.usage exit 1 end config_path, * = opts.rest_arguments config = YAML.load(File.read(config_path)) log = opts.log_file_path ? new_logger(File.expand_path(opts.log_file_path), config) : nil ctx = Context.for_application('.', environment: opts.environment, logger: log) logger = raw_logger = ctx.logger event_queue = ctx.get_data_source('sqs', config.fetch('event-queue-ds', 'sqs_event')) task_queue = ctx.get_data_source('sqs', config.fetch('task-queue-ds', 'sqs_task')) if config['alert-level'] logger = AlertingLogger.new( logger: raw_logger, sns_datasource: ctx.get_data_source('sns', config.fetch('sns-ds', 'sns')), alert_level: config.fetch('alert-level', 'warn') ) end chunk_buffer = ChunkBuffer.new( control_data_source: ctx.get_data_source('sql', config.fetch('ctl-postgres-ds', 'db_ctl')), logger: logger ) chunk_router = ChunkRouter.for_config(config.fetch('url_patterns')) task_logger = LoadTaskLogger.new( ctx.get_data_source('s3', config.fetch('ctl-s3-ds', 's3_ctl')) ) dispatcher = Dispatcher.new( event_queue: event_queue, task_queue: task_queue, chunk_router: chunk_router, chunk_buffer: chunk_buffer, task_logger: task_logger, dispatch_interval: config.fetch('dispatch-interval', 60), logger: logger ) if opts.task_id dispatcher.dispatch_tasks chunk_buffer.load_tasks_by_id([opts.task_id]) exit 0 end Process.daemon(true) if opts.daemon? create_pid_file opts.pid_file_path if opts.pid_file_path Dir.chdir '/' begin dispatcher.event_loop rescue Exception => e logger.exception e logger.error "dispatcher abort: pid=#{$$}" raise end end
create_pid_file(path)
click to toggle source
# File lib/bricolage/streamingload/dispatcher.rb, line 98 def Dispatcher.create_pid_file(path) File.open(path, 'w') {|f| f.puts $$ } rescue # ignore end
main()
click to toggle source
# File lib/bricolage/streamingload/dispatcher.rb, line 23 def Dispatcher.main Raven.capture_message("dispatcher start") Raven.capture { _main } end
new(event_queue:, task_queue:, chunk_router:, chunk_buffer:, task_logger:, dispatch_interval:, logger:)
click to toggle source
# File lib/bricolage/streamingload/dispatcher.rb, line 106 def initialize(event_queue:, task_queue:, chunk_router:, chunk_buffer:, task_logger:, dispatch_interval:, logger:) @event_queue = event_queue @task_queue = task_queue @chunk_router = chunk_router @chunk_buffer = chunk_buffer @task_logger = task_logger @dispatch_interval = dispatch_interval @dispatch_message_id = nil @logger = logger @dispatch_requested = false @checkpoint_requested = false end
new_logger(path, config)
click to toggle source
# File lib/bricolage/streamingload/dispatcher.rb, line 90 def Dispatcher.new_logger(path, config) Logger.new( device: path, rotation_period: config.fetch('log-rotation-period', 'daily'), rotation_size: config.fetch('log-rotation-size', nil) ) end
Public Instance Methods
after_message_batch()
click to toggle source
override
# File lib/bricolage/streamingload/dispatcher.rb, line 130 def after_message_batch # must be processed first @event_queue.process_async_delete if @dispatch_requested logger.info "*** dispatch requested" do_handle_dispatch @dispatch_requested = false end if @checkpoint_requested do_handle_checkpoint @checkpoint_requested = false # is needless, but reset it just in case end end
dispatch_tasks(tasks)
click to toggle source
# File lib/bricolage/streamingload/dispatcher.rb, line 216 def dispatch_tasks(tasks) tasks.each do |task| msg = StreamingLoadV3LoaderMessage.for_load_task(task) @task_queue.put msg @task_logger.log task end end
do_handle_checkpoint()
click to toggle source
# File lib/bricolage/streamingload/dispatcher.rb, line 166 def do_handle_checkpoint logger.info "*** checkpoint requested" logger.info "Force-flushing all objects..." tasks = @chunk_buffer.flush_all dispatch_tasks tasks logger.info "All objects flushed; shutting down..." @event_queue.initiate_terminate end
do_handle_dispatch()
click to toggle source
# File lib/bricolage/streamingload/dispatcher.rb, line 194 def do_handle_dispatch tasks = @chunk_buffer.flush_partial dispatch_tasks tasks set_dispatch_timer end
event_loop()
click to toggle source
# File lib/bricolage/streamingload/dispatcher.rb, line 121 def event_loop logger.info "*** dispatcher started: pid=#{$$}" set_dispatch_timer @event_queue.handle_messages(handler: self, message_class: DispatcherMessage) @event_queue.process_async_delete_force logger.info "*** shutdown gracefully: pid=#{$$}" end
handle_checkpoint(msg)
click to toggle source
# File lib/bricolage/streamingload/dispatcher.rb, line 158 def handle_checkpoint(msg) # Delay creating CHECKPOINT after the current message batch, # because any other extra events are already received. @checkpoint_requested = true # Delete this event immediately @event_queue.delete_message(msg) end
handle_data(msg)
click to toggle source
# File lib/bricolage/streamingload/dispatcher.rb, line 175 def handle_data(msg) unless msg.created_event? @event_queue.delete_message_async(msg) return end chunk = @chunk_router.route(msg) @chunk_buffer.save(chunk) @event_queue.delete_message_async(msg) end
handle_dispatch(msg)
click to toggle source
# File lib/bricolage/streamingload/dispatcher.rb, line 185 def handle_dispatch(msg) # Dispatching tasks may takes 10 minutes or more, it can exceeds visibility timeout. # To avoid this, delay dispatching until all events of current message batch are processed. if @dispatch_message_id == msg.message_id @dispatch_requested = true end @event_queue.delete_message_async(msg) end
handle_flushtable(msg)
click to toggle source
# File lib/bricolage/streamingload/dispatcher.rb, line 205 def handle_flushtable(msg) # FIXME: badly named attribute. table_name is really stream_name, which is called as data_source_id, too. stream_name = msg.table_name logger.info "*** flushtable requested: stream_name=#{stream_name}" tasks = @chunk_buffer.flush_stream(stream_name) dispatch_tasks tasks # Delete this event immediately @event_queue.delete_message(msg) end
handle_shutdown(msg)
click to toggle source
# File lib/bricolage/streamingload/dispatcher.rb, line 151 def handle_shutdown(msg) logger.info "*** shutdown requested" @event_queue.initiate_terminate # Delete this event immediately @event_queue.delete_message(msg) end
handle_unknown(msg)
click to toggle source
# File lib/bricolage/streamingload/dispatcher.rb, line 146 def handle_unknown(msg) logger.warn "unknown event: #{msg.message_body}" @event_queue.delete_message_async(msg) end
set_dispatch_timer()
click to toggle source
# File lib/bricolage/streamingload/dispatcher.rb, line 200 def set_dispatch_timer res = @event_queue.send_message(DispatchDispatcherMessage.create(delay_seconds: @dispatch_interval)) @dispatch_message_id = res.message_id end