class Cosmos::UnpackingInterface

A custom interface that unpacks aggregate packets (packets with many granules) into many simple packets (packets with a single granule). This way we can use all the cosmos niceties without having to send packets for individual measurements. Essentially we unpack an aggregate packet into many packets that are stored in a queue that is read from. When the queue is empty we look for new aggregate packets

Public Instance Methods

agg_pkt_map() click to toggle source

Maps aggregate packets (packets with many granules) to simple packets (packets with a single granule) should have the form 'AggregatePacket' => 'SimplePacket' For example: 'Science' => 'Science2'

# File lib/cosmos/unpacking_interface/unpacking_interface.rb, line 28
def agg_pkt_map
  {}
end
connect() click to toggle source
Calls superclass method
# File lib/cosmos/unpacking_interface/unpacking_interface.rb, line 44
def connect
  @derived_queue = []
  super()
end
packet_mapper() click to toggle source

Create and return a new instance of your own custom PacketMapper

# File lib/cosmos/unpacking_interface/unpacking_interface.rb, line 40
def packet_mapper
  PacketMapper.new
end
process(packet:, target:, agg_packet:, simple_packet:) click to toggle source

Unpack an aggregate packet if necessary and add resulting packets to the internal FIFO queue

# File lib/cosmos/unpacking_interface/unpacking_interface.rb, line 52
def process(packet:, target:, agg_packet:, simple_packet:)   
   agg_pkt = System.telemetry.packet(target, agg_packet)
   if(agg_pkt.identify?(packet.buffer))
     processor = AggregatePacketProcesser.new(packet_mapper, transforms)
     agg_pkt.buffer = packet.buffer.clone
     result = processor.unpack(agg_pkt, target, simple_packet)

     @derived_queue.concat(result)
   end
   return packet
end
read() click to toggle source

This method is called by COSMOS internally, essentially if we have packets on the queue read those otherwise wait for a packet, and unpack if necessary

Calls superclass method
# File lib/cosmos/unpacking_interface/unpacking_interface.rb, line 67
def read
  if !@derived_queue.empty?
    p1 = @derived_queue.shift
    return p1
  else

   # This is a blocking call to wait for an actual packet to come over the wire
   packet = super()

   agg_pkt_map.select do |agg_packet, simple_packet|
     agg_pkt = System.telemetry.packet(target, agg_packet)
     if(agg_pkt.identify?(packet.buffer))      
       process(packet: packet, target: target, agg_packet: agg_packet, simple_packet: simple_packet)
     end
   end
 
   return packet
  end 
end
target() click to toggle source

Cosmos target interface is being used with

# File lib/cosmos/unpacking_interface/unpacking_interface.rb, line 34
def target
 ""
end
transforms() click to toggle source

Transformations defined on an item in a simple packet. Entries should be of the form: “<TARGET>-<PACKET_NAME>-<ITEM_NAME>” => Proc where the proc can accept the following parameters TargetName, ItemName, key, value, index index is the index of the granule in the aggregate packet that was transformed into the simple packet More info about procs can be found here: ruby-doc.org/core-2.4.1/Proc.html

# File lib/cosmos/unpacking_interface/unpacking_interface.rb, line 20
def transforms
  {}
end