class EventQ::Amazon::EventQClient

Implements a general interface to raise an event EventQ::RabbitMq::EventQClient is the sister-class which does the same for RabbitMq

Public Class Methods

new(options) click to toggle source
# File lib/eventq/eventq_aws/aws_eventq_client.rb, line 7
def initialize(options)

  if options[:client] == nil
    raise ':client (QueueClient) must be specified.'.freeze
  end

  @client = options[:client]

  @serialization_manager = EventQ::SerializationProviders::Manager.new
  @signature_manager = EventQ::SignatureProviders::Manager.new

  # this hash is used to record known event types:
  # key = event_type / name
  # value = topic arn
  @known_event_types = {}
end

Public Instance Methods

new_message() click to toggle source
# File lib/eventq/eventq_aws/aws_eventq_client.rb, line 91
def new_message
  EventQ::QueueMessage.new
end
publish(topic:, event:, context: {}, region: nil) click to toggle source
# File lib/eventq/eventq_aws/aws_eventq_client.rb, line 51
def publish(topic:, event:, context: {}, region: nil)
  raise_event(topic, event, context, region)
end
raise_event(event_type, event, context = {}, region = nil) click to toggle source
# File lib/eventq/eventq_aws/aws_eventq_client.rb, line 55
def raise_event(event_type, event, context = {}, region = nil)
  register_event(event_type, region)

  with_prepared_message(event_type, event, context) do |message|
    topic_arn = topic_arn(event_type, region)
    response = @client.sns(region).publish(
      topic_arn: topic_arn,
      message: message,
      subject: event_type
    )

    EventQ.logger.debug do
      "[#{self.class} #raise_event] - Published to SNS with topic_arn: #{topic_arn} | event_type: #{event_type} | Message: #{message}"
    end

    response
  end
end
raise_event_in_queue(event_type, event, queue, delay, context = {}) click to toggle source
# File lib/eventq/eventq_aws/aws_eventq_client.rb, line 74
def raise_event_in_queue(event_type, event, queue, delay, context = {})
  queue_url = @client.sqs_helper.get_queue_url(queue)
  with_prepared_message(event_type, event, context) do |message|
    response = @client.sqs.send_message(
      queue_url: queue_url,
      message_body: sqs_message_body_for(message),
      delay_seconds: delay
    )

    EventQ.logger.debug do
      "[#{self.class} #raise_event_in_queue] - Raised event to SQS queue: #{queue_url} | event_type: #{event_type} | Message: #{message}"
    end

    response
  end
end
register_event(event_type, region = nil) click to toggle source

Registers the event event_type and returns its topic arn.

@param [String] event_type @param [String] region

@return [String]

# File lib/eventq/eventq_aws/aws_eventq_client.rb, line 42
def register_event(event_type, region = nil)
  topic_key = "#{region}:#{event_type}"
  return @known_event_types[topic_key] if registered?(event_type, region)

  topic_arn = @client.sns_helper(region).create_topic_arn(event_type, region)
  @known_event_types[topic_key] = topic_arn
  topic_arn
end
registered?(event_type, region = nil) click to toggle source

Returns true if the event has already been registerd, or false otherwise.

@param [String] event_type @param [String] region

@return [Boolean]

# File lib/eventq/eventq_aws/aws_eventq_client.rb, line 31
def registered?(event_type, region = nil)
  topic_key = "#{region}:#{event_type}"
  @known_event_types.key?(topic_key)
end

Private Instance Methods

serialized_message(queue_message) click to toggle source
# File lib/eventq/eventq_aws/aws_eventq_client.rb, line 118
def serialized_message(queue_message)
  serialization_provider = @serialization_manager.get_provider(EventQ::Configuration.serialization_provider)

  serialization_provider.serialize(queue_message)
end
sqs_message_body_for(payload_message) click to toggle source
# File lib/eventq/eventq_aws/aws_eventq_client.rb, line 128
def sqs_message_body_for(payload_message)
  JSON.dump(EventQ::Amazon::QueueWorker::MESSAGE => payload_message)
end
topic_arn(event_type, region = nil) click to toggle source
# File lib/eventq/eventq_aws/aws_eventq_client.rb, line 124
def topic_arn(event_type, region = nil)
  @client.sns_helper(region).get_topic_arn(event_type, region)
end
with_prepared_message(event_type, event, context) { |message| ... } click to toggle source
# File lib/eventq/eventq_aws/aws_eventq_client.rb, line 97
def with_prepared_message(event_type, event, context)
  qm = new_message
  qm.content = event
  qm.type = event_type
  qm.context = context
  qm.content_type = event.class.to_s

  if EventQ::Configuration.signature_secret != nil
    provider = @signature_manager.get_provider(EventQ::Configuration.signature_provider)
    qm.signature = provider.write(message: qm, secret: EventQ::Configuration.signature_secret)
  end

  message = serialized_message(qm)

  response = yield(message)

  EventQ.log(:debug, "[#{self.class}] - Raised event. Message: #{message} | Type: #{event_type}.")

  response.message_id
end