class EventQ::RabbitMq::EventQClient

Implements a general interface to raise an event EventQ::Amazon::EventQClient is the sister-class which does the same for AWS

Public Class Methods

new(options={}) click to toggle source
# File lib/eventq/eventq_rabbitmq/rabbitmq_eventq_client.rb, line 8
def initialize(options={})

  if options[:client] == nil
    raise ':client (QueueClient) must be specified.'.freeze
  end

  @client = options[:client]
  @queue_manager = QueueManager.new
  @event_raised_exchange = EventRaisedExchange.new
  @serialization_manager = EventQ::SerializationProviders::Manager.new
  @signature_manager = EventQ::SignatureProviders::Manager.new

  # this array is used to record known event types
  @known_event_types = []

  @connection_pool = ::Queue.new
end

Public Instance Methods

check_in_connection(connection) click to toggle source
# File lib/eventq/eventq_rabbitmq/rabbitmq_eventq_client.rb, line 32
def check_in_connection(connection)
  @connection_pool.push(connection)
end
check_out_connection() click to toggle source
# File lib/eventq/eventq_rabbitmq/rabbitmq_eventq_client.rb, line 26
def check_out_connection
  @connection_pool.pop(true)
rescue
  @client.get_connection
end
new_message() click to toggle source
# File lib/eventq/eventq_rabbitmq/rabbitmq_eventq_client.rb, line 99
def new_message
  EventQ::QueueMessage.new
end
publish(topic:, event:, context: {}) click to toggle source
# File lib/eventq/eventq_rabbitmq/rabbitmq_eventq_client.rb, line 49
def publish(topic:, event:, context: {})
  raise_event(topic, event, context)
end
raise_event(event_type, event, context = {}) click to toggle source
# File lib/eventq/eventq_rabbitmq/rabbitmq_eventq_client.rb, line 53
def raise_event(event_type, event, context = {})
  register_event(event_type)

  _event_type = EventQ.create_event_type(event_type)

  with_connection do |channel|
    exchange = @queue_manager.get_exchange(channel, @event_raised_exchange)

    message = serialized_message(event_type, event, context)

    exchange.publish(message, routing_key: _event_type)

    EventQ.logger.debug do
      "[#{self.class}] - Raised event to Exchange. Routing Key: #{_event_type} | Message: #{message}."
    end
  end
end
raise_event_in_queue(event_type, event, queue, delay, context = {}) click to toggle source
# File lib/eventq/eventq_rabbitmq/rabbitmq_eventq_client.rb, line 71
def raise_event_in_queue(event_type, event, queue, delay, context = {})
  register_event(event_type)

  _event_type = EventQ.create_event_type(event_type)

  with_connection do |channel|
    exchange = @queue_manager.get_queue_exchange(channel, queue)

    delay_exchange = @queue_manager.get_delay_exchange(channel, queue, delay)

    delay_queue = @queue_manager.create_delay_queue(channel, queue, exchange.name, delay)
    delay_queue.bind(delay_exchange, routing_key: _event_type)

    _queue_name = EventQ.create_queue_name(queue)

    q = channel.queue(_queue_name, durable: @queue_manager.durable)
    q.bind(exchange, routing_key: _event_type)

    message = serialized_message(event_type, event, context)

    delay_exchange.publish(message, routing_key: _event_type)

    EventQ.logger.debug do
      "[#{self.class}] - Raised event to Queue: #{_queue_name} | Message: #{message} | Delay: #{delay}."
    end
  end
end
register_event(event_type) click to toggle source
# File lib/eventq/eventq_rabbitmq/rabbitmq_eventq_client.rb, line 40
def register_event(event_type)
  if registered?(event_type)
    return true
  end

  @known_event_types << event_type
  true
end
registered?(event_type) click to toggle source
# File lib/eventq/eventq_rabbitmq/rabbitmq_eventq_client.rb, line 36
def registered?(event_type)
  @known_event_types.include?(event_type)
end

Private Instance Methods

serialized_message(event_type, event, context) click to toggle source
# File lib/eventq/eventq_rabbitmq/rabbitmq_eventq_client.rb, line 121
def serialized_message(event_type, event, context)
  qm = new_message
  qm.content = event
  qm.type = event_type
  qm.context = context
  qm.content_type = event.class.to_s

  if EventQ::Configuration.signature_secret != nil
    provider = @signature_manager.get_provider(EventQ::Configuration.signature_provider)
    qm.signature = provider.write(message: qm, secret: EventQ::Configuration.signature_secret)
  end

  serialization_provider = @serialization_manager.get_provider(EventQ::Configuration.serialization_provider)

  serialization_provider.serialize(qm)
end
with_connection() { |channel| ... } click to toggle source
# File lib/eventq/eventq_rabbitmq/rabbitmq_eventq_client.rb, line 105
def with_connection
  connection = check_out_connection

  begin
    channel = connection.create_channel

    yield(channel)

  ensure
    channel&.close if channel.open?
    check_in_connection(connection)
  end

  true
end