class MetricsCapacitor::Processor::Writer

Public Instance Methods

post_init() click to toggle source
# File lib/metrics-capacitor/processor/writer.rb, line 7
def post_init
  @elastic = Elasticsearch::Client.new(
    url: Config.elasticsearch[:urls],
    reload_connections: 100,
    retry_on_failure: Config.elasticsearch[:retry],
    sniffer_timeout: 5,
  )
  logger.debug 'Elastic connection set up'

  @redis = Redis.new(url: Config.redis[:url])
  logger.debug 'Redis connection set up'

  logger.debug 'Setting ES index templates'
  @elastic.indices.put_template name: 'metrics', body: INDEX_TEMPLATE
  logger.debug 'ES templates set up'

  @exit = false
end
process() click to toggle source
# File lib/metrics-capacitor/processor/writer.rb, line 26
def process
  logger.debug 'Randomizing startup time'
  sleep rand(Config.writer[:bulk_wait])
  until @exit
    logger.debug 'Gathering mertics bulk'
    metric = nil
    metrics = Metrics.new
    indexing_result = nil
    begin
      while !@exit && metrics.length < Config.writer[:bulk_max] && ( metric = @redis.blpop('writer', timeout: Config.writer[:bulk_wait]) )
        metrics << Metric.new(metric[1])
        metric = nil
      end
    rescue Redis::CannotConnectError, Redis::TimeoutError => e
      logger.error "Can't connect to redis: #{e.message}"
      sleep 1
      retry
    end
    unless metrics.empty?
      logger.debug "Writing #{metrics.length} metrics"
      logger.debug metrics.to_elastic.to_json
      indexing_result = @elastic.bulk(index: Time.now.strftime(Config.elasticsearch[:index]), type: Config.writer[:doc_type], body: metrics.to_elastic)
      if indexing_result['errors']
        logger.error 'Failed to write metrics!'
        logger.error indexing_result['items'].to_json
      else
        logger.info "Written #{metrics.length} metrics, took #{indexing_result['took']}ms"
      end
    else
      logger.warn 'No metrics to write :-('
    end
    metrics = nil
    indexing_result = nil
  end
end
shutdown() click to toggle source
# File lib/metrics-capacitor/processor/writer.rb, line 62
def shutdown
  @exit = true
end