class AwsIotDevice::MqttShadowClient::ShadowActionManager
Attributes
This the main AWS action manager It enables the AWS IoT actions (get, update, delete) It enables the time control the time out after an action have been start Actions requests are send on the general actions topic and answer is retreived from accepted/refused/delta topics
Public Class Methods
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 18 def initialize(shadow_name, mqtt_client, persistent_subscribe=false) @shadow_name = shadow_name @topic_manager = ShadowTopicManager.new(mqtt_client, shadow_name) @payload_parser = JSONPayloadParser.new @is_subscribed = {} @is_subscribed[:get] = false @is_subscribed[:update] = false @is_subscribed[:delete] = false @token_handler = TokenCreator.new(shadow_name, mqtt_client.client_id) @persistent_subscribe = persistent_subscribe @last_stable_version = -1 #Mean no currentely stable @topic_subscribed_callback = {} @topic_subscribed_callback[:get] = nil @topic_subscribed_callback[:update] = nil @topic_subscribed_callback[:delta] = nil @topic_subscribed_task_count = {} @topic_subscribed_task_count[:get] = 0 @topic_subscribed_task_count[:update] = 0 @topic_subscribed_task_count[:delete] = 0 @token_pool = {} @token_callback = {} @task_count_mutex = Mutex.new @token_mutex = Mutex.new @parser_mutex = Mutex.new set_basic_callback end
Public Instance Methods
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 116 def logger? !@logger.nil? && @logger.is_a?(Logger) end
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 86 def register_delete_callback(callback, &block) register_action_callback(:delete, callback, &block) end
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 78 def register_get_callback(callback, &block) register_action_callback(:get, callback, &block) end
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 90 def register_shadow_delta_callback(callback, &block) if callback.is_a?(Proc) @topic_subscribed_callback[:delta] = callback elsif block_given? @topic_subscribed_callback[:delta] = block end @topic_manager.shadow_topic_subscribe("delta", @default_callback) end
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 82 def register_update_callback(callback, &block) register_action_callback(:update, callback, &block) end
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 107 def remove_delete_callback remove_action_callback(:delete) end
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 99 def remove_get_callback remove_action_callback(:get) end
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 111 def remove_shadow_delta_callback @topic_subscribe_callback.delete[:delta] @topic_manager.shadow_topic_unsubscribe("delta") end
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 103 def remove_update_callback remove_action_callback(:update) end
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 74 def shadow_delete(timeout=5, callback=nil, &block) shadow_action(:delete, "", timeout, callback, &block) end
Send and publish packet with an empty payload contains in a valid JSON format. A unique token is generate and send in the packet in order to trace the action. Subscribe to the two get/accepted and get/rejected of the coresponding shadow. If the request is accpeted, the answer would be send on the get/accepted topic. It contains all the details of the shadow state in JSON document. A specific callback in Proc could be send parameter. Before exit, the function start a timer count down in the separate thread. If the time ran out, the timer_handler function is called and the get action is cancelled using the token.
Parameter:
> callback: the Proc to execute when the answer to th get request would be received. It should accept three different paramter: - payload : the answer content - response_status : among ['accepted', 'refused', 'delta'] - token : the token assoicate to the get request > timeout: the period after which the request should be canceled and timer_handler should be call
Returns :
> the token associate to the current action (which also store in @token_pool)
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 66 def shadow_get(timeout=5, callback=nil, &block) shadow_action(:get, "", timeout, callback, &block) end
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 70 def shadow_update(payload, timeout=5, callback=nil, &block) shadow_action(:update, payload, timeout, callback, &block) end
Private Instance Methods
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 260 def accepted_tasks(message, action, token) @topic_subscribed_callback[action].call(message) unless @topic_subscribed_callback[action].nil? @token_callback[token].call(message) if @token_callback.has_key?(token) @token_callback.delete(token) end
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 204 def decresase_task_count(action) @topic_subscribed_task_count[action] -= 1 if @topic_subscribed_task_count[action] <= 0 @topic_subscribed_task_count[action] = 0 unless @persistent_subscribe @topic_manager.shadow_topic_unsubscribe(action.to_s) @is_subscribed[action] = false end end end
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 248 def do_accepted(message, action, token, type, new_version) if new_version && new_version >= @last_stable_version @logger.info("The #{action} action with the token #{token} have been accepted.") if logger? type.eql?("delete") ? @last_stable_version = -1 : @last_stable_version = new_version Thread.new do accepted_tasks(message, action, token) end else @logger.warn("CATCH AN ACCEPTED #{action} BUT OUTDATED/INVALID VERSION (= #{new_version})\n") if logger? end end
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 275 def do_delta(message, new_version) if new_version && new_version >= @last_stable_version @logger.info("A delta action have been accepted.") if logger? @last_stable_version = new_version Thread.new { @topic_subscribed_callback[:delta].call(message) } unless @topic_subscribed_callback[:delta].nil? else @logger.warn("CATCH A DELTA BUT OUTDATED/INVALID VERSION (= #{new_version})\n") if logger? end end
The default callback that is called by every actions It acknowledge the accepted status if action success Call a specific callback for each actions if it defined have been register previously
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 218 def do_message_callback(message) topic = message.topic action = parse_action(topic) type = parse_type(topic) payload = message.payload token = nil new_version = -1 @parser_mutex.synchronize() { @payload_parser.set_message(payload) new_version = @payload_parser.get_attribute_value("version") token = @payload_parser.get_attribute_value("clientToken") } if %w(get update delete).include?(action) if @token_pool.has_key?(token) @token_pool[token].cancel @token_pool.delete(token) if type.eql?("accepted") do_accepted(message, action.to_sym, token, type, new_version) else do_rejected(token, action, new_version) end @task_count_mutex.synchronize { decresase_task_count(action.to_sym) } end elsif %w(delta).include?(action) do_delta(message, new_version) end end
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 266 def do_rejected(token, action, new_version) if new_version && new_version >= @last_stable_version @logger.info("The #{action} action with the token #{token} have been rejected.") if logger? @token_callback.delete(token) else @logger.warn("CATCH AN REJECTED #{action} BUT OUTDATED/INVALID VERSION (= #{new_version})\n") if logger? end end
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 285 def handle_subscription(action, timeout) @topic_manager.shadow_topic_subscribe(action.to_s, @default_callback) if @topic_manager.paho_client? ref = Time.now + timeout while !@is_subscribed[action] && handle_timeout(ref) do sleep 0.0001 end else sleep 2 end end
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 297 def handle_timeout(ref) Time.now <= ref end
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 305 def parse_action(topic) if topic.split('/')[5] == "delta" topic.split('/')[5] else topic.split('/')[4] end end
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 301 def parse_shadow_name(topic) topic.split('/')[2] end
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 313 def parse_type(topic) topic.split('/')[5] end
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 192 def register_action_callback(action, callback, &block) if callback.is_a?(Proc) @topic_subscribed_callback[action] = callback elsif block_given? @topic_subscribed_callback[action] = block end end
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 180 def register_token_callback(token, callback, &block) if callback.is_a?(Proc) @token_callback[token] = callback elsif block_given? @token_callback[token] = block end end
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 200 def remove_action_callback(action) @topic_subscribed_callback[action] = nil end
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 188 def remove_token_callback(token) @token_callback.delete(token) end
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 166 def set_basic_callback @default_callback = proc { |message| do_message_callback(message) } @topic_manager.on_suback = lambda do |topics| action = @topic_manager.retrieve_action(topics[0]) @is_subscribed[action] ||= true unless action.nil? end @topic_manager.on_unsuback = lambda do |topics| action = @topic_manager.retrieve_action(topics[0]) @is_subscribed[action] = false if action.nil? end end
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 122 def shadow_action(action, payload="", timeout=5, callback=nil, &block) current_token = Symbol timer = Timers::Group.new json_payload = "" @token_mutex.synchronize(){ current_token = @token_handler.create_next_token } timer.after(timeout){ timeout_manager(action, current_token) } @parser_mutex.synchronize { @payload_parser.set_message(payload) unless payload == "" @payload_parser.set_attribute_value("clientToken", current_token) json_payload = @payload_parser.get_json } handle_subscription(action, timeout) unless @is_subscribed[action] @topic_manager.shadow_topic_publish(action.to_s, json_payload) @task_count_mutex.synchronize { @topic_subscribed_task_count[action] += 1 } @token_pool[current_token] = timer register_token_callback(current_token, callback, &block) Thread.new{ timer.wait } current_token end
Should cancel the token after a preset time interval
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 147 def timeout_manager(action_name, token) if @token_pool.has_key?(token) action = action_name.to_sym @token_pool.delete(token) @token_callback.delete(token) @logger.warn("The #{action_name} request with the token #{token} has timed out!\n") if logger? @task_count_mutex.synchronize { @topic_subscribed_task_count[action] -= 1 unless @topic_subscribed_task_count[action] <= 0 @topic_subscribed_task_count[action] = 0 unless @persistent_subscribe @topic_manager.shadow_topic_unsubscribe(action) @is_subscribed[action.to_sym] = false end end } end end