class GemeraldBeanstalk::Job

Constants

INACTIVE_STATES
MAX_JOB_PRIORITY
RESERVED_STATES
UPDATE_STATES

Attributes

beanstalk[R]
body[RW]
buried_at[RW]
bytes[RW]
created_at[RW]
delay[RW]
id[RW]
priority[RW]
ready_at[RW]
reserved_at[R]
reserved_by[R]
timeout_at[R]
ttr[RW]
tube_name[RW]

Public Class Methods

new(beanstalk, id, tube_name, priority, delay, ttr, bytes, body) click to toggle source
# File lib/gemerald_beanstalk/job.rb, line 82
def initialize(beanstalk, id, tube_name, priority, delay, ttr, bytes, body)
  priority, delay, ttr = priority.to_i, delay.to_i, ttr.to_i
  @beanstalk = beanstalk
  @stats_hash = Hash.new(0)
  self.id = id
  self.tube_name = tube_name
  self.priority = priority % MAX_JOB_PRIORITY
  self.delay = delay
  self.ttr = ttr == 0 ? 1 : ttr
  self.bytes = bytes
  self.body = body
  self.created_at = Time.now.to_f
  self.ready_at = self.created_at + delay

  @state = delay > 0 ? :delayed : :ready
end

Public Instance Methods

<(other_job) click to toggle source
# File lib/gemerald_beanstalk/job.rb, line 14
def <(other_job)
  return (self <=> other_job) == -1
end
<=>(other_job) click to toggle source
# File lib/gemerald_beanstalk/job.rb, line 19
def <=>(other_job)
  raise 'Cannot compare job with nil' if other_job.nil?
  current_state = state
  raise 'Cannot compare jobs with different states' if current_state != other_job.state

  case current_state
  when :ready
    return -1 if self.priority < other_job.priority ||
      self.priority == other_job.priority && self.created_at < other_job.created_at
  when :delayed
    return -1 if self.ready_at < other_job.ready_at
  when :buried
    return -1 if self.buried_at < other_job.buried_at
  else
    raise "Cannot compare job with state of #{current_state}"
  end
  return 1
end
buried?() click to toggle source
# File lib/gemerald_beanstalk/job.rb, line 39
def buried?
  return state == :buried
end
bury(connection, priority, *args) click to toggle source
# File lib/gemerald_beanstalk/job.rb, line 44
def bury(connection, priority, *args)
  return false unless reserved_by_connection?(connection)

  reset_reserve_state
  @state = :buried
  @stats_hash[:'buries'] += 1
  self.priority = priority.to_i
  self.buried_at = Time.now.to_f
  self.ready_at = nil
  return true
end
deadline_approaching(*args) click to toggle source

Must look at @state to avoid infinite recursion

# File lib/gemerald_beanstalk/job.rb, line 58
def deadline_approaching(*args)
  return false unless @state == :reserved
  @state = :deadline_pending
  return true
end
deadline_pending?() click to toggle source
# File lib/gemerald_beanstalk/job.rb, line 65
def deadline_pending?
  return state == :deadline_pending
end
delayed?() click to toggle source
# File lib/gemerald_beanstalk/job.rb, line 70
def delayed?
  return state == :delayed
end
delete(connection, *args) click to toggle source
# File lib/gemerald_beanstalk/job.rb, line 75
def delete(connection, *args)
  return false if RESERVED_STATES.include?(state) && !reserved_by_connection?(connection)
  @state = :deleted
  return true
end
kick(*args) click to toggle source
# File lib/gemerald_beanstalk/job.rb, line 100
def kick(*args)
  return false unless INACTIVE_STATES.include?(state)

  @state = :ready
  @stats_hash[:'kicks'] += 1
  self.ready_at = Time.now.to_f
  self.buried_at = nil
  return true
end
ready?() click to toggle source
# File lib/gemerald_beanstalk/job.rb, line 111
def ready?
  return state == :ready
