class Rafka::Producer

A Kafka producer that can produce to different topics. See {#produce} for more info.

@see kafka.apache.org/documentation/#producerapi

Attributes

redis[R]

Access the underlying Redis client object

Public Class Methods

new(opts={}) click to toggle source

Create a new producer.

@param [Hash] opts @option opts [String] :host (“localhost”) server hostname @option opts [Fixnum] :port (6380) server port @option opts [Hash] :redis Configuration options for the underlying

Redis client

@return [Producer]

# File lib/rafka/producer.rb, line 21
def initialize(opts={})
  @options = parse_opts(opts)
  @redis = Redis.new(@options)
end

Public Instance Methods

flush(timeout_ms=5000) click to toggle source

Flush any outstanding messages. The server will block until all messages are written or the provided timeout is exceeded. Note however, that the provided timeout may be overshot by the `read_timeout` setting of the underlying Redis client. This means that the client might interrupt the call earlier than timeout_ms, if `read_timeout` is less than `timeout_ms`.

@param timeout_ms [Fixnum]. Must be smaller than or equal to the underlying Redis client's `read_timeout`

which superseeds the current timeout.

@return [Fixnum] The number of unflushed messages

# File lib/rafka/producer.rb, line 59
def flush(timeout_ms=5000)
  Rafka.wrap_errors do
    Integer(@redis.dump(timeout_ms.to_s))
  end
end
produce(topic, msg, key: nil) click to toggle source

Produce a message to a topic. This is an asynchronous operation.

@param topic [String] @param msg [#to_s] the message @param key [#to_s] an optional partition hashing key. Two or more messages

with the same key will always be written to the same partition.

@example Simple produce

producer = Rafka::Producer.new
producer.produce("greetings", "Hello there!")

@example Produce two messages with a hashing key. Those messages are guaranteed to be written to the same partition

producer = Rafka::Producer.new
produce("greetings", "Aloha", key: "abc")
produce("greetings", "Hola", key: "abc")
# File lib/rafka/producer.rb, line 41
def produce(topic, msg, key: nil)
  Rafka.wrap_errors do
    redis_key = "topics:#{topic}"
    redis_key << ":#{key}" if key
    @redis.rpushx(redis_key, msg.to_s)
  end
end

Private Instance Methods

parse_opts(opts) click to toggle source

@return [Hash]

# File lib/rafka/producer.rb, line 68
def parse_opts(opts)
  rafka_opts = opts.reject { |k| k == :redis }
  redis_opts = opts[:redis] || {}
  REDIS_DEFAULTS.dup.merge(opts).merge(redis_opts).merge(rafka_opts)
end