class LogStash::Outputs::ZeroMQ

Write events to a 0MQ PUB socket.

You need to have the 0mq 2.1.x library installed to be able to use this output plugin.

The default settings will create a publisher connecting to a subscriber bound to tcp://127.0.0.1:2120

Public Instance Methods

close() click to toggle source
# File lib/logstash/outputs/zeromq.rb, line 73
def close
  begin
    error_check(@zsocket.close, "while closing the socket")
  rescue RuntimeError => e
    @logger.error("Failed to properly teardown ZeroMQ")
  end
end
multi_receive_encoded(events_and_encoded) click to toggle source
# File lib/logstash/outputs/zeromq.rb, line 81
def multi_receive_encoded(events_and_encoded)
  events_and_encoded.each {|event, encoded| self.publish(event,encoded)}
end
register() click to toggle source
# File lib/logstash/outputs/zeromq.rb, line 66
def register
  load_zmq

  connect
end

Private Instance Methods

connect() click to toggle source
# File lib/logstash/outputs/zeromq.rb, line 108
def connect
  # Translate topology shorthand to socket types
  case @topology
  when "pair"
    zmq_const = ZMQ::PAIR
  when "pushpull"
    zmq_const = ZMQ::PUSH
  when "pubsub"
    zmq_const = ZMQ::PUB
  end # case socket_type

  @zsocket = context.socket(zmq_const)

  error_check(@zsocket.setsockopt(ZMQ::LINGER, 1),
              "while setting ZMQ::LINGER == 1)")

  if @sockopt
    setopts(@zsocket, @sockopt)
  end

  @address.each do |addr|
    setup(@zsocket, addr)
  end
end
load_zmq() click to toggle source
# File lib/logstash/outputs/zeromq.rb, line 102
def load_zmq
  require "ffi-rzmq"
  require "logstash/plugin_mixins/zeromq"
  self.class.send(:include, LogStash::PluginMixins::ZeroMQ)
end
publish(event, payload) click to toggle source
# File lib/logstash/outputs/zeromq.rb, line 90
def publish(event, payload)
  if @topology == "pubsub"
    topic = event.sprintf(@topic)
    error_check(@zsocket.send_string(topic, ZMQ::SNDMORE), "in topic send_string")
  end
  @logger.debug? && @logger.debug("0mq: sending", :event => payload)
  error_check(@zsocket.send_string(payload), "in send_string")
rescue => e
  warn e.inspect
  @logger.warn("0mq output exception", :address => @address, :exception => e)
end
server?() click to toggle source
# File lib/logstash/outputs/zeromq.rb, line 86
def server?
  @mode == "server"
end