class MqttRails::Subscriber

Attributes

subscribed_topics[R]

Public Class Methods

new(sender) click to toggle source
# File lib/mqtt_rails/subscriber.rb, line 20
def initialize(sender)
  @waiting_suback    = []
  @waiting_unsuback  = []
  @subscribed_mutex  = Mutex.new
  @subscribed_topics = []
  @suback_mutex      = Mutex.new
  @unsuback_mutex    = Mutex.new
  @sender            = sender
end

Public Instance Methods

add_subscription(max_qos, packet_id, adjust_qos) click to toggle source
# File lib/mqtt_rails/subscriber.rb, line 55
def add_subscription(max_qos, packet_id, adjust_qos)
  @suback_mutex.synchronize do
    adjust_qos, @waiting_suback = @waiting_suback.partition { |pck| pck[:id] == packet_id }
  end
  if adjust_qos.length == 1
    adjust_qos = adjust_qos.first[:packet].topics
    adjust_qos.each do |t|
      if [0, 1, 2].include?(max_qos[0])
        t[1] = max_qos.shift
      elsif max_qos[0] == 128
        adjust_qos.delete(t)
      else
        Rails.logger.error("[MQTT RAILS][ERROR] The QoS value is invalid in subscribe.")
        raise PacketException.new('Invalid suback QoS value')
      end
    end
  else
    Rails.logger.error("[MQTT RAILS][ERROR] The packet id is invalid, already used.")
    raise PacketException.new("Invalid suback packet id: #{packet_id}")
  end
  @subscribed_mutex.synchronize do
    @subscribed_topics.concat(adjust_qos)
  end
  return adjust_qos
end
check_waiting_subscriber() click to toggle source
# File lib/mqtt_rails/subscriber.rb, line 141
def check_waiting_subscriber
  @sender.check_ack_alive(@waiting_suback, @suback_mutex)
  @sender.check_ack_alive(@waiting_unsuback, @unsuback_mutex)
end
clear_queue() click to toggle source
# File lib/mqtt_rails/subscriber.rb, line 146
def clear_queue
  @waiting_suback = []
end
config_subscription(new_id) click to toggle source
# File lib/mqtt_rails/subscriber.rb, line 34
def config_subscription(new_id)
  unless @subscribed_topics == [] || @subscribed_topics.nil?
    packet = MqttRails::Packet::Subscribe.new(
      :id     => new_id,
      :topics => @subscribed_topics
    )
    @subscribed_mutex.synchronize do
      @subscribed_topics = []
    end
    @suback_mutex.synchronize do
      if @waiting_suback.length >= MAX_SUBACK
        Rails.logger.error('[MQTT RAILS][ERROR] SUBACK queue is full, could not send subscribe')
        return MQTT_ERR_FAILURE
      end
      @waiting_suback.push(:id => new_id, :packet => packet, :timestamp => Time.now)
    end
    @sender.append_to_writing(packet)
  end
  MQTT_ERR_SUCCESS
end
remove_subscription(packet_id, to_unsub) click to toggle source
# File lib/mqtt_rails/subscriber.rb, line 81
def remove_subscription(packet_id, to_unsub)
  @unsuback_mutex.synchronize do
    to_unsub, @waiting_unsuback = @waiting_unsuback.partition { |pck| pck[:id] == packet_id }
  end

  if to_unsub.length == 1
    to_unsub = to_unsub.first[:packet].topics
  else
    Rails.logger.error("[MQTT RAILS][ERROR] The packet id is invalid, already used.")
    raise PacketException.new("Invalid unsuback packet id: #{packet_id}")
  end

  @subscribed_mutex.synchronize do
    to_unsub.each do |filter|
      @subscribed_topics.delete_if { |topic| MqttRails.match_filter(topic.first, filter) }
    end
  end
  return to_unsub
end
send_subscribe(topics, new_id) click to toggle source
# File lib/mqtt_rails/subscriber.rb, line 101
def send_subscribe(topics, new_id)
  unless valid_topics?(topics) == MQTT_ERR_FAIL
    packet = MqttRails::Packet::Subscribe.new(
      :id     => new_id,
      :topics => topics
    )
    @sender.append_to_writing(packet)
    @suback_mutex.synchronize do
      if @waiting_suback.length >= MAX_SUBACK
        Rails.logger.error('[MQTT RAILS][ERROR] SUBACK queue is full, could not send subscribe')
        return MQTT_ERR_FAILURE
      end
      @waiting_suback.push(:id => new_id, :packet => packet, :timestamp => Time.now)
    end
    MQTT_ERR_SUCCESS
  else
    raise ProtocolViolation
  end
end
send_unsubscribe(topics, new_id) click to toggle source
# File lib/mqtt_rails/subscriber.rb, line 121
def send_unsubscribe(topics, new_id)
  unless valid_topics?(topics) == MQTT_ERR_FAIL
    packet = MqttRails::Packet::Unsubscribe.new(
      :id     => new_id,
      :topics => topics
    )
    @sender.append_to_writing(packet)
    @unsuback_mutex.synchronize do
      if @waiting_suback.length >= MAX_UNSUBACK
        Rails.logger.error('[MQTT RAILS][ERROR] UNSUBACK queue is full, could not send unbsubscribe')
        return MQTT_ERR_FAIL
      end
      @waiting_unsuback.push(:id => new_id, :packet => packet, :timestamp => Time.now)
    end
    MQTT_ERR_SUCCESS
  else
    raise ProtocolViolation
  end
end
sender=(sender) click to toggle source
# File lib/mqtt_rails/subscriber.rb, line 30
def sender=(sender)
  @sender = sender
end
valid_topics?(topics) click to toggle source
# File lib/mqtt_rails/subscriber.rb, line 150
def valid_topics?(topics)
  unless topics.length == 0
    topics.map do |topic|
      case topic
      when Array
        return MQTT_ERR_FAIL if topic.first == ""
      when String
        return MQTT_ERR_FAIL if topic == ""
      end
    end
  else
    MQTT_ERR_FAIL
  end
  MQTT_ERR_SUCCESS
end