class Fluent::DdOutput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_dd.rb, line 18 def initialize super require 'dogapi' require 'socket' require 'thread' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_dd.rb, line 55 def configure(conf) super unless @dd_api_key raise Fluent::ConfigError, '`dd_api_key` is required' end if !@emit_in_background && @concurrency raise Fluent::ConfigError, '`concurrency` should be used with `emit_in_background`' end @concurrency ||= 1 unless @host @host = %x[hostname -f 2> /dev/null].strip @host = Socket.gethostname if @host.empty? end @dog = Dogapi::Client.new(@dd_api_key, @dd_app_key, @host, @device, @silent, @timeout) end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_dd.rb, line 75 def format(tag, time, record) [tag, time, record].to_msgpack end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_dd.rb, line 42 def shutdown super if @emit_in_background @threads.size.times do @queue.push(false) end @threads.each do |thread| thread.join end end end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_dd.rb, line 25 def start super if @emit_in_background @queue = Queue.new @threads = @concurrency.times.map do Thread.start do while (job = @queue.pop) emit_points(*job) Thread.pass end end end end end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_dd.rb, line 79 def write(chunk) jobs = chunk_to_jobs(chunk) jobs.each do |job| if @emit_in_background @queue.push(job) else emit_points(*job) end end end
Private Instance Methods
chunk_to_jobs(chunk)
click to toggle source
# File lib/fluent/plugin/out_dd.rb, line 97 def chunk_to_jobs(chunk) enum = chunk.to_enum(:msgpack_each) enum.select {|tag, time, record| unless record['metric'] log.warn("`metric` key does not exist: #{[tag, time, record].inspect}") end record['metric'] }.chunk {|tag, time, record| dd_tag = record['tag'] if not dd_tag and @use_fluentd_tag_for_datadog_tag dd_tag = tag end [dd_tag] + record.values_at('metric', 'host', 'type', 'device') }.map {|i, records| tag, metric, host, type, device = i points = records.map do |tag, time, record| time = Time.at(time) value = record['value'] [time, value] end options = {} options[:tags] = tag.split(',').map {|i| i.strip } if tag options[:host] = host if host options[:type] = type if type options[:device] = device if device [metric, points, options] } end
emit_points(metric, points, options)
click to toggle source
# File lib/fluent/plugin/out_dd.rb, line 93 def emit_points(metric, points, options) @dog.emit_points(metric, points, options) end