class Kafka::Fetcher
Attributes
current_reset_counter[R]
queue[R]
Public Class Methods
new(cluster:, logger:, instrumenter:, max_queue_size:, group:)
click to toggle source
# File lib/kafka/fetcher.rb, line 9 def initialize(cluster:, logger:, instrumenter:, max_queue_size:, group:) @cluster = cluster @logger = TaggedLogger.new(logger) @instrumenter = instrumenter @max_queue_size = max_queue_size @group = group @queue = Queue.new @commands = Queue.new @next_offsets = Hash.new { |h, k| h[k] = {} } # Long poll until at least this many bytes can be fetched. @min_bytes = 1 # Long poll at most this number of seconds. @max_wait_time = 1 # The maximum number of bytes to fetch for any given fetch request. @max_bytes = 10485760 # The maximum number of bytes to fetch per partition, by topic. @max_bytes_per_partition = {} # An incrementing counter used to synchronize resets between the # foreground and background thread. @current_reset_counter = 0 end
Public Instance Methods
configure(min_bytes:, max_bytes:, max_wait_time:)
click to toggle source
# File lib/kafka/fetcher.rb, line 45 def configure(min_bytes:, max_bytes:, max_wait_time:) @commands << [:configure, [min_bytes, max_bytes, max_wait_time]] end
data?()
click to toggle source
# File lib/kafka/fetcher.rb, line 74 def data? !@queue.empty? end
poll()
click to toggle source
# File lib/kafka/fetcher.rb, line 78 def poll tag, message, reset_counter = @queue.deq # Batches are tagged with the current reset counter value. If the batch # has a reset_counter < current_reset_counter, we know it was fetched # prior to the most recent reset and should be discarded. if tag == :batches && message.any? && current_reset_counter > reset_counter @logger.warn "Skipping stale messages buffered prior to reset" return tag, [] end return [tag, message] end
reset()
click to toggle source
# File lib/kafka/fetcher.rb, line 69 def reset @current_reset_counter = current_reset_counter + 1 @commands << [:reset] end
seek(topic, partition, offset)
click to toggle source
# File lib/kafka/fetcher.rb, line 41 def seek(topic, partition, offset) @commands << [:seek, [topic, partition, offset]] end
start()
click to toggle source
# File lib/kafka/fetcher.rb, line 49 def start return if @running @running = true @thread = Thread.new do while @running loop end @logger.info "#{@group} Fetcher thread exited." end @thread.abort_on_exception = true end
stop()
click to toggle source
# File lib/kafka/fetcher.rb, line 63 def stop return unless @running @commands << [:stop, []] @thread.join end
subscribe(topic, max_bytes_per_partition:)
click to toggle source
# File lib/kafka/fetcher.rb, line 37 def subscribe(topic, max_bytes_per_partition:) @commands << [:subscribe, [topic, max_bytes_per_partition]] end
Private Instance Methods
fetch_batches()
click to toggle source
# File lib/kafka/fetcher.rb, line 181 def fetch_batches @logger.debug "Fetching batches" operation = FetchOperation.new( cluster: @cluster, logger: @logger, min_bytes: @min_bytes, max_bytes: @max_bytes, max_wait_time: @max_wait_time, ) @next_offsets.each do |topic, partitions| # Fetch at most this many bytes from any single partition. max_bytes = @max_bytes_per_partition[topic] partitions.each do |partition, offset| operation.fetch_from_partition(topic, partition, offset: offset, max_bytes: max_bytes) end end operation.execute rescue UnknownTopicOrPartition @logger.error "Failed to fetch from some partitions. Maybe a rebalance has happened? Refreshing cluster info." # Our cluster information has become stale, we need to refresh it. @cluster.refresh_metadata! # Don't overwhelm the brokers in case this keeps happening. sleep 10 retry rescue NoPartitionsToFetchFrom backoff = @max_wait_time > 0 ? @max_wait_time : 1 @logger.info "There are no partitions to fetch from, sleeping for #{backoff}s" sleep backoff [] end
handle_configure(min_bytes, max_bytes, max_wait_time)
click to toggle source
# File lib/kafka/fetcher.rb, line 120 def handle_configure(min_bytes, max_bytes, max_wait_time) @min_bytes = min_bytes @max_bytes = max_bytes @max_wait_time = max_wait_time end
handle_reset()
click to toggle source
# File lib/kafka/fetcher.rb, line 126 def handle_reset @next_offsets.clear @queue.clear end
handle_seek(topic, partition, offset)
click to toggle source
# File lib/kafka/fetcher.rb, line 146 def handle_seek(topic, partition, offset) @instrumenter.instrument('seek.consumer', group_id: @group.group_id, topic: topic, partition: partition, offset: offset) @logger.info "Seeking #{topic}/#{partition} to offset #{offset}" @next_offsets[topic][partition] = offset end
handle_stop(*)
click to toggle source
# File lib/kafka/fetcher.rb, line 131 def handle_stop(*) @running = false @commands.clear # After stopping, we need to reconfigure the topics and partitions to fetch # from. Otherwise we'd keep fetching from a bunch of partitions we may no # longer be assigned. handle_reset end
handle_subscribe(topic, max_bytes_per_partition)
click to toggle source
# File lib/kafka/fetcher.rb, line 141 def handle_subscribe(topic, max_bytes_per_partition) @logger.info "Will fetch at most #{max_bytes_per_partition} bytes at a time per partition from #{topic}" @max_bytes_per_partition[topic] = max_bytes_per_partition end
loop()
click to toggle source
# File lib/kafka/fetcher.rb, line 96 def loop @logger.push_tags(@group.to_s) @instrumenter.instrument("loop.fetcher", { queue_size: @queue.size, }) return unless @running if !@commands.empty? cmd, args = @commands.deq @logger.debug "Handling fetcher command: #{cmd}" send("handle_#{cmd}", *args) elsif @queue.size < @max_queue_size step else @logger.warn "Reached max fetcher queue size (#{@max_queue_size}), sleeping 1s" sleep 1 end ensure @logger.pop_tags end
step()
click to toggle source
# File lib/kafka/fetcher.rb, line 156 def step batches = fetch_batches batches.each do |batch| unless batch.empty? @instrumenter.instrument("fetch_batch.consumer", { topic: batch.topic, partition: batch.partition, offset_lag: batch.offset_lag, highwater_mark_offset: batch.highwater_mark_offset, message_count: batch.messages.count, }) end @next_offsets[batch.topic][batch.partition] = batch.last_offset + 1 unless batch.unknown_last_offset? end @queue << [:batches, batches, current_reset_counter] rescue Kafka::NoPartitionsToFetchFrom @logger.warn "No partitions to fetch from, sleeping for 1s" sleep 1 rescue Kafka::Error => e @queue << [:exception, e] end