module GemeraldBeanstalk::BeanstalkHelper
Constants
- BAD_FORMAT
- BURIED
- CRLF
- DEADLINE_SOON
- DELETED
- EXPECTED_CRLF
- JOB_INACTIVE_STATES
- JOB_RESERVED_STATES
- JOB_TOO_BIG
- KICKED
- NOT_FOUND
- NOT_IGNORED
- PAUSED
- RELEASED
- TIMED_OUT
- TOUCHED
- UNKNOWN_COMMAND
Public Class Methods
included(beanstalk)
click to toggle source
# File lib/gemerald_beanstalk/beanstalk_helper.rb, line 23 def self.included(beanstalk) beanstalk.extend(ClassMethods) end
Public Instance Methods
adjust_stats_cmd_put()
click to toggle source
ease handling of odd case where put can return BAD_FORMAT
but increment stats
# File lib/gemerald_beanstalk/beanstalk_helper.rb, line 38 def adjust_stats_cmd_put adjust_stats_key(:'cmd-put') end
connect(connection = nil)
click to toggle source
# File lib/gemerald_beanstalk/beanstalk_helper.rb, line 43 def connect(connection = nil) beanstalk_connection = GemeraldBeanstalk::Connection.new(self, connection) @connections << beanstalk_connection adjust_stats_key(:'total-connections') return beanstalk_connection end
disconnect(connection)
click to toggle source
# File lib/gemerald_beanstalk/beanstalk_helper.rb, line 51 def disconnect(connection) connection.close_connection tube(connection.tube_used).stop_use connection.tubes_watched.each do |watched_tube| tube(watched_tube).ignore connection.ignore(watched_tube, :force) end @reserved[connection].each do |job| job.release(connection, job.priority, 0, false) end @reserved.delete(connection) @connections.delete(connection) end
execute(command)
click to toggle source
# File lib/gemerald_beanstalk/beanstalk_helper.rb, line 66 def execute(command) return send(command.method_name, *command.arguments) end
register_job_timeout(connection, job)
click to toggle source
# File lib/gemerald_beanstalk/beanstalk_helper.rb, line 71 def register_job_timeout(connection, job) @reserved[connection].delete(job) adjust_stats_key(:'job-timeouts') honor_reservations(job) end
Private Instance Methods
active_tubes()
click to toggle source
# File lib/gemerald_beanstalk/beanstalk_helper.rb, line 79 def active_tubes tubes = {} @tubes.each_pair { |tube_name, tube| tubes[tube_name] = tube if tube.active? } return tubes end
adjust_stats_key(key, adjustment = 1)
click to toggle source
# File lib/gemerald_beanstalk/beanstalk_helper.rb, line 86 def adjust_stats_key(key, adjustment = 1) @stats[key] += adjustment end
cancel_reservations(connection)
click to toggle source
# File lib/gemerald_beanstalk/beanstalk_helper.rb, line 91 def cancel_reservations(connection) connection.tubes_watched.each do |tube_name| tube(tube_name).cancel_reservation(connection) end return connection end
deadline_pending?(connection)
click to toggle source
# File lib/gemerald_beanstalk/beanstalk_helper.rb, line 99 def deadline_pending?(connection) return @reserved[connection].any?(&:deadline_pending?) end
find_job(job_id, options = {})
click to toggle source
# File lib/gemerald_beanstalk/beanstalk_helper.rb, line 104 def find_job(job_id, options = {}) return unless (job_id = job_id.to_i) > 0 only = Array(options[:only]) except = Array(options[:except]).unshift(:deleted) job = @jobs[job_id - 1] return nil if job.nil? || except.include?(job.state) return (only.empty? || only.include?(job.state)) ? job : nil end
honor_reservations(job_or_tube)
click to toggle source
# File lib/gemerald_beanstalk/beanstalk_helper.rb, line 116 def honor_reservations(job_or_tube) if job_or_tube.is_a?(GemeraldBeanstalk::Job) job = job_or_tube tube = tube(job.tube_name) elsif job_or_tube.is_a?(GemeraldBeanstalk::Tube) tube = job_or_tube job = tube.next_job end while job && (next_reservation = tube.next_reservation) next unless try_dispatch(next_reservation, job) job = tube.next_job end end
next_job(connection, state = :ready)
click to toggle source
# File lib/gemerald_beanstalk/beanstalk_helper.rb, line 132 def next_job(connection, state = :ready) best_candidate = nil connection.tubes_watched.each do |tube_name| candidate = tube(tube_name).next_job(state) next if candidate.nil? best_candidate = candidate if best_candidate.nil? || candidate < best_candidate end return best_candidate end
peek_by_state(connection, state)
click to toggle source
# File lib/gemerald_beanstalk/beanstalk_helper.rb, line 145 def peek_by_state(connection, state) adjust_stats_key(:"cmd-peek-#{state}") return peek_message(tube(connection.tube_used).next_job(state, :peek)) end
peek_message(job)
click to toggle source
# File lib/gemerald_beanstalk/beanstalk_helper.rb, line 151 def peek_message(job) job.nil? ? NOT_FOUND : "FOUND #{job.id} #{job.bytes}\r\n#{job.body}\r\n" end
reserve_job(connection, timeout = 0)
click to toggle source
# File lib/gemerald_beanstalk/beanstalk_helper.rb, line 156 def reserve_job(connection, timeout = 0) connection.worker = true if deadline_pending?(connection) connection.transmit(DEADLINE_SOON) return true end connection.tubes_watched.each do |tube_name| tube(tube_name).reserve(connection) end connection.wait(timeout <= 0 ? nil : Time.now.to_f + timeout) dispatched = false while !dispatched break if (job = next_job(connection)).nil? dispatched = try_dispatch(connection, job) end return dispatched end
stats_commands()
click to toggle source
# File lib/gemerald_beanstalk/beanstalk_helper.rb, line 179 def stats_commands return { 'cmd-put' => @stats[:'cmd-put'], 'cmd-peek' => @stats[:'cmd-peek'], 'cmd-peek-ready' => @stats[:'cmd-peek-ready'], 'cmd-peek-delayed' => @stats[:'cmd-peek-delayed'], 'cmd-peek-buried' => @stats[:'cmd-peek-buried'], 'cmd-reserve' => @stats[:'cmd-reserve'], 'cmd-reserve-with-timeout' => @stats[:'cmd-reserve-with-timeout'], 'cmd-delete' => @stats[:'cmd-delete'], 'cmd-release' => @stats[:'cmd-release'], 'cmd-use' => @stats[:'cmd-use'], 'cmd-watch' => @stats[:'cmd-watch'], 'cmd-ignore' => @stats[:'cmd-ignore'], 'cmd-bury' => @stats[:'cmd-bury'], 'cmd-kick' => @stats[:'cmd-kick'], 'cmd-touch' => @stats[:'cmd-touch'], 'cmd-stats' => @stats[:'cmd-stats'], 'cmd-stats-job' => @stats[:'cmd-stats-job'], 'cmd-stats-tube' => @stats[:'cmd-stats-tube'], 'cmd-list-tubes' => @stats[:'cmd-list-tubes'], 'cmd-list-tube-used' => @stats[:'cmd-list-tube-used'], 'cmd-list-tubes-watched' => @stats[:'cmd-list-tubes-watched'], 'cmd-pause-tube' => @stats[:'cmd-pause-tube'], } end
stats_connections()
click to toggle source
# File lib/gemerald_beanstalk/beanstalk_helper.rb, line 207 def stats_connections conn_stats = { 'current-connections' => @connections.length, 'current-producers' => 0, 'current-workers' => 0, 'current-waiting' => 0, 'total-connections' => @stats[:'total-connections'] } @connections.each do |connection| conn_stats['current-producers'] += 1 if connection.producer? conn_stats['current-waiting'] += 1 if connection.waiting? conn_stats['current-workers'] += 1 if connection.worker? end return conn_stats end
try_dispatch(connection, job)
click to toggle source
# File lib/gemerald_beanstalk/beanstalk_helper.rb, line 224 def try_dispatch(connection, job) connection.mutex.synchronize do # Make sure connection still waiting and job not claimed return false unless connection.waiting? && job.reserve(connection) connection.transmit("RESERVED #{job.id} #{job.bytes}\r\n#{job.body}\r\n") cancel_reservations(connection) end @reserved[connection] << job return true end
tube(tube_name, create_if_missing = false)
click to toggle source
# File lib/gemerald_beanstalk/beanstalk_helper.rb, line 236 def tube(tube_name, create_if_missing = false) tube = @tubes[tube_name] return tube unless tube.nil? || tube.deactivated? return @tubes[tube_name] = GemeraldBeanstalk::Tube.new(tube_name) if create_if_missing @tubes.delete(tube_name) unless tube.nil? return nil end
tube_list(tube_list)
click to toggle source
# File lib/gemerald_beanstalk/beanstalk_helper.rb, line 248 def tube_list(tube_list) return yaml_response(tube_list.map { |key| "- #{key}" }) end
update_state()
click to toggle source
# File lib/gemerald_beanstalk/beanstalk_helper.rb, line 253 def update_state update_waiting update_timeouts end
update_timeouts()
click to toggle source
# File lib/gemerald_beanstalk/beanstalk_helper.rb, line 259 def update_timeouts @reserved.values.flatten.each(&:state) @delayed.keep_if do |job| case job.state when :delayed true when :ready honor_reservations(job) false else false end end @paused.keep_if do |tube| if tube.paused? true else honor_reservations(tube) false end end end
update_waiting()
click to toggle source
# File lib/gemerald_beanstalk/beanstalk_helper.rb, line 283 def update_waiting waiting_connections.each do |connection| if connection.waiting? && deadline_pending?(connection) message_for_connection = DEADLINE_SOON elsif connection.timed_out? message_for_connection = TIMED_OUT else next end cancel_reservations(connection) connection.transmit(message_for_connection) end end
uptime()
click to toggle source
# File lib/gemerald_beanstalk/beanstalk_helper.rb, line 299 def uptime (Time.now.to_f - @up_at).to_i end
waiting_connections()
click to toggle source
# File lib/gemerald_beanstalk/beanstalk_helper.rb, line 304 def waiting_connections return @connections.select {|connection| connection.waiting? || connection.timed_out? } end
yaml_response(data)
click to toggle source
# File lib/gemerald_beanstalk/beanstalk_helper.rb, line 309 def yaml_response(data) response = %w[---].concat(data).join("\n") return "OK #{response.bytesize}\r\n#{response}\r\n" end