class Bricolage::SQSDataSource::DeleteMessageBuffer
Constants
- BATCH_SIZE_MAX
- MAX_RETRY_COUNT
Public Class Methods
new(sqs_client, url, logger)
click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 214 def initialize(sqs_client, url, logger) @sqs_client = sqs_client @url = url @logger = logger @buf = {} end
Public Instance Methods
empty?()
click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 227 def empty? @buf.empty? end
flush(now = Time.now)
click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 245 def flush(now = Time.now) entries = @buf.values.select {|ent| ent.issuable?(now) } return if entries.empty? @logger.info "flushing async delete requests" entries.each_slice(BATCH_SIZE_MAX) do |ents| res = @sqs_client.delete_message_batch(queue_url: @url, entries: ents.map(&:request_params)) @logger.info "DeleteMessageBatch executed: #{res.successful.size} succeeded, #{res.failed.size} failed" issued_time = Time.now res.successful.each do |s| @buf.delete s.id end res.failed.each do |f| ent = @buf[f.id] unless ent @logger.error "[BUG] no corrensponding DeleteMessageBuffer entry: id=#{f.id}" next end ent.failed!(issued_time) if ent.too_many_failure? @logger.warn "DeleteMessage failure count exceeded the limit; give up: message_id=#{ent.message.message_id}, receipt_handle=#{ent.message.receipt_handle}" @buf.delete f.id next end @logger.info "DeleteMessageBatch partially failed (#{ent.n_failure} times): sender_fault=#{f.sender_fault}, code=#{f.code}, message=#{f.message}" end end end
flush_force()
click to toggle source
Flushes all delayed delete requests, including pending requests
# File lib/bricolage/sqsdatasource.rb, line 240 def flush_force # retry continues in only 2m, now+1h must be after than all @next_issue_time flush(Time.now + 3600) end
full?()
click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 231 def full? @buf.size >= BATCH_SIZE_MAX end
put(msg)
click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 221 def put(msg) ent = Entry.new(msg) @buf[ent.id] = ent flush if full? end
size()
click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 235 def size @buf.size end