class Fluent::ZmqPubOutput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_zmq_pub.rb, line 13
def initialize
  super
  require 'ffi-rzmq'
  @mutex = Mutex.new
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_zmq_pub.rb, line 19
def configure(conf)
  super
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_zmq_pub.rb, line 31
def format(tag, time, record)
  [tag,time,record].to_msgpack
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_zmq_pub.rb, line 66
def shutdown
  @publisher.close
  @context.terminate
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_zmq_pub.rb, line 23
def start
  super
  @context = ZMQ::Context.new()
  @publisher = @context.socket(ZMQ::PUB)
  @publisher.setsockopt(ZMQ::SNDHWM, @highwatermark)
  @publisher.bind(@bindaddr)
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_zmq_pub.rb, line 35
def write(chunk)
  records = { }
  #  to_msgpack in format, unpack in write, then to_msgpack again... better way?
  chunk.msgpack_each{ |record|
    pubkey_replaced = @pubkey.gsub(/\${(.*?)}/){ |s|
      case $1
      when 'tag'
        record[0]
      else
        record[2][$1]
      end
    }
    if @bulk_send
      records[pubkey_replaced] ||= []
      records[pubkey_replaced] << record
    else
      @mutex.synchronize {
        @publisher.send_string(pubkey_replaced + " " + record.to_msgpack,ZMQ::DONTWAIT)
      }
    end
  }
  if @bulk_send
    @mutex.synchronize {
      records.each{  |k,v|
        @publisher.send_string(k + " " + v.to_msgpack,ZMQ::DONTWAIT)
      }
    }
  end

end