class BufferItem

Attributes

buffer_state[RW]
url_send_count[RW]

Public Class Methods

new(option = {}) click to toggle source
# File lib/logstash/outputs/sensors_analytics.rb, line 232
def initialize(option = {})
  @client = option[:client]
  @url_send_count = {}
  url_list = option[:url_list]
  url_list.each do |url|
    @url_send_count[url] = 0
  end
  init_url_list(url_list, option[:index])
  @logger = option[:logger]

  buffer_config = {
      :max_items => option[:flush_batch_size],
      :max_interval => option[:flush_interval_sec],
      :logger => @logger
  }
  buffer_initialize(buffer_config)
end

Public Instance Methods

do_send(form_data, url) click to toggle source
# File lib/logstash/outputs/sensors_analytics.rb, line 250
def do_send(form_data, url)
  begin
    response = @client.post(url, :params => form_data).call
    if response.code != 200
      @logger.warn("Send failed, code: #{response.code}, body: #{response.body}")
      return false
    end
  rescue => e
    @logger.warn("Send failed", :exception => e.class.name, :backtrace => e.backtrace)
    return false
  end
  true
end
flush(events, final) click to toggle source

数据被 Gzip > Base64 后尝试发送 如果当前 url 发送失败, 会尝试获取列表中下一个地址进行发送, 发送失败的 url 在 3 秒内不会再尝试发送 如果所有的 url 都被标记为发送失败, sleep 5 秒后重新获取

# File lib/logstash/outputs/sensors_analytics.rb, line 269
def flush(events, final)
  wio = StringIO.new("w")
  gzip_io = Zlib::GzipWriter.new(wio)
  gzip_io.write(events.to_json)
  gzip_io.close
  data = Base64.strict_encode64(wio.string)
  form_data = {"data_list" => data, "gzip" => 1}

  url_item = obtain_url

  until do_send(form_data, url_item[:url])
    last_url = url_item[:url]
    # 将发送失败的 url 标记为不可用
    disable_url(url_item)
    url_item = obtain_url
    @logger.warn("Send failed, retry send data to another url", :last_url => last_url, :retry_url => url_item[:url])
  end
  @url_send_count[url_item[:url]] += events.length
end

Private Instance Methods

disable_url(url_item) click to toggle source
# File lib/logstash/outputs/sensors_analytics.rb, line 325
def disable_url(url_item)
  url_item[:ok?] = false
  url_item[:fail_time] = Time.now
end
init_url_list(urls, start_index) click to toggle source

把当前 buffer 用的 url 从 list 的 0 索引开始依次放入, 方便在 obtain_url 遍历

# File lib/logstash/outputs/sensors_analytics.rb, line 292
def init_url_list(urls, start_index)
  @url_list = []
  index = start_index
  loop do
    @url_list << {
        :url => urls[index],
        :ok? => true,
        :fail_time => Time.now
    }
    index = (index + 1) % urls.length
    break if index == start_index
  end
end
obtain_url() click to toggle source
# File lib/logstash/outputs/sensors_analytics.rb, line 309
def obtain_url
  while true do
    @url_list.each do |url_item|
      return url_item if url_item[:ok?]
      if Time.now - url_item[:fail_time] > 3
        url_item[:ok] = true
        return url_item
      end
    end
    @logger.warn("All url disable, sleep 5 sec")
    sleep 5
  end
end