end
release(connection, priority, delay, increment_stats = true, *args) click to toggle source
# File lib/gemerald_beanstalk/job.rb, line 116
def release(connection, priority, delay, increment_stats = true, *args)
  return false unless reserved_by_connection?(connection)

  delay = delay.to_i
  reset_reserve_state
  @state = delay > 0 ? :delayed : :ready
  @stats_hash[:'releases'] += 1 if increment_stats
  self.priority = priority.to_i
  self.delay = delay
  self.ready_at = Time.now.to_f + delay
  return true
end
reserve(connection, *args) click to toggle source
# File lib/gemerald_beanstalk/job.rb, line 130
def reserve(connection, *args)
  return false unless ready?

  @state = :reserved
  @stats_hash[:'reserves'] += 1
  @reserved_by = connection
  @reserved_at = Time.now.to_f
  @timeout_at = @reserved_at + self.ttr
  return true
end
reserved_by_connection?(connection) click to toggle source
# File lib/gemerald_beanstalk/job.rb, line 142
def reserved_by_connection?(connection)
  return RESERVED_STATES.include?(state) && self.reserved_by == connection ? true : false
end
reset_reserve_state() click to toggle source
# File lib/gemerald_beanstalk/job.rb, line 147
def reset_reserve_state
  @timeout_at = nil
  @reserved_at = nil
  @reserved_by = nil
end
state() click to toggle source
# File lib/gemerald_beanstalk/job.rb, line 154
def state
  return @state unless UPDATE_STATES.include?(@state)

  now = Time.now.to_f
  if @state == :delayed && self.ready_at <= now
    @state = :ready
  elsif RESERVED_STATES.include?(@state)
    # Rescue from timeout being reset by other thread
    if (now > self.timeout_at rescue false)
      timed_out
    elsif (@state == :reserved && now + 1 > self.timeout_at rescue false)
      deadline_approaching
    end
  end

  return @state
end
stats() click to toggle source
# File lib/gemerald_beanstalk/job.rb, line 173
def stats
  now = Time.now.to_f
  current_state = state
  return {
    'id' => self.id,
    'tube' => self.tube_name,
    'state' => current_state == :deadline_pending ? 'reserved' : current_state.to_s,
    'pri' => self.priority,
    'age' => (now - self.created_at).to_i,
    'delay' => self.delay.to_i,
    'ttr' => self.ttr,
    'time-left' => time_left(now),
    'file' => 0,
    'reserves' => @stats_hash[:'reserves'],
    'timeouts' => @stats_hash[:'timeouts'],
    'releases' => @stats_hash[:'releases'],
    'buries' => @stats_hash[:'buries'],
    'kicks' => @stats_hash[:'kicks'],
  }
end
time_left(current_time = Time.now.to_f) click to toggle source
# File lib/gemerald_beanstalk/job.rb, line 195
def time_left(current_time = Time.now.to_f)
  if self.timeout_at
    time_left = self.timeout_at - current_time
  elsif self.ready_at
    time_left = self.ready_at - current_time
  end
  return time_left.to_i
end
timed_out(*args) click to toggle source

Must reference @state to avoid infinite recursion

# File lib/gemerald_beanstalk/job.rb, line 206
def timed_out(*args)
  return false unless RESERVED_STATES.include?(@state)
  @state = :ready
  @stats_hash[:'timeouts'] += 1
  connection = self.reserved_by
  reset_reserve_state
  self.beanstalk.register_job_timeout(connection, self)
  return true
end
timed_out?() click to toggle source
# File lib/gemerald_beanstalk/job.rb, line 217
def timed_out?
  return state == :timed_out
end
touch(connection) click to toggle source
# File lib/gemerald_beanstalk/job.rb, line 222
def touch(connection)
  return false unless reserved_by_connection?(connection)
  @state = :reserved
  @timeout_at = Time.now.to_f + self.ttr
  return true
end