class EventMachine::MQTTSN::GatewayHandler
There is only a single instance of GatewayHandler
which processes UDP packets from all MQTT-SN clients.
Attributes
connections[R]
logger[R]
server_address[R]
server_port[R]
Public Class Methods
new(attr)
click to toggle source
# File lib/em/mqtt-sn/gateway_handler.rb, line 12 def initialize(attr) @connections = {} attr.each_pair do |k,v| instance_variable_set("@#{k}", v) end # Run the cleanup task periodically EventMachine.add_periodic_timer(10) { cleanup } end
Public Instance Methods
cleanup()
click to toggle source
Periodic task to cleanup dead connections
# File lib/em/mqtt-sn/gateway_handler.rb, line 208 def cleanup connections.each_pair do |key,connection| unless connection.connected? logger.debug("Destroying connection: #{connection.client_id}") @connections.delete(key) end end end
connect(peername, packet)
click to toggle source
CONNECT received from client - establish connection to server
# File lib/em/mqtt-sn/gateway_handler.rb, line 62 def connect(peername, packet) # If connection already exists, disconnect first if @connections.has_key?(peername) logger.warn("Received CONNECT while already connected") @connections[peername].disconnect end # Create a TCP connection to the server client_port, client_address = Socket.unpack_sockaddr_in(peername) connection = EventMachine::connect( server_address, server_port, EventMachine::MQTTSN::ServerConnection, self, client_address, client_port ) # Store the client ID connection.client_id = packet.client_id # Send a MQTT connect packet to the server connection.send_packet MQTT::Packet::Connect.new( :client_id => packet.client_id, :keep_alive => packet.keep_alive, :clean_session => packet.clean_session ) # Add the connection to the table @connections[peername] = connection end
disconnect(connection)
click to toggle source
Disconnect client from server
# File lib/em/mqtt-sn/gateway_handler.rb, line 198 def disconnect(connection) if connection.connected? logger.info("Disconnected: #{connection.client_id}") mqttsn_packet = EventMachine::MQTTSN::Packet::Disconnect.new send_datagram(mqttsn_packet.to_s, connection.client_address, connection.client_port) connection.disconnect end end
process_packet(peername, packet)
click to toggle source
Incoming packet received from client
# File lib/em/mqtt-sn/gateway_handler.rb, line 31 def process_packet(peername, packet) logger.debug("Received MQTT-SN: #{packet.class}") if packet.class == EventMachine::MQTTSN::Packet::Connect connect(peername, packet) else connection = @connections[peername] unless connection.nil? or !connection.connected? case packet when EventMachine::MQTTSN::Packet::Register register(connection, packet) when EventMachine::MQTTSN::Packet::Publish publish(connection, packet) when EventMachine::MQTTSN::Packet::Subscribe subscribe(connection, packet) when EventMachine::MQTTSN::Packet::Pingreq connection.send_packet MQTT::Packet::Pingreq.new when EventMachine::MQTTSN::Packet::Pingresp connection.send_packet MQTT::Packet::Pingresp.new when EventMachine::MQTTSN::Packet::Disconnect disconnect(connection) else logger.warn("Unable to handle MQTT-SN packet of type: #{packet.class}") end else logger.warn("Received MQTT-SN packet of type: #{packet.class} while not connected") end end end
publish(connection, packet)
click to toggle source
PUBLISH received from client - pass it on to the server
# File lib/em/mqtt-sn/gateway_handler.rb, line 165 def publish(connection, packet) if packet.topic_id_type == :short topic_name = packet.topic_id elsif packet.topic_id_type == :normal topic_name = connection.get_topic_name(packet.topic_id) end if topic_name logger.info("#{connection.client_id} publishing to '#{topic_name}'") connection.send_packet MQTT::Packet::Publish.new( :topic => topic_name, :payload => packet.data, :retain => packet.retain, :qos => packet.qos ) else # FIXME: disconnect? logger.warn("Invalid topic ID: #{packet.topic_id}") end end
receive_data(data)
click to toggle source
UDP packet received by gateway
# File lib/em/mqtt-sn/gateway_handler.rb, line 23 def receive_data(data) packet = EventMachine::MQTTSN::Packet.parse(data) unless packet.nil? process_packet(get_peername, packet) end end
register(connection, packet)
click to toggle source
REGISTER received from client
# File lib/em/mqtt-sn/gateway_handler.rb, line 148 def register(connection, packet) regack = EventMachine::MQTTSN::Packet::Regack.new( :topic_id_type => :normal, :id => packet.id ) topic_id_type, topic_id = connection.get_topic_id(packet.topic_name) unless topic_id.nil? regack.return_code = 0x00 # Accepted regack.topic_id = topic_id else regack.return_code = 0x02 # Rejected: invalid topic ID end send_data(regack.to_s) end
relay_from_server(connection, packet)
click to toggle source
Handle a MQTT packet coming back from the server
# File lib/em/mqtt-sn/gateway_handler.rb, line 92 def relay_from_server(connection, packet) logger.debug("Received MQTT: #{packet.inspect}") case packet when MQTT::Packet::Connack # FIXME: re-map the return code mqttsn_packet = EventMachine::MQTTSN::Packet::Connack.new( :return_code => packet.return_code ) if packet.return_code == 0 logger.info("#{connection.client_id} is now connected") else logger.info("#{connection.client_id} failed to connect: #{packet.return_msg}") end when MQTT::Packet::Suback # Check that it is a response to a request we made request = connection.remove_from_pending(packet.id) if request logger.debug("#{connection.client_id} now subscribed to '#{request.topic_name}'") topic_id_type, topic_id = connection.get_topic_id(request.topic_name) mqttsn_packet = EventMachine::MQTTSN::Packet::Suback.new( :topic_id_type => topic_id_type, :topic_id => topic_id, :qos => packet.granted_qos.first, :id => packet.id, :return_code => 0x00 ) else logger.warn("Received Suback from server for something we didn't request: #{packet.inspect}") end when MQTT::Packet::Publish logger.info("#{connection.client_id} recieved publish to '#{packet.topic}'") # FIXME: send register if this is a new topic topic_id_type, topic_id = connection.get_topic_id(packet.topic) mqttsn_packet = EventMachine::MQTTSN::Packet::Publish.new( :duplicate => packet.duplicate, :qos => packet.qos, :retain => packet.retain, :topic_id_type => topic_id_type, :topic_id => topic_id, :id => packet.id, :data => packet.payload ) when MQTT::Packet::Pingreq mqttsn_packet = EventMachine::MQTTSN::Packet::Pingreq.new when MQTT::Packet::Pingresp mqttsn_packet = EventMachine::MQTTSN::Packet::Pingresp.new else logger.warn("Unable to handle MQTT packet of type: #{packet.class}") end unless mqttsn_packet.nil? send_datagram(mqttsn_packet.to_s, connection.client_address, connection.client_port) end end
subscribe(connection, packet)
click to toggle source
SUBSCRIBE received from client - pass it on to the server
# File lib/em/mqtt-sn/gateway_handler.rb, line 187 def subscribe(connection, packet) logger.info("#{connection.client_id} subscribing to '#{packet.topic_name}'") mqtt_packet = MQTT::Packet::Subscribe.new( :id => packet.id, :topics => packet.topic_name ) connection.add_to_pending(packet) connection.send_packet(mqtt_packet) end