class Bricolage::StreamingLoad::ChunkBuffer

Constants

TASK_GENERATION_TIME_LIMIT

Public Class Methods

new(control_data_source:, logger:) click to toggle source
# File lib/bricolage/streamingload/chunkbuffer.rb, line 15
def initialize(control_data_source:, logger:)
  @ctl_ds = control_data_source
  @logger = logger
  @task_generation_time_limit = TASK_GENERATION_TIME_LIMIT
end

Public Instance Methods

flush_all() click to toggle source

Flushes all chunks of all stream with no additional conditions, to create “system checkpoint”.

# File lib/bricolage/streamingload/chunkbuffer.rb, line 58
def flush_all
  all_task_ids = []
  tasks = nil

  @ctl_ds.open {|conn|
    conn.transaction {|txn|
      # update_task_objects may not consume all saved objects
      # (e.g. there are too many objects for one table), we must create
      # tasks repeatedly until all objects are flushed.
      until (task_ids = insert_tasks(conn, force: true)).empty?
        update_task_objects(conn, task_ids)
        all_task_ids.concat task_ids
      end
    }
    log_task_ids(all_task_ids)
    tasks = load_tasks(conn, all_task_ids)
  }
  tasks
end
flush_partial() click to toggle source

Flushes chunks of multiple streams, which are met conditions.

# File lib/bricolage/streamingload/chunkbuffer.rb, line 39
def flush_partial
  task_ids = nil
  tasks = nil

  @ctl_ds.open {|conn|
    warn_slow_task_generation {
      conn.transaction {|txn|
        task_ids = insert_tasks(conn)
        update_task_objects(conn, task_ids) unless task_ids.empty?
      }
    }
    log_task_ids(task_ids)
    tasks = load_tasks(conn, task_ids)
  }
  tasks
end
flush_stream(stream_name) click to toggle source

Flushes all chunks of the specified stream with no additional conditions, to create “stream checkpoint”.

# File lib/bricolage/streamingload/chunkbuffer.rb, line 80
def flush_stream(stream_name)
  all_task_ids = []
  tasks = nil

  @ctl_ds.open {|conn|
    conn.transaction {|txn|
      # update_task_objects may not consume all saved objects
      # (e.g. there are too many objects for one table), we must create
      # tasks repeatedly until all objects are flushed.
      until (task_ids = insert_tasks_for_stream(conn, stream_name)).empty?
        update_task_objects(conn, task_ids)
        all_task_ids.concat task_ids
      end
    }
    log_task_ids(all_task_ids)
    tasks = load_tasks(conn, all_task_ids)
  }
  tasks
end
load_tasks_by_id(ids) click to toggle source
# File lib/bricolage/streamingload/chunkbuffer.rb, line 100
def load_tasks_by_id(ids)
  @ctl_ds.open {|conn|
    return load_tasks(conn, ids)
  }
end
save(chunk) click to toggle source
chunk

IncomingChunk

# File lib/bricolage/streamingload/chunkbuffer.rb, line 22
def save(chunk)
  @ctl_ds.open {|conn|
    suppress_sql_logging {
      conn.transaction {
        object_id = insert_object(conn, chunk)
        if object_id
          insert_task_objects(conn, object_id)
        else
          @logger.info "Duplicated object recieved: url=#{chunk.url}"
          insert_dup_object(conn, chunk)
        end
      }
    }
  }
end

Private Instance Methods

insert_dup_object(conn, obj) click to toggle source
# File lib/bricolage/streamingload/chunkbuffer.rb, line 134
      def insert_dup_object(conn, obj)
        conn.update(<<-EndSQL)
            insert into strload_dup_objects
                ( object_url
                , object_size
                , data_source_id
                , message_id
                , event_time
                , submit_time
                )
            values
                ( #{s obj.url}
                , #{obj.size}
                , #{s obj.stream_name}
                , #{s obj.message_id}
                , '#{obj.event_time}' AT TIME ZONE 'JST'
                , current_timestamp
                )
            ;
        EndSQL
      end
insert_object(conn, obj) click to toggle source
# File lib/bricolage/streamingload/chunkbuffer.rb, line 108
      def insert_object(conn, obj)
        object_ids = conn.query_values(<<-EndSQL)
            insert into strload_objects
                ( object_url
                , object_size
                , data_source_id
                , message_id
                , event_time
                , submit_time
                )
            values
                ( #{s obj.url}
                , #{obj.size}
                , #{s obj.stream_name}
                , #{s obj.message_id}
                , '#{obj.event_time}' AT TIME ZONE 'JST'
                , current_timestamp
                )
            on conflict on constraint strload_objects_object_url
            do nothing
            returning object_id
            ;
        EndSQL
        return object_ids.first
      end
insert_task_objects(conn, object_id) click to toggle source
# File lib/bricolage/streamingload/chunkbuffer.rb, line 156
      def insert_task_objects(conn, object_id)
        conn.update(<<-EndSQL)
            insert into strload_task_objects
                ( task_id
                , object_id
                )
            values
                ( -1
                , #{object_id}
                )
            ;
        EndSQL
      end
