class Fluent::DdOutput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_dd.rb, line 18
def initialize
  super
  require 'dogapi'
  require 'socket'
  require 'thread'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_dd.rb, line 55
def configure(conf)
  super

  unless @dd_api_key
    raise Fluent::ConfigError, '`dd_api_key` is required'
  end

  if !@emit_in_background && @concurrency
    raise Fluent::ConfigError, '`concurrency` should be used with `emit_in_background`'
  end
  @concurrency ||= 1

  unless @host
    @host = %x[hostname -f 2> /dev/null].strip
    @host = Socket.gethostname if @host.empty?
  end

  @dog = Dogapi::Client.new(@dd_api_key, @dd_app_key, @host, @device, @silent, @timeout)
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_dd.rb, line 75
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_dd.rb, line 42
def shutdown
  super

  if @emit_in_background
    @threads.size.times do
      @queue.push(false)
    end
    @threads.each do |thread|
      thread.join
    end
  end
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_dd.rb, line 25
def start
  super

  if @emit_in_background
    @queue = Queue.new

    @threads = @concurrency.times.map do
      Thread.start do
        while (job = @queue.pop)
          emit_points(*job)
          Thread.pass
        end
      end
    end
  end
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_dd.rb, line 79
def write(chunk)
  jobs = chunk_to_jobs(chunk)

  jobs.each do |job|
    if @emit_in_background
      @queue.push(job)
    else
      emit_points(*job)
    end
  end
end

Private Instance Methods

chunk_to_jobs(chunk) click to toggle source
# File lib/fluent/plugin/out_dd.rb, line 97
def chunk_to_jobs(chunk)
  enum = chunk.to_enum(:msgpack_each)

  enum.select {|tag, time, record|
    unless record['metric']
      log.warn("`metric` key does not exist: #{[tag, time, record].inspect}")
    end

    record['metric']
  }.chunk {|tag, time, record|
    dd_tag = record['tag']

    if not dd_tag and @use_fluentd_tag_for_datadog_tag
      dd_tag = tag
    end

    [dd_tag] + record.values_at('metric', 'host', 'type', 'device')
  }.map {|i, records|
    tag, metric, host, type, device = i

    points = records.map do |tag, time, record|
      time = Time.at(time)
      value = record['value']
      [time, value]
    end

    options = {}
    options[:tags] = tag.split(',').map {|i| i.strip } if tag
    options[:host] = host if host
    options[:type] = type if type
    options[:device] = device if device

    [metric, points, options]
  }
end
emit_points(metric, points, options) click to toggle source
# File lib/fluent/plugin/out_dd.rb, line 93
def emit_points(metric, points, options)
  @dog.emit_points(metric, points, options)
end