class Libhoney::TransmissionClient
@api private
Constants
- REQUIRED_EVENT_FIELDS
Public Class Methods
new(max_batch_size: 50, send_frequency: 100, max_concurrent_batches: 10, pending_work_capacity: 1000, send_timeout: 10, responses: nil, block_on_send: false, block_on_responses: false, user_agent_addition: nil, proxy_config: nil)
click to toggle source
# File lib/libhoney/transmission.rb, line 13 def initialize(max_batch_size: 50, send_frequency: 100, max_concurrent_batches: 10, pending_work_capacity: 1000, send_timeout: 10, responses: nil, block_on_send: false, block_on_responses: false, user_agent_addition: nil, proxy_config: nil) @responses = responses || SizedQueue.new(pending_work_capacity * 2) @block_on_send = block_on_send @block_on_responses = block_on_responses @max_batch_size = max_batch_size # convert to seconds @send_frequency = send_frequency.fdiv(1000) @max_concurrent_batches = max_concurrent_batches @pending_work_capacity = pending_work_capacity @send_timeout = send_timeout @user_agent = build_user_agent(user_agent_addition).freeze @proxy_config = proxy_config @send_queue = Queue.new @threads = [] @lock = Mutex.new @batch_thread = nil setup_batch_queue end
Public Instance Methods
add(event)
click to toggle source
# File lib/libhoney/transmission.rb, line 44 def add(event) return unless event_valid(event) begin @batch_queue.enq(event, !@block_on_send) rescue ThreadError # happens if the queue was full and block_on_send = false. end ensure_threads_running end
batch_loop()
click to toggle source
# File lib/libhoney/transmission.rb, line 144 def batch_loop next_send_time = Time.now + @send_frequency batched_events = Hash.new do |h, key| h[key] = [] end loop do begin # a timeout expiration waiting for an event # 1. interrupts only when thread is in a blocking state (waiting for pop) # 2. exception skips the break and is rescued # 3. triggers the ensure to flush the current batch # 3. begins the loop again with an updated next_send_time Thread.handle_interrupt(Timeout::Error => :on_blocking) do # an event on the batch_queue # 1. pops out and is truthy # 2. gets included in the current batch # 3. while waits for another event while (event = Timeout.timeout(@send_frequency) { @batch_queue.pop }) key = [event.api_host, event.writekey, event.dataset] batched_events[key] << event end end # a nil on the batch_queue # 1. pops out and is falsy # 2. ends the event-popping while do..end # 3. breaks the loop # 4. flushes the current batch # 5. ends the batch_loop break rescue Timeout::Error # Timeout::Error happens when there is nothing to pop from the batch_queue. # We rescue it here to avoid spamming the logs with "execution expired" errors. rescue Exception => e warn "#{self.class.name}: 💥 " + e.message if %w[debug trace].include?(ENV['LOG_LEVEL']) warn e.backtrace.join("\n").to_s if ['trace'].include?(ENV['LOG_LEVEL']) # regardless of the exception, figure out whether enough time has passed to # send the current batched events, if so, send them and figure out the next send time # before going back to the top of the loop ensure next_send_time = flush_batched_events(batched_events) if Time.now > next_send_time end end # don't need to capture the next_send_time here because the batch_loop is exiting # for some reason (probably transmission.close) flush_batched_events(batched_events) end
close(drain)
click to toggle source
# File lib/libhoney/transmission.rb, line 115 def close(drain) @lock.synchronize do # if drain is false, clear the remaining unprocessed events from the queue if drain warn "#{self.class.name} - close: draining events" if %w[debug trace].include?(ENV['LOG_LEVEL']) else warn "#{self.class.name} - close: deleting unsent events" if %w[debug trace].include?(ENV['LOG_LEVEL']) @batch_queue.clear @send_queue.clear end @batch_queue.enq(nil) if @batch_thread.nil? else @batch_thread.join(1.0) # limit the amount of time we'll wait for the thread to end end # send @threads.length number of nils so each thread will fall out of send_loop @threads.length.times { @send_queue << nil } @threads.each(&:join) @threads = [] end enqueue_response(nil) warn "#{self.class.name} - close: close complete" if %w[debug trace].include?(ENV['LOG_LEVEL']) 0 end
send_loop()
click to toggle source
# File lib/libhoney/transmission.rb, line 56 def send_loop http_clients = build_http_clients # eat events until we run out loop do api_host, writekey, dataset, batch = @send_queue.pop break if batch.nil? before = Time.now begin http = http_clients[api_host] body = serialize_batch(batch) next if body.nil? headers = { 'Content-Type' => 'application/json', 'X-Honeycomb-Team' => writekey } response = http.post( path: "/1/batch/#{Addressable::URI.escape(dataset)}", body: body, headers: headers ) process_response(response, before, batch) rescue Exception => e # catch a broader swath of exceptions than is usually good practice, # because this is effectively the top-level exception handler for the # sender threads, and we don't want those threads to die (leaving # nothing consuming the queue). warn "#{self.class.name}: 💥 " + e.message if %w[debug trace].include?(ENV['LOG_LEVEL']) warn e.backtrace.join("\n").to_s if ['trace'].include?(ENV['LOG_LEVEL']) begin batch.each do |event| # nil events in the batch should already have had an error # response enqueued in #serialize_batch next if event.nil? Response.new(error: e).tap do |error_response| error_response.metadata = event.metadata enqueue_response(error_response) end end rescue ThreadError end end end ensure http_clients.each do |_, http| begin http.close rescue StandardError nil end end end
Private Instance Methods
build_http_clients()
click to toggle source
# File lib/libhoney/transmission.rb, line 324 def build_http_clients Hash.new do |h, api_host| client = ::Excon.new( api_host, persistent: true, read_timeout: @send_timeout, write_timeout: @send_timeout, connect_timeout: @send_timeout, proxy: @proxy_config, headers: { 'User-Agent' => @user_agent, 'Content-Type' => 'application/json' } ) h[api_host] = client end end
build_user_agent(user_agent_addition)
click to toggle source
# File lib/libhoney/transmission.rb, line 299 def build_user_agent(user_agent_addition) ua = "libhoney-rb/#{VERSION}" ua << " #{user_agent_addition}" if user_agent_addition ua end
enqueue_response(response)
click to toggle source
Enqueues a response to the responses queue suppressing ThreadError when there is no space left on the queue and we are not blocking on response
# File lib/libhoney/transmission.rb, line 229 def enqueue_response(response) @responses.enq(response, !@block_on_responses) rescue ThreadError end
ensure_threads_running()
click to toggle source
# File lib/libhoney/transmission.rb, line 305 def ensure_threads_running @lock.synchronize do @batch_thread = Thread.new { batch_loop } unless @batch_thread && @batch_thread.alive? @threads.select!(&:alive?) @threads << Thread.new { send_loop } while @threads.length < @max_concurrent_batches end end
event_valid(event)
click to toggle source
# File lib/libhoney/transmission.rb, line 204 def event_valid(event) missing_required_fields = REQUIRED_EVENT_FIELDS.select do |required_field| event.public_send(required_field).nil? || event.public_send(required_field).empty? end if missing_required_fields.empty? true else enqueue_response( Response.new( metadata: event.metadata, error: StandardError.new( "#{self.class.name}: nil or empty required fields (#{missing_required_fields.join(', ')})"\ '. Will not attempt to send.' ) ) ) false end end
flush_batched_events(batched_events)
click to toggle source
# File lib/libhoney/transmission.rb, line 313 def flush_batched_events(batched_events) batched_events.each do |(api_host, writekey, dataset), events| events.each_slice(@max_batch_size) do |batch| @send_queue << [api_host, writekey, dataset, batch] end end batched_events.clear Time.now + @send_frequency end
process_response(http_response, before, batch)
click to toggle source
# File lib/libhoney/transmission.rb, line 234 def process_response(http_response, before, batch) if http_response.status == 200 index = 0 JSON.parse(http_response.body).each do |event| index += 1 while batch[index].nil? && index < batch.size break unless (batched_event = batch[index]) enqueue_response( Response.new( status_code: event['status'], duration: (Time.now - before), metadata: batched_event.metadata ) ) end else error = JSON.parse(http_response.body)['error'] if %w[debug trace].include?(ENV['LOG_LEVEL']) warn "#{self.class.name}: error sending data to Honeycomb - #{http_response.status} #{error}" end batch.each do |batched_event| next unless batched_event # skip nils enqueued from serialization errors enqueue_response( Response.new( status_code: http_response.status, # single error from API applied to all events sent in batch duration: (Time.now - before), metadata: batched_event.metadata, error: RuntimeError.new(error) ) ) end end end
serialize_batch(batch)
click to toggle source
# File lib/libhoney/transmission.rb, line 269 def serialize_batch(batch) payload = [] batch.map! do |event| begin data = clean_data(event.data) e = { time: event.timestamp.iso8601(3), samplerate: event.sample_rate, data: data } payload << JSON.generate(e) event rescue StandardError => e Response.new(error: e).tap do |response| response.metadata = event.metadata enqueue_response(response) end nil end end return if payload.empty? "[#{payload.join(',')}]" end
setup_batch_queue()
click to toggle source
# File lib/libhoney/transmission.rb, line 197 def setup_batch_queue # use a SizedQueue so the producer will block on adding to the batch_queue when @block_on_send is true @batch_queue = SizedQueue.new(@pending_work_capacity) end