class Bricolage::SQSDataSource
Attributes
access_key_id[R]
max_number_of_messages[R]
region[R]
secret_access_key[R]
url[R]
visibility_timeout[R]
wait_time_seconds[R]
Public Class Methods
new(region:, url:, access_key_id: nil, secret_access_key: nil, visibility_timeout:, max_number_of_messages: 10, wait_time_seconds: 20, noop: false)
click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 13 def initialize(region:, url:, access_key_id: nil, secret_access_key: nil, visibility_timeout:, max_number_of_messages: 10, wait_time_seconds: 20, noop: false) @region = region @url = url @access_key_id = access_key_id @secret_access_key = secret_access_key @visibility_timeout = visibility_timeout @max_number_of_messages = max_number_of_messages @wait_time_seconds = wait_time_seconds @noop = noop end
new_mock(**args)
click to toggle source
# File lib/bricolage/sqsmock.rb, line 6 def SQSDataSource.new_mock(**args) SQSDataSource.new( region: 'ap-northeast-1', url: 'http://sqs/000000000000/queue-name', access_key_id: 'access_key_id_1', secret_access_key: 'secret_access_key_1', visibility_timeout: 30 ).tap {|ds| logger = NullLogger.new #logger = Bricolage::Logger.default ds.__send__(:initialize_base, 'name', nil, logger) ds.instance_variable_set(:@client, SQSMock::Client.new(**args)) } end
Public Instance Methods
client()
click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 34 def client @client ||= begin c = @noop ? DummySQSClient.new : Aws::SQS::Client.new(region: @region, access_key_id: @access_key_id, secret_access_key: @secret_access_key) SQSClientWrapper.new(c, logger: logger) end end
delete_message(msg)
click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 184 def delete_message(msg) client.delete_message( queue_url: @url, receipt_handle: msg.receipt_handle ) end
delete_message_async(msg)
click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 191 def delete_message_async(msg) delete_message_buffer.put(msg) end
handle_messages(handler:, message_class:)
click to toggle source
High-Level Polling Interface
# File lib/bricolage/sqsdatasource.rb, line 45 def handle_messages(handler:, message_class:) trap_signals polling_loop do result = poll or next true msgs = message_class.for_sqs_result(result) msgs.each do |msg| handler.handle(msg) end handler.after_message_batch break if terminating? msgs.empty? end end
initiate_terminate()
click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 70 def initiate_terminate # No I/O allowed in this method @terminating = true end
poll()
click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 100 def poll result = receive_messages() unless result and result.successful? logger.error "ReceiveMessage failed: #{result ? result.error.message : '(result=nil)'}" return nil end logger.info "receive #{result.messages.size} messages" result end
process_async_delete(now = Time.now)
click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 195 def process_async_delete(now = Time.now) delete_message_buffer.flush(now) end
process_async_delete_force()
click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 199 def process_async_delete_force delete_message_buffer.flush_force end
put(msg)
click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 161 def put(msg) send_message(msg) end
receive_messages()
click to toggle source
API-Level Interface
# File lib/bricolage/sqsdatasource.rb, line 151 def receive_messages result = client.receive_message( queue_url: @url, max_number_of_messages: @max_number_of_messages, visibility_timeout: @visibility_timeout, wait_time_seconds: @wait_time_seconds ) result end
send_event(name, source: SQSMessage::SQS_EVENT_SOURCE, time: Time.now, **attrs)
click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 177 def send_event(name, source: SQSMessage::SQS_EVENT_SOURCE, time: Time.now, **attrs) attrs['eventName'] = name attrs['eventSource'] = source attrs['eventTime'] = time.iso8601 send_object(attrs) end
send_message(msg)
click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 165 def send_message(msg) client.send_message( queue_url: @url, message_body: { 'Records' => [msg.body] }.to_json, delay_seconds: msg.delay_seconds ) end
send_object(obj)
click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 173 def send_object(obj) send_message(ObjectMessage.new(obj)) end
terminating?()
click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 75 def terminating? @terminating end
Private Instance Methods
delete_message_buffer()
click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 205 def delete_message_buffer @delete_message_buffer ||= DeleteMessageBuffer.new(client, @url, logger) end
insert_handler_wait(n_failure)
click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 93 def insert_handler_wait(n_failure) sec = 2 ** [n_failure, 6].min # max 64s logger.info "queue wait: sleep #{sec}" if n_failure > 0 sleep sec end
polling_loop() { || ... }
click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 79 def polling_loop n_failure = 0 while true failed = yield if failed n_failure += 1 else n_failure = 0 end insert_handler_wait(n_failure) end end
trap_signals()
click to toggle source
# File lib/bricolage/sqsdatasource.rb, line 59 def trap_signals # Allows graceful stop Signal.trap(:TERM) { initiate_terminate } Signal.trap(:INT) { initiate_terminate } end