class ActiveMessaging::Adapters::Adapter::Connection
Connection
class needed by a13g
Public Class Methods
Generic init method needed by a13g
# File lib/activemessaging/adapters/wmq.rb, line 33 def initialize(cfg) # Set default values cfg[:poll_interval] ||= 0.1 # Initialize instance members # Trick for the connection_options is to allow settings WMQ constants directly in broker.yml :)) @connection_options = cfg.each_pair {|key, value| cfg[key] = instance_eval(value) if (value.instance_of?(String) && value.match("WMQ::")) } @queue_names = [] @current_queue = 0 @queues = {} end
Public Instance Methods
Disconnect method needed by a13g No need to disconnect from the queue manager since connection and disconnection occurs inside the send and receive methods headers is never used
# File lib/activemessaging/adapters/wmq.rb, line 48 def disconnect(headers = {}) end
Receive method needed by a13g
# File lib/activemessaging/adapters/wmq.rb, line 52 def receive(options={}) raise "No subscription to receive messages from" if (@queue_names.nil? || @queue_names.empty?) start = @current_queue while true @current_queue = ((@current_queue < @queue_names.length-1) ? @current_queue + 1 : 0) sleep(@connection_options[:poll_interval]) if (@current_queue == start) q = @queues[@queue_names[@current_queue]] unless q.nil? message = retrieve_message(q) return message unless message.nil? end end end
called after a message is successfully received and processed
# File lib/activemessaging/adapters/wmq.rb, line 117 def received message, headers={} end
Send method needed by a13g headers may contains 2 different hashes to gives more control over the sending process
:descriptor => {...} to populate the descriptor of the message :put_options => {...} to specify the put options for that message
# File lib/activemessaging/adapters/wmq.rb, line 70 def send(q_name, message_data, headers={}) WMQ::QueueManager.connect(@connection_options) do |qmgr| qmgr.open_queue(:q_name => q_name, :mode => :output) do |queue| message_descriptor = headers[:descriptor] || {:format => WMQ::MQFMT_STRING} put_options = headers[:put_options].nil? ? {} : headers[:put_options].dup wmq_message = WMQ::Message.new(:data => message_data, :descriptor => message_descriptor) queue.put(put_options.merge(:message => wmq_message, :data => nil)) return Message.new(wmq_message, q_name) end end end
Subscribe method needed by a13g headers may contains a hash to give more control over the get operation on the queue
:get_options => {...} to specify the get options when receiving messages Warning : get options are set only on the first queue subscription and are common to all the queue's subscriptions Any other get options passed with subsequent subscribe on an existing queue will be discarded
subId is never used
# File lib/activemessaging/adapters/wmq.rb, line 90 def subscribe(q_name, headers={}, subId=NIL) if @queues[q_name].nil? get_options = headers[:get_options] || {} q = Queue.new(q_name, get_options) @queues[q_name] = q @queue_names << q.name end q.add_subscription end
called after a message is successfully received but unsuccessfully processed purpose is to return the message to the destination so receiving and processing and be attempted again
# File lib/activemessaging/adapters/wmq.rb, line 122 def unreceive message, headers={} end
Unsubscribe method needed by a13g Stop listening the queue only after the last unsubscription headers is never used subId is never used
# File lib/activemessaging/adapters/wmq.rb, line 105 def unsubscribe(q_name, headers={}, subId=NIL) q = @queues[q_name] unless q.nil? q.remove_subscription unless q.has_subscription? @queues.delete(q_name) @queue_names.delete(q_name) end end end
Private Instance Methods
Retrieve the first available message from the specicied queue Return nil if queue is empty
# File lib/activemessaging/adapters/wmq.rb, line 129 def retrieve_message(q) WMQ::QueueManager.connect(@connection_options) do |qmgr| qmgr.open_queue(:q_name => q.name, :mode => :input) do |queue| get_options = q.get_options.dup wmq_message = WMQ::Message.new if queue.get(get_options.merge(:message => wmq_message)) return Message.new(wmq_message, q.name) else return nil end end end end