class EventQ::RabbitMq::EventQClient
Implements a general interface to raise an event EventQ::Amazon::EventQClient is the sister-class which does the same for AWS
Public Class Methods
new(options={})
click to toggle source
# File lib/eventq_rabbitmq/rabbitmq_eventq_client.rb, line 8 def initialize(options={}) if options[:client] == nil raise ':client (QueueClient) must be specified.'.freeze end @client = options[:client] @queue_manager = QueueManager.new @event_raised_exchange = EventRaisedExchange.new @serialization_manager = EventQ::SerializationProviders::Manager.new @signature_manager = EventQ::SignatureProviders::Manager.new # this array is used to record known event types @known_event_types = [] @connection_pool = ::Queue.new end
Public Instance Methods
check_in_connection(connection)
click to toggle source
# File lib/eventq_rabbitmq/rabbitmq_eventq_client.rb, line 32 def check_in_connection(connection) @connection_pool.push(connection) end
check_out_connection()
click to toggle source
# File lib/eventq_rabbitmq/rabbitmq_eventq_client.rb, line 26 def check_out_connection @connection_pool.pop(true) rescue @client.get_connection end
new_message()
click to toggle source
# File lib/eventq_rabbitmq/rabbitmq_eventq_client.rb, line 99 def new_message EventQ::QueueMessage.new end
publish(topic:, event:, context: {})
click to toggle source
# File lib/eventq_rabbitmq/rabbitmq_eventq_client.rb, line 49 def publish(topic:, event:, context: {}) raise_event(topic, event, context) end
raise_event(event_type, event, context = {})
click to toggle source
# File lib/eventq_rabbitmq/rabbitmq_eventq_client.rb, line 53 def raise_event(event_type, event, context = {}) register_event(event_type) _event_type = EventQ.create_event_type(event_type) with_connection do |channel| exchange = @queue_manager.get_exchange(channel, @event_raised_exchange) message = serialized_message(event_type, event, context) exchange.publish(message, routing_key: _event_type) EventQ.logger.debug do "[#{self.class}] - Raised event to Exchange. Routing Key: #{_event_type} | Message: #{message}." end end end
raise_event_in_queue(event_type, event, queue, delay, context = {})
click to toggle source
# File lib/eventq_rabbitmq/rabbitmq_eventq_client.rb, line 71 def raise_event_in_queue(event_type, event, queue, delay, context = {}) register_event(event_type) _event_type = EventQ.create_event_type(event_type) with_connection do |channel| exchange = @queue_manager.get_queue_exchange(channel, queue) delay_exchange = @queue_manager.get_delay_exchange(channel, queue, delay) delay_queue = @queue_manager.create_delay_queue(channel, queue, exchange.name, delay) delay_queue.bind(delay_exchange, routing_key: _event_type) _queue_name = EventQ.create_queue_name(queue.name) q = channel.queue(_queue_name, durable: @queue_manager.durable) q.bind(exchange, routing_key: _event_type) message = serialized_message(event_type, event, context) delay_exchange.publish(message, routing_key: _event_type) EventQ.logger.debug do "[#{self.class}] - Raised event to Queue: #{_queue_name} | Message: #{message} | Delay: #{delay}." end end end
register_event(event_type)
click to toggle source
# File lib/eventq_rabbitmq/rabbitmq_eventq_client.rb, line 40 def register_event(event_type) if registered?(event_type) return true end @known_event_types << event_type true end
registered?(event_type)
click to toggle source
# File lib/eventq_rabbitmq/rabbitmq_eventq_client.rb, line 36 def registered?(event_type) @known_event_types.include?(event_type) end
Private Instance Methods
serialized_message(event_type, event, context)
click to toggle source
# File lib/eventq_rabbitmq/rabbitmq_eventq_client.rb, line 121 def serialized_message(event_type, event, context) qm = new_message qm.content = event qm.type = event_type qm.context = context qm.content_type = event.class.to_s if EventQ::Configuration.signature_secret != nil provider = @signature_manager.get_provider(EventQ::Configuration.signature_provider) qm.signature = provider.write(message: qm, secret: EventQ::Configuration.signature_secret) end serialization_provider = @serialization_manager.get_provider(EventQ::Configuration.serialization_provider) serialization_provider.serialize(qm) end
with_connection() { |channel| ... }
click to toggle source
# File lib/eventq_rabbitmq/rabbitmq_eventq_client.rb, line 105 def with_connection connection = check_out_connection begin channel = connection.create_channel yield(channel) ensure channel&.close if channel.open? check_in_connection(connection) end true end