class MqttRails::Publisher
Public Class Methods
new(sender)
click to toggle source
# File lib/mqtt_rails/publisher.rb, line 18 def initialize(sender) @waiting_puback = [] @waiting_pubrec = [] @waiting_pubrel = [] @waiting_pubcomp = [] @puback_mutex = Mutex.new @pubrec_mutex = Mutex.new @pubrel_mutex = Mutex.new @pubcomp_mutex = Mutex.new @sender = sender end
Public Instance Methods
check_waiting_publisher()
click to toggle source
# File lib/mqtt_rails/publisher.rb, line 159 def check_waiting_publisher @sender.check_ack_alive(@waiting_puback, @puback_mutex) @sender.check_ack_alive(@waiting_pubrec, @pubrec_mutex) @sender.check_ack_alive(@waiting_pubrel, @pubrel_mutex) @sender.check_ack_alive(@waiting_pubcomp, @pubcomp_mutex) end
config_all_message_queue()
click to toggle source
# File lib/mqtt_rails/publisher.rb, line 144 def config_all_message_queue config_message_queue(@waiting_puback, @puback_mutex) config_message_queue(@waiting_pubrec, @pubrec_mutex) config_message_queue(@waiting_pubrel, @pubrel_mutex) config_message_queue(@waiting_pubcomp, @pubcomp_mutex) end
config_message_queue(queue, mutex)
click to toggle source
# File lib/mqtt_rails/publisher.rb, line 151 def config_message_queue(queue, mutex) mutex.synchronize do queue.each do |pck| pck[:timestamp] = Time.now end end end
do_puback(packet_id)
click to toggle source
# File lib/mqtt_rails/publisher.rb, line 90 def do_puback(packet_id) @puback_mutex.synchronize do @waiting_puback.delete_if { |pck| pck[:id] == packet_id } end MQTT_ERR_SUCCESS end
do_pubcomp(packet_id)
click to toggle source
# File lib/mqtt_rails/publisher.rb, line 137 def do_pubcomp(packet_id) @pubcomp_mutex.synchronize do @waiting_pubcomp.delete_if { |pck| pck[:id] == packet_id } end MQTT_ERR_SUCCESS end
do_publish(qos, packet_id)
click to toggle source
# File lib/mqtt_rails/publisher.rb, line 68 def do_publish(qos, packet_id) case qos when 0 when 1 send_puback(packet_id) when 2 send_pubrec(packet_id) else Rails.logger.error("[MQTT RAILS][ERROR] The packet QoS value is invalid in publish.") raise PacketException.new('Invalid publish QoS value') end MQTT_ERR_SUCCESS end
do_pubrec(packet_id)
click to toggle source
# File lib/mqtt_rails/publisher.rb, line 106 def do_pubrec(packet_id) @pubrec_mutex.synchronize do @waiting_pubrec.delete_if { |pck| pck[:id] == packet_id } end send_pubrel(packet_id) end
do_pubrel(packet_id)
click to toggle source
# File lib/mqtt_rails/publisher.rb, line 122 def do_pubrel(packet_id) @pubrel_mutex.synchronize do @waiting_pubrel.delete_if { |pck| pck[:id] == packet_id } end send_pubcomp(packet_id) end
flush_publisher()
click to toggle source
# File lib/mqtt_rails/publisher.rb, line 166 def flush_publisher @puback_mutex.synchronize do @waiting_puback = [] end @pubrec_mutex.synchronize do @waiting_pubrec = [] end @pubrel_mutex.synchronize do @waiting_pubrel = [] end @pubcomp_mutex.synchronize do @waiting_pubcomp = [] end end
push_queue(waiting_queue, queue_mutex, max_packet, packet, new_id)
click to toggle source
# File lib/mqtt_rails/publisher.rb, line 58 def push_queue(waiting_queue, queue_mutex, max_packet, packet, new_id) if waiting_queue.length >= max_packet raise FullQueueException end queue_mutex.synchronize do waiting_queue.push(:id => new_id, :packet => packet, :timestamp => Time.now) end MQTT_ERR_SUCCESS end
send_puback(packet_id)
click to toggle source
# File lib/mqtt_rails/publisher.rb, line 82 def send_puback(packet_id) packet = MqttRails::Packet::Puback.new( :id => packet_id ) @sender.append_to_writing(packet) MQTT_ERR_SUCCESS end
send_pubcomp(packet_id)
click to toggle source
# File lib/mqtt_rails/publisher.rb, line 129 def send_pubcomp(packet_id) packet = MqttRails::Packet::Pubcomp.new( :id => packet_id ) @sender.append_to_writing(packet) MQTT_ERR_SUCCESS end
send_publish(topic, payload, retain, qos, new_id)
click to toggle source
# File lib/mqtt_rails/publisher.rb, line 34 def send_publish(topic, payload, retain, qos, new_id) packet = MqttRails::Packet::Publish.new( :id => new_id, :topic => topic, :payload => payload, :retain => retain, :qos => qos ) begin case qos when 1 push_queue(@waiting_puback, @puback_mutex, MAX_QUEUE, packet, new_id) when 2 push_queue(@waiting_pubrec, @pubrec_mutex, MAX_QUEUE, packet, new_id) end rescue FullQueueException Rails.logger.warn("[MQTT RAILS][WARNING] PUBLISH queue is full, waiting for publishing #{packet.inspect}") sleep SELECT_TIMEOUT retry end @sender.append_to_writing(packet) MQTT_ERR_SUCCESS end
send_pubrec(packet_id)
click to toggle source
# File lib/mqtt_rails/publisher.rb, line 97 def send_pubrec(packet_id) packet = MqttRails::Packet::Pubrec.new( :id => packet_id ) push_queue(@waiting_pubrel, @pubrel_mutex, MAX_QUEUE, packet, packet_id) @sender.append_to_writing(packet) MQTT_ERR_SUCCESS end
send_pubrel(packet_id)
click to toggle source
# File lib/mqtt_rails/publisher.rb, line 113 def send_pubrel(packet_id) packet = MqttRails::Packet::Pubrel.new( :id => packet_id ) push_queue(@waiting_pubcomp, @pubcomp_mutex, MAX_QUEUE, packet, packet_id) @sender.append_to_writing(packet) MQTT_ERR_SUCCESS end
sender=(sender)
click to toggle source
# File lib/mqtt_rails/publisher.rb, line 30 def sender=(sender) @sender = sender end