class CfMessageBus::MessageBus

Attributes

internal_bus[R]

Public Class Methods

new(config) click to toggle source
# File lib/cf_message_bus/message_bus.rb, line 9
def initialize(config)
  @logger = config[:logger]

  @internal_bus = MessageBusFactory.message_bus(
    config[:servers] || config[:uris] || config[:uri])

  @subscriptions = {}
end

Public Instance Methods

connected?() click to toggle source
# File lib/cf_message_bus/message_bus.rb, line 78
def connected?
  internal_bus.connected?
end
publish(subject, message = nil, inbox=nil, &callback) click to toggle source
# File lib/cf_message_bus/message_bus.rb, line 28
def publish(subject, message = nil, inbox=nil, &callback)
  EM.schedule do
    internal_bus.publish(subject, encode(message), inbox, &callback)
  end
end
request(subject, data = nil, options = {}, &block) click to toggle source
# File lib/cf_message_bus/message_bus.rb, line 34
def request(subject, data = nil, options = {}, &block)
  response_timeout = options.delete(:timeout)
  result_count = options.delete(:result_count)
  options[:max] = result_count if result_count

  subscription_id = internal_bus.request(subject, encode(data), options) do |payload, inbox|
    process_message(payload, inbox) do |parsed_data, inbox|
      run_handler(block, parsed_data, inbox, subject, 'response')
    end
  end

  if response_timeout
    internal_bus.timeout(subscription_id, response_timeout, expected: result_count || 1) do
      run_handler(block, {timeout: true}, nil, subject, 'timeout')
    end
  end

  subscription_id
end
subscribe(subject, options = {}, &block) click to toggle source
# File lib/cf_message_bus/message_bus.rb, line 18
def subscribe(subject, options = {}, &block)
  @subscriptions[subject] = [options, block]

  subscribe_on_reactor(subject, options) do |parsed_data, inbox|
    EM.defer do
      run_handler(block, parsed_data, inbox, subject, 'subscription')
    end
  end
end
synchronous_request(subject, data = nil, options = {}) click to toggle source
# File lib/cf_message_bus/message_bus.rb, line 54
def synchronous_request(subject, data = nil, options = {})
  options[:result_count] ||= 1
  result_count = options[:result_count]

  return [] if result_count <= 0

  EM.schedule_sync do |promise|
    results = []

    request(subject, encode(data), options) do |response|
      if response[:timeout]
        promise.deliver(results)
      else
        results << response
        promise.deliver(results) if results.size == result_count
      end
    end
  end
end
unsubscribe(subscription_id) click to toggle source
# File lib/cf_message_bus/message_bus.rb, line 74
def unsubscribe(subscription_id)
  internal_bus.unsubscribe(subscription_id)
end

Private Instance Methods

encode(message) click to toggle source
# File lib/cf_message_bus/message_bus.rb, line 110
def encode(message)
  unless message.nil? || message.is_a?(String)
    message = JSON.dump(message)
  end
  message
end
process_message(msg, inbox, &block) click to toggle source
# File lib/cf_message_bus/message_bus.rb, line 102
def process_message(msg, inbox, &block)
  payload = JSON.parse(msg)
  block.yield(payload, inbox)
rescue => e
  @logger.error "exception parsing json: '#{msg}' '#{e.inspect}'"
  block.yield({error: "JSON Parse Error: failed to parse", exception: e, message: msg}, inbox)
end
run_handler(block, parsed_data, inbox, subject, type) click to toggle source
# File lib/cf_message_bus/message_bus.rb, line 86
def run_handler(block, parsed_data, inbox, subject, type)
  begin
    block.yield(parsed_data, inbox)
  rescue => e
    @logger.error "exception processing #{type} for: '#{subject}' '#{parsed_data.inspect}' \n#{e.inspect}\n #{e.backtrace.join("\n")}"
  end
end
subscribe_on_reactor(subject, options = {}, &blk) click to toggle source
# File lib/cf_message_bus/message_bus.rb, line 94
def subscribe_on_reactor(subject, options = {}, &blk)
  EM.schedule do
    internal_bus.subscribe(subject, options) do |msg, inbox|
      process_message(msg, inbox, &blk)
    end
  end
end