class Connection

Constants

OPTIONS

Public Class Methods

new(host:, port:, mode:, agent:, **kwargs) click to toggle source
# File lib/ulms_client.rb, line 46
def initialize(host:, port:, mode:, agent:, **kwargs)
  @agent = agent
  @mode = mode

  @mqtt = MQTT::Client.new
  @mqtt.host = host
  @mqtt.port = port
  @mqtt.username = "v2::#{mode}"
  @mqtt.client_id = agent.to_s

  OPTIONS.each do |option|
      @mqtt.send("#{option}=", kwargs[option]) if kwargs[option] != nil
  end
end

Public Instance Methods

connect() click to toggle source

Establish the connection.

# File lib/ulms_client.rb, line 62
def connect
  @mqtt.connect
  LOG.info("#{@agent} connected")
end
disconnect() click to toggle source

Disconnect from the broker.

# File lib/ulms_client.rb, line 68
def disconnect
  @mqtt.disconnect
  LOG.info("#{@agent} disconnected")
end
make_request(method, to:, payload:, api_version: 'v1', properties: {}, qos: 0, timeout: DEFAULT_TIMEOUT) click to toggle source

A high-level method that makes a request and waits for the response on it.

Options:

- `to`: the destination service `Account` (required).
- `payload`: the publish message payload (required).
- `api_version`: service API version.
- `properties`: additional MQTT properties hash.
- `qos`: Publish QoS. An integer 0..2.
- `timeout`: Timeout for the response awaiting.
# File lib/ulms_client.rb, line 148
def make_request(method, to:, payload:, api_version: 'v1', properties: {}, qos: 0, timeout: DEFAULT_TIMEOUT)
  correlation_data = SecureRandom.hex

  properties.merge!({
    type: 'request',
    method: method,
    correlation_data: correlation_data,
    response_topic: "agents/#{@agent}/api/#{api_version}/in/#{to}"
  })

  topic = "agents/#{@agent}/api/#{api_version}/out/#{to}"
  publish(topic, payload: payload, properties: properties, qos: qos)

  receive(timeout) do |msg|
    msg.properties['type'] == 'response' &&
      msg.properties['correlation_data'] == correlation_data
  end
end
publish(topic, payload:, properties: {}, retain: false, qos: 0) click to toggle source

Publish a message to the `topic`.

Options:

- `payload`: An object that will be dumped into JSON as the message payload (required).
- `properties`: MQTT publish properties hash.
- `retain`: A boolean indicating whether the messages should be retained.
- `qos`: An integer 0..2 that sets the QoS.
# File lib/ulms_client.rb, line 80
  def publish(topic, payload:, properties: {}, retain: false, qos: 0)
    if @mode == 'default' && !properties[:local_timestamp]
      properties = properties.merge(local_timestamp: DateTime.now.strftime('%Q'))
    end

    envelope = {
      payload: JSON.dump(payload),
      properties: properties
    }

    @mqtt.publish(topic, JSON.dump(envelope), retain, qos)

    LOG.info <<~EOF
      #{@agent} published to #{topic} (q#{qos}, r#{retain ? 1 : 0}):
      Payload: #{JSON.pretty_generate(payload)}
      Properties: #{JSON.pretty_generate(properties)}
    EOF
  end
receive(timeout=DEFAULT_TIMEOUT) { |message| ... } click to toggle source

Waits for an incoming message. If a block is given it passes the received message to the block. If the block returns falsey value it waits for the next one and so on. Returns the received message. Raises if `timeout` is over.

# File lib/ulms_client.rb, line 113
  def receive(timeout=DEFAULT_TIMEOUT)
    Timeout::timeout(timeout, nil, "Timed out waiting for the message") do
      loop do
        topic, json = @mqtt.get
        envelope = JSON.load(json)
        payload = JSON.load(envelope['payload'])
        message = IncomingMessage.new(topic, payload, envelope['properties'])

        LOG.info <<~EOF
          #{@agent} received a message from topic #{topic}:
          Payload: #{JSON.pretty_generate(message.payload)}
          Properties: #{JSON.pretty_generate(message.properties)}
        EOF

        return message unless block_given?

        if yield(message)
          LOG.info "The message matched the given predicate"
          return message
        else
          LOG.info "The message didn't match the given predicate. Waiting for the next one."
        end
      end
    end
  end
subscribe(topic, qos: 0) click to toggle source

Subscribe to the `topic`.

Options:

- `qos`: Subscriptions QoS. An interger 0..2.
# File lib/ulms_client.rb, line 103
def subscribe(topic, qos: 0)
  @mqtt.subscribe([topic, qos])
  LOG.info("#{@agent} subscribed to #{topic} (q#{qos})")
end