class TDAnalytics::BatchConsumer

BatchConsumer 批量同步的发送数据. 有数据时,首先会加入本地缓冲区,当条数到达上限后会发起上报

Constants

DEFAULT_LENGTH

默认缓冲区大小

MAX_LENGTH

Public Class Methods

new(server_url, app_id, max_buffer_length = DEFAULT_LENGTH) click to toggle source
# File lib/thinkingdata-ruby/batch_consumer.rb, line 12
def initialize(server_url, app_id, max_buffer_length = DEFAULT_LENGTH)
  @server_uri = URI.parse(server_url)
  @server_uri.path = '/sync_server'
  @app_id = app_id
  @compress = true
  @max_length = [max_buffer_length, MAX_LENGTH].min
  @buffers = []
end

Public Instance Methods

_set_compress(compress) click to toggle source
# File lib/thinkingdata-ruby/batch_consumer.rb, line 21
def _set_compress(compress)
  @compress = compress
end
add(message) click to toggle source
# File lib/thinkingdata-ruby/batch_consumer.rb, line 25
def add(message)
  @buffers << message
  flush if @buffers.length >= @max_length
end
close() click to toggle source
# File lib/thinkingdata-ruby/batch_consumer.rb, line 30
def close
  flush
end
flush() click to toggle source
# File lib/thinkingdata-ruby/batch_consumer.rb, line 34
def flush
  begin
    @buffers.each_slice(@max_length) do |chunk|
      if @compress
        wio = StringIO.new("w")
        gzip_io = Zlib::GzipWriter.new(wio)
        gzip_io.write(chunk.to_json)
        gzip_io.close
        data = wio.string
      else
        data = chunk.to_json
      end
      compress_type = @compress ? 'gzip' : 'none'
      headers = {'Content-Type' => 'application/plaintext',
                 'appid' => @app_id,
                 'compress' => compress_type,
                 'TA-Integration-Type'=>'Ruby',
                 'TA-Integration-Version'=>TDAnalytics::VERSION,
                 'TA-Integration-Count'=>@buffers.count,
                 'TA_Integration-Extra'=>'batch'}
      request = CaseSensitivePost.new(@server_uri.request_uri, headers)
      request.body = data

      begin
        response_code, response_body = _request(@server_uri, request)
      rescue => e
        raise ConnectionError.new("Could not connect to TA server, with error \"#{e.message}\".")
      end

      result = {}
      if response_code.to_i == 200
        begin
          result = JSON.parse(response_body.to_s)
        rescue JSON::JSONError
          raise ServerError.new("Could not interpret TA server response: '#{response_body}'")
        end
      end

      if result['code'] != 0
        raise ServerError.new("Could not write to TA, server responded with #{response_code} returning: '#{response_body}'")
      end
    end
  rescue
    raise
  end
  @buffers = []
end

Private Instance Methods

_request(uri, request) click to toggle source
# File lib/thinkingdata-ruby/batch_consumer.rb, line 83
def _request(uri, request)
  client = Net::HTTP.new(uri.host, uri.port)
  client.use_ssl = uri.scheme === 'https' ? true : false
  client.open_timeout = 10
  client.continue_timeout = 10
  client.read_timeout = 10
  client.ssl_timeout = 10

  response = client.request(request)
  [response.code, response.body]
end