class Kafka::Pause

Manages the pause state of a partition.

The processing of messages in a partition can be paused, e.g. if there was an exception during processing. This could be caused by a downstream service not being available. A typical way of solving such an issue is to back off for a little while and then try again. In order to do that, pause the partition.

Public Class Methods

new(clock: Time) click to toggle source
# File lib/kafka/pause.rb, line 12
def initialize(clock: Time)
  @clock = clock
  @started_at = nil
  @pauses = 0
  @timeout = nil
  @max_timeout = nil
  @exponential_backoff = false
end

Public Instance Methods

expired?() click to toggle source

Whether the pause has expired.

# File lib/kafka/pause.rb, line 66
def expired?
  # We never expire the pause if timeout is nil.
  return false if @timeout.nil?

  # Have we passed the end of the pause duration?
  @clock.now >= ends_at
end
pause!(timeout: nil, max_timeout: nil, exponential_backoff: false) click to toggle source

Mark the partition as paused.

If exponential backoff is enabled, each subsequent pause of a partition will cause a doubling of the actual timeout, i.e. for pause number n, the actual timeout will be _2^n * timeout_.

Only when {#reset!} is called is this state cleared.

@param timeout [nil, Integer] if specified, the partition will automatically

resume after this many seconds.

@param exponential_backoff [Boolean] whether to enable exponential timeouts.

# File lib/kafka/pause.rb, line 32
def pause!(timeout: nil, max_timeout: nil, exponential_backoff: false)
  @started_at = @clock.now
  @timeout = timeout
  @max_timeout = max_timeout
  @exponential_backoff = exponential_backoff
  @pauses += 1
end
pause_duration() click to toggle source
# File lib/kafka/pause.rb, line 57
def pause_duration
  if paused?
    Time.now - @started_at
  else
    0
  end
end
paused?() click to toggle source

Whether the partition is currently paused. The pause may have expired, in which case {#expired?} should be checked as well.

# File lib/kafka/pause.rb, line 52
def paused?
  # This is nil if we're not currently paused.
  !@started_at.nil?
end
reset!() click to toggle source

Resets the pause state, ensuring that the next pause is not exponential.

# File lib/kafka/pause.rb, line 75
def reset!
  @pauses = 0
end
resume!() click to toggle source

Resumes the partition.

The number of pauses is still retained, and if the partition is paused again it may be with an exponential backoff.

# File lib/kafka/pause.rb, line 44
def resume!
  @started_at = nil
  @timeout = nil
  @max_timeout = nil
end

Private Instance Methods

ends_at() click to toggle source
# File lib/kafka/pause.rb, line 81
def ends_at
  # Apply an exponential backoff to the timeout.
  backoff_factor = @exponential_backoff ? 2**(@pauses - 1) : 1
  timeout = backoff_factor * @timeout

  # If set, don't allow a timeout longer than max_timeout.
  timeout = @max_timeout if @max_timeout && timeout > @max_timeout

  @started_at + timeout
end