Actors are Celluloid's concurrency primitive. They're implemented as normal Ruby objects wrapped in threads which communicate with asynchronous messages.
Obtain all running actors in the system
# File lib/celluloid/actor.rb, line 49 def all Celluloid.actor_system.running end
Invoke a method asynchronously on an actor via its mailbox
# File lib/celluloid/actor.rb, line 37 def async(mailbox, meth, *args, &block) proxy = Proxy::Async.new(mailbox, "UnknownClass") proxy.method_missing(meth, *args, &block) end
Invoke a method on the given actor via its mailbox
# File lib/celluloid/actor.rb, line 31 def call(mailbox, meth, *args, &block) proxy = Proxy::Sync.new(mailbox, "UnknownClass") proxy.method_missing(meth, *args, &block) end
Obtain the current actor
# File lib/celluloid/actor.rb, line 17 def current actor = Thread.current[:celluloid_actor] fail NotActorError, "not in actor scope" unless actor actor.behavior_proxy end
Call a method asynchronously and retrieve its value later
# File lib/celluloid/actor.rb, line 43 def future(mailbox, meth, *args, &block) proxy = Proxy::Future.new(mailbox, "UnknownClass") proxy.method_missing(meth, *args, &block) end
Wait for an actor to terminate
# File lib/celluloid/actor.rb, line 96 def join(actor, timeout = nil) actor.thread.join(timeout) actor end
Forcibly kill a given actor
# File lib/celluloid/actor.rb, line 89 def kill(actor) actor.thread.kill actor.mailbox.shutdown if actor.mailbox.alive? end
Link to another actor
# File lib/celluloid/actor.rb, line 66 def link(actor) monitor actor Thread.current[:celluloid_actor].links << actor end
Are we bidirectionally linked to the given actor?
# File lib/celluloid/actor.rb, line 83 def linked_to?(actor) monitoring?(actor) && Thread.current[:celluloid_actor].links.include?(actor) end
Watch for exit events from another actor
# File lib/celluloid/actor.rb, line 54 def monitor(actor) fail NotActorError, "can't link outside actor context" unless Celluloid.actor? Thread.current[:celluloid_actor].linking_request(actor, :link) end
Are we monitoring the given actor?
# File lib/celluloid/actor.rb, line 78 def monitoring?(actor) actor.links.include? Actor.current end
# File lib/celluloid/actor.rb, line 102 def initialize(behavior, options) @behavior = behavior @actor_system = options.fetch(:actor_system) @mailbox = options.fetch(:mailbox_class, Mailbox).new @mailbox.max_size = options.fetch(:mailbox_size, nil) @task_class = options[:task_class] || Celluloid.task_class @exit_handler = method(:default_exit_handler) @exclusive = options.fetch(:exclusive, false) @timers = Timers::Group.new @tasks = Internals::TaskSet.new @links = Internals::Links.new @handlers = Internals::Handlers.new @receivers = Internals::Receivers.new(@timers) @signals = Internals::Signals.new @running = false @name = nil handle(SystemEvent) do |message| handle_system_event message end end
Obtain the name of the current actor
# File lib/celluloid/actor.rb, line 24 def registered_name actor = Thread.current[:celluloid_actor] fail NotActorError, "not in actor scope" unless actor actor.name end
Unlink from another actor
# File lib/celluloid/actor.rb, line 72 def unlink(actor) unmonitor actor Thread.current[:celluloid_actor].links.delete actor end
Stop waiting for exit events from another actor
# File lib/celluloid/actor.rb, line 60 def unmonitor(actor) fail NotActorError, "can't link outside actor context" unless Celluloid.actor? Thread.current[:celluloid_actor].linking_request(actor, :unlink) end
Schedule a block to run at the given time
# File lib/celluloid/actor.rb, line 238 def after(interval, &block) @timers.after(interval) { task(:timer, &block) } end
# File lib/celluloid/actor.rb, line 139 def behavior_proxy @behavior.proxy end
Clean up after this actor
# File lib/celluloid/actor.rb, line 314 def cleanup(exit_event) Celluloid::Probe.actor_died(self) if $CELLULOID_MONITORING @mailbox.shutdown @links.each do |actor| actor.mailbox << exit_event if actor.mailbox.alive? end tasks.to_a.each do |task| begin task.terminate rescue DeadTaskError # TODO: not tested (failed on Travis) end end rescue => ex # TODO: metadata Internals::Logger.crash("CLEANUP CRASHED!", ex) end
# File lib/celluloid/actor.rb, line 291 def default_exit_handler(event) fail event.reason if event.reason end
Schedule a block to run at the given time
# File lib/celluloid/actor.rb, line 243 def every(interval, &block) @timers.every(interval) { task(:timer, &block) } end
Register a new handler for a given pattern
# File lib/celluloid/actor.rb, line 223 def handle(*patterns, &block) @handlers.handle(*patterns, &block) end
Handle any exceptions that occur within a running actor
# File lib/celluloid/actor.rb, line 296 def handle_crash(exception) # TODO: add meta info Internals::Logger.crash("Actor crashed!", exception) shutdown ExitEvent.new(behavior_proxy, exception) rescue => ex Internals::Logger.crash("Actor#handle_crash CRASHED!", ex) end
Handle standard low-priority messages
# File lib/celluloid/actor.rb, line 282 def handle_message(message) unless @handlers.handle_message(message) unless @receivers.handle_message(message) Internals::Logger.debug "Discarded message (unhandled): #{message}" if $CELLULOID_DEBUG end end message end
Handle high-priority system event messages
# File lib/celluloid/system_events.rb, line 4 def handle_system_event(event) if handler = SystemEvent.handle(event.class) send(handler, event) else Internals::Logger.debug "Discarded message (unhandled): #{message}" if $CELLULOID_DEBUG end end
Perform a linking request with another actor
# File lib/celluloid/actor.rb, line 181 def linking_request(receiver, type) Celluloid.exclusive do receiver.mailbox << LinkingRequest.new(Actor.current, type) system_events = [] Timers::Wait.for(LINKING_TIMEOUT) do |remaining| begin message = @mailbox.receive(remaining) do |msg| msg.is_a?(LinkingResponse) && msg.actor.mailbox.address == receiver.mailbox.address && msg.type == type end rescue TaskTimeout next # IO reactor did something, no message in queue yet. end if message.instance_of? LinkingResponse Celluloid::Probe.actors_linked(self, receiver) if $CELLULOID_MONITORING system_events.each { |ev| @mailbox << ev } return elsif message.is_a? SystemEvent # Queue up pending system events to be processed after we've successfully linked system_events << message else fail "Unexpected message type: #{message.class}. Expected LinkingResponse, NilClass, SystemEvent." end end fail TaskTimeout, "linking timeout of #{LINKING_TIMEOUT} seconds exceeded with receiver: #{receiver}" end end
Receive an asynchronous message
# File lib/celluloid/actor.rb, line 228 def receive(timeout = nil, &block) while true message = @receivers.receive(timeout, &block) return message unless message.is_a?(SystemEvent) handle_system_event(message) end end
Run the actor loop
# File lib/celluloid/actor.rb, line 149 def run while @running begin @timers.wait do |interval| interval = 0 if interval && interval < 0 if message = @mailbox.check(interval) handle_message(message) break unless @running end end rescue MailboxShutdown @running = false rescue MailboxDead # TODO: not tests (but fails occasionally in tests) @running = false end end shutdown rescue ::Exception => ex handle_crash(ex) raise unless ex.is_a?(StandardError) || ex.is_a?(Celluloid::Interruption) end
# File lib/celluloid/actor.rb, line 143 def setup_thread Thread.current[:celluloid_actor] = self Thread.current[:celluloid_mailbox] = @mailbox end
Handle cleaning up this actor after it exits
# File lib/celluloid/actor.rb, line 305 def shutdown(exit_event = ExitEvent.new(behavior_proxy)) @behavior.shutdown cleanup exit_event ensure Thread.current[:celluloid_actor] = nil Thread.current[:celluloid_mailbox] = nil end
Send a signal with the given name to all waiting methods
# File lib/celluloid/actor.rb, line 213 def signal(name, value = nil) @signals.broadcast name, value end
Sleep for the given amount of time
# File lib/celluloid/actor.rb, line 276 def sleep(interval) sleeper = Sleeper.new(@timers, interval) Celluloid.suspend(:sleeping, sleeper) end
# File lib/celluloid/actor.rb, line 127 def start @running = true @thread = Internals::ThreadHandle.new(@actor_system, :actor) do setup_thread run end @proxy = Proxy::Actor.new(@mailbox, @thread) Celluloid::Probe.actor_created(self) if $CELLULOID_MONITORING Celluloid::Actor::Manager.actor_created(self) if $CELLULOID_MANAGED end
Run a method inside a task unless it's exclusive
# File lib/celluloid/actor.rb, line 334 def task(task_type, meta = nil) @task_class.new(task_type, meta) do if @exclusive Celluloid.exclusive { yield } else yield end end.resume end
Terminate this actor
# File lib/celluloid/actor.rb, line 176 def terminate @running = false end
# File lib/celluloid/actor.rb, line 247 def timeout(duration) bt = caller task = Task.current timer = @timers.after(duration) do exception = TaskTimeout.new("execution expired") exception.set_backtrace bt task.resume exception end yield ensure timer.cancel if timer end
Wait for the given signal
# File lib/celluloid/actor.rb, line 218 def wait(name) @signals.wait name end