class PortAuthority::Agent

Scaffolding class for agents

Public Class Methods

new() click to toggle source

Common agent process init. Contains configuration load, common signal responses and runtime variables init. Implements execution of actual agents via +run+ method. Also handles any uncaught exceptions.

# File lib/port-authority/agent.rb, line 16
def initialize
  Thread.current[:name] = 'main'                        # name main thread
  @@_exit = false                                       # prepare exit flag
  @@_semaphores = { log: Mutex.new }                    # init semaphores
  @@_threads = {}                                       # init threads
  Signal.trap('INT') { exit!(1) }                       # end immediatelly
  Signal.trap('TERM') { end! }                          # end gracefully
  Config.load! || exit!(1)                              # load config or die
  begin                                                 # all-wrapping exception ;)
    run                                                 # hook to child class
  rescue StandardError => e
    Logger.alert "UNCAUGHT EXCEPTION IN THREAD main! Dying!  X.X"
    Logger.alert [' ', "#{e.class}:", e.message].join(' ')
    e.backtrace.each {|line| Logger.debug "  #{line}"}
    exit! 1
  end
end

Public Instance Methods

end!() click to toggle source

Raise the exit flag

# File lib/port-authority/agent.rb, line 81
def end!
  @@_exit = true
end
exit?() click to toggle source

Has the exit flag been raised?

# File lib/port-authority/agent.rb, line 76
def exit?
  @@_exit
end
hostname() click to toggle source

Return hostname.

# File lib/port-authority/agent.rb, line 149
def hostname
  @hostname ||= Socket.gethostname
end
sem_create(name) click to toggle source

Create a named +Mutex+ semaphore

# File lib/port-authority/agent.rb, line 86
def sem_create(name)
  @@_semaphores.merge!(Hash[name.to_sym], Mutex.new)
end
setup(args = {}) click to toggle source

Setup the agent process. Initializes logging, system process parameters, daemonizing.

There are 4 optional parameters:

:name

+String+ Agent name. Defaults to +self.class.downcase+ of the child agent

:root

+Bool+ Require to be ran as root. Defaults to +false+.

:daemonize

+Bool+ Daemonize the process. Defaults to +false+.

:nice

+Int+ nice of the process. Defaults to +0+

# File lib/port-authority/agent.rb, line 43
def setup(args = {})
  name = args[:name] || self.class.to_s.downcase.split('::').last
  args[:root] ||= false
  args[:daemonize] ||= false
  args[:nice] ||= 0
  Logger.init! @@_semaphores[:log], name
  Logger.info 'Starting main thread'
  Logger.debug 'Setting process name'
  if RUBY_VERSION >= '2.1'
    Process.setproctitle("pa-#{name}-agent")
  else
    $0 = "pa-#{name}-agent"
  end
  if args[:root] && Process.uid != 0
    Logger.alert 'Must run under root user!'
    exit! 1
  end
  Logger.debug 'Setting CPU nice level'
  Process.setpriority(Process::PRIO_PROCESS, 0, args[:nice])
  if args[:daemonize]
    Logger.info 'Daemonizing process'
    if RUBY_VERSION < '1.9'
      exit if fork
      Process.setsid
      exit if fork
      Dir.chdir('/')
    else
      Process.daemon
    end
  end
end
thr_create(name, interval, &block) click to toggle source

Create a named +Thread+ with its +Mutex+ semaphore. The definition includes +&block+ of code that should run within the thread.

The method requires 3 parameters:

name

+Symbol+ Thread/Mutex name.

interval

+Integer+ Thread loop interval.

+&block+

+Proc+ Block of code to run.

# File lib/port-authority/agent.rb, line 98
def thr_create(name, interval, &block)
  @@_semaphores.merge!(Hash[name.to_sym, Mutex.new])
  @@_threads.merge!(Hash[name.to_sym, Thread.new do
      Thread.current[:name] = name.to_s
      Logger.info "Starting thread #{Thread.current[:name]}"
      begin
        until exit?
          yield block
          sleep interval
        end
        Logger.info "Ending thread #{Thread.current[:name]}"
      rescue StandardError => e
        Logger.alert "UNCAUGHT EXCEPTION IN THREAD #{Thread.current[:name]}"
        Logger.alert [' ', "#{e.class}:", e.message].join(' ')
        e.backtrace.each {|line| Logger.debug "  #{line}"}
        end!
      end
    end
  ])
end
thr_safe(name=Thread.current[:name].to_sym) { |block| ... } click to toggle source

Run thread-safe code. The +name+ parameter can be omitted when used from within a block of thread code. In this case the Mutex with the same +:name+ will be used.

The method accepts following parameters:

name

+Symbol+ Mutex name.

+&block+

+Proc+ Block of code to run.

# File lib/port-authority/agent.rb, line 127
def thr_safe(name=Thread.current[:name].to_sym, &block)
  @@_semaphores[name.to_sym].synchronize do
    yield block
  end
end
thr_start(name=nil) click to toggle source

Start named thread. If the name is omitted, applies to all spawned threads ;)

# File lib/port-authority/agent.rb, line 135
def thr_start(name=nil)
  return @@_threads[name].run if name
  @@_threads.each_value(&:run)
end
thr_wait(name=nil) click to toggle source

Wait for named thread to finish. If the name is omitted, applies to all spawned threads ;)

# File lib/port-authority/agent.rb, line 143
def thr_wait(name=nil)
  return @@_threads[name].join if name
  @@_threads.each_value(&:join)
end