class Rafka::Producer
A Kafka producer that can produce to different topics. See {#produce} for more info.
Attributes
Access the underlying Redis client object
Public Class Methods
Create a new producer.
@param [Hash] opts @option opts [String] :host (“localhost”) server hostname @option opts [Fixnum] :port (6380) server port @option opts [Hash] :redis Configuration options for the underlying
Redis client
@return [Producer]
# File lib/rafka/producer.rb, line 21 def initialize(opts={}) @options = parse_opts(opts) @redis = Redis.new(@options) end
Public Instance Methods
Flush any outstanding messages. The server will block until all messages are written or the provided timeout is exceeded. Note however, that the provided timeout may be overshot by the `read_timeout` setting of the underlying Redis client. This means that the client might interrupt the call earlier than timeout_ms, if `read_timeout` is less than `timeout_ms`.
@param timeout_ms [Fixnum]. Must be smaller than or equal to the underlying Redis client's `read_timeout`
which superseeds the current timeout.
@return [Fixnum] The number of unflushed messages
# File lib/rafka/producer.rb, line 59 def flush(timeout_ms=5000) Rafka.wrap_errors do Integer(@redis.dump(timeout_ms.to_s)) end end
Produce a message to a topic. This is an asynchronous operation.
@param topic [String] @param msg [#to_s] the message @param key [#to_s] an optional partition hashing key. Two or more messages
with the same key will always be written to the same partition.
@example Simple produce
producer = Rafka::Producer.new producer.produce("greetings", "Hello there!")
@example Produce two messages with a hashing key. Those messages are guaranteed to be written to the same partition
producer = Rafka::Producer.new produce("greetings", "Aloha", key: "abc") produce("greetings", "Hola", key: "abc")
# File lib/rafka/producer.rb, line 41 def produce(topic, msg, key: nil) Rafka.wrap_errors do redis_key = "topics:#{topic}" redis_key << ":#{key}" if key @redis.rpushx(redis_key, msg.to_s) end end
Private Instance Methods
@return [Hash]
# File lib/rafka/producer.rb, line 68 def parse_opts(opts) rafka_opts = opts.reject { |k| k == :redis } redis_opts = opts[:redis] || {} REDIS_DEFAULTS.dup.merge(opts).merge(redis_opts).merge(rafka_opts) end