class LosantMqtt::DeviceConnection

Attributes

state[R]

Public Class Methods

connect(options = {}) click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 31
def self.connect(options = {})
  options = { host: "localhost", port: MQTT::DEFAULT_PORT }.merge(options)
  EventMachine.connect(options[:host], options[:port], self, options)
end
new(options = {}) click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 36
def initialize(options = {})
  @options = {
    client_id:     MQTT::Client.generate_client_id,
    keep_alive:    15,
    clean_session: true,
    username:      nil,
    password:      nil,
    version:       "3.1.1"
  }.merge(options)
  @subscriptions = {}
end

Public Instance Methods

connect_ack(packet) click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 118
def connect_ack(packet)
  if packet.return_code != 0x00
    @ex = MQTT::ProtocolException.new("Authentication Error - " + packet.return_msg)
    return close_connection
  end

  @state = :connected

  if @options[:keep_alive] > 0
    @timer = EventMachine::PeriodicTimer.new(@options[:keep_alive]) do
      if(Time.now.to_i - @last_received > @options[:keep_alive])
        @ex = MQTT::NotConnectedException.new("Keep alive failure, disconnecting")
        close_connection
      else
        send_packet(MQTT::Packet::Pingreq.new)
      end
    end
  end

  emit(@state)
end
connected?() click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 48
def connected?
  state == :connected
end
connection_completed() click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 190
def connection_completed
  if @options[:secure]
    @last_seen_cert    = nil
    @certificate_store = OpenSSL::X509::Store.new
    @certificate_store.add_file(CA_FILE_PATH)
    start_tls(:verify_peer => true)
  else
    send_connect_packet
  end
end
disconnect(send_msg: true) click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 71
def disconnect(send_msg: true)
  return if @state == :disconnecting || @state == :disconnected
  packet = connected? && send_msg && MQTT::Packet::Disconnect.new
  @state = :disconnecting
  emit(@state)
  packet ? send_packet(packet) : close_connection
end
next_packet_id() click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 140
def next_packet_id
  @packet_id += 1
end
post_init() click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 180
def post_init
  @state         = :connecting
  @last_received = 0
  @packet_id     = 0
  @packet        = nil
  @data          = ""
  @ex            = nil
  emit(@state)
end
process_packet(packet) click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 92
def process_packet(packet)
  @last_received = Time.now.to_i
  if state == :connect_sent && packet.class == MQTT::Packet::Connack
    connect_ack(packet)
  elsif state == :connected && packet.class == MQTT::Packet::Pingresp
    # Pong!
  elsif state == :connected && packet.class == MQTT::Packet::Publish
    @subscriptions[packet.topic].call(packet.payload) if @subscriptions[packet.topic]
  elsif state == :connected && packet.class == MQTT::Packet::Puback
    # publish acked
  elsif state == :connected && packet.class == MQTT::Packet::Suback
    # Subscribed!
  elsif state == :connected && packet.class == MQTT::Packet::Unsuback
    # Unsubscribed!
  else
    # CONNECT only sent by client
    # SUBSCRIBE only sent by client
    # PINGREQ only sent by client
    # UNSUBSCRIBE only sent by client
    # DISCONNECT only sent by client
    # PUBREC/PUBREL/PUBCOMP for QOS2 - do not support
    @ex = MQTT::ProtocolException.new("Wasn't expecting packet of type #{packet.class} when in state #{state}")
    close_connection
  end
end
publish(topic, payload) click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 52
def publish(topic, payload)
  send_packet(MQTT::Packet::Publish.new(
    id:      next_packet_id,
    qos:     0,
    retain:  false,
    topic:   topic,
    payload: payload))
end
receive_data(data) click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 163
def receive_data(data)
  @data << data

  # Are we at the start of a new packet?
  if !@packet && @data.length >= 2
    @packet = MQTT::Packet.parse_header(@data)
  end

  # Do we have the the full packet body now?
  if @packet && @data.length >= @packet.body_length
    @packet.parse_body(@data.slice!(0...@packet.body_length))
    process_packet(@packet)
    @packet = nil
    receive_data("")
  end
end
send_connect_packet() click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 79
def send_connect_packet
  packet = MQTT::Packet::Connect.new(
    client_id:     @options[:client_id],
    clean_session: @options[:clean_session],
    keep_alive:    @options[:keep_alive],
    username:      @options[:username],
    password:      @options[:password],
    version:       @options[:version])
  send_packet(packet)
  @state = :connect_sent
  emit(@state)
end
send_packet(packet) click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 144
def send_packet(packet)
  send_data(packet.to_s)
end
ssl_handshake_completed() click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 220
def ssl_handshake_completed
  unless OpenSSL::SSL.verify_certificate_identity(@last_seen_cert, @options[:host])
    @ex = OpenSSL::OpenSSLError.new("The hostname #{@options[:host]} does not match the server certificate")
    return close_connection
  end

  send_connect_packet
end
ssl_verify_peer(cert_string) click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 201
def ssl_verify_peer(cert_string)
  @last_seen_cert = OpenSSL::X509::Certificate.new(cert_string)
  unless @certificate_store.verify(@last_seen_cert)
    @ex = OpenSSL::OpenSSLError.new("Unable to verify the certificate for #{@options[:host]}")
    return false
  end

  begin
    @certificate_store.add_cert(@last_seen_cert)
  rescue OpenSSL::X509::StoreError => e
    unless e.message == "cert already in hash table"
      @ex = e
      return false
    end
  end

  true
end
subscribe(topic, &block) click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 61
def subscribe(topic, &block)
  @subscriptions[topic] = block
  send_packet(MQTT::Packet::Subscribe.new(id: next_packet_id, topics: [topic]))
end
unbind(msg) click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 148
def unbind(msg)
  if @timer
    @timer.cancel
    @timer = nil
  end

  unless @state == :disconnecting
    @ex ||= $! || MQTT::NotConnectedException.new("Connection to server lost")
  end

  @state = :disconnected
  emit(@state, @ex)
  @ex = nil
end
unsubscribe(topic) click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 66
def unsubscribe(topic)
  @subscriptions.delete(topic)
  send_packet(MQTT::Packet::Unsubscribe.new(id: next_packet_id, topics: [topic]))
end