class Bricolage::StreamingLoad::TaskHandler

Attributes

job_class[W]
logger[R]

Public Class Methods

create_pid_file(path) click to toggle source
# File lib/bricolage/streamingload/taskhandler.rb, line 92
def TaskHandler.create_pid_file(path)
  File.open(path, 'w') {|f|
    f.puts $$
  }
rescue
  # ignore
end
main() click to toggle source
# File lib/bricolage/streamingload/taskhandler.rb, line 19
def TaskHandler.main
  Raven.capture_message("loader start")

  opts = TaskHandlerOptions.new(ARGV)
  opts.parse
  unless opts.rest_arguments.size <= 1
    $stderr.puts opts.usage
    exit 1
  end
  config_path = opts.rest_arguments[0] || "#{opts.working_dir}/config/#{opts.environment}/streamingload.yml"
  config = YAML.load(File.read(config_path))

  logger = opts.log_file_path ? new_logger(opts.log_file_path, config) : nil
  ctx = Context.for_application(opts.working_dir, environment: opts.environment, logger: logger)

  ctl_ds = ctx.get_data_source('sql', config.fetch('ctl-postgres-ds', 'db_ctl'))
  data_ds = ctx.get_data_source('sql', config.fetch('redshift-ds', 'db_data'))
  task_queue = ctx.get_data_source('sqs', config.fetch('task-queue-ds', 'sqs_task'))
  log_table = config.fetch('log-table', 'strload_load_logs')
  service_logger =
    if config.key?('alert-level')
      new_alerting_logger(ctx, config)
    else
      ctx.logger
    end

  task_handler = new(
    context: ctx,
    ctl_ds: ctl_ds,
    data_ds: data_ds,
    log_table: log_table,
    task_queue: task_queue,
    working_dir: opts.working_dir,
    logger: service_logger,
    job_class: opts.noop? ? NoopJob : Job
  )

  if opts.task_id
    # Single task mode
    task_handler.execute_task_by_id opts.task_id, force: opts.force?
  else
    # Server mode
    Process.daemon(true) if opts.daemon?
    Dir.chdir '/'
    create_pid_file opts.pid_file_path if opts.pid_file_path
    begin
      service_logger.info "*** bricolage-streaming-loader started: pid=#{$$}"
      task_handler.event_loop
      service_logger.info "*** bricolage-streaming-loader shutdown gracefully: pid=#{$$}"
    rescue Exception => ex
      service_logger.exception(ex)
      service_logger.error "*** bricolage-streaming-loader abort: pid=#{$$}"
      raise
    end
  end
end
new(context:, ctl_ds:, data_ds:, log_table:, task_queue:, working_dir:, logger:, job_class: Job) click to toggle source
# File lib/bricolage/streamingload/taskhandler.rb, line 100
def initialize(context:, ctl_ds:, data_ds:, log_table:, task_queue:, working_dir:, logger:, job_class: Job)
  @ctx = context
  @ctl_ds = ctl_ds
  @data_ds = data_ds
  @log_table = log_table
  @task_queue = task_queue
  @working_dir = working_dir
  @logger = logger
  @job_class = job_class
end
new_alerting_logger(ctx, config) click to toggle source
# File lib/bricolage/streamingload/taskhandler.rb, line 84
def TaskHandler.new_alerting_logger(ctx, config)
  AlertingLogger.new(
    logger: ctx.logger,
    sns_datasource: ctx.get_data_source('sns', config.fetch('sns-ds', 'sns')),
    alert_level: config.fetch('alert-level', 'warn')
  )
end
new_logger(path, config) click to toggle source
# File lib/bricolage/streamingload/taskhandler.rb, line 76
def TaskHandler.new_logger(path, config)
  Logger.new(
    device: path,
    rotation_period: config.fetch('log-rotation-period', 'daily'),
    rotation_size: config.fetch('log-rotation-size', nil)
  )
end

Public Instance Methods

event_loop() click to toggle source
# File lib/bricolage/streamingload/taskhandler.rb, line 118
def event_loop
  @task_queue.handle_messages(handler: self, message_class: LoaderMessage)
end
execute_task_by_id(task_id, force: false) click to toggle source
# File lib/bricolage/streamingload/taskhandler.rb, line 113
def execute_task_by_id(task_id, force: false)
  job = new_job(task_id, force)
  job.execute(fail_fast: true)
end
handle_streaming_load_v3(t) click to toggle source

message handler

# File lib/bricolage/streamingload/taskhandler.rb, line 129
def handle_streaming_load_v3(t)
  Dir.chdir(@working_dir) {
    job = new_job(t.task_id, t.force?)
    if job.execute
      @task_queue.delete_message(t)
    end
  }
rescue Exception => ex
  @logger.exception ex
end
handle_unknown(t) click to toggle source

message handler

# File lib/bricolage/streamingload/taskhandler.rb, line 123
def handle_unknown(t)
  @logger.warn "unknown task: #{t.message_body}"
  @task_queue.delete_message t
end
job_class() click to toggle source
# File lib/bricolage/streamingload/taskhandler.rb, line 144
def job_class
  @job_class ||= Job
end
new_job(task_id, force) click to toggle source
# File lib/bricolage/streamingload/taskhandler.rb, line 140
def new_job(task_id, force)
  @job_class.new(context: @ctx, ctl_ds: @ctl_ds, data_ds: @data_ds, log_table: @log_table, task_id: task_id, force: force, logger: @logger)
end