class Kafka::Broker
Public Class Methods
new(connection_builder:, host:, port:, node_id: nil, logger:)
click to toggle source
# File lib/kafka/broker.rb, line 9 def initialize(connection_builder:, host:, port:, node_id: nil, logger:) @connection_builder = connection_builder @connection = nil @host = host @port = port @node_id = node_id @logger = TaggedLogger.new(logger) end
Public Instance Methods
add_partitions_to_txn(**options)
click to toggle source
# File lib/kafka/broker.rb, line 173 def add_partitions_to_txn(**options) request = Protocol::AddPartitionsToTxnRequest.new(**options) send_request(request) end
address_match?(host, port)
click to toggle source
# File lib/kafka/broker.rb, line 18 def address_match?(host, port) host == @host && port == @port end
alter_configs(**options)
click to toggle source
# File lib/kafka/broker.rb, line 137 def alter_configs(**options) request = Protocol::AlterConfigsRequest.new(**options) send_request(request) end
api_versions()
click to toggle source
# File lib/kafka/broker.rb, line 155 def api_versions request = Protocol::ApiVersionsRequest.new send_request(request) end
commit_offsets(**options)
click to toggle source
# File lib/kafka/broker.rb, line 83 def commit_offsets(**options) request = Protocol::OffsetCommitRequest.new(**options) send_request(request) end
connected?()
click to toggle source
@return [Boolean]
# File lib/kafka/broker.rb, line 33 def connected? !@connection.nil? end
create_partitions(**options)
click to toggle source
# File lib/kafka/broker.rb, line 143 def create_partitions(**options) request = Protocol::CreatePartitionsRequest.new(**options) send_request(request) end
create_topics(**options)
click to toggle source
# File lib/kafka/broker.rb, line 119 def create_topics(**options) request = Protocol::CreateTopicsRequest.new(**options) send_request(request) end
delete_topics(**options)
click to toggle source
# File lib/kafka/broker.rb, line 125 def delete_topics(**options) request = Protocol::DeleteTopicsRequest.new(**options) send_request(request) end
describe_configs(**options)
click to toggle source
# File lib/kafka/broker.rb, line 131 def describe_configs(**options) request = Protocol::DescribeConfigsRequest.new(**options) send_request(request) end
describe_groups(**options)
click to toggle source
# File lib/kafka/broker.rb, line 161 def describe_groups(**options) request = Protocol::DescribeGroupsRequest.new(**options) send_request(request) end
disconnect()
click to toggle source
@return [nil]
# File lib/kafka/broker.rb, line 28 def disconnect connection.close if connected? end
end_txn(**options)
click to toggle source
# File lib/kafka/broker.rb, line 179 def end_txn(**options) request = Protocol::EndTxnRequest.new(**options) send_request(request) end
fetch_messages(**options)
click to toggle source
Fetches messages from a specified topic and partition.
@param (see Kafka::Protocol::FetchRequest#initialize) @return [Kafka::Protocol::FetchResponse]
# File lib/kafka/broker.rb, line 51 def fetch_messages(**options) request = Protocol::FetchRequest.new(**options) send_request(request) end
fetch_metadata(**options)
click to toggle source
Fetches cluster metadata from the broker.
@param (see Kafka::Protocol::MetadataRequest#initialize) @return [Kafka::Protocol::MetadataResponse]
# File lib/kafka/broker.rb, line 41 def fetch_metadata(**options) request = Protocol::MetadataRequest.new(**options) send_request(request) end
fetch_offsets(**options)
click to toggle source
# File lib/kafka/broker.rb, line 77 def fetch_offsets(**options) request = Protocol::OffsetFetchRequest.new(**options) send_request(request) end
find_coordinator(**options)
click to toggle source
# File lib/kafka/broker.rb, line 107 def find_coordinator(**options) request = Protocol::FindCoordinatorRequest.new(**options) send_request(request) end
heartbeat(**options)
click to toggle source
# File lib/kafka/broker.rb, line 113 def heartbeat(**options) request = Protocol::HeartbeatRequest.new(**options) send_request(request) end
init_producer_id(**options)
click to toggle source
# File lib/kafka/broker.rb, line 167 def init_producer_id(**options) request = Protocol::InitProducerIDRequest.new(**options) send_request(request) end
join_group(**options)
click to toggle source
# File lib/kafka/broker.rb, line 89 def join_group(**options) request = Protocol::JoinGroupRequest.new(**options) send_request(request) end
leave_group(**options)
click to toggle source
# File lib/kafka/broker.rb, line 101 def leave_group(**options) request = Protocol::LeaveGroupRequest.new(**options) send_request(request) end
list_groups()
click to toggle source
# File lib/kafka/broker.rb, line 149 def list_groups request = Protocol::ListGroupsRequest.new send_request(request) end
list_offsets(**options)
click to toggle source
Lists the offset of the specified topics and partitions.
@param (see Kafka::Protocol::ListOffsetRequest#initialize) @return [Kafka::Protocol::ListOffsetResponse]
# File lib/kafka/broker.rb, line 61 def list_offsets(**options) request = Protocol::ListOffsetRequest.new(**options) send_request(request) end
produce(**options)
click to toggle source
Produces a set of messages to the broker.
@param (see Kafka::Protocol::ProduceRequest#initialize) @return [Kafka::Protocol::ProduceResponse]
# File lib/kafka/broker.rb, line 71 def produce(**options) request = Protocol::ProduceRequest.new(**options) send_request(request) end
sync_group(**options)
click to toggle source
# File lib/kafka/broker.rb, line 95 def sync_group(**options) request = Protocol::SyncGroupRequest.new(**options) send_request(request) end
to_s()
click to toggle source
@return [String]
# File lib/kafka/broker.rb, line 23 def to_s "#{@host}:#{@port} (node_id=#{@node_id.inspect})" end
Private Instance Methods
connection()
click to toggle source
# File lib/kafka/broker.rb, line 201 def connection @connection ||= @connection_builder.build_connection(@host, @port) end
send_request(request)
click to toggle source
# File lib/kafka/broker.rb, line 187 def send_request(request) connection.send_request(request) rescue IdleConnection @logger.warn "Connection has been unused for too long, re-connecting..." @connection.close rescue nil @connection = nil retry rescue ConnectionError @connection.close rescue nil @connection = nil raise end