class ActiveMessaging::Adapters::Synch::Connection

Attributes

configuration[RW]

configurable params

max_process[RW]

configurable params

processing_pids[RW]

configurable params

use_fork[RW]

configurable params

Public Class Methods

new(cfg) click to toggle source

generic init method needed by a13g

# File lib/activemessaging/adapters/synch.rb, line 16
def initialize cfg
  ActiveMessaging.logger.debug "ActiveMessaging::Adapters::Synch::Connection.initialize: #{cfg.inspect}"
  @configuration = cfg
  
  @use_fork = !!@configuration[:use_fork]

  # max at once
  @max_process = 10
  # keep track of the processes running
  @processing_pids = {}

  if use_fork
    Thread.new {
      watch_processes
    }
  end
end

Public Instance Methods

send(destination_name, message_body, message_headers={}) click to toggle source
# File lib/activemessaging/adapters/synch.rb, line 47
def send destination_name, message_body, message_headers={}
  message = Message.new(message_body, 'id', message_headers, destination_name, 'MESSAGE')
  
  if use_fork

    if processing_pids.size > max_process
      ActiveMessaging.logger.debug "ActiveMessaging:synch too many processes: #{processing_pids.size} > #{max_process}"
      sleep(0.5)
    end

    pid = fork {
      ActiveMessaging.logger.debug "\n-------------------- ActiveMessaging:synch start fork dispath (#{Process.pid}) --------------------"
      ActiveMessaging::Gateway.prepare_application
      ActiveMessaging::Gateway._dispatch(message)
      ActiveMessaging::Gateway.reset_application
      ActiveMessaging.logger.debug "-------------------- ActiveMessaging:synch end fork dispath (#{Process.pid})--------------------\n"
    }

    Process.detach(pid)
    processing_pids[pid] = "Destination: #{destination_name}, Message: #{message_body}"

  else

    ActiveMessaging.logger.debug "\n-------------------- ActiveMessaging:synch before dispath --------------------"
    ActiveMessaging::Gateway.prepare_application
    ActiveMessaging::Gateway._dispatch(message)
    ActiveMessaging::Gateway.reset_application
    ActiveMessaging.logger.debug "-------------------- ActiveMessaging:synch after dispath --------------------\n"

  end
  
end
watch_processes() click to toggle source
# File lib/activemessaging/adapters/synch.rb, line 34
def watch_processes
  while true
    begin
      pid = Process.wait(0, Process::WNOHANG)
      if m = processing_pids.delete(pid)
        ActiveMessaging.logger.debug "ActiveMessaging:synch - processing complete for pid (#{pid}):\n\t#{m}"
      end
      sleep(0.5)
    rescue
    end
  end
end