class MqttRails::Client

Attributes

ack_timeout[RW]
blocking[RW]
clean_session[RW]
client_id[RW]
connection_state[R]

Read Only attribute

host[RW]

Connection related attributes:

keep_alive[RW]

Timeout attributes:

mqtt_version[RW]
password[RW]
persistent[RW]
port[RW]
reconnect_delay[RW]
reconnect_limit[RW]
ssl[RW]
ssl_context[R]
username[RW]
will_payload[RW]
will_qos[RW]
will_retain[RW]
will_topic[RW]

Last will attributes:

Public Class Methods

new(*args) click to toggle source
# File lib/mqtt_rails/client.rb, line 52
def initialize(*args)
  @last_packet_id         = 0
  @ssl_context            = nil
  @sender                 = nil
  @handler                = Handler.new
  @connection_helper      = nil
  @connection_state       = MQTT_CS_DISCONNECT
  @connection_state_mutex = Mutex.new
  @mqtt_thread            = nil
  @reconnect_thread       = nil
  @id_mutex               = Mutex.new
  @reconnect_limit        = 3
  @reconnect_delay        = 5

  if args.last.is_a?(Hash)
    attr = args.pop
  else
    attr = {}
  end

  CLIENT_ATTR_DEFAULTS.merge(attr).each_pair do |k,v|
    self.send("#{k}=", v)
  end

  if @ssl
    @ssl_context = OpenSSL::SSL::SSLContext.new
  end

  if @port.nil?
    if @ssl
      @port = DEFAULT_SSL_PORT
    else
      @port = DEFAULT_PORT
    end
  end

  if  @client_id.nil? || @client_id == ""
    @client_id = generate_client_id
  end
end

Public Instance Methods

add_topic_callback(topic, callback=nil, &block) click to toggle source
# File lib/mqtt_rails/client.rb, line 268
def add_topic_callback(topic, callback=nil, &block)
  @handler.register_topic_callback(topic, callback, &block)
end
config_ssl_context(cert_path, key_path, ca_path=nil) click to toggle source
# File lib/mqtt_rails/client.rb, line 98
def config_ssl_context(cert_path, key_path, ca_path=nil)
  @ssl ||= true
  @ssl_context = SSLHelper.config_ssl_context(cert_path, key_path, ca_path)
end
connect(host=@host, port=@port, keep_alive=@keep_alive, persistent=@persistent, blocking=@blocking) click to toggle source
# File lib/mqtt_rails/client.rb, line 103
def connect(host=@host, port=@port, keep_alive=@keep_alive, persistent=@persistent, blocking=@blocking)
  @persistent = persistent
  @blocking   = blocking
  @host       = host
  @port       = port.to_i
  @keep_alive = keep_alive
  @connection_state_mutex.synchronize do
    @connection_state = MQTT_CS_NEW
  end
  @mqtt_thread.kill unless @mqtt_thread.nil?

  init_connection unless reconnect?
  @connection_helper.send_connect(session_params)
  begin
    init_pubsub
    @connection_state = @connection_helper.do_connect(reconnect?)
    if connected?
      build_pubsub
      daemon_mode unless @blocking
    end
  rescue LowVersionException
    downgrade_version
  end
end
connected?() click to toggle source
# File lib/mqtt_rails/client.rb, line 145
def connected?
  @connection_state == MQTT_CS_CONNECTED
end
daemon_mode() click to toggle source
# File lib/mqtt_rails/client.rb, line 128
def daemon_mode
  @mqtt_thread = Thread.new do
    @reconnect_thread.kill unless @reconnect_thread.nil? || !@reconnect_thread.alive?
    begin
      while connected? do
        mqtt_loop
      end
    rescue SystemCallError => e
      if @persistent
        reconnect
      else
        raise e
      end
    end
  end
end
disconnect(explicit=true) click to toggle source
# File lib/mqtt_rails/client.rb, line 217
def disconnect(explicit=true)
  @connection_helper.do_disconnect(@publisher, explicit, @mqtt_thread)
  @connection_state_mutex.synchronize do
    @connection_state = MQTT_CS_DISCONNECT
  end
  if explicit && @clean_session
    @last_packet_id = 0
    @subscriber.clear_queue
  end
  MQTT_ERR_SUCCESS
end
generate_client_id(prefix='paho_ruby', lenght=16) click to toggle source
# File lib/mqtt_rails/client.rb, line 93
def generate_client_id(prefix='paho_ruby', lenght=16)
  charset = Array('A'..'Z') + Array('a'..'z') + Array('0'..'9')
  @client_id = prefix << Array.new(lenght) { charset.sample }.join
