class Bumbleworks::Worker
Attributes
hostname[R]
id[R]
ip[R]
launched_at[R]
pid[R]
system[R]
Public Class Methods
active_worker_states()
click to toggle source
# File lib/bumbleworks/worker.rb, line 31 def active_worker_states info.inject({}) { |hsh, info| id, state = info.id, info.state unless info.in_stopped_state? hsh[id] = state end hsh } end
change_worker_state(new_state, options = {})
click to toggle source
# File lib/bumbleworks/worker.rb, line 41 def change_worker_state(new_state, options = {}) with_worker_state_enabled do Bumbleworks.dashboard.worker_state = new_state Bumbleworks::Support.wait_until(options) do active_worker_states.values.all? { |state| state == new_state } end end return true rescue Bumbleworks::Support::WaitTimeout raise WorkerStateNotChanged, "Worker states: #{active_worker_states.inspect}" end
control_document()
click to toggle source
# File lib/bumbleworks/worker.rb, line 82 def control_document doc = Bumbleworks.dashboard.storage.get('variables', 'worker_control') || {} doc['type'] = 'variables' doc['_id'] = 'worker_control' doc['workers'] ||= {} doc end
info()
click to toggle source
# File lib/bumbleworks/worker.rb, line 10 def info Bumbleworks::Worker::Info || {} end
info_document()
click to toggle source
# File lib/bumbleworks/worker.rb, line 90 def info_document doc = Bumbleworks.dashboard.storage.get('variables', 'workers') || {} doc['type'] = 'variables' doc['_id'] = 'workers' doc['workers'] ||= {} doc end
new(*args, &block)
click to toggle source
Calls superclass method
# File lib/bumbleworks/worker.rb, line 99 def initialize(*args, &block) super @pid = Process.pid @id = SecureRandom.uuid @launched_at = Time.now @ip = Ruote.local_ip @hostname = Socket.gethostname @system = `uname -a`.strip rescue nil if @info @info = Info.new(self) save_info end end
pause_all(options = {})
click to toggle source
# File lib/bumbleworks/worker.rb, line 23 def pause_all(options = {}) change_worker_state('paused', options) end
refresh_worker_info(options = {})
click to toggle source
# File lib/bumbleworks/worker.rb, line 53 def refresh_worker_info(options = {}) with_worker_state_enabled do info.each do |worker_info| if !worker_info.in_stopped_state? && worker_info.stalling? worker_info.record_new_state("stalled") end end end end
shutdown_all(options = {})
click to toggle source
# File lib/bumbleworks/worker.rb, line 14 def shutdown_all(options = {}) # First, send all running workers a message to stop change_worker_state('stopped', options) ensure # Now ensure that future started workers will be started # in "running" mode instead of automatically stopped change_worker_state('running', options) end
toggle_worker_state_enabled(switch)
click to toggle source
# File lib/bumbleworks/worker.rb, line 63 def toggle_worker_state_enabled(switch) unless [true, false].include?(switch) raise ArgumentError, "Must call with true or false" end Bumbleworks.dashboard.context['worker_state_enabled'] = switch end
unpause_all(options = {})
click to toggle source
# File lib/bumbleworks/worker.rb, line 27 def unpause_all(options = {}) change_worker_state('running', options) end
with_worker_state_enabled() { || ... }
click to toggle source
# File lib/bumbleworks/worker.rb, line 74 def with_worker_state_enabled was_already_enabled = worker_state_enabled? toggle_worker_state_enabled(true) unless was_already_enabled yield ensure toggle_worker_state_enabled(false) unless was_already_enabled end
worker_state_enabled?()
click to toggle source
# File lib/bumbleworks/worker.rb, line 70 def worker_state_enabled? !!Bumbleworks.dashboard.context['worker_state_enabled'] end
Public Instance Methods
class_name()
click to toggle source
# File lib/bumbleworks/worker.rb, line 115 def class_name self.class.to_s end
desired_state()
click to toggle source
# File lib/bumbleworks/worker.rb, line 132 def desired_state control_hash = worker_control_variable || @storage.get("variables", "worker") || { "state" => "running" } control_hash["state"] end
determine_state()
click to toggle source
# File lib/bumbleworks/worker.rb, line 139 def determine_state @state_mutex.synchronize do if @state != "stopped" && @context["worker_state_enabled"] @state = desired_state save_info end end end
info()
click to toggle source
# File lib/bumbleworks/worker.rb, line 148 def info self.class.info[id] end
save_info()
click to toggle source
# File lib/bumbleworks/worker.rb, line 119 def save_info @info.save if @info end
shutdown()
click to toggle source
Calls superclass method
# File lib/bumbleworks/worker.rb, line 123 def shutdown super save_info end
worker_control_variable()
click to toggle source
# File lib/bumbleworks/worker.rb, line 128 def worker_control_variable self.class.control_document["workers"][id] end