class LogStash::Inputs::BeatsSupport::CircuitBreaker

Largely inspired by Martin's fowler circuit breaker

Constants

DEFAULT_ERROR_THRESHOLD

Error threshold before opening the breaker, if the breaker is open it wont execute the code.

DEFAULT_EXCEPTION_RESCUED

Exceptions catched by the circuit breaker, too much errors and the breaker will trip.

DEFAULT_TIME_BEFORE_RETRY

Recover time after the breaker is open to start executing the method again.

Public Class Methods

new(name, options = {}, &block) click to toggle source
# File lib/logstash/inputs/beats_support/circuit_breaker.rb, line 25
def initialize(name, options = {}, &block)
  @exceptions = Array(options.fetch(:exceptions, [StandardError]))
  @error_threshold = options.fetch(:error_threshold, DEFAULT_ERROR_THRESHOLD)
  @time_before_retry = options.fetch(:time_before_retry, DEFAULT_TIME_BEFORE_RETRY)
  @block = block
  @name = name
  @mutex = Mutex.new
  reset
end

Public Instance Methods

closed?() click to toggle source
# File lib/logstash/inputs/beats_support/circuit_breaker.rb, line 59
def closed?
  current_state = state

  current_state == :close || current_state == :half_open
end
execute(args = nil) { |args| ... } click to toggle source
# File lib/logstash/inputs/beats_support/circuit_breaker.rb, line 35
def execute(args = nil)
  case state
  when :open
    logger.warn("CircuitBreaker::Open", :name => @name)
    raise OpenBreaker, "for #{@name}"
  when :close, :half_open
    if block_given?
      yield args
    else
      @block.call(args)
    end

    if state == :half_open
      logger.warn("CircuitBreaker::Close", :name => @name)
      reset
    end
  end
rescue *@exceptions => e
  logger.warn("CircuitBreaker::rescuing exceptions", :name => @name, :exception => e.class)
  increment_errors(e)

  raise HalfOpenBreaker
end

Private Instance Methods

increment_errors(exception) click to toggle source
# File lib/logstash/inputs/beats_support/circuit_breaker.rb, line 77
def increment_errors(exception)
  t = Time.now

  @mutex.synchronize do
    @errors_count += 1
    @last_failure_time = t
  end

  logger.debug("CircuitBreaker increment errors", 
               :errors_count => @errors_count, 
               :error_threshold => @error_threshold,
               :exception => exception.class,
               :message => exception.message) if logger.debug?
end
logger() click to toggle source
# File lib/logstash/inputs/beats_support/circuit_breaker.rb, line 66
def logger
  @logger ||= Cabin::Channel.get(LogStash)
end
reset() click to toggle source
# File lib/logstash/inputs/beats_support/circuit_breaker.rb, line 70
def reset
  @mutex.synchronize do
    @errors_count = 0
    @last_failure_time = nil
  end
end
state() click to toggle source
# File lib/logstash/inputs/beats_support/circuit_breaker.rb, line 92
def state
  t = Time.now

  @mutex.synchronize do
    if @errors_count >= @error_threshold
      if t - @last_failure_time > @time_before_retry
        :half_open
      else
        :open
      end
    else
      :close
    end
  end
end