class Kafka::Statsd::ConsumerSubscriber

Public Instance Methods

fetch_batch(event) click to toggle source
# File lib/kafka/statsd.rb, line 124
def fetch_batch(event)
  lag = event.payload.fetch(:offset_lag)
  batch_size = event.payload.fetch(:message_count)
  client = event.payload.fetch(:client_id)
  group_id = event.payload.fetch(:group_id)
  topic = event.payload.fetch(:topic)
  partition = event.payload.fetch(:partition)

  count("consumer.#{client}.#{group_id}.#{topic}.#{partition}.batch_size", batch_size)
  gauge("consumer.#{client}.#{group_id}.#{topic}.#{partition}.lag", lag)
end
join_group(event) click to toggle source
# File lib/kafka/statsd.rb, line 136
def join_group(event)
  client = event.payload.fetch(:client_id)
  group_id = event.payload.fetch(:group_id)

  timing("consumer.#{client}.#{group_id}.join_group", event.duration)

  if event.payload.key?(:exception)
    increment("consumer.#{client}.#{group_id}.join_group.errors")
  end
end
leave_group(event) click to toggle source
# File lib/kafka/statsd.rb, line 158
def leave_group(event)
  client = event.payload.fetch(:client_id)
  group_id = event.payload.fetch(:group_id)

  timing("consumer.#{client}.#{group_id}.leave_group", event.duration)

  if event.payload.key?(:exception)
    increment("consumer.#{client}.#{group_id}.leave_group.errors")
  end
end
pause_status(event) click to toggle source
# File lib/kafka/statsd.rb, line 169
def pause_status(event)
  client = event.payload.fetch(:client_id)
  group_id = event.payload.fetch(:group_id)
  topic = event.payload.fetch(:topic)
  partition = event.payload.fetch(:partition)

  duration = event.payload.fetch(:duration)

  gauge("consumer.#{client}.#{group_id}.#{topic}.#{partition}.pause.duration", duration)
end
process_batch(event) click to toggle source
# File lib/kafka/statsd.rb, line 109
def process_batch(event)
  messages = event.payload.fetch(:message_count)
  client = event.payload.fetch(:client_id)
  group_id = event.payload.fetch(:group_id)
  topic = event.payload.fetch(:topic)
  partition = event.payload.fetch(:partition)

  if event.payload.key?(:exception)
    increment("consumer.#{client}.#{group_id}.#{topic}.#{partition}.process_batch.errors")
  else
    timing("consumer.#{client}.#{group_id}.#{topic}.#{partition}.process_batch.latency", event.duration)
    count("consumer.#{client}.#{group_id}.#{topic}.#{partition}.messages", messages)
  end
end
process_message(event) click to toggle source
# File lib/kafka/statsd.rb, line 84
def process_message(event)
  offset_lag = event.payload.fetch(:offset_lag)
  create_time = event.payload.fetch(:create_time)
  client = event.payload.fetch(:client_id)
  group_id = event.payload.fetch(:group_id)
  topic = event.payload.fetch(:topic)
  partition = event.payload.fetch(:partition)

  time_lag = create_time && ((Time.now - create_time) * 1000).to_i

  if event.payload.key?(:exception)
    increment("consumer.#{client}.#{group_id}.#{topic}.#{partition}.process_message.errors")
  else
    timing("consumer.#{client}.#{group_id}.#{topic}.#{partition}.process_message.latency", event.duration)
    increment("consumer.#{client}.#{group_id}.#{topic}.#{partition}.messages")
  end

  gauge("consumer.#{client}.#{group_id}.#{topic}.#{partition}.lag", offset_lag)

  # Not all messages have timestamps.
  if time_lag
    gauge("consumer.#{client}.#{group_id}.#{topic}.#{partition}.time_lag", time_lag)
  end
end
sync_group(event) click to toggle source
# File lib/kafka/statsd.rb, line 147
def sync_group(event)
  client = event.payload.fetch(:client_id)
  group_id = event.payload.fetch(:group_id)

  timing("consumer.#{client}.#{group_id}.sync_group", event.duration)

  if event.payload.key?(:exception)
    increment("consumer.#{client}.#{group_id}.sync_group.errors")
  end
end