class Phobos::Producer::ClassMethods::PublicAPI
Constants
- ASYNC_PRODUCER_PARAMS
- INTERNAL_PRODUCER_PARAMS
- NAMESPACE
Public Instance Methods
async_configs()
click to toggle source
# File lib/phobos/producer.rb, line 143 def async_configs Phobos.config.producer_hash .reject { |k, _| INTERNAL_PRODUCER_PARAMS.include?(k) } end
async_producer()
click to toggle source
# File lib/phobos/producer.rb, line 116 def async_producer producer_store[:async_producer] end
async_producer_shutdown()
click to toggle source
# File lib/phobos/producer.rb, line 131 def async_producer_shutdown async_producer&.deliver_messages async_producer&.shutdown producer_store[:async_producer] = nil end
async_publish(topic:, payload:, key: nil, partition_key: nil, headers: nil)
click to toggle source
# File lib/phobos/producer.rb, line 120 def async_publish(topic:, payload:, key: nil, partition_key: nil, headers: nil) async_publish_list([{ topic: topic, payload: payload, key: key, partition_key: partition_key, headers: headers }]) end
async_publish_list(messages)
click to toggle source
# File lib/phobos/producer.rb, line 125 def async_publish_list(messages) producer = async_producer || create_async_producer produce_messages(producer, messages) producer.deliver_messages unless async_automatic_delivery? end
configure_kafka_client(kafka_client)
click to toggle source
This method configures the kafka client used with publish operations performed by the host class
@param kafka_client
[Kafka::Client]
# File lib/phobos/producer.rb, line 70 def configure_kafka_client(kafka_client) async_producer_shutdown producer_store[:kafka_client] = kafka_client end
create_async_producer()
click to toggle source
# File lib/phobos/producer.rb, line 110 def create_async_producer client = kafka_client || configure_kafka_client(Phobos.create_kafka_client(:producer)) async_producer = client.async_producer(**async_configs) producer_store[:async_producer] = async_producer end
create_sync_producer()
click to toggle source
# File lib/phobos/producer.rb, line 79 def create_sync_producer client = kafka_client || configure_kafka_client(Phobos.create_kafka_client(:producer)) sync_producer = client.producer(**regular_configs) if Phobos.config.producer_hash[:persistent_connections] producer_store[:sync_producer] = sync_producer end sync_producer end
kafka_client()
click to toggle source
# File lib/phobos/producer.rb, line 75 def kafka_client producer_store[:kafka_client] end
publish(topic:, payload:, key: nil, partition_key: nil, headers: nil)
click to toggle source
# File lib/phobos/producer.rb, line 97 def publish(topic:, payload:, key: nil, partition_key: nil, headers: nil) publish_list([{ topic: topic, payload: payload, key: key, partition_key: partition_key, headers: headers }]) end
publish_list(messages)
click to toggle source
# File lib/phobos/producer.rb, line 102 def publish_list(messages) producer = sync_producer || create_sync_producer produce_messages(producer, messages) producer.deliver_messages ensure producer&.shutdown unless Phobos.config.producer_hash[:persistent_connections] end
regular_configs()
click to toggle source
# File lib/phobos/producer.rb, line 137 def regular_configs Phobos.config.producer_hash .reject { |k, _| ASYNC_PRODUCER_PARAMS.include?(k) } .reject { |k, _| INTERNAL_PRODUCER_PARAMS.include?(k) } end
sync_producer()
click to toggle source
# File lib/phobos/producer.rb, line 88 def sync_producer producer_store[:sync_producer] end
sync_producer_shutdown()
click to toggle source
# File lib/phobos/producer.rb, line 92 def sync_producer_shutdown sync_producer&.shutdown producer_store[:sync_producer] = nil end
Private Instance Methods
async_automatic_delivery?()
click to toggle source
# File lib/phobos/producer.rb, line 160 def async_automatic_delivery? async_configs.fetch(:delivery_threshold, 0).positive? || async_configs.fetch(:delivery_interval, 0).positive? end
produce_messages(producer, messages)
click to toggle source
# File lib/phobos/producer.rb, line 150 def produce_messages(producer, messages) messages.each do |message| partition_key = message[:partition_key] || message[:key] producer.produce(message[:payload], topic: message[:topic], key: message[:key], headers: message[:headers], partition_key: partition_key) end end
producer_store()
click to toggle source
# File lib/phobos/producer.rb, line 165 def producer_store Thread.current[NAMESPACE] ||= {} end