class Fluent::Plugin::ZookeeperOutput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_zookeeper.rb, line 36 def initialize super @zk = nil end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_zookeeper.rb, line 74 def configure(conf) super if @type != 'persistent' && @type != 'ephemeral' && @type != 'sequence' log.warn "'type' parameter value is wrong (#@type). Will use default value (persistent))" @type = 'persistent' end @formatter_proc = setup_json_formatter end
init_client(raise_exception = true)
click to toggle source
# File lib/fluent/plugin/out_zookeeper.rb, line 41 def init_client(raise_exception = true) log.info "Initializing connection to Zookeeper" begin if @zk.nil? @zk = Zookeeper.new(@servers) else @zk.reopen end if @zk.connected? case @type when 'ephemeral' @zk.create({path: @path, ephemeral: true}) when 'sequence' @zk.create({path: @path, sequence: true}) else # persistent (or anything else) @zk.create({path: @path}) end log.info "Connection to Zookeeper service [#@servers] has been initialized" @con_lost_msg = "Connection to Zookeeper was lost" else log.warn "Cannot establish connection to Zookeeper" end rescue Exception => e if raise_exception raise e else log.error e end end end
process(tag, es)
click to toggle source
# File lib/fluent/plugin/out_zookeeper.rb, line 103 def process(tag, es) if @zk.connected? begin es.each do |time, record| begin data = @formatter_proc.call(record) rescue StandardError => e log.warn "Failed to format record:", :error => e.to_s, :record => record next end if @ignore_empty_msg && data == "{}" log.debug "Skipping empty record" next end @zk.set({path: @path, data: data}) end rescue Exception => e log.error "Exception occurred while sending data: #{e}" # Connection will be reinitialized on next call @zk.close end elsif !@zk.connecting? # We are not connected and not connecting; it's time to reinit the client @zk.close if !@zk.closed? init_client(false) else if !@con_lost_msg.nil? log.warn "#@con_lost_msg" @con_lost_msg = nil end end end
setup_json_formatter()
click to toggle source
# File lib/fluent/plugin/out_zookeeper.rb, line 85 def setup_json_formatter Oj.default_options = Fluent::DEFAULT_OJ_OPTIONS Proc.new { |record| Oj.dump(record) } end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_zookeeper.rb, line 95 def shutdown @zk.delete({path: @path}) @zk.close log.info "Connection to Zookeeper service has been gracefully closed" @zk = nil super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_zookeeper.rb, line 90 def start super init_client end