class ActiveMessaging::Gateway
Attributes
adapters[RW]
connections[RW]
filters[RW]
named_destinations[RW]
processor_groups[RW]
subscriptions[RW]
Public Class Methods
_dispatch(message)
click to toggle source
# File lib/activemessaging/gateway.rb, line 228 def _dispatch(message) abort = false processed = false subscriptions.each do |key, subscription| if message.matches_subscription?(subscription) then processed = true routing = { :receiver => subscription.processor_class, :destination => subscription.destination, :direction => :incoming } begin execute_filter_chain(:incoming, message.dup, routing) do |m| result = subscription.processor_class.new.process!(m) end rescue ActiveMessaging::AbortMessageException abort_message subscription, message abort = true return ensure acknowledge_message subscription, message unless abort end end end ActiveMessaging.logger.error("No-one responded to #{message}") unless processed end
abort_message(subscription, message)
click to toggle source
abort_message
is called when procesing the message raises a ActiveMessaging::AbortMessageException
indicating the message should be returned to the destination so it can be tried again, later
# File lib/activemessaging/gateway.rb, line 265 def abort_message subscription, message connection(subscription.destination.broker_name).unreceive message, subscription.subscribe_headers end
acknowledge_message(subscription, message)
click to toggle source
acknowledge_message
is called when the message has been processed w/o error by at least one processor
# File lib/activemessaging/gateway.rb, line 259 def acknowledge_message subscription, message connection(subscription.destination.broker_name).received message, subscription.subscribe_headers end
apply_filter?(direction, details, options)
click to toggle source
# File lib/activemessaging/gateway.rb, line 167 def apply_filter?(direction, details, options) # check that it is the correct direction result = if direction.to_sym == options[:direction] || options[:direction] == :bidirectional if options.has_key?(:only) && [options[:only]].flatten.include?(details[:destination].name) true elsif options.has_key?(:except) && ![options[:except]].flatten.include?(details[:destination].name) true elsif !options.has_key?(:only) && !options.has_key?(:except) true end end result end
connection(broker_name='default')
click to toggle source
# File lib/activemessaging/gateway.rb, line 122 def connection broker_name='default' return @connections[broker_name] if @connections.has_key?(broker_name) config = load_connection_configuration(broker_name) adapter_class = Gateway.adapters[config[:adapter]] raise "Unknown messaging adapter #{config[:adapter].inspect}!" if adapter_class.nil? @connections[broker_name] = adapter_class.new(config) end
create_filter(filter, options)
click to toggle source
# File lib/activemessaging/gateway.rb, line 181 def create_filter(filter, options) filter_class = if filter.is_a?(String) or filter.is_a?(Symbol) filter.to_s.camelize.constantize elsif filter.is_a?(Class) filter end if filter_class if filter_class.respond_to?(:process) && (filter_class.method(:process).arity.abs > 0) filter_class elsif filter_class.instance_method(:initialize).arity.abs == 1 filter_class.new(options) elsif filter_class.instance_method(:initialize).arity == 0 filter_class.new else raise "Filter #{filter} could not be created, no 'initialize' matched." end else raise "Filter #{filter} could not be loaded, created, or used!" end end
current_processor_group()
click to toggle source
# File lib/activemessaging/gateway.rb, line 348 def current_processor_group if ARGV.length > 0 && !@current_processor_group ARGV.each {|arg| pair = arg.split('=') if pair[0] == 'process-group' group_sym = pair[1].to_sym if processor_groups.has_key? group_sym @current_processor_group = group_sym else ActiveMessaging.logger.error "Unrecognized process-group." ActiveMessaging.logger.error "You specified process-group #{pair[1]}, make sure this is specified in config/messaging.rb" ActiveMessaging.logger.error " ActiveMessaging::Gateway.define do |s|" ActiveMessaging.logger.error " s.processor_groups = { :group1 => [:foo_bar1_processor], :group2 => [:foo_bar2_processor] }" ActiveMessaging.logger.error " end" exit end end } end @current_processor_group end
define() { |self| ... }
click to toggle source
# File lib/activemessaging/gateway.rb, line 269 def define #run the rest of messaging.rb yield self end
destination(destination_name, destination, publish_headers={})
click to toggle source
# File lib/activemessaging/gateway.rb, line 274 def destination destination_name, destination, publish_headers={}, broker='default' raise "You already defined #{destination_name} to #{named_destinations[destination_name].value}" if named_destinations.has_key?(destination_name) named_destinations[destination_name] = Destination.new(destination_name, destination, publish_headers, broker) end
Also aliased as: queue
disconnect()
click to toggle source
# File lib/activemessaging/gateway.rb, line 147 def disconnect @connections.each { |key,connection| connection.disconnect } @connections = {} end
dispatch(message)
click to toggle source
# File lib/activemessaging/gateway.rb, line 216 def dispatch(message) prepare_application _dispatch(message) rescue Object => exc ActiveMessaging.logger.error "Dispatch exception: #{exc}" ActiveMessaging.logger.error exc.backtrace.join("\n\t") raise exc ensure ActiveMessaging.logger.flush rescue nil reset_application end
execute_filter_chain(direction, message, details={}) { |message| ... }
click to toggle source
# File lib/activemessaging/gateway.rb, line 152 def execute_filter_chain(direction, message, details={}) filters.each do |filter, options| if apply_filter?(direction, details, options) begin filter_obj = create_filter(filter, options) filter_obj.process(message, details) rescue ActiveMessaging::StopFilterException => sfe ActiveMessaging.logger.error "Filter: #{filter_obj.inspect} threw StopFilterException: #{sfe.message}" return end end end yield(message) end
filter(filter, options = {})
click to toggle source
# File lib/activemessaging/gateway.rb, line 134 def filter filter, options = {} options[:direction] = :bidirectional if options[:direction].nil? filters << [filter, options] end
find_destination(destination_name)
click to toggle source
# File lib/activemessaging/gateway.rb, line 281 def find_destination destination_name real_destination = named_destinations[destination_name] raise "You have not yet defined a destination named #{destination_name}. Destinations currently defined are [#{named_destinations.keys.join(',')}]" if real_destination.nil? real_destination end
Also aliased as: find_queue
load_connection_configuration(label='default')
click to toggle source
# File lib/activemessaging/gateway.rb, line 370 def load_connection_configuration(label='default') @broker_yml = YAML::load(ERB.new(IO.read(File.join(ActiveMessaging.app_root, 'config', 'broker.yml'))).result) if @broker_yml.nil? if label == 'default' config = @broker_yml[ActiveMessaging.app_env].symbolize_keys else config = @broker_yml[ActiveMessaging.app_env][label].symbolize_keys end config[:adapter] = config[:adapter].to_sym if config[:adapter] config[:adapter] ||= :stomp return config end
prepare_application()
click to toggle source
# File lib/activemessaging/gateway.rb, line 203 def prepare_application return unless defined?(ActiveRecord) if ActiveRecord::VERSION::MAJOR >= 4 ActiveRecord::Base.connection_pool.connections.map(&:verify!) else ActiveRecord::Base.verify_active_connections! end end
processor_group(group_name, *processors)
click to toggle source
# File lib/activemessaging/gateway.rb, line 340 def processor_group group_name, *processors if processor_groups.has_key? group_name processor_groups[group_name] = processor_groups[group_name] + processors else processor_groups[group_name] = processors end end
publish(destination_name, body, publisher=nil, headers={})
click to toggle source
# File lib/activemessaging/gateway.rb, line 297 def publish destination_name, body, publisher=nil, headers={}, timeout=10 raise "You cannot have a nil or empty destination name." if destination_name.nil? raise "You cannot have a nil or empty message body." if (body.nil? || body.empty?) real_destination = find_destination(destination_name) details = { :publisher => publisher, :destination => real_destination, :direction => :outgoing } message = OpenStruct.new(:body => body, :headers => headers.reverse_merge(real_destination.publish_headers)) begin Timeout.timeout timeout do execute_filter_chain(:outgoing, message, details) do |message| connection(real_destination.broker_name).send real_destination.value, message.body, message.headers end end rescue Timeout::Error=>toe ActiveMessaging.logger.error("Timed out trying to send the message #{message} to destination #{destination_name} via broker #{real_destination.broker_name}") raise toe end end
receive(destination_name, receiver=nil, subscribe_headers={})
click to toggle source
# File lib/activemessaging/gateway.rb, line 320 def receive destination_name, receiver=nil, subscribe_headers={}, timeout=10 raise "You cannot have a nil or empty destination name." if destination_name.nil? conn = nil dest = find_destination destination_name config = load_connection_configuration(dest.broker_name) subscribe_headers['id'] = receiver.name.underscore unless (receiver.nil? or subscribe_headers.key? 'id') Timeout.timeout timeout do conn = Gateway.adapters[config[:adapter]].new(config) conn.subscribe(dest.value, subscribe_headers) message = conn.receive conn.received message, subscribe_headers return message end rescue Timeout::Error=>toe ActiveMessaging.logger.error("Timed out trying to receive a message on destination #{destination_name}") raise toe ensure conn.disconnect unless conn.nil? end
register_adapter(adapter_name, adapter_class)
click to toggle source
# File lib/activemessaging/gateway.rb, line 130 def register_adapter adapter_name, adapter_class Gateway.adapters[adapter_name] = adapter_class end
reset()
click to toggle source
# File lib/activemessaging/test_helper.rb, line 17 def self.reset unsubscribe disconnect @filters = [] @subscriptions = {} @named_destinations = {} @processor_groups = {} @current_processor_group = nil @connections = {} end
reset_application()
click to toggle source
# File lib/activemessaging/gateway.rb, line 213 def reset_application end
start()
click to toggle source
Starts up an message listener to start polling for messages on each configured connection, and dispatching processing
# File lib/activemessaging/gateway.rb, line 28 def start # subscribe - creating connections along the way subscribe # for each connection, start a thread @connections.each do |name, conn| @connection_threads[name] = Thread.start do while @running begin Thread.current[:message] = nil Thread.current[:message] = conn.receive #catch these but then stop looping rescue StopProcessingException=>spe ActiveMessaging.logger.error "ActiveMessaging: thread[#{name}]: Processing Stopped - receive interrupted, will process last message if already received" # break #catch all others, but go back and try and recieve again rescue Object=>exception ActiveMessaging.logger.error "ActiveMessaging: thread[#{name}]: Exception from connection.receive: #{exception.message}\n" + exception.backtrace.join("\n\t") ensure if Thread.current[:message] @guard.synchronize { dispatch Thread.current[:message] } Thread.current[:message] = nil else # if there is no message at all, sleep # maybe this should be configurable sleep(1) end end Thread.pass end ActiveMessaging.logger.error "ActiveMessaging: thread[#{name}]: receive loop terminated" end end while @running trap("TERM", "EXIT") living = false @connection_threads.each { |name, thread| living ||= thread.alive? } @running = living sleep(1) end ActiveMessaging.logger.error "All connection threads have died..." rescue Interrupt ActiveMessaging.logger.error "\n<<Interrupt received>>\n" rescue Object=>exception ActiveMessaging.logger.error "#{exception.class.name}: #{exception.message}\n\t#{exception.backtrace.join("\n\t")}" raise exception ensure ActiveMessaging.logger.error "Cleaning up..." stop ActiveMessaging.logger.error "=> END" end
stop()
click to toggle source
# File lib/activemessaging/gateway.rb, line 84 def stop # first tell the threads to stop their looping, so they'll stop when next complete a receive/dispatch cycle @running = false # if they are dispatching (i.e. !thread[:message].nil?), wait for them to finish # if they are receiving (i.e. thread[:message].nil?), stop them by raising exception dispatching = true while dispatching dispatching = false @connection_threads.each do |name, thread| if thread[:message] dispatching = true # if thread got killed, but dispatch not done, try it again if thread.alive? ActiveMessaging.logger.error "Waiting on thread #{name} to finish processing last message..." else ActiveMessaging.logger.error "Starting thread #{name} to finish processing last message..." msg = thread[:message] thread.exit thread = Thread.start do begin Thread.current[:message] = msg dispatch Thread.current[:message] ensure Thread.current[:message] = nil end end end else thread.raise StopProcessingException, "Time to stop." if thread.alive? end end sleep(1) end unsubscribe disconnect end
subscribe()
click to toggle source
# File lib/activemessaging/gateway.rb, line 139 def subscribe subscriptions.each { |key, subscription| subscription.subscribe } end
subscribe_to(destination_name, processor, headers={})
click to toggle source
# File lib/activemessaging/gateway.rb, line 289 def subscribe_to destination_name, processor, headers={} proc_name = processor.name.underscore proc_sym = processor.name.underscore.to_sym if (!current_processor_group || processor_groups[current_processor_group].include?(proc_sym)) @subscriptions["#{proc_name}:#{destination_name}"]= Subscription.new(find_destination(destination_name), processor, headers) end end
unsubscribe()
click to toggle source
# File lib/activemessaging/gateway.rb, line 143 def unsubscribe subscriptions.each { |key, subscription| subscription.unsubscribe } end