class Bricolage::SQSDataSource::DeleteMessageBuffer

Constants

BATCH_SIZE_MAX
MAX_RETRY_COUNT

Public Class Methods

new(sqs_client, url, logger) click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 214
def initialize(sqs_client, url, logger)
  @sqs_client = sqs_client
  @url = url
  @logger = logger
  @buf = {}
end

Public Instance Methods

empty?() click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 227
def empty?
  @buf.empty?
end
flush(now = Time.now) click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 245
def flush(now = Time.now)
  entries = @buf.values.select {|ent| ent.issuable?(now) }
  return if entries.empty?
  @logger.info "flushing async delete requests"
  entries.each_slice(BATCH_SIZE_MAX) do |ents|
    res = @sqs_client.delete_message_batch(queue_url: @url, entries: ents.map(&:request_params))
    @logger.info "DeleteMessageBatch executed: #{res.successful.size} succeeded, #{res.failed.size} failed"
    issued_time = Time.now
    res.successful.each do |s|
      @buf.delete s.id
    end
    res.failed.each do |f|
      ent = @buf[f.id]
      unless ent
        @logger.error "[BUG] no corrensponding DeleteMessageBuffer entry: id=#{f.id}"
        next
      end
      ent.failed!(issued_time)
      if ent.too_many_failure?
        @logger.warn "DeleteMessage failure count exceeded the limit; give up: message_id=#{ent.message.message_id}, receipt_handle=#{ent.message.receipt_handle}"
        @buf.delete f.id
        next
      end
      @logger.info "DeleteMessageBatch partially failed (#{ent.n_failure} times): sender_fault=#{f.sender_fault}, code=#{f.code}, message=#{f.message}"
    end
  end
end
flush_force() click to toggle source

Flushes all delayed delete requests, including pending requests

# File lib/bricolage/sqsdatasource.rb, line 240
def flush_force
  # retry continues in only 2m, now+1h must be after than all @next_issue_time
  flush(Time.now + 3600)
end
full?() click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 231
def full?
  @buf.size >= BATCH_SIZE_MAX
end
put(msg) click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 221
def put(msg)
  ent = Entry.new(msg)
  @buf[ent.id] = ent
  flush if full?
end
size() click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 235
def size
  @buf.size
end