class Connection
Constants
- OPTIONS
Public Class Methods
new(host:, port:, mode:, agent:, **kwargs)
click to toggle source
# File lib/ulms_client.rb, line 46 def initialize(host:, port:, mode:, agent:, **kwargs) @agent = agent @mode = mode @mqtt = MQTT::Client.new @mqtt.host = host @mqtt.port = port @mqtt.username = "v2::#{mode}" @mqtt.client_id = agent.to_s OPTIONS.each do |option| @mqtt.send("#{option}=", kwargs[option]) if kwargs[option] != nil end end
Public Instance Methods
connect()
click to toggle source
Establish the connection.
# File lib/ulms_client.rb, line 62 def connect @mqtt.connect LOG.info("#{@agent} connected") end
disconnect()
click to toggle source
Disconnect from the broker.
# File lib/ulms_client.rb, line 68 def disconnect @mqtt.disconnect LOG.info("#{@agent} disconnected") end
make_request(method, to:, payload:, api_version: 'v1', properties: {}, qos: 0, timeout: DEFAULT_TIMEOUT)
click to toggle source
A high-level method that makes a request and waits for the response on it.
Options:
- `to`: the destination service `Account` (required). - `payload`: the publish message payload (required). - `api_version`: service API version. - `properties`: additional MQTT properties hash. - `qos`: Publish QoS. An integer 0..2. - `timeout`: Timeout for the response awaiting.
# File lib/ulms_client.rb, line 148 def make_request(method, to:, payload:, api_version: 'v1', properties: {}, qos: 0, timeout: DEFAULT_TIMEOUT) correlation_data = SecureRandom.hex properties.merge!({ type: 'request', method: method, correlation_data: correlation_data, response_topic: "agents/#{@agent}/api/#{api_version}/in/#{to}" }) topic = "agents/#{@agent}/api/#{api_version}/out/#{to}" publish(topic, payload: payload, properties: properties, qos: qos) receive(timeout) do |msg| msg.properties['type'] == 'response' && msg.properties['correlation_data'] == correlation_data end end
publish(topic, payload:, properties: {}, retain: false, qos: 0)
click to toggle source
Publish a message to the `topic`.
Options:
- `payload`: An object that will be dumped into JSON as the message payload (required). - `properties`: MQTT publish properties hash. - `retain`: A boolean indicating whether the messages should be retained. - `qos`: An integer 0..2 that sets the QoS.
# File lib/ulms_client.rb, line 80 def publish(topic, payload:, properties: {}, retain: false, qos: 0) if @mode == 'default' && !properties[:local_timestamp] properties = properties.merge(local_timestamp: DateTime.now.strftime('%Q')) end envelope = { payload: JSON.dump(payload), properties: properties } @mqtt.publish(topic, JSON.dump(envelope), retain, qos) LOG.info <<~EOF #{@agent} published to #{topic} (q#{qos}, r#{retain ? 1 : 0}): Payload: #{JSON.pretty_generate(payload)} Properties: #{JSON.pretty_generate(properties)} EOF end
receive(timeout=DEFAULT_TIMEOUT) { |message| ... }
click to toggle source
Waits for an incoming message. If a block is given it passes the received message to the block. If the block returns falsey value it waits for the next one and so on. Returns the received message. Raises if `timeout` is over.
# File lib/ulms_client.rb, line 113 def receive(timeout=DEFAULT_TIMEOUT) Timeout::timeout(timeout, nil, "Timed out waiting for the message") do loop do topic, json = @mqtt.get envelope = JSON.load(json) payload = JSON.load(envelope['payload']) message = IncomingMessage.new(topic, payload, envelope['properties']) LOG.info <<~EOF #{@agent} received a message from topic #{topic}: Payload: #{JSON.pretty_generate(message.payload)} Properties: #{JSON.pretty_generate(message.properties)} EOF return message unless block_given? if yield(message) LOG.info "The message matched the given predicate" return message else LOG.info "The message didn't match the given predicate. Waiting for the next one." end end end end
subscribe(topic, qos: 0)
click to toggle source
Subscribe to the `topic`.
Options:
- `qos`: Subscriptions QoS. An interger 0..2.
# File lib/ulms_client.rb, line 103 def subscribe(topic, qos: 0) @mqtt.subscribe([topic, qos]) LOG.info("#{@agent} subscribed to #{topic} (q#{qos})") end