class MqttRails::Subscriber
Attributes
subscribed_topics[R]
Public Class Methods
new(sender)
click to toggle source
# File lib/mqtt_rails/subscriber.rb, line 20 def initialize(sender) @waiting_suback = [] @waiting_unsuback = [] @subscribed_mutex = Mutex.new @subscribed_topics = [] @suback_mutex = Mutex.new @unsuback_mutex = Mutex.new @sender = sender end
Public Instance Methods
add_subscription(max_qos, packet_id, adjust_qos)
click to toggle source
# File lib/mqtt_rails/subscriber.rb, line 55 def add_subscription(max_qos, packet_id, adjust_qos) @suback_mutex.synchronize do adjust_qos, @waiting_suback = @waiting_suback.partition { |pck| pck[:id] == packet_id } end if adjust_qos.length == 1 adjust_qos = adjust_qos.first[:packet].topics adjust_qos.each do |t| if [0, 1, 2].include?(max_qos[0]) t[1] = max_qos.shift elsif max_qos[0] == 128 adjust_qos.delete(t) else Rails.logger.error("[MQTT RAILS][ERROR] The QoS value is invalid in subscribe.") raise PacketException.new('Invalid suback QoS value') end end else Rails.logger.error("[MQTT RAILS][ERROR] The packet id is invalid, already used.") raise PacketException.new("Invalid suback packet id: #{packet_id}") end @subscribed_mutex.synchronize do @subscribed_topics.concat(adjust_qos) end return adjust_qos end
check_waiting_subscriber()
click to toggle source
# File lib/mqtt_rails/subscriber.rb, line 141 def check_waiting_subscriber @sender.check_ack_alive(@waiting_suback, @suback_mutex) @sender.check_ack_alive(@waiting_unsuback, @unsuback_mutex) end
clear_queue()
click to toggle source
# File lib/mqtt_rails/subscriber.rb, line 146 def clear_queue @waiting_suback = [] end
config_subscription(new_id)
click to toggle source
# File lib/mqtt_rails/subscriber.rb, line 34 def config_subscription(new_id) unless @subscribed_topics == [] || @subscribed_topics.nil? packet = MqttRails::Packet::Subscribe.new( :id => new_id, :topics => @subscribed_topics ) @subscribed_mutex.synchronize do @subscribed_topics = [] end @suback_mutex.synchronize do if @waiting_suback.length >= MAX_SUBACK Rails.logger.error('[MQTT RAILS][ERROR] SUBACK queue is full, could not send subscribe') return MQTT_ERR_FAILURE end @waiting_suback.push(:id => new_id, :packet => packet, :timestamp => Time.now) end @sender.append_to_writing(packet) end MQTT_ERR_SUCCESS end
remove_subscription(packet_id, to_unsub)
click to toggle source
# File lib/mqtt_rails/subscriber.rb, line 81 def remove_subscription(packet_id, to_unsub) @unsuback_mutex.synchronize do to_unsub, @waiting_unsuback = @waiting_unsuback.partition { |pck| pck[:id] == packet_id } end if to_unsub.length == 1 to_unsub = to_unsub.first[:packet].topics else Rails.logger.error("[MQTT RAILS][ERROR] The packet id is invalid, already used.") raise PacketException.new("Invalid unsuback packet id: #{packet_id}") end @subscribed_mutex.synchronize do to_unsub.each do |filter| @subscribed_topics.delete_if { |topic| MqttRails.match_filter(topic.first, filter) } end end return to_unsub end
send_subscribe(topics, new_id)
click to toggle source
# File lib/mqtt_rails/subscriber.rb, line 101 def send_subscribe(topics, new_id) unless valid_topics?(topics) == MQTT_ERR_FAIL packet = MqttRails::Packet::Subscribe.new( :id => new_id, :topics => topics ) @sender.append_to_writing(packet) @suback_mutex.synchronize do if @waiting_suback.length >= MAX_SUBACK Rails.logger.error('[MQTT RAILS][ERROR] SUBACK queue is full, could not send subscribe') return MQTT_ERR_FAILURE end @waiting_suback.push(:id => new_id, :packet => packet, :timestamp => Time.now) end MQTT_ERR_SUCCESS else raise ProtocolViolation end end
send_unsubscribe(topics, new_id)
click to toggle source
# File lib/mqtt_rails/subscriber.rb, line 121 def send_unsubscribe(topics, new_id) unless valid_topics?(topics) == MQTT_ERR_FAIL packet = MqttRails::Packet::Unsubscribe.new( :id => new_id, :topics => topics ) @sender.append_to_writing(packet) @unsuback_mutex.synchronize do if @waiting_suback.length >= MAX_UNSUBACK Rails.logger.error('[MQTT RAILS][ERROR] UNSUBACK queue is full, could not send unbsubscribe') return MQTT_ERR_FAIL end @waiting_unsuback.push(:id => new_id, :packet => packet, :timestamp => Time.now) end MQTT_ERR_SUCCESS else raise ProtocolViolation end end
sender=(sender)
click to toggle source
# File lib/mqtt_rails/subscriber.rb, line 30 def sender=(sender) @sender = sender end
valid_topics?(topics)
click to toggle source
# File lib/mqtt_rails/subscriber.rb, line 150 def valid_topics?(topics) unless topics.length == 0 topics.map do |topic| case topic when Array return MQTT_ERR_FAIL if topic.first == "" when String return MQTT_ERR_FAIL if topic == "" end end else MQTT_ERR_FAIL end MQTT_ERR_SUCCESS end