class Octo::KafkaBridge

The bridge between Kafka and ruby

Constants

BROKERS

Changes as per environment

CLIENT_ID

These are hard wired

DELIVERY_INTERVAL
MAX_BUFFER_SIZE
MAX_QUEUE_SIZE
TOPIC

Public Class Methods

new(opts = {}) click to toggle source
# File lib/octocore-cassandra/kafka_bridge.rb, line 20
def initialize(opts = {})
  opts.deep_symbolize_keys!
  @kafka = ::Kafka.new(seed_brokers: opts.fetch(:brokers, BROKERS),
                     client_id: opts.fetch(:client_id, CLIENT_ID)
  )
  @producer = @kafka.async_producer(
    max_buffer_size: opts.fetch(:max_buffer_size, MAX_BUFFER_SIZE),
    max_queue_size: opts.fetch(:max_queue_size, MAX_QUEUE_SIZE),
    delivery_interval: opts.fetch(:delivery_interval, DELIVERY_INTERVAL),
  )
  if opts.has_key?(:topic)
    @topic = opts[:topic]
  else
    @topic = TOPIC
  end
end

Public Instance Methods

push(params) click to toggle source
# File lib/octocore-cassandra/kafka_bridge.rb, line 37
def push(params)
  create_message params
end
teardown() click to toggle source
# File lib/octocore-cassandra/kafka_bridge.rb, line 41
def teardown
  @producer.shutdown
end

Private Instance Methods

create_message(message) click to toggle source

Creates a new message. @param [Hash] message The message hash to be produced

# File lib/octocore-cassandra/kafka_bridge.rb, line 49
def create_message(message)
  begin
    @producer.produce(JSON.dump(message), topic: @topic)
  rescue Kafka::BufferOverflow
    Octo.logger.error 'Buffer Overflow. Sleeping for 1s'
    sleep 1
    retry
  end
end