class GemeraldBeanstalk::Beanstalk

Attributes

address[R]
max_job_size[R]

Public Class Methods

new(address, maximum_job_size = 2**16) click to toggle source
# File lib/gemerald_beanstalk/beanstalk.rb, line 12
def initialize(address, maximum_job_size = 2**16)
  @max_job_size = maximum_job_size
  @address = address
  @connections = ThreadSafe::Array.new
  @delayed = ThreadSafe::Array.new
  @id = SecureRandom.base64(16)
  @jobs = GemeraldBeanstalk::Jobs.new
  @mutex = Mutex.new
  @paused = ThreadSafe::Array.new
  @reserved = ThreadSafe::Cache.new {|reserved, key| reserved[key] = [] }
  @stats = ThreadSafe::Hash.new(0)
  @tubes = ThreadSafe::Cache.new
  @up_at = Time.now.to_f

  tube('default', :create_if_missing)
end

Protected Instance Methods

bury(connection, job_id, priority, *args) click to toggle source
# File lib/gemerald_beanstalk/beanstalk.rb, line 32
def bury(connection, job_id, priority, *args)
  adjust_stats_key(:'cmd-bury')
  job = find_job(job_id, :only => JOB_RESERVED_STATES)
  return NOT_FOUND if job.nil? || !job.bury(connection, priority)

  @reserved[connection].delete(job)
  return BURIED
end
delete(connection, job_id = nil, *args) click to toggle source
# File lib/gemerald_beanstalk/beanstalk.rb, line 42
def delete(connection, job_id = nil, *args)
  adjust_stats_key(:'cmd-delete')
  job = find_job(job_id)
  return NOT_FOUND if job.nil?

  original_state = job.state
  return NOT_FOUND unless job.delete(connection)

  tube(job.tube_name).delete(job)
  @jobs[job.id - 1] = nil
  @reserved[connection].delete(job) if JOB_RESERVED_STATES.include?(original_state)

  return DELETED
end
ignore(connection, tube_name) click to toggle source
# File lib/gemerald_beanstalk/beanstalk.rb, line 58
def ignore(connection, tube_name)
  adjust_stats_key(:'cmd-ignore')
  return NOT_IGNORED if (watched_count = connection.ignore(tube_name)).nil?
  tube = tube(tube_name)
  tube.ignore unless tube.nil?
  return "WATCHING #{watched_count}\r\n"
end
kick(connection, limit, *args) click to toggle source
# File lib/gemerald_beanstalk/beanstalk.rb, line 67
def kick(connection, limit, *args)
  adjust_stats_key(:'cmd-kick')
  limit = limit.to_i
  kicked = 0
  JOB_INACTIVE_STATES.each do |job_state|
    # GTE to handle negative limits
    break if kicked >= limit
    until (job = tube(connection.tube_used).next_job(job_state, :peek)).nil?
      kicked += 1 if job.kick
      break if kicked == limit
    end
    break if kicked > 0
  end

  return "KICKED #{kicked}\r\n"
end
kick_job(connection, job_id = nil, *args) click to toggle source
# File lib/gemerald_beanstalk/beanstalk.rb, line 85
def kick_job(connection, job_id = nil, *args)
  job = find_job(job_id, :only => JOB_INACTIVE_STATES)
  return (!job.nil? && job.kick) ? KICKED : NOT_FOUND
end
list_tube_used(connection) click to toggle source
# File lib/gemerald_beanstalk/beanstalk.rb, line 97
def list_tube_used(connection)
  adjust_stats_key(:'cmd-list-tube-used')
  return "USING #{connection.tube_used}\r\n"
end
list_tubes(connection) click to toggle source
# File lib/gemerald_beanstalk/beanstalk.rb, line 91
def list_tubes(connection)
  adjust_stats_key(:'cmd-list-tubes')
  return tube_list(active_tubes.keys)
end
list_tubes_watched(connection) click to toggle source
# File lib/gemerald_beanstalk/beanstalk.rb, line 103
def list_tubes_watched(connection)
  adjust_stats_key(:'cmd-list-tubes-watched')
  return tube_list(connection.tubes_watched)
end
pause_tube(connection, tube_name, delay) click to toggle source
# File lib/gemerald_beanstalk/beanstalk.rb, line 109
def pause_tube(connection, tube_name, delay)
  adjust_stats_key(:'cmd-paue-tube')
  return NOT_FOUND if (tube = tube(tube_name)).nil?
  tube.pause(delay.to_i % 2**32)
  @paused << tube
  return PAUSED
end
peek(connection, job_id = nil, *args) click to toggle source
# File lib/gemerald_beanstalk/beanstalk.rb, line 118
def peek(connection, job_id = nil, *args)
  adjust_stats_key(:'cmd-peek')
  return peek_message(find_job(job_id))
end
peek_buried(connection) click to toggle source
# File lib/gemerald_beanstalk/beanstalk.rb, line 124
def peek_buried(connection)
  return peek_by_state(connection, :buried)
end
peek_delayed(connection) click to toggle source
# File lib/gemerald_beanstalk/beanstalk.rb, line 129
def peek_delayed(connection)
  return peek_by_state(connection, :delayed)
end
peek_ready(connection) click to toggle source
# File lib/gemerald_beanstalk/beanstalk.rb, line 134
def peek_ready(connection)
  return peek_by_state(connection, :ready)
