Source code for xbout.tests.test_fastoutput
import numpy as np
import random
from xarray import DataArray, Dataset
from xbout.fastoutput import open_fastoutput
[docs]
class TestFastOutput:
[docs]
def test_open_fastoutput(self, tmp_path):
datapath_list, xinds, yinds, zinds = make_fastoutput_set(tmp_path, 4)
ds = open_fastoutput(tmp_path.joinpath("BOUT.fast.*.nc"))
expected_x = list(set(xinds))
expected_x.sort()
expected_y = list(set(yinds))
expected_y.sort()
expected_z = list(set(zinds))
expected_z.sort()
assert ds.sizes["t"] == 3
assert all(ds["x"] == expected_x)
assert all(ds["y"] == expected_y)
assert all(ds["z"] == expected_z)
for v in ("n", "T"):
assert v in ds
assert "t" in ds[v].dims
assert "x" in ds[v].dims
assert "y" in ds[v].dims
assert "z" in ds[v].dims
[docs]
def make_fastoutput_set(path, n):
"""
Create a set of FastOutput files/Datasets, with random data and location indices.
Parameters
----------
path : pathlib.Path
Directory to save the files to.
n : int
Number of output files/Datasets to create
Returns
-------
result_list : list of pathlib.Path or Dataset
Output files/Datasets in format duplicating files produced by FastOutput
xinds : list of int
x-indices where data exists
yinds : list of int
y-indices where data exists
zinds : list of int
z-indices where data exists
"""
# Choose a fixed seed to ensure reproducibility
random.seed(83)
procinds = random.sample(range(10 * n), n)
npoints = [random.randint(1, 4) for _ in procinds]
total_points = sum(npoints)
xinds = random.sample(range(max(100, total_points)), total_points)
yinds = random.sample(range(max(100, total_points)), total_points)
zinds = random.sample(range(max(100, total_points)), total_points)
# Add some points which share an x-, y- or z-index (but not all three) with other
# points
if total_points < 5:
raise ValueError(f"Not enough points for n={n}. Increase n.")
xinds[1] = xinds[0]
yinds[1] = yinds[0]
zinds[4] = zinds[3]
result_list = []
counter = 0
for i, procind in enumerate(procinds):
locations = {}
for _ in range(npoints[i]):
locations[counter] = (xinds[counter], yinds[counter], zinds[counter])
counter += 1
result_list.append(make_fastoutput(path, procind, locations))
return result_list, xinds, yinds, zinds
[docs]
def make_fastoutput(path, i, locations):
"""
Create a single file in the format produced by
`FastOutput <https://github.com/johnomotani/BoutFastOutput>`_
Parameters
----------
path : pathlib.Path
Directory to save the file to.
i : int
Processor index of the file.
locations : dict of {int: tuple of int}
Keys are the FastOutput indices of the variables to be saved.
Values are 3-tuples of ints giving x, y, and z indices associated with the key.
"""
nt = 3
# Each variable contains unique random noise
np.random.seed(seed=i)
ds = Dataset()
time = DataArray(np.linspace(0.0, 0.5, nt), dims="t")
ds["time"] = time
for j, (xind, yind, zind) in locations.items():
n = DataArray(np.random.randn(nt), dims="t")
T = DataArray(np.random.randn(nt), dims="t")
for v in [n, T]:
v.attrs["ix"] = xind
v.attrs["iy"] = yind
v.attrs["iz"] = zind
ds[f"n{j}"] = n
ds[f"T{j}"] = T
filepath = path.joinpath(f"BOUT.fast.{i}.nc")
ds.to_netcdf(filepath)