class Bricolage::StreamingLoad::TaskHandler
Attributes
job_class[W]
logger[R]
Public Class Methods
create_pid_file(path)
click to toggle source
# File lib/bricolage/streamingload/taskhandler.rb, line 92 def TaskHandler.create_pid_file(path) File.open(path, 'w') {|f| f.puts $$ } rescue # ignore end
main()
click to toggle source
# File lib/bricolage/streamingload/taskhandler.rb, line 19 def TaskHandler.main Raven.capture_message("loader start") opts = TaskHandlerOptions.new(ARGV) opts.parse unless opts.rest_arguments.size <= 1 $stderr.puts opts.usage exit 1 end config_path = opts.rest_arguments[0] || "#{opts.working_dir}/config/#{opts.environment}/streamingload.yml" config = YAML.load(File.read(config_path)) logger = opts.log_file_path ? new_logger(opts.log_file_path, config) : nil ctx = Context.for_application(opts.working_dir, environment: opts.environment, logger: logger) ctl_ds = ctx.get_data_source('sql', config.fetch('ctl-postgres-ds', 'db_ctl')) data_ds = ctx.get_data_source('sql', config.fetch('redshift-ds', 'db_data')) task_queue = ctx.get_data_source('sqs', config.fetch('task-queue-ds', 'sqs_task')) log_table = config.fetch('log-table', 'strload_load_logs') service_logger = if config.key?('alert-level') new_alerting_logger(ctx, config) else ctx.logger end task_handler = new( context: ctx, ctl_ds: ctl_ds, data_ds: data_ds, log_table: log_table, task_queue: task_queue, working_dir: opts.working_dir, logger: service_logger, job_class: opts.noop? ? NoopJob : Job ) if opts.task_id # Single task mode task_handler.execute_task_by_id opts.task_id, force: opts.force? else # Server mode Process.daemon(true) if opts.daemon? Dir.chdir '/' create_pid_file opts.pid_file_path if opts.pid_file_path begin service_logger.info "*** bricolage-streaming-loader started: pid=#{$$}" task_handler.event_loop service_logger.info "*** bricolage-streaming-loader shutdown gracefully: pid=#{$$}" rescue Exception => ex service_logger.exception(ex) service_logger.error "*** bricolage-streaming-loader abort: pid=#{$$}" raise end end end
new(context:, ctl_ds:, data_ds:, log_table:, task_queue:, working_dir:, logger:, job_class: Job)
click to toggle source
# File lib/bricolage/streamingload/taskhandler.rb, line 100 def initialize(context:, ctl_ds:, data_ds:, log_table:, task_queue:, working_dir:, logger:, job_class: Job) @ctx = context @ctl_ds = ctl_ds @data_ds = data_ds @log_table = log_table @task_queue = task_queue @working_dir = working_dir @logger = logger @job_class = job_class end
new_alerting_logger(ctx, config)
click to toggle source
# File lib/bricolage/streamingload/taskhandler.rb, line 84 def TaskHandler.new_alerting_logger(ctx, config) AlertingLogger.new( logger: ctx.logger, sns_datasource: ctx.get_data_source('sns', config.fetch('sns-ds', 'sns')), alert_level: config.fetch('alert-level', 'warn') ) end
new_logger(path, config)
click to toggle source
# File lib/bricolage/streamingload/taskhandler.rb, line 76 def TaskHandler.new_logger(path, config) Logger.new( device: path, rotation_period: config.fetch('log-rotation-period', 'daily'), rotation_size: config.fetch('log-rotation-size', nil) ) end
Public Instance Methods
event_loop()
click to toggle source
# File lib/bricolage/streamingload/taskhandler.rb, line 118 def event_loop @task_queue.handle_messages(handler: self, message_class: LoaderMessage) end
execute_task_by_id(task_id, force: false)
click to toggle source
# File lib/bricolage/streamingload/taskhandler.rb, line 113 def execute_task_by_id(task_id, force: false) job = new_job(task_id, force) job.execute(fail_fast: true) end
handle_streaming_load_v3(t)
click to toggle source
message handler
# File lib/bricolage/streamingload/taskhandler.rb, line 129 def handle_streaming_load_v3(t) Dir.chdir(@working_dir) { job = new_job(t.task_id, t.force?) if job.execute @task_queue.delete_message(t) end } rescue Exception => ex @logger.exception ex end
handle_unknown(t)
click to toggle source
message handler
# File lib/bricolage/streamingload/taskhandler.rb, line 123 def handle_unknown(t) @logger.warn "unknown task: #{t.message_body}" @task_queue.delete_message t end
job_class()
click to toggle source
# File lib/bricolage/streamingload/taskhandler.rb, line 144 def job_class @job_class ||= Job end
new_job(task_id, force)
click to toggle source
# File lib/bricolage/streamingload/taskhandler.rb, line 140 def new_job(task_id, force) @job_class.new(context: @ctx, ctl_ds: @ctl_ds, data_ds: @data_ds, log_table: @log_table, task_id: task_id, force: force, logger: @logger) end