class Fluent::Plugin::LogzioOutputBuffered
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_logzio_buffered.rb, line 28 def configure(conf) super compat_parameters_convert(conf, :buffer) if conf['proxy_uri'] log.debug "Proxy #{@proxy_uri}" ENV['http_proxy'] = @proxy_uri end if conf['proxy_cert'] log.debug "Proxy #{@proxy_cert}" ENV['SSL_CERT_FILE'] = @proxy_cert end end
encode_chunk(chunk) { |records, bulk_size| ... }
click to toggle source
# File lib/fluent/plugin/out_logzio_buffered.rb, line 92 def encode_chunk(chunk) records = [] bulk_size = 0 chunk.each { |tag, time, record| record['@timestamp'] ||= Time.at(time).iso8601(3) if @output_include_time record[@output_tags_fieldname] ||= tag.to_s if @output_include_tags begin json_record = Yajl.dump(record) record_size = json_record.size + (1 if !records.empty?).to_i # Accounting for trailing "\n" rescue log.error "Adding record #{record} to buffer failed. Exception: #{$!}" next end if record_size > @bulk_limit if @bulk_limit_warning_limit.is_a?(Integer) log.warn "Record with size #{record_size} exceeds #{@bulk_limit} and can't be sent to Logz.io. Record starts with (truncated at #{@bulk_limit_warning_limit} characters): #{json_record[0,@bulk_limit_warning_limit]}" # Send the full message to debug facility log.debug "Record with size #{record_size} exceeds #{@bulk_limit} and can't be sent to Logz.io. Record is: #{json_record}" else log.warn "Record with size #{record_size} exceeds #{@bulk_limit} and can't be sent to Logz.io. Record is: #{json_record}" end next end if bulk_size + record_size > @bulk_limit yield(records, bulk_size) records = [] bulk_size = 0 end records.push(json_record) bulk_size += record_size } if records yield(records, bulk_size) end end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_logzio_buffered.rb, line 77 def format(tag, time, record) if time.is_a?(Fluent::EventTime) sec_frac = time.to_f else sec_frac = time * 1.0 end [tag, sec_frac, record].to_msgpack end
formatted_to_msgpack_binary?()
click to toggle source
# File lib/fluent/plugin/out_logzio_buffered.rb, line 69 def formatted_to_msgpack_binary? true end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_logzio_buffered.rb, line 73 def multi_workers_ready? true end
send_bulk(bulk_records, bulk_size)
click to toggle source
# File lib/fluent/plugin/out_logzio_buffered.rb, line 130 def send_bulk(bulk_records, bulk_size) log.debug "Sending a bulk of #{bulk_records.size} records, size #{bulk_size}B to Logz.io" # Setting our request post = Net::HTTP::Post.new @uri.request_uri # Logz.io bulk http endpoint expecting log line with \n delimiter post.body = bulk_records.join("\n") sleep_interval = @retry_sleep begin @retry_count.times do |counter| should_retry = true begin response = @http.request @uri, post if response.code != '200' if response.code == '401' log.error "You are not authorized with Logz.io! Token OK? dropping logs..." should_retry = false elsif response.code == '400' log.info "Got 400 code from Logz.io. This means that some of your logs are too big, or badly formatted. Response: #{response.body}" should_retry = false else log.debug "Got HTTP #{response.code} from Logz.io, not giving up just yet (Try #{counter + 1}/#{@retry_count})" end else log.debug "Successfully sent bulk of #{bulk_records.size} records, size #{bulk_size}B to Logz.io" should_retry = false end rescue StandardError => e log.debug "Error connecting to Logz.io. Got exception: #{e} (Try #{counter + 1}/#{@retry_count})" end if should_retry if counter == @retry_count - 1 log.error "Could not send your bulk after #{retry_count} tries Sorry! Your bulk is: #{post.body}" break end sleep(sleep_interval) sleep_interval *= 2 else return end end rescue Exception => e log.error "Got unexpected exception! Here: #{e}" end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_logzio_buffered.rb, line 65 def shutdown super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_logzio_buffered.rb, line 44 def start super require 'net/http/persistent' if @log_type_key log_type = record[@log_type_key] || @log_type else index_prefix = @log_type end @logzio_endpoint = "#{@logzio_url}?token=#{@token}&type=#{@log_type}" @uri = URI @logzio_endpoint @http = Net::HTTP::Persistent.new 'fluent-plugin-logzio', :ENV @http.headers['Content-Type'] = 'text/plain' @http.idle_timeout = @http_idle_timeout @http.socket_options << [Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, 1] log.debug "Started Logz.io shipper.." end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_logzio_buffered.rb, line 86 def write(chunk) encode_chunk(chunk) { |bulk_records, bulk_size| send_bulk(bulk_records, bulk_size) } end