end
loop_misc() click to toggle source
# File lib/mqtt_rails/client.rb, line 188
def loop_misc
  if @connection_helper.check_keep_alive(@persistent, @keep_alive) == MQTT_CS_DISCONNECT
    reconnect if check_persistence
  end
  @publisher.check_waiting_publisher
  @subscriber.check_waiting_subscriber
  sleep SELECT_TIMEOUT
end
loop_read() click to toggle source
# File lib/mqtt_rails/client.rb, line 165
def loop_read
  begin
    MAX_QUEUE.times do
      result = @handler.receive_packet
      break if result.nil?
    end
  rescue FullQueueException
    Rails.logger.warn("[MQTT RAILS][WARNING] Early exit in reading loop. The maximum packets have been reach for #{packet.type_name}")
  rescue ReadingException
    if check_persistence
      reconnect
    else
      raise ReadingException
    end
  end
end
loop_write() click to toggle source
# File lib/mqtt_rails/client.rb, line 153
def loop_write
  begin
    @sender.writing_loop
  rescue WritingException
    if check_persistence
      reconnect
    else
      raise WritingException
    end
  end
end
mqtt_loop() click to toggle source
# File lib/mqtt_rails/client.rb, line 182
def mqtt_loop
  loop_read
  loop_write
  loop_misc
end
on_connack(&block) click to toggle source
# File lib/mqtt_rails/client.rb, line 276
def on_connack(&block)
  @handler.on_connack = block if block_given?
  @handler.on_connack
end
on_connack=(callback) click to toggle source
# File lib/mqtt_rails/client.rb, line 316
def on_connack=(callback)
  @handler.on_connack = callback if callback.is_a?(Proc)
end
on_message(&block) click to toggle source
# File lib/mqtt_rails/client.rb, line 311
def on_message(&block)
  @handler.on_message = block if block_given?
  @handler.on_message
end
on_message=(callback) click to toggle source
# File lib/mqtt_rails/client.rb, line 344
def on_message=(callback)
  @handler.on_message = callback if callback.is_a?(Proc)
end
on_puback(&block) click to toggle source
# File lib/mqtt_rails/client.rb, line 291
def on_puback(&block)
  @handler.on_puback = block if block_given?
  @handler.on_puback
end
on_puback=(callback) click to toggle source
# File lib/mqtt_rails/client.rb, line 328
def on_puback=(callback)
  @handler.on_puback = callback if callback.is_a?(Proc)
end
on_pubcomp(&block) click to toggle source
# File lib/mqtt_rails/client.rb, line 306
def on_pubcomp(&block)
  @handler.on_pubcomp = block if block_given?
  @handler.on_pubcomp
end
on_pubcomp=(callback) click to toggle source
# File lib/mqtt_rails/client.rb, line 340
def on_pubcomp=(callback)
  @handler.on_pubcomp = callback if callback.is_a?(Proc)
end
on_pubrec(&block) click to toggle source
# File lib/mqtt_rails/client.rb, line 296
def on_pubrec(&block)
  @handler.on_pubrec = block if block_given?
  @handler.on_pubrec
end
on_pubrec=(callback) click to toggle source
# File lib/mqtt_rails/client.rb, line 332
def on_pubrec=(callback)
  @handler.on_pubrec = callback if callback.is_a?(Proc)
end
on_pubrel(&block) click to toggle source
# File lib/mqtt_rails/client.rb, line 301
def on_pubrel(&block)
  @handler.on_pubrel = block if block_given?
  @handler.on_pubrel
end
on_pubrel=(callback) click to toggle source
# File lib/mqtt_rails/client.rb, line 336
def on_pubrel=(callback)
  @handler.on_pubrel = callback if callback.is_a?(Proc)
end
on_suback(&block) click to toggle source
# File lib/mqtt_rails/client.rb, line 281
def on_suback(&block)
  @handler.on_suback = block if block_given?
  @handler.on_suback
end
on_suback=(callback) click to toggle source
# File lib/mqtt_rails/client.rb, line 320
def on_suback=(callback)
  @handler.on_suback = callback if callback.is_a?(Proc)
end
on_unsuback(&block) click to toggle source
# File lib/mqtt_rails/client.rb, line 286
def on_unsuback(&block)
  @handler.on_unsuback = block if block_given?
  @handler.on_unsuback
end
on_unsuback=(callback) click to toggle source
# File lib/mqtt_rails/client.rb, line 324
def on_unsuback=(callback)
  @handler.on_unsuback = callback if callback.is_a?(Proc)
end
ping_host() click to toggle source
# File lib/mqtt_rails/client.rb, line 264
def ping_host
  @sender.send_pingreq
end
publish(topic, payload="", retain=false, qos=0) click to toggle source
# File lib/mqtt_rails/client.rb, line 229
def publish(topic, payload="", retain=false, qos=0)
  if topic == "" || !topic.is_a?(String)
    Rails.logger.error("[MQTT RAILS][ERROR] Publish topics is invalid, not a string or empty.")
    raise ArgumentError
  end
  id = next_packet_id
  @publisher.send_publish(topic, payload, retain, qos, id)
