class MatrixCreator::Comm

Class: Comm

This class is used to communicate to the chipset by using ZeroMQ sockets

Constants

LOG_LEVEL

Current logger level to store

MATRIX_CREATOR_IP

Contains the IP address to be used to connect to the Matrix Creator chipset

MAX_LOG_SIZE

Maximum size for each log file

MAX_OLD_FILES

Maximum number of old log files to keep

PING_SPEED

Pinging speed

TIMEOUT_VERIFICATION_SPEED

Speed to check for timeout

Attributes

context[R]

Contains the ZMQ::Context instance used

device_port[R]

Contains device base port

Public Class Methods

new(device_port) click to toggle source

Creates an instance of Comm to be used as communication with chipset’s device

@param device_port [Integer] port of the device to communicate with

# File lib/matrix_creator/comm.rb, line 38
def initialize(device_port)
  initialize_logger

  # Creating instance variables
  print_log(:debug, 'Initializing instance')
  @context = ::ZMQ::Context.new
  @device_port = device_port
end

Public Instance Methods

destroy() click to toggle source

Destroy the ZMQ::Context instance, since there can only be one running per proccess

# File lib/matrix_creator/comm.rb, line 240
def destroy
  print_log(:info, 'Destroying ZMQ context')
  @context.destroy
end
perform(decoder, options = {}, block = nil) click to toggle source

Start the listening proccess on a driver.

@param decoder [MatrixMalos] module to be used to decode data received @param options [Hash] contains the options that can be specified for a max_resp and/or max_secs @yield callback used to process data received from the driver @return an array with a list of all the messages received

# File lib/matrix_creator/comm.rb, line 217
def perform(decoder, options = {}, block = nil)
  # Start running threads
  error_thread = start_error_listener
  data_thread = start_data_listener(decoder, options[:max_resp], error_thread, block)
  ping_thread = start_pinging(data_thread)

  # Verify timeout if that option is specified
  if options[:max_secs]
    verify_timeout(options[:max_secs], data_thread, error_thread, ping_thread)
  end

  # Wait for threads to finish
  data_thread.join
  error_thread.join
  ping_thread.join

  # Return data captured from the driver
  print_log(:debug, "Data Result: #{data_thread[:result].to_json}")
  data_thread[:result]
end
send_configuration(driver_config) click to toggle source

Sends configuration data to the driver

@param driver_config [MatrixMalos::DriverConfig] data message

# File lib/matrix_creator/comm.rb, line 51
def send_configuration(driver_config)
  # Connecting to the configuration port
  socket_address = "tcp://#{MATRIX_CREATOR_IP}:#{@device_port}"
  config_socket = @context.socket(:PUSH)
  config_socket.connect(socket_address)
  print_log(:debug, "config_socket connected to #{socket_address}")

  # Sending Encoded Data
  config_data = MatrixMalos::DriverConfig.encode(driver_config)
  config_socket.send(config_data)
  print_log(:info, 'Configuration sent to driver')
  print_log(:debug, "Data: #{driver_config.to_json}")
end
start_data_listener(decoder, max_resp, error_thread, block = nil) click to toggle source

Main thread that listens for data reported by the driver data port. It will listen for any errors until the maximum number of messages expected to be received is reached or until it is killed by the timeout verification.

@param decoder [MatrixMalos] module to be used to decode data received @param max_resp [Integer] maximum number of messages to receive @param error_thread [Thread] instance that logs errors @param block callback method to be executed when a message has been received @return [Thread] instance

