class LosantMqtt::DeviceConnection
Attributes
state[R]
Public Class Methods
connect(options = {})
click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 31 def self.connect(options = {}) options = { host: "localhost", port: MQTT::DEFAULT_PORT }.merge(options) EventMachine.connect(options[:host], options[:port], self, options) end
new(options = {})
click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 36 def initialize(options = {}) @options = { client_id: MQTT::Client.generate_client_id, keep_alive: 15, clean_session: true, username: nil, password: nil, version: "3.1.1" }.merge(options) @subscriptions = {} end
Public Instance Methods
connect_ack(packet)
click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 118 def connect_ack(packet) if packet.return_code != 0x00 @ex = MQTT::ProtocolException.new("Authentication Error - " + packet.return_msg) return close_connection end @state = :connected if @options[:keep_alive] > 0 @timer = EventMachine::PeriodicTimer.new(@options[:keep_alive]) do if(Time.now.to_i - @last_received > @options[:keep_alive]) @ex = MQTT::NotConnectedException.new("Keep alive failure, disconnecting") close_connection else send_packet(MQTT::Packet::Pingreq.new) end end end emit(@state) end
connected?()
click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 48 def connected? state == :connected end
connection_completed()
click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 190 def connection_completed if @options[:secure] @last_seen_cert = nil @certificate_store = OpenSSL::X509::Store.new @certificate_store.add_file(CA_FILE_PATH) start_tls(:verify_peer => true) else send_connect_packet end end
disconnect(send_msg: true)
click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 71 def disconnect(send_msg: true) return if @state == :disconnecting || @state == :disconnected packet = connected? && send_msg && MQTT::Packet::Disconnect.new @state = :disconnecting emit(@state) packet ? send_packet(packet) : close_connection end
next_packet_id()
click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 140 def next_packet_id @packet_id += 1 end
post_init()
click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 180 def post_init @state = :connecting @last_received = 0 @packet_id = 0 @packet = nil @data = "" @ex = nil emit(@state) end
process_packet(packet)
click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 92 def process_packet(packet) @last_received = Time.now.to_i if state == :connect_sent && packet.class == MQTT::Packet::Connack connect_ack(packet) elsif state == :connected && packet.class == MQTT::Packet::Pingresp # Pong! elsif state == :connected && packet.class == MQTT::Packet::Publish @subscriptions[packet.topic].call(packet.payload) if @subscriptions[packet.topic] elsif state == :connected && packet.class == MQTT::Packet::Puback # publish acked elsif state == :connected && packet.class == MQTT::Packet::Suback # Subscribed! elsif state == :connected && packet.class == MQTT::Packet::Unsuback # Unsubscribed! else # CONNECT only sent by client # SUBSCRIBE only sent by client # PINGREQ only sent by client # UNSUBSCRIBE only sent by client # DISCONNECT only sent by client # PUBREC/PUBREL/PUBCOMP for QOS2 - do not support @ex = MQTT::ProtocolException.new("Wasn't expecting packet of type #{packet.class} when in state #{state}") close_connection end end
publish(topic, payload)
click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 52 def publish(topic, payload) send_packet(MQTT::Packet::Publish.new( id: next_packet_id, qos: 0, retain: false, topic: topic, payload: payload)) end
receive_data(data)
click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 163 def receive_data(data) @data << data # Are we at the start of a new packet? if !@packet && @data.length >= 2 @packet = MQTT::Packet.parse_header(@data) end # Do we have the the full packet body now? if @packet && @data.length >= @packet.body_length @packet.parse_body(@data.slice!(0...@packet.body_length)) process_packet(@packet) @packet = nil receive_data("") end end
send_connect_packet()
click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 79 def send_connect_packet packet = MQTT::Packet::Connect.new( client_id: @options[:client_id], clean_session: @options[:clean_session], keep_alive: @options[:keep_alive], username: @options[:username], password: @options[:password], version: @options[:version]) send_packet(packet) @state = :connect_sent emit(@state) end
send_packet(packet)
click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 144 def send_packet(packet) send_data(packet.to_s) end
ssl_handshake_completed()
click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 220 def ssl_handshake_completed unless OpenSSL::SSL.verify_certificate_identity(@last_seen_cert, @options[:host]) @ex = OpenSSL::OpenSSLError.new("The hostname #{@options[:host]} does not match the server certificate") return close_connection end send_connect_packet end
ssl_verify_peer(cert_string)
click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 201 def ssl_verify_peer(cert_string) @last_seen_cert = OpenSSL::X509::Certificate.new(cert_string) unless @certificate_store.verify(@last_seen_cert) @ex = OpenSSL::OpenSSLError.new("Unable to verify the certificate for #{@options[:host]}") return false end begin @certificate_store.add_cert(@last_seen_cert) rescue OpenSSL::X509::StoreError => e unless e.message == "cert already in hash table" @ex = e return false end end true end
subscribe(topic, &block)
click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 61 def subscribe(topic, &block) @subscriptions[topic] = block send_packet(MQTT::Packet::Subscribe.new(id: next_packet_id, topics: [topic])) end
unbind(msg)
click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 148 def unbind(msg) if @timer @timer.cancel @timer = nil end unless @state == :disconnecting @ex ||= $! || MQTT::NotConnectedException.new("Connection to server lost") end @state = :disconnected emit(@state, @ex) @ex = nil end
unsubscribe(topic)
click to toggle source
# File lib/losant_mqtt/device_connection.rb, line 66 def unsubscribe(topic) @subscriptions.delete(topic) send_packet(MQTT::Packet::Unsubscribe.new(id: next_packet_id, topics: [topic])) end