class BufferItem
Attributes
buffer_state[RW]
url_send_count[RW]
Public Class Methods
new(option = {})
click to toggle source
# File lib/logstash/outputs/sensors_analytics.rb, line 232 def initialize(option = {}) @client = option[:client] @url_send_count = {} url_list = option[:url_list] url_list.each do |url| @url_send_count[url] = 0 end init_url_list(url_list, option[:index]) @logger = option[:logger] buffer_config = { :max_items => option[:flush_batch_size], :max_interval => option[:flush_interval_sec], :logger => @logger } buffer_initialize(buffer_config) end
Public Instance Methods
do_send(form_data, url)
click to toggle source
# File lib/logstash/outputs/sensors_analytics.rb, line 250 def do_send(form_data, url) begin response = @client.post(url, :params => form_data).call if response.code != 200 @logger.warn("Send failed, code: #{response.code}, body: #{response.body}") return false end rescue => e @logger.warn("Send failed", :exception => e.class.name, :backtrace => e.backtrace) return false end true end
flush(events, final)
click to toggle source
数据被 Gzip > Base64 后尝试发送 如果当前 url 发送失败, 会尝试获取列表中下一个地址进行发送, 发送失败的 url 在 3 秒内不会再尝试发送 如果所有的 url 都被标记为发送失败, sleep 5 秒后重新获取
# File lib/logstash/outputs/sensors_analytics.rb, line 269 def flush(events, final) wio = StringIO.new("w") gzip_io = Zlib::GzipWriter.new(wio) gzip_io.write(events.to_json) gzip_io.close data = Base64.strict_encode64(wio.string) form_data = {"data_list" => data, "gzip" => 1} url_item = obtain_url until do_send(form_data, url_item[:url]) last_url = url_item[:url] # 将发送失败的 url 标记为不可用 disable_url(url_item) url_item = obtain_url @logger.warn("Send failed, retry send data to another url", :last_url => last_url, :retry_url => url_item[:url]) end @url_send_count[url_item[:url]] += events.length end
Private Instance Methods
disable_url(url_item)
click to toggle source
# File lib/logstash/outputs/sensors_analytics.rb, line 325 def disable_url(url_item) url_item[:ok?] = false url_item[:fail_time] = Time.now end
init_url_list(urls, start_index)
click to toggle source
把当前 buffer 用的 url 从 list 的 0 索引开始依次放入, 方便在 obtain_url
遍历
# File lib/logstash/outputs/sensors_analytics.rb, line 292 def init_url_list(urls, start_index) @url_list = [] index = start_index loop do @url_list << { :url => urls[index], :ok? => true, :fail_time => Time.now } index = (index + 1) % urls.length break if index == start_index end end
obtain_url()
click to toggle source
# File lib/logstash/outputs/sensors_analytics.rb, line 309 def obtain_url while true do @url_list.each do |url_item| return url_item if url_item[:ok?] if Time.now - url_item[:fail_time] > 3 url_item[:ok] = true return url_item end end @logger.warn("All url disable, sleep 5 sec") sleep 5 end end