# File lib/matrix_creator/comm.rb, line 125
def start_data_listener(decoder, max_resp, error_thread, block = nil)
  Thread.new do
    # Initialize current number of messages received
    count = 0

    begin
      # Thread variable that indicates if this thread has finished
      Thread.current[:finished] = false

      # Thread variable that contains an array of messages received
      # for further processing
      Thread.current[:result] = []

      # Connecting to the data port
      socket_address = "tcp://#{MATRIX_CREATOR_IP}:#{@device_port + 3}"
      data_socket = @context.socket(:SUB)
      data_socket.connect(socket_address)
      print_log(:debug, "data_socket connected to #{socket_address}")
      data_socket.subscribe('')
      print_log(:info, "Listening for data (max_resp: #{max_resp || 'Unlimited'})")

      loop do
        # Receiving data
        data = data_socket.recv
        print_log(:info, 'Data received')
        decoded_data = JSON.parse(decoder.decode(data).to_json, symbolize_names: true)
        print_log(:debug, "Data: #{decoded_data}")

        # Push decoded data into the results array
        Thread.current[:result] << decoded_data

        # Send data to callback method
        block.call(decoded_data) if block

        # Increment count and break loop if max number of
        # messages has been reached
        count += 1
        break if max_resp && count >= max_resp
      end
    rescue => e
      print_log(:fatal, e.message)
    end

    # Mark thread as finished
    Thread.current[:finished] = true
    print_log(:info, 'Finished listening')

    # Kill error thread, no longer need to log errors
    Thread.kill(error_thread)
    print_log(:info, 'Killed error listener thread')
  end
end
start_error_listener() click to toggle source

Connects to the error port to listen for any errors reported

@return [Thread] instance

# File lib/matrix_creator/comm.rb, line 97
def start_error_listener
  Thread.new do
    # Connecting to the error port
    socket_address = "tcp://#{MATRIX_CREATOR_IP}:#{@device_port + 2}"
    error_socket = @context.socket(:SUB)
    error_socket.connect(socket_address)
    error_socket.subscribe('')

    # Infinite loop to listen for errors, this thread will be killed
    # by the main thread when it needs to be stopped
    loop do
      # Read and log error messages
      error_msg = error_socket.recv_message
      print_log(:error, error_msg.data)
    end
  end
end
start_pinging(main_thread) click to toggle source

Pings the driver keep-alive port every 3 seconds until listener finishes running

@param main_thread [Thread] the main listener @return [Thread] instance

# File lib/matrix_creator/comm.rb, line 70
def start_pinging(main_thread)
  Thread.new do
    # Connecting to the keep-alive port
    socket_address = "tcp://#{MATRIX_CREATOR_IP}:#{@device_port + 1}"
    ping_socket = @context.socket(:PUSH)
    ping_socket.connect(socket_address)
    print_log(:debug, "ping_socket connected to #{socket_address}")

    # Infinite loop that breaks when main thread has finished
    loop do
      # Send Ping
      ping_socket.send('')
      print_log(:info, 'Ping sent')

      sleep(PING_SPEED)

      break if main_thread[:finished]
    end

    print_log(:debug, 'Stopped pinging')
  end
end
verify_timeout(max_secs, main_thread, error_thread, ping_thread) click to toggle source

Verifies if there is a timeout according to the max number of seconds specified, if there is then all threads are killed

@param max_secs [Integer] maximum number of seconds to gather data @param main_thread [Thread] instance of the main data listener @param error_thread [Thread] instance of the error listener @param ping_thread [Thread] instance of the ping thread

# File lib/matrix_creator/comm.rb, line 186
def verify_timeout(max_secs, main_thread, error_thread, ping_thread)
  current_time = Time.now

  print_log(:info, "Starting timeout verification (max_secs: #{max_secs})")

  loop do
    # Break if main thread is finished, we no longer need to check for timeout
    break if main_thread[:finished]

    # If there is a timeout, kill all threads and break
    if Time.now >= current_time + max_secs
      print_log(:info, 'Listener timed out, killing all threads')
      Thread.kill(main_thread)
      Thread.kill(error_thread)
      Thread.kill(ping_thread)
      break
    end

    sleep(TIMEOUT_VERIFICATION_SPEED)
  end

  print_log(:info, 'Finishing timeout verification')
end

Private Instance Methods

initialize_logger() click to toggle source

Initialize logger instance

# File lib/matrix_creator/comm.rb, line 249
def initialize_logger
  # Logger initialization
  FileUtils.mkdir_p('log/') unless File.directory?('log/')
  @logger = Logger.new('log/matrix_creator.log', MAX_OLD_FILES, MAX_LOG_SIZE)
  @logger.level = LOG_LEVEL
end
print_log(level, msg) click to toggle source

Send a message to the logger instance

@param level [Symbol] logging level for the message @param msg [String] message to be logged