class Boxxspring::Worker::Base

Public Class Methods

environment() click to toggle source
# File lib/boxxspring/worker/base.rb, line 46
def environment
  @environment ||= begin
    Worker.env == 'development' ?
      ( ENV[ 'USER' ].underscore || 'development' ) :
      Worker.env
  end
end
process( &block ) click to toggle source
# File lib/boxxspring/worker/base.rb, line 29
        def process( &block )
  self.processor = block
end
queue_interface() click to toggle source
# File lib/boxxspring/worker/base.rb, line 33
def queue_interface
  @queue_interface ||= Aws::SQS::Client.new
end
queue_url() click to toggle source
# File lib/boxxspring/worker/base.rb, line 37
def queue_url
  @queue_url ||= begin
    response = self.queue_interface.create_queue(
      queue_name: self.full_queue_name
    )
    response[ :queue_url ]
  end
end

Protected Class Methods

full_queue_name() click to toggle source
# File lib/boxxspring/worker/base.rb, line 54
           def full_queue_name
  queue_name = self.queue_name ||
               self.name.underscore.gsub( /[\/]/, '-' ).
                 gsub( /_worker\Z/, '' )
  full_name = self.environment + '-' + queue_name
  
  ENV[ 'PRIORITY' ] ? ( full_name + '_priority' ) : full_name
end

Public Instance Methods

process() click to toggle source
# File lib/boxxspring/worker/base.rb, line 68
def process
   metric_defaults  dimensions: { worker_name: self.class.name,
                                  environment: environment } do

    messages = self.receive_messages() || []
    messages.each do | message |
      if message.present?
        payload = self.payload_from_message( message )

        if payload.present?
          begin
            metric :messages do
              result = self.process_payload( payload )

              # note: if an exception is raised the message will be deleted
              self.delete_message( message ) unless result == false
            end
          rescue StandardError => error
            metric :errors

            self.logger.error(
              "The #{ self.human_name } failed to process the payload."
            )
            self.logger.error( error.message )
            self.logger.info( error.backtrace.join( "\n" ) )
          end

        else
          self.delete_message( message )
          self.logger.error(
            "The #{ self.human_name } received an invalid payload."
          )
        end
      end
    end
  end
end

Protected Instance Methods

delegate_payload( queue_name, payload ) click to toggle source
# File lib/boxxspring/worker/base.rb, line 173
           def delegate_payload( queue_name, payload )
  queue_name = self.class.environment + '-' + queue_name

  begin
    response = self.class.queue_interface.create_queue(
      queue_name: queue_name
    )
    queue_url = response[ :queue_url ]

    if queue_url.present?
      self.class.queue_interface.send_message(
        queue_url: queue_url,
        message_body: payload.to_json
      )
    end
  rescue StandardError => error
    raise RuntimeError.new(
      "The #{ self.human_name } was unable to delegate the " +
      "payload to the '#{ queue_name }' queue. #{ error.message }."
    )
  end
end
delete_message( message ) click to toggle source
# File lib/boxxspring/worker/base.rb, line 127
           def delete_message( message )
  begin
    self.class.queue_interface.delete_message(
      queue_url: self.class.queue_url,
      receipt_handle: message[ :receipt_handle ]
    )
  rescue StandardError => error
    raise RuntimeError.new(
      "The #{ self.human_name } is unable to delete the " +
      "message from the queue. #{ error.message }."
    )
  end

  message
end
error?( object, object_class ) click to toggle source
# File lib/boxxspring/worker/base.rb, line 212
           def error?( object, object_class )
  class_name = "Boxxspring::#{ object_class }".constantize
  !object.is_a?( class_name ) || object.send( :errors ).present?
end
human_name() click to toggle source
# File lib/boxxspring/worker/base.rb, line 224
           def human_name
  self.class.name.underscore.gsub('_', ' ')
end
operation( endpoint ) click to toggle source
# File lib/boxxspring/worker/base.rb, line 217
           def operation( endpoint )
  Boxxspring::Operation.new(
    endpoint,
    Boxxspring::Worker.configuration.api_credentials.to_hash
  )
end
payload_from_message( message ) click to toggle source
# File lib/boxxspring/worker/base.rb, line 143
           def payload_from_message( message )

  payload = message.body

  if payload.present?
    payload = JSON.parse( payload ) rescue payload
    if payload.is_a?( Hash ) && payload.include?( 'Type' ) &&
       payload[ 'Type' ] == 'Notification'
      payload = payload[ 'Message' ]
      payload = payload.present? ?
        ( JSON.parse( payload ) rescue payload ) :
        payload
    end
  else
    logger.error( "The message lacks a payload." )
    logger.debug( message.inspect )
  end
  payload
end
process_payload( payload ) click to toggle source
# File lib/boxxspring/worker/base.rb, line 163
           def process_payload( payload )
  if self.class.processor.present?
    self.class.processor.call( payload )
  else
    raise RuntimeError.new(
      "The worker lacks a processor"
    )
  end
end
read_object( property_id=nil, type, id, includes ) click to toggle source
# File lib/boxxspring/worker/base.rb, line 197
           def read_object( property_id=nil, type, id, includes )
  if property_id.present?
    endpoint = "/properties/#{ property_id }/#{ type.pluralize }/#{ id }"
  else
    endpoint = "/properties/#{ id }"
  end

  object = operation( endpoint ).include( *includes ).read
  if error?( object, type.capitalize ) && object.is_a?( Array )
    object.first
  else
    object
  end
end
receive_messages() click to toggle source
# File lib/boxxspring/worker/base.rb, line 109
           def receive_messages
  messages = nil
  begin
    response = self.class.queue_interface.receive_message(
      queue_url: self.class.queue_url,
      max_number_of_messages: QUEUE_MESSAGE_REQUEST_COUNT,
      wait_time_seconds: QUEUE_MESSAGE_WAIT_IN_SECONDS
    )
    messages = response[ :messages ]
  rescue StandardError => error
    raise RuntimeError.new(
      "The #{ self.human_name } is unable to receive a message " +
      "from the queue. #{ error.message }."
    )
  end
  messages
end