class CfMessageBus::MessageBus
Attributes
internal_bus[R]
Public Class Methods
new(config)
click to toggle source
# File lib/cf_message_bus/message_bus.rb, line 9 def initialize(config) @logger = config[:logger] @internal_bus = MessageBusFactory.message_bus( config[:servers] || config[:uris] || config[:uri]) @subscriptions = {} end
Public Instance Methods
connected?()
click to toggle source
# File lib/cf_message_bus/message_bus.rb, line 78 def connected? internal_bus.connected? end
publish(subject, message = nil, inbox=nil, &callback)
click to toggle source
# File lib/cf_message_bus/message_bus.rb, line 28 def publish(subject, message = nil, inbox=nil, &callback) EM.schedule do internal_bus.publish(subject, encode(message), inbox, &callback) end end
request(subject, data = nil, options = {}, &block)
click to toggle source
# File lib/cf_message_bus/message_bus.rb, line 34 def request(subject, data = nil, options = {}, &block) response_timeout = options.delete(:timeout) result_count = options.delete(:result_count) options[:max] = result_count if result_count subscription_id = internal_bus.request(subject, encode(data), options) do |payload, inbox| process_message(payload, inbox) do |parsed_data, inbox| run_handler(block, parsed_data, inbox, subject, 'response') end end if response_timeout internal_bus.timeout(subscription_id, response_timeout, expected: result_count || 1) do run_handler(block, {timeout: true}, nil, subject, 'timeout') end end subscription_id end
subscribe(subject, options = {}, &block)
click to toggle source
# File lib/cf_message_bus/message_bus.rb, line 18 def subscribe(subject, options = {}, &block) @subscriptions[subject] = [options, block] subscribe_on_reactor(subject, options) do |parsed_data, inbox| EM.defer do run_handler(block, parsed_data, inbox, subject, 'subscription') end end end
synchronous_request(subject, data = nil, options = {})
click to toggle source
# File lib/cf_message_bus/message_bus.rb, line 54 def synchronous_request(subject, data = nil, options = {}) options[:result_count] ||= 1 result_count = options[:result_count] return [] if result_count <= 0 EM.schedule_sync do |promise| results = [] request(subject, encode(data), options) do |response| if response[:timeout] promise.deliver(results) else results << response promise.deliver(results) if results.size == result_count end end end end
unsubscribe(subscription_id)
click to toggle source
# File lib/cf_message_bus/message_bus.rb, line 74 def unsubscribe(subscription_id) internal_bus.unsubscribe(subscription_id) end
Private Instance Methods
encode(message)
click to toggle source
# File lib/cf_message_bus/message_bus.rb, line 110 def encode(message) unless message.nil? || message.is_a?(String) message = JSON.dump(message) end message end
process_message(msg, inbox, &block)
click to toggle source
# File lib/cf_message_bus/message_bus.rb, line 102 def process_message(msg, inbox, &block) payload = JSON.parse(msg) block.yield(payload, inbox) rescue => e @logger.error "exception parsing json: '#{msg}' '#{e.inspect}'" block.yield({error: "JSON Parse Error: failed to parse", exception: e, message: msg}, inbox) end
run_handler(block, parsed_data, inbox, subject, type)
click to toggle source
# File lib/cf_message_bus/message_bus.rb, line 86 def run_handler(block, parsed_data, inbox, subject, type) begin block.yield(parsed_data, inbox) rescue => e @logger.error "exception processing #{type} for: '#{subject}' '#{parsed_data.inspect}' \n#{e.inspect}\n #{e.backtrace.join("\n")}" end end
subscribe_on_reactor(subject, options = {}, &blk)
click to toggle source
# File lib/cf_message_bus/message_bus.rb, line 94 def subscribe_on_reactor(subject, options = {}, &blk) EM.schedule do internal_bus.subscribe(subject, options) do |msg, inbox| process_message(msg, inbox, &blk) end end end