end
reconnect() click to toggle source
# File lib/mqtt_rails/client.rb, line 197
def reconnect
  @reconnect_thread = Thread.new do
    counter = 0
    while (@reconnect_limit >= counter || @reconnect_limit == -1) do
      counter += 1
      Rails.logger.info("[MQTT RAILS][INFO] New reconnect attempt...")
      connect
      if connected?
        break
      else
        sleep @reconnect_delay
      end
    end
    unless connected?
      Rails.logger.error("[MQTT RAILS][ERROR] Reconnection attempt counter is over. (#{@reconnect_limit} times)")
      disconnect(false)
    end
  end
end
reconnect?() click to toggle source
# File lib/mqtt_rails/client.rb, line 149
def reconnect?
  Thread.current == @reconnect_thread
end
registered_callback() click to toggle source
# File lib/mqtt_rails/client.rb, line 348
def registered_callback
  @handler.registered_callback
end
remove_topic_callback(topic) click to toggle source
# File lib/mqtt_rails/client.rb, line 272
def remove_topic_callback(topic)
  @handler.clear_topic_callback(topic)
end
subscribe(*topics) click to toggle source
# File lib/mqtt_rails/client.rb, line 238
def subscribe(*topics)
  begin
    id = next_packet_id
    unless @subscriber.send_subscribe(topics, id) == MqttRails::MQTT_ERR_SUCCESS
      reconnect if check_persistence
    end
    MQTT_ERR_SUCCESS
  rescue ProtocolViolation
    Rails.logger.error("[MQTT RAILS][ERROR] Subscribe topics need one topic or a list of topics.")
    raise ProtocolViolation
  end
end
subscribed_topics() click to toggle source
# File lib/mqtt_rails/client.rb, line 352
def subscribed_topics
  @subscriber.subscribed_topics
end
unsubscribe(*topics) click to toggle source
# File lib/mqtt_rails/client.rb, line 251
def unsubscribe(*topics)
  begin
    id = next_packet_id
    unless @subscriber.send_unsubscribe(topics, id) == MQTT_ERR_SUCCESS
      reconnect if check_persistence
    end
    MQTT_ERR_SUCCESS
  rescue ProtocolViolation
    Rails.logger.error("[MQTT RAILS][ERROR] Unsubscribe need at least one topic.")
    raise ProtocolViolation
  end
end

Private Instance Methods

build_pubsub() click to toggle source
# File lib/mqtt_rails/client.rb, line 382
def build_pubsub
  @subscriber.config_subscription(next_packet_id)
  @sender.flush_waiting_packet
  @publisher.config_all_message_queue
end
check_persistence() click to toggle source
# File lib/mqtt_rails/client.rb, line 409
def check_persistence
  disconnect(false)
  @persistent
end
downgrade_version() click to toggle source
# File lib/mqtt_rails/client.rb, line 366
def downgrade_version
  Rails.logger.info("[MQTT RAILS][INFO] Connection refused: unacceptable protocol version #{@mqtt_version}, trying 3.1")
  if @mqtt_version != "3.1"
    @mqtt_version = "3.1"
    connect(@host, @port, @keep_alive)
  else
    raise ProtocolVersionException.new("Unsupported MQTT version")
  end
end
init_connection() click to toggle source
# File lib/mqtt_rails/client.rb, line 388
def init_connection
  @connection_helper         = ConnectionHelper.new(@host, @port, @ssl, @ssl_context, @ack_timeout)
  @connection_helper.handler = @handler
  @sender                    = @connection_helper.sender
end
init_pubsub() click to toggle source
# File lib/mqtt_rails/client.rb, line 376
def init_pubsub
  @subscriber.nil? ? @subscriber = Subscriber.new(@sender) : @subscriber.sender = @sender
  @publisher.nil? ? @publisher = Publisher.new(@sender) : @publisher.sender = @sender
  @handler.config_pubsub(@publisher, @subscriber)
end
next_packet_id() click to toggle source
# File lib/mqtt_rails/client.rb, line 359
def next_packet_id
  @id_mutex.synchronize do
    @last_packet_id = 0 if @last_packet_id >= MAX_PACKET_ID
    @last_packet_id = @last_packet_id.next
  end
end
session_params() click to toggle source
# File lib/mqtt_rails/client.rb, line 394
def session_params
  {
    :version       => @mqtt_version,
    :clean_session => @clean_session,
    :keep_alive    => @keep_alive,
    :client_id     => @client_id,
    :username      => @username,
    :password      => @password,
    :will_topic    => @will_topic,
    :will_payload  => @will_payload,
    :will_qos      => @will_qos,
    :will_retain   => @will_retain
  }
end