class LogStash::Outputs::ZeroMQ
Write events to a 0MQ PUB socket.
You need to have the 0mq 2.1.x library installed to be able to use this output plugin.
The default settings will create a publisher connecting to a subscriber bound to tcp://127.0.0.1:2120
Public Instance Methods
close()
click to toggle source
# File lib/logstash/outputs/zeromq.rb, line 73 def close begin error_check(@zsocket.close, "while closing the socket") rescue RuntimeError => e @logger.error("Failed to properly teardown ZeroMQ") end end
multi_receive_encoded(events_and_encoded)
click to toggle source
# File lib/logstash/outputs/zeromq.rb, line 81 def multi_receive_encoded(events_and_encoded) events_and_encoded.each {|event, encoded| self.publish(event,encoded)} end
register()
click to toggle source
# File lib/logstash/outputs/zeromq.rb, line 66 def register load_zmq connect end
Private Instance Methods
connect()
click to toggle source
# File lib/logstash/outputs/zeromq.rb, line 108 def connect # Translate topology shorthand to socket types case @topology when "pair" zmq_const = ZMQ::PAIR when "pushpull" zmq_const = ZMQ::PUSH when "pubsub" zmq_const = ZMQ::PUB end # case socket_type @zsocket = context.socket(zmq_const) error_check(@zsocket.setsockopt(ZMQ::LINGER, 1), "while setting ZMQ::LINGER == 1)") if @sockopt setopts(@zsocket, @sockopt) end @address.each do |addr| setup(@zsocket, addr) end end
load_zmq()
click to toggle source
# File lib/logstash/outputs/zeromq.rb, line 102 def load_zmq require "ffi-rzmq" require "logstash/plugin_mixins/zeromq" self.class.send(:include, LogStash::PluginMixins::ZeroMQ) end
publish(event, payload)
click to toggle source
# File lib/logstash/outputs/zeromq.rb, line 90 def publish(event, payload) if @topology == "pubsub" topic = event.sprintf(@topic) error_check(@zsocket.send_string(topic, ZMQ::SNDMORE), "in topic send_string") end @logger.debug? && @logger.debug("0mq: sending", :event => payload) error_check(@zsocket.send_string(payload), "in send_string") rescue => e warn e.inspect @logger.warn("0mq output exception", :address => @address, :exception => e) end
server?()
click to toggle source
# File lib/logstash/outputs/zeromq.rb, line 86 def server? @mode == "server" end