class DispatchRider::QueueServices::AwsSqs
Attributes
visibility_timeout[R]
Public Instance Methods
assign_storage(attrs)
click to toggle source
# File lib/dispatch-rider/queue_services/aws_sqs.rb, line 16 def assign_storage(attrs) sqs = Aws::SQS::Client.new(logger: nil) if attrs[:name].present? url = sqs.list_queues({queue_name_prefix: attrs[:name]}).queue_urls.first set_visibility_timeout(sqs,url) Aws::SQS::Queue.new(url: url, client: sqs) elsif attrs[:url].present? set_visibility_timeout(sqs,attrs[:url]) Aws::SQS::Queue.new(url: attrs[:url], client: sqs) else raise RecordInvalid.new(self, ["Either name or url have to be specified"]) end end
construct_message_from(item)
click to toggle source
# File lib/dispatch-rider/queue_services/aws_sqs.rb, line 51 def construct_message_from(item) deserialize(MessageBodyExtractor.new(item).extract) end
delete(item)
click to toggle source
# File lib/dispatch-rider/queue_services/aws_sqs.rb, line 55 def delete(item) item.delete end
insert(item)
click to toggle source
# File lib/dispatch-rider/queue_services/aws_sqs.rb, line 47 def insert(item) queue.send_message(item) end
pop() { |obj| ... }
click to toggle source
# File lib/dispatch-rider/queue_services/aws_sqs.rb, line 30 def pop raw_item = queue.receive_messages({max_number_of_messages: 1}).first if raw_item.present? obj = SqsReceivedMessage.new(construct_message_from(raw_item), raw_item, queue, visibility_timeout) visibility_timeout_shield(obj) do raise AbortExecution, "false received from handler" unless yield(obj) obj end Retriable.retriable(tries: 3) { raw_item.delete } end rescue AbortExecution # ignore, it was already handled, just need to break out if pop end
size()
click to toggle source
# File lib/dispatch-rider/queue_services/aws_sqs.rb, line 59 def size queue.approximate_number_of_messages end
Private Instance Methods
set_visibility_timeout(client,url)
click to toggle source
# File lib/dispatch-rider/queue_services/aws_sqs.rb, line 67 def set_visibility_timeout(client,url) resp = client.get_queue_attributes(queue_url: url, attribute_names: ["VisibilityTimeout"]) @visibility_timeout = resp.attributes["VisibilityTimeout"] end
visibility_timeout_shield(message) { || ... }
click to toggle source
# File lib/dispatch-rider/queue_services/aws_sqs.rb, line 72 def visibility_timeout_shield(message) yield ensure duration = Time.now - message.start_time timeout = message.total_timeout if duration > timeout message = "message: #{message.subject}, #{message.body.inspect} took #{duration} seconds while the timeout was #{timeout}" raise VisibilityTimeoutExceeded, message end end