class ActionManager

begin rdoc

This class provides support to handle actions. Class methods, or actions, can be registered in the action manager. The manager will wait for actions to be triggered (thread-safe), and will execute them concurrently. The action manager can be used to synchronize different objects in different threads

Example

class Sample
    attr_reader :am

    def initialize
        @am = ActionManager.new(15,true)

        @am.register_action("SLEEP",method("sleep_action"))
    end

    def sleep_action(secs)
        sleep(secs)
    end

    def finalize_action
        p "Exiting..."
        @am.stop_listener
    end
end

s = Sample.new

s.@am.start_listener

# Objects in other threads can trigger actions like this # s.am.trigger_action(“SLEEP”,rand(3)+1) # s.am.trigger_action(“FINALIZE”)

end

Public Class Methods

new(concurrency=10, threaded=true) click to toggle source

Creates a new Action Manager

concurrency is the maximun number of actions that can run at the same time threaded if true actions will be executed by default in a different thread

# File lib/ActionManager.rb, line 64
def initialize(concurrency=10, threaded=true)
    @finalize       = false
    @actions        = Hash.new
    @threaded       = threaded

    @concurrency    = concurrency
    @num_running    = 0

    @action_queue   = Array.new
    @action_running = Hash.new

    @threads_mutex  = Mutex.new
    @threads_cond   = ConditionVariable.new
end

Public Instance Methods

cancel_action(action_id) click to toggle source
# File lib/ActionManager.rb, line 140
def cancel_action(action_id)
    @threads_mutex.synchronize {
        action = @action_running[action_id]
        if action
            thread = action[:thread]
        else
            thread = nil
        end

        if thread
            thread.kill

            @num_running -= 1
            delete_running_action(action_id)

            @threads_cond.signal
        else
            i = @action_queue.select{|x| x[:id] == action_id}.first
            @action_queue.delete(i) if i
        end
    }
end
register_action(aname, method, threaded=nil) click to toggle source

Registers a new action in the manager. An action is defined by:

aname name of the action, it will identify the action method it's invoked with call. It should be a Proc or Method object threaded execute the action in a new thread

# File lib/ActionManager.rb, line 84
def register_action(aname, method, threaded=nil)
    threaded ||= @threaded

    @actions[aname]={
        :method     => method,
        :threaded   => threaded
    }
end
start_listener() click to toggle source
# File lib/ActionManager.rb, line 163
def start_listener
    while true
        @threads_mutex.synchronize {
            while ((@concurrency - @num_running)==0) || empty_queue
                @threads_cond.wait(@threads_mutex)

                return if (@finalize && @num_running == 0)
            end

            run_action
        }
    end
end
trigger_action(aname, action_id, *aargs) click to toggle source

Triggers the execution of the action.

aname name of the action action_id an id to identify the action (to cancel it later) aargs arguments to call the action

# File lib/ActionManager.rb, line 98
def trigger_action(aname, action_id, *aargs)

    @threads_mutex.synchronize {
        return if @finalize

        if aname == :FINALIZE
            finalize if respond_to?(:finalize)
            @finalize = true
            @threads_cond.signal if @num_running == 0
            return
        end

        if !@actions.has_key?(aname)
            return
        end

        arity=@actions[aname][:method].arity

        if arity < 0
            # Last parameter is an array
            arity = -arity - 1
            if arity > aargs.length
                # Message has not enough parameters
                return
            end
            # Converts last parameters to an array
            aargs[arity..-1]=[aargs[arity..-1]]
        else
            if arity != aargs.length
                return
            end
        end

        @action_queue << @actions[aname].merge(:args => aargs,
                :id => action_id)

        if @num_running < @concurrency
            @threads_cond.signal
        end
    }
end

Protected Instance Methods

delete_running_action(action_id) click to toggle source
# File lib/ActionManager.rb, line 179
def delete_running_action(action_id)
    @action_running.delete(action_id)
end
empty_queue() click to toggle source
# File lib/ActionManager.rb, line 187
def empty_queue
    @action_queue.size==0
end
get_runable_action() click to toggle source
# File lib/ActionManager.rb, line 183
def get_runable_action
    @action_queue.shift
end
run_action() click to toggle source
# File lib/ActionManager.rb, line 191
def run_action
    action = get_runable_action

    if action
        @num_running += 1

        if action[:threaded]
            thread = Thread.new {
                begin
                    action[:method].call(*action[:args])
                ensure
                    @threads_mutex.synchronize {
                        @num_running -= 1
                        delete_running_action(action[:id])

                        @threads_cond.signal
                    }
                end
            }

            action[:thread] = thread
            @action_running[action[:id]] = action
        else
            action[:method].call(*action[:args])

            @num_running -= 1
        end
    end
end