class Fluent::ZmqPubOutput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_zmq_pub.rb, line 13 def initialize super require 'ffi-rzmq' @mutex = Mutex.new end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_zmq_pub.rb, line 19 def configure(conf) super end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_zmq_pub.rb, line 31 def format(tag, time, record) [tag,time,record].to_msgpack end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_zmq_pub.rb, line 66 def shutdown @publisher.close @context.terminate super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_zmq_pub.rb, line 23 def start super @context = ZMQ::Context.new() @publisher = @context.socket(ZMQ::PUB) @publisher.setsockopt(ZMQ::SNDHWM, @highwatermark) @publisher.bind(@bindaddr) end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_zmq_pub.rb, line 35 def write(chunk) records = { } # to_msgpack in format, unpack in write, then to_msgpack again... better way? chunk.msgpack_each{ |record| pubkey_replaced = @pubkey.gsub(/\${(.*?)}/){ |s| case $1 when 'tag' record[0] else record[2][$1] end } if @bulk_send records[pubkey_replaced] ||= [] records[pubkey_replaced] << record else @mutex.synchronize { @publisher.send_string(pubkey_replaced + " " + record.to_msgpack,ZMQ::DONTWAIT) } end } if @bulk_send @mutex.synchronize { records.each{ |k,v| @publisher.send_string(k + " " + v.to_msgpack,ZMQ::DONTWAIT) } } end end