class Octo::KafkaBridge
The bridge between Kafka and ruby
Constants
- BROKERS
Changes as per environment
- CLIENT_ID
These are hard wired
- DELIVERY_INTERVAL
- MAX_BUFFER_SIZE
- MAX_QUEUE_SIZE
- TOPIC
Public Class Methods
new(opts = {})
click to toggle source
# File lib/octocore-cassandra/kafka_bridge.rb, line 20 def initialize(opts = {}) opts.deep_symbolize_keys! @kafka = ::Kafka.new(seed_brokers: opts.fetch(:brokers, BROKERS), client_id: opts.fetch(:client_id, CLIENT_ID) ) @producer = @kafka.async_producer( max_buffer_size: opts.fetch(:max_buffer_size, MAX_BUFFER_SIZE), max_queue_size: opts.fetch(:max_queue_size, MAX_QUEUE_SIZE), delivery_interval: opts.fetch(:delivery_interval, DELIVERY_INTERVAL), ) if opts.has_key?(:topic) @topic = opts[:topic] else @topic = TOPIC end end
Public Instance Methods
push(params)
click to toggle source
# File lib/octocore-cassandra/kafka_bridge.rb, line 37 def push(params) create_message params end
teardown()
click to toggle source
# File lib/octocore-cassandra/kafka_bridge.rb, line 41 def teardown @producer.shutdown end
Private Instance Methods
create_message(message)
click to toggle source
Creates a new message. @param [Hash] message The message hash to be produced
# File lib/octocore-cassandra/kafka_bridge.rb, line 49 def create_message(message) begin @producer.produce(JSON.dump(message), topic: @topic) rescue Kafka::BufferOverflow Octo.logger.error 'Buffer Overflow. Sleeping for 1s' sleep 1 retry end end