class Freddy

Constants

DEFAULT_MAX_CONCURRENCY
FREDDY_TOPIC_EXCHANGE_NAME

Public Class Methods

build(logger = Logger.new(STDOUT), max_concurrency: DEFAULT_MAX_CONCURRENCY, **config) click to toggle source

Creates a new freddy instance

@param [Logger] logger

instance of a logger, defaults to the STDOUT logger

@param [Hash] config

rabbitmq connection information

@option config [String] :host ('localhost') @option config [Integer] :port (5672) @option config [String] :user ('guest') @option config [String] :pass ('guest') @option config [Integer] :max_concurrency (4)

@return [Freddy]

@example

Freddy.build(Logger.new(STDOUT), user: 'thumper', pass: 'howdy')
# File lib/freddy.rb, line 28
def self.build(logger = Logger.new(STDOUT), max_concurrency: DEFAULT_MAX_CONCURRENCY, **config)
  OpenTracing.global_tracer ||= OpenTracing::Tracer.new

  connection = Adapters.determine.connect(config)
  new(connection, logger, max_concurrency)
end
new(connection, logger, max_concurrency) click to toggle source
# File lib/freddy.rb, line 43
def initialize(connection, logger, max_concurrency)
  @connection = connection
  @logger = logger
  @prefetch_buffer_size = max_concurrency

  @send_and_forget_producer = Producers::SendAndForgetProducer.new(
    connection.create_channel, logger
  )
  @send_and_wait_response_producer = Producers::SendAndWaitResponseProducer.new(
    connection.create_channel, logger
  )
end
trace() click to toggle source
# File lib/freddy.rb, line 35
def self.trace
  Thread.current[:freddy_trace]
end
trace=(trace) click to toggle source
# File lib/freddy.rb, line 39
def self.trace=(trace)
  Thread.current[:freddy_trace] = trace
end

Public Instance Methods

close() click to toggle source

Closes the connection with message queue

@return [void]

@example

freddy.close
# File lib/freddy.rb, line 204
def close
  @connection.close
end
deliver(destination, payload, options = {}) click to toggle source

Sends a message to given destination

This is *send and forget* type of delivery. It sends a message to given destination and does not wait for response. This is useful when there are multiple consumers that are using tap_into or you just do not care about the response.

@param [String] destination

the queue name

@param [Hash] payload

the payload that can be serialized to json

@param [Hash] options

the options for delivery

@option options [Integer] :timeout (0)

discards the message after given seconds if nobody consumes it. Message
won't be discarded if timeout it set to 0 (default).

@return [void]

@example

freddy.deliver 'Metrics', user_id: 5, metric: 'signed_in'
# File lib/freddy.rb, line 152
def deliver(destination, payload, options = {})
  timeout = options.fetch(:timeout, 0)
  opts = {}
  opts[:expiration] = (timeout * 1000).to_i if timeout > 0

  @send_and_forget_producer.produce(destination, payload, opts)
end
deliver_with_response(destination, payload, options = {}) click to toggle source

Sends a message and waits for the response

@param [String] destination

the queue name

@param [Hash] payload

the payload that can be serialized to json

@param [Hash] options

the options for delivery

@option options [Integer] :timeout (3)

throws a time out exception after given seconds when there is no response

@option options [Boolean] :delete_on_timeout (true)

discards the message when timeout error is raised

@raise [Freddy::TimeoutError]

if nobody responded to the request

@raise [Freddy::InvalidRequestError]

if the responder responded with an error response

@return [Hash] the response

@example

begin
  response = freddy.deliver_with_response 'Users', type: 'fetch_all'
  puts "Got response #{response}"
rescue Freddy::TimeoutError
  puts "Service unavailable"
rescue Freddy::InvalidRequestError => e
  puts "Got error response: #{e.response}"
end
# File lib/freddy.rb, line 189
def deliver_with_response(destination, payload, options = {})
  timeout = options.fetch(:timeout, 3)
  delete_on_timeout = options.fetch(:delete_on_timeout, true)

  @send_and_wait_response_producer.produce destination, payload, {
    timeout_in_seconds: timeout, delete_on_timeout: delete_on_timeout
  }
end
respond_to(destination, &callback) click to toggle source

Listens and responds to messages

This consumes messages on a given destination. It is useful for messages that have to be processed once and then a result must be sent.

@param [String] destination

the queue name

@yieldparam [Hash<Symbol => Object>] message

Received message as a ruby hash with symbolized keys

@yieldparam [#success, error] handler

Handler for responding to messages. Use handler#success for successful
respone and handler#error for error response.

@return [#shutdown]

@example

freddy.respond_to 'RegistrationService' do |attributes, handler|
  if id = register(attributes)
    handler.success(id: id)
  else
    handler.error(message: 'Can not do')
  end
end
# File lib/freddy.rb, line 81
def respond_to(destination, &callback)
  @logger.info "Listening for requests on #{destination}"

  channel = @connection.create_channel(prefetch: @prefetch_buffer_size)
  producer = Producers::ReplyProducer.new(channel, @logger)
  handler_adapter_factory = MessageHandlerAdapters::Factory.new(producer)

  Consumers::RespondToConsumer.consume(
    thread_pool: Thread.pool(@prefetch_buffer_size),
    destination: destination,
    channel: channel,
    handler_adapter_factory: handler_adapter_factory,
    &callback
  )
end
tap_into(pattern, options = {}, &callback) click to toggle source

Listens for messages without consuming them

This listens for messages on a given destination or destinations without consuming them. It is useful for general messages that two or more clients are interested.

@param [String] pattern

the destination pattern. Use `#` wildcard for matching 0 or more words.
Use `*` to match exactly one word.

@param [Hash] options @option options [String] :group

only one of the listeners in given group will receive a message. All
listeners will receive a message if the group is not specified.

@yield [message] Yields received message to the block

@return [#shutdown]

@example

freddy.tap_into 'notifications.*' do |message|
  puts "Notification showed #{message.inspect}"
end
# File lib/freddy.rb, line 119
def tap_into(pattern, options = {}, &callback)
  @logger.debug "Tapping into messages that match #{pattern}"

  Consumers::TapIntoConsumer.consume(
    thread_pool: Thread.pool(@prefetch_buffer_size),
    pattern: pattern,
    channel: @connection.create_channel(prefetch: @prefetch_buffer_size),
    options: options,
    &callback
  )
end