class MetricsCapacitor::Processor::Writer
Public Instance Methods
post_init()
click to toggle source
# File lib/metrics-capacitor/processor/writer.rb, line 7 def post_init @elastic = Elasticsearch::Client.new( url: Config.elasticsearch[:urls], reload_connections: 100, retry_on_failure: Config.elasticsearch[:retry], sniffer_timeout: 5, ) logger.debug 'Elastic connection set up' @redis = Redis.new(url: Config.redis[:url]) logger.debug 'Redis connection set up' logger.debug 'Setting ES index templates' @elastic.indices.put_template name: 'metrics', body: INDEX_TEMPLATE logger.debug 'ES templates set up' @exit = false end
process()
click to toggle source
# File lib/metrics-capacitor/processor/writer.rb, line 26 def process logger.debug 'Randomizing startup time' sleep rand(Config.writer[:bulk_wait]) until @exit logger.debug 'Gathering mertics bulk' metric = nil metrics = Metrics.new indexing_result = nil begin while !@exit && metrics.length < Config.writer[:bulk_max] && ( metric = @redis.blpop('writer', timeout: Config.writer[:bulk_wait]) ) metrics << Metric.new(metric[1]) metric = nil end rescue Redis::CannotConnectError, Redis::TimeoutError => e logger.error "Can't connect to redis: #{e.message}" sleep 1 retry end unless metrics.empty? logger.debug "Writing #{metrics.length} metrics" logger.debug metrics.to_elastic.to_json indexing_result = @elastic.bulk(index: Time.now.strftime(Config.elasticsearch[:index]), type: Config.writer[:doc_type], body: metrics.to_elastic) if indexing_result['errors'] logger.error 'Failed to write metrics!' logger.error indexing_result['items'].to_json else logger.info "Written #{metrics.length} metrics, took #{indexing_result['took']}ms" end else logger.warn 'No metrics to write :-(' end metrics = nil indexing_result = nil end end
shutdown()
click to toggle source
# File lib/metrics-capacitor/processor/writer.rb, line 62 def shutdown @exit = true end