class LogStash::Outputs::MQTT
This is Logstash output plugin for the mqtt.org/[MQTT] protocol.
Features:
-
Publish messages to a topic
-
TSL/SSL connection to
MQTT
server (optional) -
Message publishing to a topic
-
QoS levels 0 and 1 (note that QoS 2 is not currently supported due to github.com/njh/ruby-mqtt[ruby-mqtt] library limitations)
-
Fault tolerance for network shortages, however not optimzied for performance since it takes a new connection for each event (or a bunch of events) to be published
-
MQTT
protocol version 3.1.0
Example publishing to test.mosquitto.org:
- source,ruby
output {
mqtt { host => "test.mosquitto.org" port => 8883 topic => "hello" }
}
Example publishing to aws.amazon.com/iot/[AWS IoT]:
- source,ruby
output {
mqtt { host => "somehostname.iot.someregion.amazonaws.com" port => 8883 topic => "hello" client_id => "clientidfromaws" ssl => true cert_file => "certificate.pem.crt" key_file => "private.pem.key" ca_file => "root-CA.crt" }
}
Topic may also depend on parts of the event using the standard sprintf syntax.
- source,ruby
output {
mqtt { ... topic => "something/%{myfield}" }
}
Public Instance Methods
close()
click to toggle source
# File lib/logstash/outputs/mqtt.rb, line 153 def close @closing = true end
multi_receive(events)
click to toggle source
# File lib/logstash/outputs/mqtt.rb, line 147 def multi_receive(events) events.each { |event| @codec.encode(event) } # Handle all events at once to prevent taking a new connection for each event handle_events end
receive(event)
click to toggle source
# File lib/logstash/outputs/mqtt.rb, line 142 def receive(event) @codec.encode(event) handle_events end
register()
click to toggle source
# File lib/logstash/outputs/mqtt.rb, line 105 def register @options = { :host => @host } if @port @options[:port] = @port end if @client_id @options[:client_id] = @client_id end if @username @options[:username] = @username end if @password @options[:password] = @password end if @ssl @options[:ssl] = @ssl end if @cert_file @options[:cert_file] = @cert_file end if @key_file @options[:key_file] = @key_file end if @ca_file @options[:ca_file] = @ca_file end # Encode events using the given codec # Use an array as a buffer so the multi_receive can handle multiple events with a single connection @event_buffer = Array.new @codec.on_event do |event, encoded_event| @event_buffer.push([event, encoded_event]) end end
Private Instance Methods
handle_events()
click to toggle source
# File lib/logstash/outputs/mqtt.rb, line 164 def handle_events # It is easy to cope with network failures, ie. if connection fails just try it again while event = @event_buffer.first do @logger.debug("Publishing MQTT event #{event[1]} with topic #{@topic}, retain #{@retain}, qos #{@qos}") mqtt_client.publish(event[0].sprintf(@topic), event[1], @retain, @qos) @event_buffer.shift end rescue StandardError => e @logger.error("Error #{e.message} while publishing to MQTT server. Will retry in #{@connect_retry_interval} seconds.") @mqtt_client = nil Stud.stoppable_sleep(@connect_retry_interval, 1) { @closing } retry end
mqtt_client()
click to toggle source
# File lib/logstash/outputs/mqtt.rb, line 159 def mqtt_client @logger.debug("Connecting MQTT with options #{@options}") @mqtt_client ||= MQTT::Client.connect(@options) end