class MqttRails::Client
Attributes
ack_timeout[RW]
blocking[RW]
clean_session[RW]
client_id[RW]
connection_state[R]
Read Only attribute
host[RW]
Connection related attributes:
keep_alive[RW]
Timeout attributes:
mqtt_version[RW]
password[RW]
persistent[RW]
port[RW]
reconnect_delay[RW]
reconnect_limit[RW]
ssl[RW]
ssl_context[R]
username[RW]
will_payload[RW]
will_qos[RW]
will_retain[RW]
will_topic[RW]
Last will attributes:
Public Class Methods
new(*args)
click to toggle source
# File lib/mqtt_rails/client.rb, line 52 def initialize(*args) @last_packet_id = 0 @ssl_context = nil @sender = nil @handler = Handler.new @connection_helper = nil @connection_state = MQTT_CS_DISCONNECT @connection_state_mutex = Mutex.new @mqtt_thread = nil @reconnect_thread = nil @id_mutex = Mutex.new @reconnect_limit = 3 @reconnect_delay = 5 if args.last.is_a?(Hash) attr = args.pop else attr = {} end CLIENT_ATTR_DEFAULTS.merge(attr).each_pair do |k,v| self.send("#{k}=", v) end if @ssl @ssl_context = OpenSSL::SSL::SSLContext.new end if @port.nil? if @ssl @port = DEFAULT_SSL_PORT else @port = DEFAULT_PORT end end if @client_id.nil? || @client_id == "" @client_id = generate_client_id end end
Public Instance Methods
add_topic_callback(topic, callback=nil, &block)
click to toggle source
# File lib/mqtt_rails/client.rb, line 268 def add_topic_callback(topic, callback=nil, &block) @handler.register_topic_callback(topic, callback, &block) end
config_ssl_context(cert_path, key_path, ca_path=nil)
click to toggle source
# File lib/mqtt_rails/client.rb, line 98 def config_ssl_context(cert_path, key_path, ca_path=nil) @ssl ||= true @ssl_context = SSLHelper.config_ssl_context(cert_path, key_path, ca_path) end
connect(host=@host, port=@port, keep_alive=@keep_alive, persistent=@persistent, blocking=@blocking)
click to toggle source
# File lib/mqtt_rails/client.rb, line 103 def connect(host=@host, port=@port, keep_alive=@keep_alive, persistent=@persistent, blocking=@blocking) @persistent = persistent @blocking = blocking @host = host @port = port.to_i @keep_alive = keep_alive @connection_state_mutex.synchronize do @connection_state = MQTT_CS_NEW end @mqtt_thread.kill unless @mqtt_thread.nil? init_connection unless reconnect? @connection_helper.send_connect(session_params) begin init_pubsub @connection_state = @connection_helper.do_connect(reconnect?) if connected? build_pubsub daemon_mode unless @blocking end rescue LowVersionException downgrade_version end end
connected?()
click to toggle source
# File lib/mqtt_rails/client.rb, line 145 def connected? @connection_state == MQTT_CS_CONNECTED end
daemon_mode()
click to toggle source
# File lib/mqtt_rails/client.rb, line 128 def daemon_mode @mqtt_thread = Thread.new do @reconnect_thread.kill unless @reconnect_thread.nil? || !@reconnect_thread.alive? begin while connected? do mqtt_loop end rescue SystemCallError => e if @persistent reconnect else raise e end end end end
disconnect(explicit=true)
click to toggle source
# File lib/mqtt_rails/client.rb, line 217 def disconnect(explicit=true) @connection_helper.do_disconnect(@publisher, explicit, @mqtt_thread) @connection_state_mutex.synchronize do @connection_state = MQTT_CS_DISCONNECT end if explicit && @clean_session @last_packet_id = 0 @subscriber.clear_queue end MQTT_ERR_SUCCESS end
generate_client_id(prefix='paho_ruby', lenght=16)
click to toggle source
# File lib/mqtt_rails/client.rb, line 93 def generate_client_id(prefix='paho_ruby', lenght=16) charset = Array('A'..'Z') + Array('a'..'z') + Array('0'..'9') @client_id = prefix << Array.new(lenght) { charset.sample }.join end
loop_misc()
click to toggle source
# File lib/mqtt_rails/client.rb, line 188 def loop_misc if @connection_helper.check_keep_alive(@persistent, @keep_alive) == MQTT_CS_DISCONNECT reconnect if check_persistence end @publisher.check_waiting_publisher @subscriber.check_waiting_subscriber sleep SELECT_TIMEOUT end
loop_read()
click to toggle source
# File lib/mqtt_rails/client.rb, line 165 def loop_read begin MAX_QUEUE.times do result = @handler.receive_packet break if result.nil? end rescue FullQueueException Rails.logger.warn("[MQTT RAILS][WARNING] Early exit in reading loop. The maximum packets have been reach for #{packet.type_name}") rescue ReadingException if check_persistence reconnect else raise ReadingException end end end
loop_write()
click to toggle source
# File lib/mqtt_rails/client.rb, line 153 def loop_write begin @sender.writing_loop rescue WritingException if check_persistence reconnect else raise WritingException end end end
mqtt_loop()
click to toggle source
# File lib/mqtt_rails/client.rb, line 182 def mqtt_loop loop_read loop_write loop_misc end
on_connack(&block)
click to toggle source
# File lib/mqtt_rails/client.rb, line 276 def on_connack(&block) @handler.on_connack = block if block_given? @handler.on_connack end
on_connack=(callback)
click to toggle source
# File lib/mqtt_rails/client.rb, line 316 def on_connack=(callback) @handler.on_connack = callback if callback.is_a?(Proc) end
on_message(&block)
click to toggle source
# File lib/mqtt_rails/client.rb, line 311 def on_message(&block) @handler.on_message = block if block_given? @handler.on_message end
on_message=(callback)
click to toggle source
# File lib/mqtt_rails/client.rb, line 344 def on_message=(callback) @handler.on_message = callback if callback.is_a?(Proc) end
on_puback(&block)
click to toggle source
# File lib/mqtt_rails/client.rb, line 291 def on_puback(&block) @handler.on_puback = block if block_given? @handler.on_puback end
on_puback=(callback)
click to toggle source
# File lib/mqtt_rails/client.rb, line 328 def on_puback=(callback) @handler.on_puback = callback if callback.is_a?(Proc) end
on_pubcomp(&block)
click to toggle source
# File lib/mqtt_rails/client.rb, line 306 def on_pubcomp(&block) @handler.on_pubcomp = block if block_given? @handler.on_pubcomp end
on_pubcomp=(callback)
click to toggle source
# File lib/mqtt_rails/client.rb, line 340 def on_pubcomp=(callback) @handler.on_pubcomp = callback if callback.is_a?(Proc) end
on_pubrec(&block)
click to toggle source
# File lib/mqtt_rails/client.rb, line 296 def on_pubrec(&block) @handler.on_pubrec = block if block_given? @handler.on_pubrec end
on_pubrec=(callback)
click to toggle source
# File lib/mqtt_rails/client.rb, line 332 def on_pubrec=(callback) @handler.on_pubrec = callback if callback.is_a?(Proc) end
on_pubrel(&block)
click to toggle source
# File lib/mqtt_rails/client.rb, line 301 def on_pubrel(&block) @handler.on_pubrel = block if block_given? @handler.on_pubrel end
on_pubrel=(callback)
click to toggle source
# File lib/mqtt_rails/client.rb, line 336 def on_pubrel=(callback) @handler.on_pubrel = callback if callback.is_a?(Proc) end
on_suback(&block)
click to toggle source
# File lib/mqtt_rails/client.rb, line 281 def on_suback(&block) @handler.on_suback = block if block_given? @handler.on_suback end
on_suback=(callback)
click to toggle source
# File lib/mqtt_rails/client.rb, line 320 def on_suback=(callback) @handler.on_suback = callback if callback.is_a?(Proc) end
on_unsuback(&block)
click to toggle source
# File lib/mqtt_rails/client.rb, line 286 def on_unsuback(&block) @handler.on_unsuback = block if block_given? @handler.on_unsuback end
on_unsuback=(callback)
click to toggle source
# File lib/mqtt_rails/client.rb, line 324 def on_unsuback=(callback) @handler.on_unsuback = callback if callback.is_a?(Proc) end
ping_host()
click to toggle source
# File lib/mqtt_rails/client.rb, line 264 def ping_host @sender.send_pingreq end
publish(topic, payload="", retain=false, qos=0)
click to toggle source
# File lib/mqtt_rails/client.rb, line 229 def publish(topic, payload="", retain=false, qos=0) if topic == "" || !topic.is_a?(String) Rails.logger.error("[MQTT RAILS][ERROR] Publish topics is invalid, not a string or empty.") raise ArgumentError end id = next_packet_id @publisher.send_publish(topic, payload, retain, qos, id) end
reconnect()
click to toggle source
# File lib/mqtt_rails/client.rb, line 197 def reconnect @reconnect_thread = Thread.new do counter = 0 while (@reconnect_limit >= counter || @reconnect_limit == -1) do counter += 1 Rails.logger.info("[MQTT RAILS][INFO] New reconnect attempt...") connect if connected? break else sleep @reconnect_delay end end unless connected? Rails.logger.error("[MQTT RAILS][ERROR] Reconnection attempt counter is over. (#{@reconnect_limit} times)") disconnect(false) end end end
reconnect?()
click to toggle source
# File lib/mqtt_rails/client.rb, line 149 def reconnect? Thread.current == @reconnect_thread end
registered_callback()
click to toggle source
# File lib/mqtt_rails/client.rb, line 348 def registered_callback @handler.registered_callback end
remove_topic_callback(topic)
click to toggle source
# File lib/mqtt_rails/client.rb, line 272 def remove_topic_callback(topic) @handler.clear_topic_callback(topic) end
subscribe(*topics)
click to toggle source
# File lib/mqtt_rails/client.rb, line 238 def subscribe(*topics) begin id = next_packet_id unless @subscriber.send_subscribe(topics, id) == MqttRails::MQTT_ERR_SUCCESS reconnect if check_persistence end MQTT_ERR_SUCCESS rescue ProtocolViolation Rails.logger.error("[MQTT RAILS][ERROR] Subscribe topics need one topic or a list of topics.") raise ProtocolViolation end end
subscribed_topics()
click to toggle source
# File lib/mqtt_rails/client.rb, line 352 def subscribed_topics @subscriber.subscribed_topics end
unsubscribe(*topics)
click to toggle source
# File lib/mqtt_rails/client.rb, line 251 def unsubscribe(*topics) begin id = next_packet_id unless @subscriber.send_unsubscribe(topics, id) == MQTT_ERR_SUCCESS reconnect if check_persistence end MQTT_ERR_SUCCESS rescue ProtocolViolation Rails.logger.error("[MQTT RAILS][ERROR] Unsubscribe need at least one topic.") raise ProtocolViolation end end
Private Instance Methods
build_pubsub()
click to toggle source
# File lib/mqtt_rails/client.rb, line 382 def build_pubsub @subscriber.config_subscription(next_packet_id) @sender.flush_waiting_packet @publisher.config_all_message_queue end
check_persistence()
click to toggle source
# File lib/mqtt_rails/client.rb, line 409 def check_persistence disconnect(false) @persistent end
downgrade_version()
click to toggle source
# File lib/mqtt_rails/client.rb, line 366 def downgrade_version Rails.logger.info("[MQTT RAILS][INFO] Connection refused: unacceptable protocol version #{@mqtt_version}, trying 3.1") if @mqtt_version != "3.1" @mqtt_version = "3.1" connect(@host, @port, @keep_alive) else raise ProtocolVersionException.new("Unsupported MQTT version") end end
init_connection()
click to toggle source
# File lib/mqtt_rails/client.rb, line 388 def init_connection @connection_helper = ConnectionHelper.new(@host, @port, @ssl, @ssl_context, @ack_timeout) @connection_helper.handler = @handler @sender = @connection_helper.sender end
init_pubsub()
click to toggle source
# File lib/mqtt_rails/client.rb, line 376 def init_pubsub @subscriber.nil? ? @subscriber = Subscriber.new(@sender) : @subscriber.sender = @sender @publisher.nil? ? @publisher = Publisher.new(@sender) : @publisher.sender = @sender @handler.config_pubsub(@publisher, @subscriber) end
next_packet_id()
click to toggle source
# File lib/mqtt_rails/client.rb, line 359 def next_packet_id @id_mutex.synchronize do @last_packet_id = 0 if @last_packet_id >= MAX_PACKET_ID @last_packet_id = @last_packet_id.next end end
session_params()
click to toggle source
# File lib/mqtt_rails/client.rb, line 394 def session_params { :version => @mqtt_version, :clean_session => @clean_session, :keep_alive => @keep_alive, :client_id => @client_id, :username => @username, :password => @password, :will_topic => @will_topic, :will_payload => @will_payload, :will_qos => @will_qos, :will_retain => @will_retain } end