class Bumbleworks::Worker::Info
Attributes
worker[R]
Public Class Methods
[](worker_id)
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 41 def [](worker_id) from_hash(raw_hash[worker_id]) end
all()
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 23 def all to_a end
each() { |from_hash(v)| ... }
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 17 def each raw_hash.each { |k, v| yield from_hash(v) } end
filter() { |worker| ... }
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 36 def filter return [] unless block_given? select { |info| yield info.worker } end
forget_worker(id_to_delete)
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 49 def forget_worker(id_to_delete) purge_worker_info do |id, info| id == id_to_delete end end
from_hash(hash)
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 45 def from_hash(hash) new(Bumbleworks::Worker::Proxy.new(hash)) end
new(worker)
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 73 def initialize(worker) @worker = worker @last_save = Time.now - 2 * 60 @msgs = [] unless worker.is_a?(Bumbleworks::Worker::Proxy) end
purge_stale_worker_info()
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 55 def purge_stale_worker_info purge_worker_info do |id, info| info['state'].nil? || info['state'] == 'stopped' end end
purge_worker_info(&block)
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 61 def purge_worker_info(&block) doc = Bumbleworks.dashboard.storage.get('variables', 'workers') return unless doc doc['workers'] = doc['workers'].reject { |id, info| block.call(id, info) } result = Bumbleworks.dashboard.storage.put(doc) purge_stale_worker_info if result all end
raw_hash()
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 13 def raw_hash Bumbleworks.dashboard.worker_info || {} end
where(options)
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 27 def where(options) filter_proc = proc { |worker| options.all? { |k, v| worker.send(k.to_sym) == v } } filter(&filter_proc) end
Public Instance Methods
==(other)
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 80 def ==(other) other.is_a?(Bumbleworks::Worker::Info) && other.worker == worker end
constant_worker_info_hash()
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 195 def constant_worker_info_hash { "id" => @worker.id, "class" => @worker.class_name, "name" => @worker.name, "ip" => @worker.ip, "hostname" => @worker.hostname, "pid" => @worker.pid, "system" => @worker.system, "launched_at" => @worker.launched_at, "state" => @worker.state } end
in_stopped_state?()
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 110 def in_stopped_state? worker.state.nil? || ["stopped", "stalled"].include?(worker.state) end
pause(options = {})
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 152 def pause(options = {}) send_command("paused", options) end
processed_last_hour()
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 187 def processed_last_hour raw_hash["processed_last_hour"] end
processed_last_minute()
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 179 def processed_last_minute raw_hash["processed_last_minute"] end
raw_hash()
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 85 def raw_hash self.class.raw_hash[worker.id] end
record_new_state(state)
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 93 def record_new_state(state) worker.state = state save end
reload()
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 89 def reload @worker = Bumbleworks::Worker::Proxy.new(raw_hash) end
responding?(options = {})
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 128 def responding?(options = {}) options[:since] ||= Time.now - Bumbleworks.timeout Bumbleworks::Worker.with_worker_state_enabled do Bumbleworks::Support.wait_until(options) do updated_since?(options[:since]) end end true rescue Bumbleworks::Support::WaitTimeout false end
save()
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 209 def save doc = Bumbleworks::Worker.info_document worker_info_hash = doc['workers'][@worker.id] || {} worker_info_hash.merge!(constant_worker_info_hash) worker_info_hash.merge!({ 'put_at' => Ruote.now_to_utc_s, 'uptime' => uptime, }) if defined?(@msgs) now = Time.now @msgs = @msgs.drop_while { |msg| Time.parse(msg['processed_at']) < now - 3600 } mm = @msgs.drop_while { |msg| Time.parse(msg['processed_at']) < now - 60 } hour_count = @msgs.size < 1 ? 1 : @msgs.size minute_count = mm.size < 1 ? 1 : mm.size worker_info_hash.merge!({ 'processed_last_minute' => mm.size, 'wait_time_last_minute' => mm.inject(0.0) { |s, m| s + m['wait_time'] } / minute_count.to_f, 'processed_last_hour' => @msgs.size, 'wait_time_last_hour' => @msgs.inject(0.0) { |s, m| s + m['wait_time'] } / hour_count.to_f }) end doc['workers'][@worker.id] = worker_info_hash r = storage.put(doc) @last_save = Time.now save if r != nil end
save_control_message(message)
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 172 def save_control_message(message) doc = Bumbleworks::Worker.control_document doc["workers"][id] ||= {} doc["workers"][id]["state"] = message storage.put(doc) end
send_command(command, options = {})
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 162 def send_command(command, options = {}) save_control_message(command) Bumbleworks::Worker.with_worker_state_enabled do Bumbleworks::Support.wait_until(options) do raw_hash["state"] == command end end reload end
shutdown(options = {})
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 148 def shutdown(options = {}) send_command("stopped", options) end
stalling?()
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 140 def stalling? !responding? end
storage()
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 144 def storage @worker.storage || Bumbleworks.dashboard.storage end
unpause(options = {})
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 156 def unpause(options = {}) send_command("running", options) end
Also aliased as: run
updated_at()
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 114 def updated_at Time.parse(raw_hash['put_at']) end
updated_recently?(options = {})
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 122 def updated_recently?(options = {}) options[:seconds_ago] ||= Bumbleworks.timeout latest_time = Time.now - options[:seconds_ago] updated_since?(latest_time) end
updated_since?(latest_time)
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 118 def updated_since?(latest_time) updated_at >= latest_time end
uptime()
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 102 def uptime if in_stopped_state? && worker.respond_to?(:uptime) worker.uptime else Time.now - worker.launched_at end end
wait_time_last_hour()
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 191 def wait_time_last_hour raw_hash["wait_time_last_hour"] end
wait_time_last_minute()
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 183 def wait_time_last_minute raw_hash["wait_time_last_minute"] end
worker_class_name()
click to toggle source
# File lib/bumbleworks/worker/info.rb, line 98 def worker_class_name worker.class_name end