end
put(connection, priority, delay, ttr, bytes, body) click to toggle source
# File lib/gemerald_beanstalk/beanstalk.rb, line 139
def put(connection, priority, delay, ttr, bytes, body)
  adjust_stats_key(:'cmd-put')
  bytes = bytes.to_i
  return JOB_TOO_BIG if bytes > @max_job_size
  return EXPECTED_CRLF if body.slice!(-2, 2) != CRLF || body.length != bytes

  job = nil
  # Ensure job insertion order and ID
  @mutex.synchronize do
    job = GemeraldBeanstalk::Job.new(self, @jobs.next_id, connection.tube_used, priority, delay, ttr, bytes, body)
    @jobs.enqueue(job)
    tube(connection.tube_used).put(job)
  end

  # Send async so client doesn't wait while we check if job can be immediately dispatched
  connection.transmit("INSERTED #{job.id}\r\n")

  connection.producer = true

  case job.state
  when :ready
    honor_reservations(job)
  when :delayed
    @delayed << job
  end
  return nil
end
quit(connection) click to toggle source
# File lib/gemerald_beanstalk/beanstalk.rb, line 168
def quit(connection)
  disconnect(connection)
  return nil
end
release(connection, job_id, priority, delay) click to toggle source
# File lib/gemerald_beanstalk/beanstalk.rb, line 174
def release(connection, job_id, priority, delay)
  adjust_stats_key(:'cmd-release')
  job = find_job(job_id, :only => JOB_RESERVED_STATES)
  return NOT_FOUND if job.nil? || !job.release(connection, priority, delay)

  @reserved[connection].delete(job)
  @delayed << job if job.delayed?
  return RELEASED
end
reserve(connection, *args) click to toggle source
# File lib/gemerald_beanstalk/beanstalk.rb, line 185
def reserve(connection, *args)
  adjust_stats_key(:'cmd-reserve')
  return BAD_FORMAT unless args.empty?
  reserve_job(connection)
  return nil
end
reserve_with_timeout(connection, timeout = 0, *args) click to toggle source
# File lib/gemerald_beanstalk/beanstalk.rb, line 193
def reserve_with_timeout(connection, timeout = 0, *args)
  adjust_stats_key(:'cmd-reserve-with-timeout')
  timeout = timeout.to_i
  return nil if reserve_job(connection, timeout) || timeout != 0
  connection.wait_timed_out
  return TIMED_OUT
end
stats(connection) click to toggle source
# File lib/gemerald_beanstalk/beanstalk.rb, line 202
def stats(connection)
  adjust_stats_key(:'cmd-stats')
  stats = @jobs.counts_by_state.merge(stats_commands).merge({
    'job-timeouts' => @stats[:'job-timeouts'],
    'total-jobs' => @jobs.total_jobs,
    'max-job-size' => @max_job_size,
    'current-tubes' => active_tubes.length,
  }).merge(stats_connections).merge({
    'pid' => Process.pid,
    'version' => GemeraldBeanstalk::VERSION,
    'rusage-utime' => 0,
    'rusage-stime' => 0,
    'uptime' => uptime,
    'binlog-oldest-index' => 0,
    'binlog-current-index' => 0,
    'binlog-records-migrated' => 0,
    'binlog-records-written' => 0,
    'binlog-max-size' => 10485760,
    'id' => @id,
    'hostname' => Socket.gethostname,
  })
  return yaml_response(stats.map{|stat, value| "#{stat}: #{value}" })
end
stats_job(connection, job_id = nil, *args) click to toggle source
# File lib/gemerald_beanstalk/beanstalk.rb, line 227
def stats_job(connection, job_id = nil, *args)
  adjust_stats_key(:'cmd-stats-job')
  job = find_job(job_id)
  return NOT_FOUND if job.nil?

  return yaml_response(job.stats.map{ |stat, value| "#{stat}: #{value}" })
end
stats_tube(connection, tube_name) click to toggle source
# File lib/gemerald_beanstalk/beanstalk.rb, line 236
def stats_tube(connection, tube_name)
  adjust_stats_key(:'cmd-stats-tube')
  return NOT_FOUND if (tube = tube(tube_name)).nil?

  return yaml_response(tube.stats.map{ |stat, value| "#{stat}: #{value}" })
end
touch(connection, job_id = nil, *args) click to toggle source
# File lib/gemerald_beanstalk/beanstalk.rb, line 244
def touch(connection, job_id = nil, *args)
  adjust_stats_key(:'cmd-touch')
  job = find_job(job_id, :only => JOB_RESERVED_STATES)
  return NOT_FOUND if job.nil? || !job.touch(connection)

  return TOUCHED
end
use(connection, tube_name) click to toggle source
# File lib/gemerald_beanstalk/beanstalk.rb, line 253
def use(connection, tube_name)
  adjust_stats_key(:'cmd-use')
  tube(connection.tube_used).stop_use
  tube(tube_name, :create_if_missing).use
  connection.use(tube_name)

  return "USING #{tube_name}\r\n"
end
watch(connection, tube_name) click to toggle source
# File lib/gemerald_beanstalk/beanstalk.rb, line 263
def watch(connection, tube_name)
  adjust_stats_key(:'cmd-watch')
  if connection.tubes_watched.include?(tube_name)
    watched_count = connection.tubes_watched.length
  else
    tube(tube_name, :create_if_missing).watch
    watched_count = connection.watch(tube_name)
  end

  return "WATCHING #{watched_count}\r\n"
end