class LogStash::Outputs::MQTT

This is Logstash output plugin for the mqtt.org/[MQTT] protocol.

Features:

Example publishing to test.mosquitto.org:

source,ruby

output {

mqtt {
  host => "test.mosquitto.org"
  port => 8883
  topic => "hello"
}

}


Example publishing to aws.amazon.com/iot/[AWS IoT]:

source,ruby

output {

mqtt {
  host => "somehostname.iot.someregion.amazonaws.com"
  port => 8883
  topic => "hello"
  client_id => "clientidfromaws"
  ssl => true
  cert_file => "certificate.pem.crt"
  key_file => "private.pem.key"
  ca_file => "root-CA.crt"
}

}


Topic may also depend on parts of the event using the standard sprintf syntax.

source,ruby

output {

mqtt {
  ...
  topic => "something/%{myfield}"
}

}


Public Instance Methods

close() click to toggle source
# File lib/logstash/outputs/mqtt.rb, line 153
def close
  @closing = true
end
multi_receive(events) click to toggle source
# File lib/logstash/outputs/mqtt.rb, line 147
def multi_receive(events)
  events.each { |event| @codec.encode(event) }
  # Handle all events at once to prevent taking a new connection for each event
  handle_events
end
receive(event) click to toggle source
# File lib/logstash/outputs/mqtt.rb, line 142
def receive(event)
  @codec.encode(event)
  handle_events
end
register() click to toggle source
# File lib/logstash/outputs/mqtt.rb, line 105
def register
  @options = {
    :host => @host
  }
  if @port
    @options[:port] = @port
  end
  if @client_id
    @options[:client_id] = @client_id
  end
  if @username
    @options[:username] = @username
  end
  if @password
    @options[:password] = @password
  end
  if @ssl
    @options[:ssl] = @ssl
  end
  if @cert_file
    @options[:cert_file] = @cert_file
  end
  if @key_file
    @options[:key_file] = @key_file
  end
  if @ca_file
    @options[:ca_file] = @ca_file
  end

  # Encode events using the given codec
  # Use an array as a buffer so the multi_receive can handle multiple events with a single connection
  @event_buffer = Array.new
  @codec.on_event do |event, encoded_event|
    @event_buffer.push([event, encoded_event])
  end
end

Private Instance Methods

handle_events() click to toggle source
# File lib/logstash/outputs/mqtt.rb, line 164
def handle_events
  # It is easy to cope with network failures, ie. if connection fails just try it again
  while event = @event_buffer.first do
    @logger.debug("Publishing MQTT event #{event[1]} with topic #{@topic}, retain #{@retain}, qos #{@qos}")
    mqtt_client.publish(event[0].sprintf(@topic), event[1], @retain, @qos)
    @event_buffer.shift
  end
rescue StandardError => e
  @logger.error("Error #{e.message} while publishing to MQTT server. Will retry in #{@connect_retry_interval} seconds.")
  @mqtt_client = nil
  Stud.stoppable_sleep(@connect_retry_interval, 1) { @closing }
  retry
end
mqtt_client() click to toggle source
# File lib/logstash/outputs/mqtt.rb, line 159
def mqtt_client
  @logger.debug("Connecting MQTT with options #{@options}")
  @mqtt_client ||= MQTT::Client.connect(@options)
end