insert_tasks(conn, force: false) click to toggle source
# File lib/bricolage/streamingload/chunkbuffer.rb, line 170
      def insert_tasks(conn, force: false)
        task_ids = conn.query_values(<<-EndSQL)
            insert into strload_tasks
                ( task_class
                , table_id
                , submit_time
                )
            select
                'streaming_load_v3'
                , tbl.table_id
                , current_timestamp
            from
                strload_tables tbl

                -- number of objects not assigned to a task for each schema_name.table_name (> 0)
                inner join (
                    select
                        data_source_id
                        , count(*) as object_count
                    from
                        strload_objects
                    where
                        object_id in (select object_id from strload_task_objects where task_id = -1)
                    group by
                        data_source_id
                ) obj
                using (data_source_id)

                -- preceeding task's submit time
                left outer join (
                    select
                        table_id
                        , max(submit_time) as latest_submit_time
                    from
                        strload_tasks
                    group by
                        table_id
                ) task
                using (table_id)
            where
                not tbl.disabled -- not disabled
                and (
                    #{force ? "true or" : ""}                                                      -- Creates tasks with no conditions if forced
                    obj.object_count > tbl.load_batch_size                                         -- batch_size exceeded?
                    or extract(epoch from current_timestamp - task.latest_submit_time) > tbl.load_interval  -- load_interval exceeded?
                    or task.latest_submit_time is null                                             -- no previous tasks?
                )
            returning task_id
            ;
        EndSQL

        task_ids
      end
insert_tasks_for_stream(conn, stream_name) click to toggle source
# File lib/bricolage/streamingload/chunkbuffer.rb, line 224
      def insert_tasks_for_stream(conn, stream_name)
        task_ids = conn.query_values(<<-EndSQL)
            insert into strload_tasks
                ( task_class
                , table_id
                , submit_time
                )
            select
                'streaming_load_v3'
                , tbl.table_id
                , current_timestamp
            from
                strload_tables tbl

                -- The number of objects for each tables, which is not assigned to any task (> 0).
                -- This subquery is covered by the index.
                inner join (
                    select
                        data_source_id
                        , count(*) as object_count
                    from
                        strload_objects
                    where
                        object_id in (select object_id from strload_task_objects where task_id = -1)
                    group by
                        data_source_id
                ) obj
                using (data_source_id)
            where
                -- does not check disabled
                data_source_id = #{s stream_name}
            returning task_id
            ;
        EndSQL

        task_ids
      end
load_tasks(conn, task_ids) click to toggle source
# File lib/bricolage/streamingload/chunkbuffer.rb, line 292
      def load_tasks(conn, task_ids)
        return [] if task_ids.empty?

        records = suppress_sql_logging {
          conn.query_rows(<<-EndSQL)
              select
                  t.task_id
                  , t.object_id
                  , o.object_url
                  , o.object_size
              from
                  strload_task_objects t
                  inner join strload_objects o using (object_id)
              where
                  task_id in (#{task_ids.join(',')})
              ;
          EndSQL
        }

        records.group_by {|row| row['task_id'] }.map {|task_id, rows|
          chunks = rows.map {|row|
            id, url, size = row.values_at('object_id', 'object_url', 'object_size')
            Chunk.new(id: id, url: url, size: size)
          }
          LoadTask.new(id: task_id, chunks: chunks)
        }
      end
log_task_ids(task_ids) click to toggle source
# File lib/bricolage/streamingload/chunkbuffer.rb, line 331
def log_task_ids(task_ids)
  created_task_num = task_ids.size
  @logger.info "Number of task created: #{created_task_num}"
  @logger.info "Created task ids: #{task_ids}" if created_task_num > 0
end
suppress_sql_logging() { || ... } click to toggle source
# File lib/bricolage/streamingload/chunkbuffer.rb, line 320
def suppress_sql_logging
  # CLUDGE
  orig = @logger.level
  begin
    @logger.level = Logger::ERROR
    yield
  ensure
    @logger.level = orig
  end
end
update_task_objects(conn, task_ids) click to toggle source
# File lib/bricolage/streamingload/chunkbuffer.rb, line 262
      def update_task_objects(conn, task_ids)
        conn.update(<<-EndSQL)
            update strload_task_objects dst
            set
                task_id = tasks.task_id
            from
                strload_tasks tasks
                inner join strload_tables tables using (table_id)
                inner join (
                    select
                        object_id
                        , data_source_id
                        , row_number() over (partition by data_source_id order by object_id) as object_seq
                    from
                        strload_objects
                    where
                        object_id in (select object_id from strload_task_objects where task_id = -1)
                ) tsk_obj
                using (data_source_id)
            where
                dst.task_id = -1
                and tasks.task_id in (#{task_ids.join(",")})
                and dst.object_id = tsk_obj.object_id
                and tsk_obj.object_seq <= tables.load_batch_size
            ;
        EndSQL
        # UPDATE statement cannot return values
        nil
      end
warn_slow_task_generation() { || ... } click to toggle source
# File lib/bricolage/streamingload/chunkbuffer.rb, line 337
def warn_slow_task_generation(&block)
  start_time = Time.now
  yield
  exec_time = (Time.now - start_time)
  if exec_time > @task_generation_time_limit
    @logger.warn "Long task generation time:  #{exec_time}"
    @task_generation_time_limit = @task_generation_time_limit * 1.1
  end
end