Source code for arkimet.scan.bufr

from typing import Callable
from collections import defaultdict
import arkimet
import dballe
import math
import logging

log = logging.getLogger("arkimet.scan.bufr")


[docs]class Scanner: by_type = defaultdict(list) def __init__(self): for type, scanners in self.by_type.items(): scanners.sort(key=lambda p: p[0])
[docs] def scan(self, msg: dballe.Message, md: arkimet.Metadata): # Find the formatter list for this style scanners = self.by_type.get(msg.type) if scanners is None: return None # Try all scanner functions in the list, in priority order, stopping at # the first one that returns False. # Note that False is explicitly required: returning None will not stop # the scanner chain. for prio, scan in scanners: try: if scan(msg, md) is False: break except Exception: log.warning("scanner function failed", exc_info=True)
[docs] @classmethod def register(cls, type: str, scanner: Callable[[dballe.Message, arkimet.Metadata], None], priority=0): if scanner not in cls.by_type[type]: cls.by_type[type].append((priority, scanner))
[docs]def read_area_fixed(msg): """ Read the area from a BUFR containing a fixed station """ lat = msg.get_named('latitude') lon = msg.get_named('longitude') if lat is not None and lon is not None: return {"lat": lat.enqi(), "lon": lon.enqi()} else: return None
[docs]def read_area_mobile(msg): """ Read the area from a BUFR containing a mobile station """ lat = msg.get_named('latitude') lon = msg.get_named('longitude') if lat is not None and lon is not None: # Approximate to 1 degree to allow arkimet to perform grouping return { "type": "mob", "x": math.floor(lon.enqd()), "y": math.floor(lat.enqd()) } else: return None
[docs]def read_proddef(msg): """ Read a default proddef value for a message """ res = {} blo = msg.get_named('block') sta = msg.get_named('station') id = msg.get_named('ident') found = False if blo: res["blo"] = blo.enqi() found = True if sta: res["sta"] = sta.enqi() found = True if id: res["id"] = id.enqc() found = True if found: return res else: return None
# function bufr_scan_forecast(msg) # local forecast = nil # msg:foreach(function(v) # if v.pind == 254 then # -- Verify that all p1 are the same: if they differ, error() # local p1 = v.p1 # if forecast ~= nil and forecast ~= p1 then # error("BUFR message has contradictory forecasts"..tostring(forecast).." and "..tostring(p1)) # end # forecast = p1 # end # end) # if forecast == nil then forecast = 0 end # # if forecast == 0 then # return arki_timerange.timedef(0) # elseif math.floor(forecast / 3600) * 3600 == forecast then # -- If it is a multiple of one hour, represent it in hours # return arki_timerange.timedef(forecast / 3600, "h") # elseif math.floor(forecast / 60) * 60 == forecast then # -- If it is a multiple of one minute, represent it in minutes # return arki_timerange.timedef(forecast / 60, "m") # else # return arki_timerange.timedef(forecast, "s") # end # end