class Warren::Subscriber::Base

A message takes a rabbitMQ message, and handles its acknowledgement or rejection.

Attributes

delivery_info[R]

@return [Bunny::DeliveryInfo] Contains the information necessary for acknowledging the message

fox[R]

@return [Warren::Fox] The fox consumer that provided the message. Used to acknowledge messages

payload[R]

@return [String] The message contents

properties[R]

@return [Bunny::MessageProperties] Contains additional information about the received message

Public Class Methods

new(fox, delivery_info, properties, payload) click to toggle source

Construct a basic subscriber for each received message. Call {#process} to handle to processing of the message

@param fox [Warren::Fox] The fox consumer that provided the message. Used to acknowledge messages @param delivery_info [Bunny::DeliveryInfo] Contains the information necessary for acknowledging the message @param properties [Bunny::MessageProperties] Contains additional information about the received message @param payload [String] The message contents

# File lib/warren/subscriber/base.rb, line 39
def initialize(fox, delivery_info, properties, payload)
  @fox = fox
  @delivery_info = delivery_info
  @properties = properties
  @payload = payload
  @acknowledged = false
end

Public Instance Methods

_process_() click to toggle source

Called by {Warren::Fox} to trigger processing of the message and acknowledgment on success. In most cases the {#process} method should be used to customize behaviour.

@return [Void]

# File lib/warren/subscriber/base.rb, line 51
def _process_
  process
  ack unless @acknowledged
end
dead_letter(exception) click to toggle source

Reject the message without re-queuing Will end up getting dead-lettered

@param exception [StandardError] The exception which triggered message dead-letter

@return [Void]

# File lib/warren/subscriber/base.rb, line 87
def dead_letter(exception)
  error "Dead-letter: #{payload}"
  error "Dead-letter Exception: #{exception.message}"
  raise_if_acknowledged
  subscription.nack(delivery_tag)
  @acknowledged = true
  error 'Dead-letter nacked'
end
delay(exception) click to toggle source

Re-post the message to the delay exchange and acknowledges receipt of the original message. The delay exchange will return the messages to the original queue after a delay.

@param exception [StandardError] The exception that has caused the

message to require a delay

@return [Void]

# File lib/warren/subscriber/base.rb, line 106
def delay(exception)
  return dead_letter(exception) if attempt > max_retries

  warn "Delay: #{payload}"
  warn "Delay Exception: #{exception.message}"
  # Publish the message to the delay queue
  delayed.publish(payload, routing_key: routing_key, headers: { attempts: attempt + 1 })
  # Acknowledge the original message
  ack
end
process() click to toggle source

Triggers processing of the method. Over-ride this in subclasses to customize your handler.

# File lib/warren/subscriber/base.rb, line 58
def process
  true
end
requeue(exception) click to toggle source

Reject the message and re-queue ready for immediate reprocessing.

@param exception [StandardError] The exception which triggered message requeue

@return [Void]

# File lib/warren/subscriber/base.rb, line 69
def requeue(exception)
  warn "Re-queue: #{payload}"
  warn "Re-queue Exception: #{exception.message}"
  raise_if_acknowledged
  # nack arguments: delivery_tag, multiple, requeue
  # http://reference.rubybunny.info/Bunny/Channel.html#nack-instance_method
  subscription.nack(delivery_tag, false, true)
  @acknowledged = true
  warn 'Re-queue nacked'
end

Private Instance Methods

ack() click to toggle source

Acknowledge the message as successfully processed. Will raise {Warren::MultipleAcknowledgements} if the message has been acknowledged or rejected already.

# File lib/warren/subscriber/base.rb, line 136
def ack
  raise_if_acknowledged
  subscription.ack(delivery_tag)
  @acknowledged = true
end
attempt() click to toggle source
# File lib/warren/subscriber/base.rb, line 123
def attempt
  headers.fetch('attempts', 0)
end
headers() click to toggle source
# File lib/warren/subscriber/base.rb, line 127
def headers
  # Annoyingly it appears that a message with no headers
  # returns nil, not an empty hash
  properties.headers || {}
end
max_retries() click to toggle source
# File lib/warren/subscriber/base.rb, line 119
def max_retries
  30
end
raise_if_acknowledged() click to toggle source
# File lib/warren/subscriber/base.rb, line 142
def raise_if_acknowledged
  return unless @acknowledged

  message = "Multiple acks/nacks for: #{payload}"
  error message
  raise Warren::Exceptions::MultipleAcknowledgements, message
end