class Kafka::Sasl::OAuth
Constants
- OAUTH_IDENT
Public Class Methods
new(logger:, token_provider:)
click to toggle source
token_provider: THE FOLLOWING INTERFACE MUST BE FULFILLED:
- REQUIRED
-
TokenProvider#token - Returns an ID/Access Token to be sent to the
Kafka
client.
The implementation should ensure token reuse so that multiple calls at connect time do not create multiple tokens. The implementation should also periodically refresh the token in order to guarantee that each call returns an unexpired token. A timeout error should be returned after a short period of inactivity so that the broker can log debugging info and retry.
- OPTIONAL
-
TokenProvider#extensions - Returns a map of key-value pairs that can be sent with the
SASL/OAUTHBEARER initial client response. If not provided, the values are ignored. This feature is only available in Kafka >= 2.1.0.
# File lib/kafka/sasl/oauth.rb, line 21 def initialize(logger:, token_provider:) @logger = TaggedLogger.new(logger) @token_provider = token_provider end
Public Instance Methods
authenticate!(host, encoder, decoder)
click to toggle source
# File lib/kafka/sasl/oauth.rb, line 34 def authenticate!(host, encoder, decoder) # Send SASLOauthBearerClientResponse with token @logger.debug "Authenticating to #{host} with SASL #{OAUTH_IDENT}" encoder.write_bytes(initial_client_response) begin # receive SASL OAuthBearer Server Response msg = decoder.bytes raise Kafka::Error, "SASL #{OAUTH_IDENT} authentication failed: unknown error" unless msg rescue Errno::ETIMEDOUT, EOFError => e raise Kafka::Error, "SASL #{OAUTH_IDENT} authentication failed: #{e.message}" end @logger.debug "SASL #{OAUTH_IDENT} authentication successful." end
configured?()
click to toggle source
# File lib/kafka/sasl/oauth.rb, line 30 def configured? @token_provider end
ident()
click to toggle source
# File lib/kafka/sasl/oauth.rb, line 26 def ident OAUTH_IDENT end
Private Instance Methods
initial_client_response()
click to toggle source
# File lib/kafka/sasl/oauth.rb, line 53 def initial_client_response raise Kafka::TokenMethodNotImplementedError, "Token provider doesn't define 'token'" unless @token_provider.respond_to? :token "n,,\x01auth=Bearer #{@token_provider.token}#{token_extensions}\x01\x01" end
token_extensions()
click to toggle source
# File lib/kafka/sasl/oauth.rb, line 58 def token_extensions return nil unless @token_provider.respond_to? :extensions "\x01#{@token_provider.extensions.map {|e| e.join("=")}.join("\x01")}" end