class LogStash::Inputs::Mqtt
Generate a repeating message.
This plugin is intented only as an example.
Public Instance Methods
register()
click to toggle source
# File lib/logstash/inputs/mqtt.rb, line 42 def register @logstash_host = Socket.gethostname end
run(queue)
click to toggle source
# File lib/logstash/inputs/mqtt.rb, line 46 def run(queue) PahoMqtt.logger = @logfile unless @logfile.nil? #levels: DEBUG < INFO < WARN < ERROR < FATAL < UNKNOWN PahoMqtt.logger.level = @log_level unless @logfile.nil? @client = PahoMqtt::Client.new({ :host => @host, :port => @port, :persistent => @persistent, # keep connection persistent :mqtt_version => @mqtt_version, :clean_session => @clean_session, :client_id => @client_id, :username => @username, :password => @password, :ssl => @ssl, :will_topic => @will_topic, :will_payload => @will_payload, :will_qos => @will_qos, :will_retain => @will_retain, :reconnect_limit => @reconnect_retries, :reconnect_delay => @reconnect_sleep_time, }) if @ssl @client.config_ssl_context(@certificate_path, @key_path, @root_ca_path) end @client.on_message do |message| @codec.decode(message.payload) do |event| host = event.get("host") || @logstash_host event.set("host", host) event.set("topic", message.topic) decorate(event) queue << event end end is_connected = false begin @client.connect is_connected = true rescue PahoMqtt::Exception => e @logger.warn("Error while setting up connection for MQTT broker! Retrying.", :message => e.message, :class => e.class.name, :location => e.backtrace.first ) Stud.stoppable_sleep(1) { stop? } retry end if is_connected # subscribe to topic @client.subscribe([@topic, @qos]) Stud.stoppable_sleep(1) { stop? } while !stop? end end