class Bricolage::SQSDataSource

Attributes

access_key_id[R]
max_number_of_messages[R]
region[R]
secret_access_key[R]
url[R]
visibility_timeout[R]
wait_time_seconds[R]

Public Class Methods

new(region:, url:, access_key_id: nil, secret_access_key: nil, visibility_timeout:, max_number_of_messages: 10, wait_time_seconds: 20, noop: false) click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 13
def initialize(region:, url:, access_key_id: nil, secret_access_key: nil,
    visibility_timeout:, max_number_of_messages: 10, wait_time_seconds: 20, noop: false)
  @region = region
  @url = url
  @access_key_id = access_key_id
  @secret_access_key = secret_access_key
  @visibility_timeout = visibility_timeout
  @max_number_of_messages = max_number_of_messages
  @wait_time_seconds = wait_time_seconds
  @noop = noop
end
new_mock(**args) click to toggle source
# File lib/bricolage/sqsmock.rb, line 6
def SQSDataSource.new_mock(**args)
  SQSDataSource.new(
    region: 'ap-northeast-1',
    url: 'http://sqs/000000000000/queue-name',
    access_key_id: 'access_key_id_1',
    secret_access_key: 'secret_access_key_1',
    visibility_timeout: 30
  ).tap {|ds|
    logger = NullLogger.new
    #logger = Bricolage::Logger.default
    ds.__send__(:initialize_base, 'name', nil, logger)
    ds.instance_variable_set(:@client, SQSMock::Client.new(**args))
  }
end

Public Instance Methods

client() click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 34
def client
  @client ||= begin
    c = @noop ? DummySQSClient.new : Aws::SQS::Client.new(region: @region, access_key_id: @access_key_id, secret_access_key: @secret_access_key)
    SQSClientWrapper.new(c, logger: logger)
  end
end
delete_message(msg) click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 184
def delete_message(msg)
  client.delete_message(
    queue_url: @url,
    receipt_handle: msg.receipt_handle
  )
end
delete_message_async(msg) click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 191
def delete_message_async(msg)
  delete_message_buffer.put(msg)
end
handle_messages(handler:, message_class:) click to toggle source

High-Level Polling Interface

# File lib/bricolage/sqsdatasource.rb, line 45
def handle_messages(handler:, message_class:)
  trap_signals
  polling_loop do
    result = poll or next true
    msgs = message_class.for_sqs_result(result)
    msgs.each do |msg|
      handler.handle(msg)
    end
    handler.after_message_batch
    break if terminating?
    msgs.empty?
  end
end
initiate_terminate() click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 70
def initiate_terminate
  # No I/O allowed in this method
  @terminating = true
end
poll() click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 100
def poll
  result = receive_messages()
  unless result and result.successful?
    logger.error "ReceiveMessage failed: #{result ? result.error.message : '(result=nil)'}"
    return nil
  end
  logger.info "receive #{result.messages.size} messages"
  result
end
process_async_delete(now = Time.now) click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 195
def process_async_delete(now = Time.now)
  delete_message_buffer.flush(now)
end
process_async_delete_force() click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 199
def process_async_delete_force
  delete_message_buffer.flush_force
end
put(msg) click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 161
def put(msg)
  send_message(msg)
end
receive_messages() click to toggle source

API-Level Interface

# File lib/bricolage/sqsdatasource.rb, line 151
def receive_messages
  result = client.receive_message(
    queue_url: @url,
    max_number_of_messages: @max_number_of_messages,
    visibility_timeout: @visibility_timeout,
    wait_time_seconds: @wait_time_seconds
  )
  result
end
send_event(name, source: SQSMessage::SQS_EVENT_SOURCE, time: Time.now, **attrs) click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 177
def send_event(name, source: SQSMessage::SQS_EVENT_SOURCE, time: Time.now, **attrs)
  attrs['eventName'] = name
  attrs['eventSource'] = source
  attrs['eventTime'] = time.iso8601
  send_object(attrs)
end
send_message(msg) click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 165
def send_message(msg)
  client.send_message(
    queue_url: @url,
    message_body: { 'Records' => [msg.body] }.to_json,
    delay_seconds: msg.delay_seconds
  )
end
send_object(obj) click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 173
def send_object(obj)
  send_message(ObjectMessage.new(obj))
end
terminating?() click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 75
def terminating?
  @terminating
end

Private Instance Methods

delete_message_buffer() click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 205
def delete_message_buffer
  @delete_message_buffer ||= DeleteMessageBuffer.new(client, @url, logger)
end
insert_handler_wait(n_failure) click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 93
def insert_handler_wait(n_failure)
  sec = 2 ** [n_failure, 6].min   # max 64s
  logger.info "queue wait: sleep #{sec}" if n_failure > 0
  sleep sec
end
polling_loop() { || ... } click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 79
def polling_loop
  n_failure = 0
  while true
    failed = yield
    if failed
      n_failure += 1
    else
      n_failure = 0
    end
    insert_handler_wait(n_failure)
  end
end
trap_signals() click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 59
def trap_signals
  # Allows graceful stop
  Signal.trap(:TERM) {
    initiate_terminate
  }
  Signal.trap(:INT) {
    initiate_terminate
  }
end