class Fluent::Plugin::ZookeeperOutput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_zookeeper.rb, line 36
def initialize
  super
  @zk = nil
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_zookeeper.rb, line 74
def configure(conf)
  super

  if @type != 'persistent' && @type != 'ephemeral' && @type != 'sequence'
    log.warn "'type' parameter value is wrong (#@type). Will use default value (persistent))"
    @type = 'persistent'
  end

  @formatter_proc = setup_json_formatter
end
init_client(raise_exception = true) click to toggle source
# File lib/fluent/plugin/out_zookeeper.rb, line 41
def init_client(raise_exception = true)
  log.info "Initializing connection to Zookeeper"
  begin
    if @zk.nil?
      @zk = Zookeeper.new(@servers)
    else
      @zk.reopen
    end

    if @zk.connected?
      case @type
      when 'ephemeral'
        @zk.create({path: @path, ephemeral: true})
      when 'sequence'
        @zk.create({path: @path, sequence: true})
      else
        # persistent (or anything else)
        @zk.create({path: @path})
      end
      log.info "Connection to Zookeeper service [#@servers] has been initialized"
      @con_lost_msg = "Connection to Zookeeper was lost"
    else
      log.warn "Cannot establish connection to Zookeeper"
    end
  rescue Exception => e
    if raise_exception
      raise e
    else
      log.error e
    end
  end
end
process(tag, es) click to toggle source
# File lib/fluent/plugin/out_zookeeper.rb, line 103
def process(tag, es)
  if @zk.connected?
    begin
      es.each do |time, record|
        begin
          data = @formatter_proc.call(record)
        rescue StandardError => e
          log.warn "Failed to format record:", :error => e.to_s, :record => record
          next
        end
        if @ignore_empty_msg && data == "{}"
          log.debug "Skipping empty record"
          next
        end
        @zk.set({path: @path, data: data})
      end
    rescue Exception => e
      log.error "Exception occurred while sending data: #{e}"
      # Connection will be reinitialized on next call
      @zk.close
    end
  elsif !@zk.connecting?
    # We are not connected and not connecting; it's time to reinit the client
    @zk.close if !@zk.closed?
    init_client(false)
  else
    if !@con_lost_msg.nil?
      log.warn "#@con_lost_msg"
      @con_lost_msg = nil
    end
  end
end
setup_json_formatter() click to toggle source
# File lib/fluent/plugin/out_zookeeper.rb, line 85
def setup_json_formatter
  Oj.default_options = Fluent::DEFAULT_OJ_OPTIONS
  Proc.new { |record| Oj.dump(record) }
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_zookeeper.rb, line 95
def shutdown
  @zk.delete({path: @path})
  @zk.close
  log.info "Connection to Zookeeper service has been gracefully closed"
  @zk = nil
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_zookeeper.rb, line 90
def start
  super
  init_client
end