class EstormMessageProcessor::Base

Attributes

channel[R]
conn[R]
consumer[R]
queue[R]

Public Class Methods

logger() click to toggle source
# File lib/estorm-message-processor/base.rb, line 10
def Base.logger
  @@logger
end
logger=(logger) click to toggle source
# File lib/estorm-message-processor/base.rb, line 14
def Base.logger=(logger)
  @@logger = logger
end

Public Instance Methods

logger() click to toggle source
# File lib/estorm-message-processor/base.rb, line 18
def logger
  @@logger
end
queue_creation(config) click to toggle source
# File lib/estorm-message-processor/base.rb, line 69
def queue_creation(config)
   setup_bunny_communications(config[:url],config[:connecturlflag],config[:queuename])
   #@consumer=EstormMessageProcessor::Consumer.new(@channel, @queue, config[:consumer_name], true, false, config)
   @consumer=EstormMessageProcessor::Consumer.new(@channel, @queue)

   @consumer.logger=logger
   raise "consumer creation problem" if @consumer==nil
   queue_mgmt(config)
end
queue_mgmt(config) click to toggle source
# File lib/estorm-message-processor/base.rb, line 52
def queue_mgmt(config)
   msg= "[*] Waiting for messages in #{@queue.name}.  blocking is #{config[:blocking]}"
   logger.info msg
   count=0
   @channel.prefetch(1)  if config[:prefetch_one] # set quality of service to only delivery one message at a time....
   @msg_count,consumer_count = @consumer.queue_statistics  # just to get the stats before entering hte queue
#  @queue.subscribe(:block => config[:blocking]) do |delivery_info, properties, body|
  @consumer.target(@msg_count,config[:exit_when_done]) if config[:exit_when_done]
  @consumer.on_delivery() do |delivery_info, metadata, payload|
     @consumer.process_messages(delivery_info,metadata,payload)
     msg=   "ON DELIVERY: #{@consumer.count}: messages processed"
     logger.info msg
     @channel.close if @consumer.cancelled? 
    
    # @consumer.cancel if msg_count==0 && config[:exit_when_empty]
    end 
end
setup_bunny_communications(url,flag,queuename) click to toggle source
# File lib/estorm-message-processor/base.rb, line 28
def setup_bunny_communications(url,flag,queuename)
  @client=EstormMessageProcessor::Client.new
  @conn,@channel=@client.setup_bunny(url,flag)
  raise "connection problem with #{@client.inspect}" if @conn==nil
  @channel   = @conn.create_channel
  @queue   = @channel.queue(queuename)
  msg= "set up active MQ on #{queuename}"
  logger.info msg
end
start(config) click to toggle source
# File lib/estorm-message-processor/base.rb, line 79
def start(config)
  msg= "Connecting to bunny environment #{config.inspect}"
  logger.info msg
  config[:exit_when_done]=false if  config[:exit_when_done]==nil
  queue_creation(config)
  
  # the block flag shuts down the thread. the timeout values says whether to unsubscriber
  #need to set ack to true to manage the qos parameter
 # retval= @queue.subscribe_with(@consumer,:ack => true, :block => config[:blocking], :timeout => config[:timeout])
 #  retval= @queue.subscribe_with(@consumer,:ack => true, :block => config[:blocking])
 retval ="[did not subscribe as msg count = 0]"
 retval= @queue.subscribe_with(@consumer, :block => config[:blocking])  if !config[:exit_when_done] or @msg_count >0
 # loop do
      #should loop forever if blocking... otherwise needs  a loop
  #   sleep 1
 #  end
  msg= "Ending======about to tear_down_bunny [retval: #{retval}]...."
  logger.info msg
  tear_down_bunny
    
end
tear_down_bunny() click to toggle source
# File lib/estorm-message-processor/base.rb, line 38
def tear_down_bunny
   if @conn!=nil && @conn.open? && @channel!=nil && @channel.open?
     sleep 1
     @consumer.cancel if @consumer!=nil && !@consumer.cancelled?
     sleep 1
   #  @queue.unsubscribe
   
   #  sleep 0.5
   end
   @conn.close if @conn != nil && @conn.open?    #new by soctt
   msg= "closing bunny"
   logger.info msg
end