class EventQ::Amazon::EventQClient
Implements a general interface to raise an event EventQ::RabbitMq::EventQClient
is the sister-class which does the same for RabbitMq
Public Class Methods
new(options)
click to toggle source
# File lib/eventq/eventq_aws/aws_eventq_client.rb, line 7 def initialize(options) if options[:client] == nil raise ':client (QueueClient) must be specified.'.freeze end @client = options[:client] @serialization_manager = EventQ::SerializationProviders::Manager.new @signature_manager = EventQ::SignatureProviders::Manager.new # this hash is used to record known event types: # key = event_type / name # value = topic arn @known_event_types = {} end
Public Instance Methods
new_message()
click to toggle source
# File lib/eventq/eventq_aws/aws_eventq_client.rb, line 91 def new_message EventQ::QueueMessage.new end
publish(topic:, event:, context: {}, region: nil)
click to toggle source
# File lib/eventq/eventq_aws/aws_eventq_client.rb, line 51 def publish(topic:, event:, context: {}, region: nil) raise_event(topic, event, context, region) end
raise_event(event_type, event, context = {}, region = nil)
click to toggle source
# File lib/eventq/eventq_aws/aws_eventq_client.rb, line 55 def raise_event(event_type, event, context = {}, region = nil) register_event(event_type, region) with_prepared_message(event_type, event, context) do |message| topic_arn = topic_arn(event_type, region) response = @client.sns(region).publish( topic_arn: topic_arn, message: message, subject: event_type ) EventQ.logger.debug do "[#{self.class} #raise_event] - Published to SNS with topic_arn: #{topic_arn} | event_type: #{event_type} | Message: #{message}" end response end end
raise_event_in_queue(event_type, event, queue, delay, context = {})
click to toggle source
# File lib/eventq/eventq_aws/aws_eventq_client.rb, line 74 def raise_event_in_queue(event_type, event, queue, delay, context = {}) queue_url = @client.sqs_helper.get_queue_url(queue) with_prepared_message(event_type, event, context) do |message| response = @client.sqs.send_message( queue_url: queue_url, message_body: sqs_message_body_for(message), delay_seconds: delay ) EventQ.logger.debug do "[#{self.class} #raise_event_in_queue] - Raised event to SQS queue: #{queue_url} | event_type: #{event_type} | Message: #{message}" end response end end
register_event(event_type, region = nil)
click to toggle source
Registers the event event_type and returns its topic arn.
@param [String] event_type @param [String] region
@return [String]
# File lib/eventq/eventq_aws/aws_eventq_client.rb, line 42 def register_event(event_type, region = nil) topic_key = "#{region}:#{event_type}" return @known_event_types[topic_key] if registered?(event_type, region) topic_arn = @client.sns_helper(region).create_topic_arn(event_type, region) @known_event_types[topic_key] = topic_arn topic_arn end
registered?(event_type, region = nil)
click to toggle source
Returns true if the event has already been registerd, or false otherwise.
@param [String] event_type @param [String] region
@return [Boolean]
# File lib/eventq/eventq_aws/aws_eventq_client.rb, line 31 def registered?(event_type, region = nil) topic_key = "#{region}:#{event_type}" @known_event_types.key?(topic_key) end
Private Instance Methods
serialized_message(queue_message)
click to toggle source
# File lib/eventq/eventq_aws/aws_eventq_client.rb, line 118 def serialized_message(queue_message) serialization_provider = @serialization_manager.get_provider(EventQ::Configuration.serialization_provider) serialization_provider.serialize(queue_message) end
sqs_message_body_for(payload_message)
click to toggle source
# File lib/eventq/eventq_aws/aws_eventq_client.rb, line 128 def sqs_message_body_for(payload_message) JSON.dump(EventQ::Amazon::QueueWorker::MESSAGE => payload_message) end
topic_arn(event_type, region = nil)
click to toggle source
# File lib/eventq/eventq_aws/aws_eventq_client.rb, line 124 def topic_arn(event_type, region = nil) @client.sns_helper(region).get_topic_arn(event_type, region) end
with_prepared_message(event_type, event, context) { |message| ... }
click to toggle source
# File lib/eventq/eventq_aws/aws_eventq_client.rb, line 97 def with_prepared_message(event_type, event, context) qm = new_message qm.content = event qm.type = event_type qm.context = context qm.content_type = event.class.to_s if EventQ::Configuration.signature_secret != nil provider = @signature_manager.get_provider(EventQ::Configuration.signature_provider) qm.signature = provider.write(message: qm, secret: EventQ::Configuration.signature_secret) end message = serialized_message(qm) response = yield(message) EventQ.log(:debug, "[#{self.class}] - Raised event. Message: #{message} | Type: #{event_type}.") response.message_id end