import pytest
import dask.array
import numpy as np
import numpy.testing as npt
import xarray as xr
import xarray.testing as xrt
from xarray.core.utils import dict_equiv
from xbout import open_boutdataset
from xbout.geometries import apply_geometry
from xbout.utils import _1d_coord_from_spacing
[docs]
class TestBoutDataArrayMethods:
[docs]
def test_to_dataset(self, bout_xyt_example_files):
dataset_list = bout_xyt_example_files(None, nxpe=3, nype=4, nt=1)
with pytest.warns(UserWarning):
ds = open_boutdataset(
datapath=dataset_list, inputfilepath=None, keep_xboundaries=False
)
da = ds["n"]
new_ds = da.bout.to_dataset()
assert dict_equiv(ds.attrs, new_ds.attrs)
assert dict_equiv(ds.metadata, new_ds.metadata)
[docs]
@pytest.mark.parametrize("mxg", [0, pytest.param(2, marks=pytest.mark.long)])
@pytest.mark.parametrize("myg", [pytest.param(0, marks=pytest.mark.long), 2])
@pytest.mark.parametrize(
"remove_extra_upper", [False, pytest.param(True, marks=pytest.mark.long)]
)
def test_remove_yboundaries(
self, bout_xyt_example_files, mxg, myg, remove_extra_upper
):
dataset_list, grid_ds = bout_xyt_example_files(
None,
lengths=(2, 3, 4, 3),
nxpe=1,
nype=6,
nt=1,
grid="grid",
guards={"x": mxg, "y": myg},
topology="connected-double-null",
syn_data_type="linear",
)
ds = open_boutdataset(
datapath=dataset_list,
gridfilepath=grid_ds,
geometry="toroidal",
keep_yboundaries=True,
)
dataset_list_no_yboundaries, grid_ds_no_yboundaries = bout_xyt_example_files(
None,
lengths=(2, 3, 4, 3),
nxpe=1,
nype=6,
nt=1,
grid="grid",
guards={"x": mxg, "y": 0},
topology="connected-double-null",
syn_data_type="linear",
)
ds_no_yboundaries = open_boutdataset(
datapath=dataset_list_no_yboundaries,
gridfilepath=grid_ds_no_yboundaries,
geometry="toroidal",
keep_yboundaries=False,
)
if remove_extra_upper:
ds_no_yboundaries = xr.concat(
[
ds_no_yboundaries.isel(theta=slice(None, 11)),
ds_no_yboundaries.isel(theta=slice(12, -1)),
],
dim="theta",
)
n = ds["n"].bout.remove_yboundaries(remove_extra_upper=remove_extra_upper)
assert n.metadata["keep_yboundaries"] == 0
npt.assert_equal(n.values, ds_no_yboundaries["n"].values)
[docs]
@pytest.mark.parametrize(
"nz",
[
pytest.param(6, marks=pytest.mark.long),
7,
pytest.param(8, marks=pytest.mark.long),
pytest.param(9, marks=pytest.mark.long),
],
)
@pytest.mark.parametrize(
"permute_dims", [False, pytest.param(True, marks=pytest.mark.long)]
)
def test_to_field_aligned(self, bout_xyt_example_files, nz, permute_dims):
dataset_list = bout_xyt_example_files(
None, lengths=(3, 3, 4, nz), nxpe=1, nype=1, nt=1
)
with pytest.warns(UserWarning):
ds = open_boutdataset(
datapath=dataset_list, inputfilepath=None, keep_xboundaries=False
)
ds["psixy"] = ds["x"]
ds["Rxy"] = ds["x"]
ds["Zxy"] = ds["y"]
ds = apply_geometry(ds, "toroidal")
# set up test variable
n = ds["n"].load()
zShift = ds["zShift"].load()
for t in range(ds.sizes["t"]):
for x in range(ds.sizes["x"]):
for y in range(ds.sizes["theta"]):
zShift[x, y] = (
(x * ds.sizes["theta"] + y) * 2.0 * np.pi / ds.sizes["zeta"]
)
for z in range(nz):
n[t, x, y, z] = 1000.0 * t + 100.0 * x + 10.0 * y + z
n.attrs["direction_y"] = "Standard"
if permute_dims:
n = n.transpose("t", "zeta", "x", "theta").compute()
n_al = n.bout.to_field_aligned()
if permute_dims:
n_al = n_al.transpose("t", "x", "theta", "zeta").compute()
assert n_al.direction_y == "Aligned"
for t in range(ds.sizes["t"]):
for z in range(nz):
npt.assert_allclose(
n_al[t, 0, 0, z].values,
1000.0 * t + z % nz,
rtol=1.0e-15,
atol=5.0e-16,
) # noqa: E501
for z in range(nz):
npt.assert_allclose(
n_al[t, 0, 1, z].values,
1000.0 * t + 10.0 * 1.0 + (z + 1) % nz,
rtol=1.0e-15,
atol=0.0,
) # noqa: E501
for z in range(nz):
npt.assert_allclose(
n_al[t, 0, 2, z].values,
1000.0 * t + 10.0 * 2.0 + (z + 2) % nz,
rtol=1.0e-15,
atol=0.0,
) # noqa: E501
for z in range(nz):
npt.assert_allclose(
n_al[t, 0, 3, z].values,
1000.0 * t + 10.0 * 3.0 + (z + 3) % nz,
rtol=1.0e-15,
atol=0.0,
) # noqa: E501
for z in range(nz):
npt.assert_allclose(
n_al[t, 1, 0, z].values,
1000.0 * t + 100.0 * 1 + 10.0 * 0.0 + (z + 4) % nz,
rtol=1.0e-15,
atol=0.0,
) # noqa: E501
for z in range(nz):
npt.assert_allclose(
n_al[t, 1, 1, z].values,
1000.0 * t + 100.0 * 1 + 10.0 * 1.0 + (z + 5) % nz,
rtol=1.0e-15,
atol=0.0,
) # noqa: E501
for z in range(nz):
npt.assert_allclose(
n_al[t, 1, 2, z].values,
1000.0 * t + 100.0 * 1 + 10.0 * 2.0 + (z + 6) % nz,
rtol=1.0e-15,
atol=0.0,
) # noqa: E501
for z in range(nz):
npt.assert_allclose(
n_al[t, 1, 3, z].values,
1000.0 * t + 100.0 * 1 + 10.0 * 3.0 + (z + 7) % nz,
rtol=1.0e-15,
atol=0.0,
) # noqa: E501
[docs]
@pytest.mark.parametrize(
"permute_dims", [False, pytest.param(True, marks=pytest.mark.long)]
)
def test_to_field_aligned_dask(self, bout_xyt_example_files, permute_dims):
nz = 6
dataset_list = bout_xyt_example_files(
None, lengths=(3, 3, 4, nz), nxpe=1, nype=1, nt=1
)
with pytest.warns(UserWarning):
ds = open_boutdataset(
datapath=dataset_list, inputfilepath=None, keep_xboundaries=False
)
ds["psixy"] = ds["x"]
ds["Rxy"] = ds["x"]
ds["Zxy"] = ds["y"]
ds = apply_geometry(ds, "toroidal")
# set up test variable
n = ds["n"].load()
zShift = ds["zShift"].load()
for t in range(ds.sizes["t"]):
for x in range(ds.sizes["x"]):
for y in range(ds.sizes["theta"]):
zShift[x, y] = (
(x * ds.sizes["theta"] + y) * 2.0 * np.pi / ds.sizes["zeta"]
)
for z in range(nz):
n[t, x, y, z] = 1000.0 * t + 100.0 * x + 10.0 * y + z
# The above loop required the call to .load(), but that turned the data into a
# numpy array. Now convert back to dask
n = n.chunk({"t": 1})
assert isinstance(n.data, dask.array.Array)
n.attrs["direction_y"] = "Standard"
if permute_dims:
n = n.transpose("t", "zeta", "x", "theta").compute()
n_al = n.bout.to_field_aligned()
if permute_dims:
n_al = n_al.transpose("t", "x", "theta", "zeta").compute()
assert n_al.direction_y == "Aligned"
for t in range(ds.sizes["t"]):
for z in range(nz):
npt.assert_allclose(
n_al[t, 0, 0, z].values,
1000.0 * t + z % nz,
rtol=1.0e-15,
atol=5.0e-16,
) # noqa: E501
for z in range(nz):
npt.assert_allclose(
n_al[t, 0, 1, z].values,
1000.0 * t + 10.0 * 1.0 + (z + 1) % nz,
rtol=1.0e-15,
atol=0.0,
) # noqa: E501
for z in range(nz):
npt.assert_allclose(
n_al[t, 0, 2, z].values,
1000.0 * t + 10.0 * 2.0 + (z + 2) % nz,
rtol=1.0e-15,
atol=0.0,
) # noqa: E501
for z in range(nz):
npt.assert_allclose(
n_al[t, 0, 3, z].values,
1000.0 * t + 10.0 * 3.0 + (z + 3) % nz,
rtol=1.0e-15,
atol=0.0,
) # noqa: E501
for z in range(nz):
npt.assert_allclose(
n_al[t, 1, 0, z].values,
1000.0 * t + 100.0 * 1 + 10.0 * 0.0 + (z + 4) % nz,
rtol=1.0e-15,
atol=0.0,
) # noqa: E501
for z in range(nz):
npt.assert_allclose(
n_al[t, 1, 1, z].values,
1000.0 * t + 100.0 * 1 + 10.0 * 1.0 + (z + 5) % nz,
rtol=1.0e-15,
atol=0.0,
) # noqa: E501
for z in range(nz):
npt.assert_allclose(
n_al[t, 1, 2, z].values,
1000.0 * t + 100.0 * 1 + 10.0 * 2.0 + (z + 6) % nz,
rtol=1.0e-15,
atol=0.0,
) # noqa: E501
for z in range(nz):
npt.assert_allclose(
n_al[t, 1, 3, z].values,
1000.0 * t + 100.0 * 1 + 10.0 * 3.0 + (z + 7) % nz,
rtol=1.0e-15,
atol=0.0,
) # noqa: E501
[docs]
@pytest.mark.parametrize(
"nz",
[
pytest.param(6, marks=pytest.mark.long),
7,
pytest.param(8, marks=pytest.mark.long),
pytest.param(9, marks=pytest.mark.long),
],
)
@pytest.mark.parametrize(
"permute_dims", [False, pytest.param(True, marks=pytest.mark.long)]
)
def test_from_field_aligned(self, bout_xyt_example_files, nz, permute_dims):
dataset_list = bout_xyt_example_files(
None, lengths=(3, 3, 4, nz), nxpe=1, nype=1, nt=1
)
with pytest.warns(UserWarning):
ds = open_boutdataset(
datapath=dataset_list, inputfilepath=None, keep_xboundaries=False
)
ds["psixy"] = ds["x"]
ds["Rxy"] = ds["x"]
ds["Zxy"] = ds["y"]
ds = apply_geometry(ds, "toroidal")
# set up test variable
n = ds["n"].load()
zShift = ds["zShift"].load()
for t in range(ds.sizes["t"]):
for x in range(ds.sizes["x"]):
for y in range(ds.sizes["theta"]):
zShift[x, y] = (
(x * ds.sizes["theta"] + y) * 2.0 * np.pi / ds.sizes["zeta"]
)
for z in range(ds.sizes["zeta"]):
n[t, x, y, z] = 1000.0 * t + 100.0 * x + 10.0 * y + z
n.attrs["direction_y"] = "Aligned"
if permute_dims:
n = n.transpose("t", "zeta", "x", "theta").compute()
n_nal = n.bout.from_field_aligned()
if permute_dims:
n_nal = n_nal.transpose("t", "x", "theta", "zeta").compute()
assert n_nal.direction_y == "Standard"
for t in range(ds.sizes["t"]):
for z in range(nz):
npt.assert_allclose(
n_nal[t, 0, 0, z].values,
1000.0 * t + z % nz,
rtol=1.0e-15,
atol=5.0e-16,
) # noqa: E501
for z in range(nz):
npt.assert_allclose(
n_nal[t, 0, 1, z].values,
1000.0 * t + 10.0 * 1.0 + (z - 1) % nz,
rtol=1.0e-15,
atol=0.0,
) # noqa: E501
for z in range(nz):
npt.assert_allclose(
n_nal[t, 0, 2, z].values,
1000.0 * t + 10.0 * 2.0 + (z - 2) % nz,
rtol=1.0e-15,
atol=0.0,
) # noqa: E501
for z in range(nz):
npt.assert_allclose(
n_nal[t, 0, 3, z].values,
1000.0 * t + 10.0 * 3.0 + (z - 3) % nz,
rtol=1.0e-15,
atol=0.0,
) # noqa: E501
for z in range(nz):
npt.assert_allclose(
n_nal[t, 1, 0, z].values,
1000.0 * t + 100.0 * 1 + 10.0 * 0.0 + (z - 4) % nz,
rtol=1.0e-15,
atol=0.0,
) # noqa: E501
for z in range(nz):
npt.assert_allclose(
n_nal[t, 1, 1, z].values,
1000.0 * t + 100.0 * 1 + 10.0 * 1.0 + (z - 5) % nz,
rtol=1.0e-15,
atol=0.0,
) # noqa: E501
for z in range(nz):
npt.assert_allclose(
n_nal[t, 1, 2, z].values,
1000.0 * t + 100.0 * 1 + 10.0 * 2.0 + (z - 6) % nz,
rtol=1.0e-15,
atol=0.0,
) # noqa: E501
for z in range(nz):
npt.assert_allclose(
n_nal[t, 1, 3, z].values,
1000.0 * t + 100.0 * 1 + 10.0 * 3.0 + (z - 7) % nz,
rtol=1.0e-15,
atol=0.0,
) # noqa: E501
[docs]
@pytest.mark.parametrize("stag_location", ["CELL_XLOW", "CELL_YLOW", "CELL_ZLOW"])
@pytest.mark.parametrize(
"permute_dims", [False, pytest.param(True, marks=pytest.mark.long)]
)
def test_to_field_aligned_staggered(
self, bout_xyt_example_files, stag_location, permute_dims
):
dataset_list = bout_xyt_example_files(
None, lengths=(3, 3, 4, 8), nxpe=1, nype=1, nt=1
)
with pytest.warns(UserWarning):
ds = open_boutdataset(
datapath=dataset_list, inputfilepath=None, keep_xboundaries=False
)
ds["psixy"] = ds["x"]
ds["Rxy"] = ds["x"]
ds["Zxy"] = ds["y"]
ds = apply_geometry(ds, "toroidal")
# set up test variable
n = ds["n"].load()
zShift = ds["zShift"].load()
for t in range(ds.sizes["t"]):
for x in range(ds.sizes["x"]):
for y in range(ds.sizes["theta"]):
zShift[x, y] = (
(x * ds.sizes["theta"] + y) * 2.0 * np.pi / ds.sizes["zeta"]
)
for z in range(ds.sizes["zeta"]):
n[t, x, y, z] = 1000.0 * t + 100.0 * x + 10.0 * y + z
if permute_dims:
n = n.transpose("t", "zeta", "x", "theta").compute()
n_al = n.bout.to_field_aligned().copy(deep=True)
if permute_dims:
n_al = n_al.transpose("t", "x", "theta", "zeta").compute()
assert n_al.direction_y == "Aligned"
# make 'n' staggered
ds["n"].attrs["cell_location"] = stag_location
if stag_location != "CELL_ZLOW":
with pytest.raises(ValueError):
# Check exception raised when needed zShift_CELL_*LOW is not present
ds["n"].bout.to_field_aligned()
ds["zShift_" + stag_location] = zShift
ds["zShift_" + stag_location].attrs["cell_location"] = stag_location
ds = ds.set_coords("zShift_" + stag_location)
ds = ds.drop_vars("zShift")
with pytest.raises(ValueError):
# Check shifting non-staggered field fails without zShift
ds["T"].bout.to_field_aligned()
n_stag_al = ds["n"].bout.to_field_aligned()
npt.assert_equal(n_stag_al.values, n_al.values)
[docs]
@pytest.mark.parametrize("stag_location", ["CELL_XLOW", "CELL_YLOW", "CELL_ZLOW"])
@pytest.mark.parametrize(
"permute_dims", [False, pytest.param(True, marks=pytest.mark.long)]
)
def test_from_field_aligned_staggered(
self, bout_xyt_example_files, stag_location, permute_dims
):
dataset_list = bout_xyt_example_files(
None, lengths=(3, 3, 4, 8), nxpe=1, nype=1, nt=1
)
with pytest.warns(UserWarning):
ds = open_boutdataset(
datapath=dataset_list, inputfilepath=None, keep_xboundaries=False
)
ds["psixy"] = ds["x"]
ds["Rxy"] = ds["x"]
ds["Zxy"] = ds["y"]
ds = apply_geometry(ds, "toroidal")
# set up test variable
n = ds["n"].load()
zShift = ds["zShift"].load()
for t in range(ds.sizes["t"]):
for x in range(ds.sizes["x"]):
for y in range(ds.sizes["theta"]):
zShift[x, y] = (
(x * ds.sizes["theta"] + y) * 2.0 * np.pi / ds.sizes["zeta"]
)
for z in range(ds.sizes["zeta"]):
n[t, x, y, z] = 1000.0 * t + 100.0 * x + 10.0 * y + z
n.attrs["direction_y"] = "Aligned"
ds["T"].attrs["direction_y"] = "Aligned"
if permute_dims:
n = n.transpose("t", "zeta", "x", "theta").compute()
n_nal = n.bout.from_field_aligned().copy(deep=True)
if permute_dims:
n_nal = n_nal.transpose("t", "x", "theta", "zeta").compute()
assert n_nal.direction_y == "Standard"
# make 'n' staggered
ds["n"].attrs["cell_location"] = stag_location
if stag_location != "CELL_ZLOW":
with pytest.raises(ValueError):
# Check exception raised when needed zShift_CELL_*LOW is not present
ds["n"].bout.from_field_aligned()
ds["zShift_" + stag_location] = zShift
ds["zShift_" + stag_location].attrs["cell_location"] = stag_location
ds = ds.set_coords("zShift_" + stag_location)
ds = ds.drop_vars("zShift")
with pytest.raises(ValueError):
# Check shifting non-staggered field fails without zShift
ds["T"].bout.from_field_aligned()
n_stag_nal = ds["n"].bout.from_field_aligned()
npt.assert_equal(n_stag_nal.values, n_nal.values)
[docs]
@pytest.mark.long
def test_interpolate_parallel_region_core(self, bout_xyt_example_files):
dataset_list, grid_ds = bout_xyt_example_files(
None,
lengths=(2, 3, 16, 3),
nxpe=1,
nype=1,
nt=1,
grid="grid",
guards={"y": 2},
topology="core",
)
ds = open_boutdataset(
datapath=dataset_list,
gridfilepath=grid_ds,
geometry="toroidal",
keep_yboundaries=True,
)
n = ds["n"]
thetalength = 2.0 * np.pi
dtheta = thetalength / 16.0
theta = xr.DataArray(
np.linspace(0.0 - 1.5 * dtheta, thetalength + 1.5 * dtheta, 20),
dims="theta",
)
dtheta_fine = thetalength / 128.0
theta_fine = xr.DataArray(
np.linspace(0.0 + dtheta_fine / 2.0, thetalength - dtheta_fine / 2.0, 128),
dims="theta",
)
def f(t):
t = np.sin(t)
return t**3 - t**2 + t - 1.0
n.data = f(theta).broadcast_like(n)
n_highres = n.bout.interpolate_parallel("core")
expected = f(theta_fine).broadcast_like(n_highres)
npt.assert_allclose(n_highres.values, expected.values, rtol=0.0, atol=1.0e-2)
[docs]
@pytest.mark.parametrize(
"res_factor",
[
pytest.param(2, marks=pytest.mark.long),
3,
pytest.param(7, marks=pytest.mark.long),
pytest.param(18, marks=pytest.mark.long),
],
)
def test_interpolate_parallel_region_core_change_n(
self, bout_xyt_example_files, res_factor
):
dataset_list, grid_ds = bout_xyt_example_files(
None,
lengths=(2, 3, 16, 3),
nxpe=1,
nype=1,
nt=1,
grid="grid",
guards={"y": 2},
topology="core",
)
ds = open_boutdataset(
datapath=dataset_list,
gridfilepath=grid_ds,
geometry="toroidal",
keep_yboundaries=True,
)
n = ds["n"]
thetalength = 2.0 * np.pi
dtheta = thetalength / 16.0
theta = xr.DataArray(
np.linspace(0.0 - 1.5 * dtheta, thetalength + 1.5 * dtheta, 20),
dims="theta",
)
dtheta_fine = thetalength / res_factor / 16.0
theta_fine = xr.DataArray(
np.linspace(
0.0 + dtheta_fine / 2.0,
thetalength - dtheta_fine / 2.0,
res_factor * 16,
),
dims="theta",
)
def f(t):
t = np.sin(t)
return t**3 - t**2 + t - 1.0
n.data = f(theta).broadcast_like(n)
n_highres = n.bout.interpolate_parallel("core", n=res_factor)
expected = f(theta_fine).broadcast_like(n_highres)
npt.assert_allclose(n_highres.values, expected.values, rtol=0.0, atol=1.0e-2)
[docs]
@pytest.mark.long
def test_interpolate_parallel_region_sol(self, bout_xyt_example_files):
dataset_list, grid_ds = bout_xyt_example_files(
None,
lengths=(2, 3, 16, 3),
nxpe=1,
nype=1,
nt=1,
grid="grid",
guards={"y": 2},
topology="sol",
)
ds = open_boutdataset(
datapath=dataset_list,
gridfilepath=grid_ds,
geometry="toroidal",
keep_yboundaries=True,
)
n = ds["n"]
thetalength = 2.0 * np.pi
dtheta = thetalength / 16.0
theta = xr.DataArray(
np.linspace(0.0 - 1.5 * dtheta, thetalength + 1.5 * dtheta, 20),
dims="theta",
)
dtheta_fine = thetalength / 128.0
theta_fine = xr.DataArray(
np.linspace(0.0 - 1.5 * dtheta_fine, thetalength + 1.5 * dtheta_fine, 132),
dims="theta",
)
def f(t):
t = np.sin(t)
return t**3 - t**2 + t - 1.0
n.data = f(theta).broadcast_like(n)
n_highres = n.bout.interpolate_parallel("SOL")
expected = f(theta_fine).broadcast_like(n_highres)
npt.assert_allclose(n_highres.values, expected.values, rtol=0.0, atol=1.0e-2)
[docs]
def test_interpolate_parallel_region_singlenull(self, bout_xyt_example_files):
dataset_list, grid_ds = bout_xyt_example_files(
None,
lengths=(2, 3, 16, 3),
nxpe=1,
nype=3,
nt=1,
grid="grid",
guards={"y": 2},
topology="single-null",
)
ds = open_boutdataset(
datapath=dataset_list,
gridfilepath=grid_ds,
geometry="toroidal",
keep_yboundaries=True,
)
n = ds["n"]
thetalength = 2.0 * np.pi
dtheta = thetalength / 48.0
theta = xr.DataArray(
np.linspace(0.0 - 1.5 * dtheta, thetalength + 1.5 * dtheta, 52),
dims="theta",
)
dtheta_fine = thetalength / 3.0 / 128.0
theta_fine = xr.DataArray(
np.linspace(
0.0 + 0.5 * dtheta_fine, thetalength - 0.5 * dtheta_fine, 3 * 128
),
dims="theta",
)
def f(t):
t = np.sin(3.0 * t)
return t**3 - t**2 + t - 1.0
n.data = f(theta).broadcast_like(n)
f_fine = f(theta_fine)[:128]
for region in ["inner_PFR", "inner_SOL"]:
n_highres = n.bout.interpolate_parallel(region).isel(theta=slice(2, None))
expected = f_fine.broadcast_like(n_highres)
npt.assert_allclose(
n_highres.values, expected.values, rtol=0.0, atol=1.0e-2
)
for region in ["core", "SOL"]:
n_highres = n.bout.interpolate_parallel(region)
expected = f_fine.broadcast_like(n_highres)
npt.assert_allclose(
n_highres.values, expected.values, rtol=0.0, atol=1.0e-2
)
for region in ["outer_PFR", "outer_SOL"]:
n_highres = n.bout.interpolate_parallel(region).isel(theta=slice(-2))
expected = f_fine.broadcast_like(n_highres)
npt.assert_allclose(
n_highres.values, expected.values, rtol=0.0, atol=1.0e-2
)
[docs]
def test_interpolate_parallel(self, bout_xyt_example_files):
dataset_list, grid_ds = bout_xyt_example_files(
None,
lengths=(2, 3, 16, 3),
nxpe=1,
nype=3,
nt=1,
grid="grid",
guards={"y": 2},
topology="single-null",
)
ds = open_boutdataset(
datapath=dataset_list,
gridfilepath=grid_ds,
geometry="toroidal",
keep_yboundaries=True,
)
n = ds["n"]
thetalength = 2.0 * np.pi
dtheta = thetalength / 48.0
theta = xr.DataArray(
np.linspace(0.0 - 1.5 * dtheta, thetalength + 1.5 * dtheta, 52),
dims="theta",
)
dtheta_fine = thetalength / 3.0 / 128.0
theta_fine = xr.DataArray(
np.linspace(
0.0 + 0.5 * dtheta_fine, thetalength - 0.5 * dtheta_fine, 3 * 128
),
dims="theta",
)
x = xr.DataArray(np.arange(3), dims="x")
def f_y(t):
t = np.sin(3.0 * t)
return t**3 - t**2 + t - 1.0
f = f_y(theta) * (x + 1.0)
n.data = f.broadcast_like(n)
f_fine = f_y(theta_fine) * (x + 1.0)
n_highres = n.bout.interpolate_parallel().isel(theta=slice(2, -2))
expected = f_fine.broadcast_like(n_highres)
npt.assert_allclose(n_highres.values, expected.values, rtol=0.0, atol=1.1e-2)
[docs]
def test_interpolate_parallel_sol(self, bout_xyt_example_files):
dataset_list, grid_ds = bout_xyt_example_files(
None,
lengths=(2, 3, 16, 3),
nxpe=1,
nype=1,
nt=1,
grid="grid",
guards={"y": 2},
topology="sol",
)
ds = open_boutdataset(
datapath=dataset_list,
gridfilepath=grid_ds,
geometry="toroidal",
keep_yboundaries=True,
)
n = ds["n"]
thetalength = 2.0 * np.pi
dtheta = thetalength / 16.0
theta = xr.DataArray(
np.linspace(0.0 - 1.5 * dtheta, thetalength + 1.5 * dtheta, 20),
dims="theta",
)
dtheta_fine = thetalength / 128.0
theta_fine = xr.DataArray(
np.linspace(0.0 + 0.5 * dtheta_fine, thetalength - 0.5 * dtheta_fine, 128),
dims="theta",
)
x = xr.DataArray(np.arange(3), dims="x")
def f_y(t):
t = np.sin(t)
return t**3 - t**2 + t - 1.0
f = f_y(theta) * (x + 1.0)
n.data = f.broadcast_like(n)
f_fine = f_y(theta_fine) * (x + 1.0)
n_highres = n.bout.interpolate_parallel().isel(theta=slice(2, -2))
expected = f_fine.broadcast_like(n_highres)
npt.assert_allclose(n_highres.values, expected.values, rtol=0.0, atol=1.1e-2)
[docs]
def test_interpolate_parallel_toroidal_points(self, bout_xyt_example_files):
dataset_list, grid_ds = bout_xyt_example_files(
None,
lengths=(2, 3, 16, 3),
nxpe=1,
nype=3,
nt=1,
grid="grid",
guards={"y": 2},
topology="single-null",
)
ds = open_boutdataset(
datapath=dataset_list,
gridfilepath=grid_ds,
geometry="toroidal",
keep_yboundaries=True,
)
n_highres = ds["n"].bout.interpolate_parallel()
n_highres_truncated = ds["n"].bout.interpolate_parallel(toroidal_points=2)
xrt.assert_identical(n_highres_truncated, n_highres.isel(zeta=[0, 2]))
[docs]
def test_interpolate_parallel_toroidal_points_list(self, bout_xyt_example_files):
dataset_list, grid_ds = bout_xyt_example_files(
None,
lengths=(2, 3, 16, 3),
nxpe=1,
nype=3,
nt=1,
grid="grid",
guards={"y": 2},
topology="single-null",
)
ds = open_boutdataset(
datapath=dataset_list,
gridfilepath=grid_ds,
geometry="toroidal",
keep_yboundaries=True,
)
n_highres = ds["n"].bout.interpolate_parallel()
points_list = [1, 2]
n_highres_truncated = ds["n"].bout.interpolate_parallel(
toroidal_points=points_list
)
xrt.assert_identical(n_highres_truncated, n_highres.isel(zeta=points_list))
[docs]
def test_interpolate_to_cartesian(self, bout_xyt_example_files):
dataset_list = bout_xyt_example_files(
None, lengths=(2, 16, 17, 18), nxpe=1, nype=1, nt=1
)
with pytest.warns(UserWarning):
ds = open_boutdataset(
datapath=dataset_list, inputfilepath=None, keep_xboundaries=False
)
ds["psixy"] = ds["g11"].copy(deep=True)
ds["Rxy"] = ds["g11"].copy(deep=True)
ds["Zxy"] = ds["g11"].copy(deep=True)
r = np.linspace(1.0, 2.0, ds.metadata["nx"])
theta = np.linspace(0.0, 2.0 * np.pi, ds.metadata["ny"])
R = r[:, np.newaxis] * np.cos(theta[np.newaxis, :])
Z = r[:, np.newaxis] * np.sin(theta[np.newaxis, :])
ds["Rxy"].values[:] = R
ds["Zxy"].values[:] = Z
ds = apply_geometry(ds, "toroidal")
da = ds["n"]
da.values[:] = 1.0
nX = 30
nY = 30
nZ = 10
da_cartesian = da.bout.interpolate_to_cartesian(nX, nY, nZ)
# Check a point inside the original grid
npt.assert_allclose(
da_cartesian.isel(t=0, X=round(nX * 4 / 5), Y=nY // 2, Z=nZ // 2).item(),
1.0,
rtol=1.0e-15,
atol=1.0e-15,
)
# Check a point outside the original grid
assert np.isnan(da_cartesian.isel(t=0, X=0, Y=0, Z=0).item())
# Check output is float32
assert da_cartesian.dtype == np.float32
[docs]
def test_add_cartesian_coordinates(self, bout_xyt_example_files):
dataset_list = bout_xyt_example_files(None, nxpe=1, nype=1, nt=1)
with pytest.warns(UserWarning):
ds = open_boutdataset(
datapath=dataset_list, inputfilepath=None, keep_xboundaries=False
)
ds["psixy"] = ds["g11"].copy(deep=True)
ds["Rxy"] = ds["g11"].copy(deep=True)
ds["Zxy"] = ds["g11"].copy(deep=True)
r = np.linspace(1.0, 2.0, ds.metadata["nx"])
theta = np.linspace(0.0, 2.0 * np.pi, ds.metadata["ny"])
R = r[:, np.newaxis] * np.cos(theta[np.newaxis, :])
Z = r[:, np.newaxis] * np.sin(theta[np.newaxis, :])
ds["Rxy"].values[:] = R
ds["Zxy"].values[:] = Z
ds = apply_geometry(ds, "toroidal")
zeta = ds["zeta"].values
da = ds["n"].bout.add_cartesian_coordinates()
npt.assert_allclose(
da["X_cartesian"],
R[:, :, np.newaxis] * np.cos(zeta[np.newaxis, np.newaxis, :]),
)
npt.assert_allclose(
da["Y_cartesian"],
R[:, :, np.newaxis] * np.sin(zeta[np.newaxis, np.newaxis, :]),
)
npt.assert_allclose(
da["Z_cartesian"],
Z[:, :, np.newaxis] * np.ones(ds.metadata["nz"])[np.newaxis, np.newaxis, :],
)
[docs]
def test_ddx(self, bout_xyt_example_files):
nx = 64
dataset_list = bout_xyt_example_files(
None,
lengths=(2, nx, 4, 3),
nxpe=1,
nype=1,
)
with pytest.warns(UserWarning):
ds = open_boutdataset(
datapath=dataset_list,
)
n = ds["n"]
t = ds["t"].broadcast_like(n)
ds["x_1d"] = _1d_coord_from_spacing(ds["dx"], "x")
x = ds["x_1d"].broadcast_like(n)
y = ds["y"].broadcast_like(n)
z = ds["z"].broadcast_like(n)
n.values[:] = (np.sin(12.0 * x / nx) * (1.0 + t + y + z)).values
expected = 12.0 / nx * np.cos(12.0 * x / nx) * (1.0 + t + y + z)
npt.assert_allclose(
n.bout.ddx().isel(x=slice(1, -1)).values,
expected.isel(x=slice(1, -1)).values,
rtol=1.0e-2,
atol=1.0e-13,
)
[docs]
def test_ddy(self, bout_xyt_example_files):
ny = 64
dataset_list, gridfilepath = bout_xyt_example_files(
None,
lengths=(2, 3, ny, 4),
nxpe=1,
nype=1,
grid="grid",
)
ds = open_boutdataset(
datapath=dataset_list, geometry="toroidal", gridfilepath=gridfilepath
)
n = ds["n"]
t = ds["t"].broadcast_like(n)
x = ds["x"].broadcast_like(n)
y = ds["theta"].broadcast_like(n)
z = ds["zeta"].broadcast_like(n)
n.values[:] = (np.sin(3.0 * y / ny) * (1.0 + t + x + z)).values
expected = 3.0 / ny * np.cos(3.0 * y / ny) * (1.0 + t + x + z)
npt.assert_allclose(
n.bout.ddy().isel(theta=slice(1, -1)).values,
expected.isel(theta=slice(1, -1)).values,
rtol=1.0e-2,
atol=1.0e-13,
)
[docs]
def test_ddz(self, bout_xyt_example_files):
dataset_list = bout_xyt_example_files(
None,
lengths=(2, 3, 4, 64),
nxpe=1,
nype=1,
)
with pytest.warns(UserWarning):
ds = open_boutdataset(
datapath=dataset_list,
)
n = ds["n"]
t = ds["t"].broadcast_like(n)
x = ds["x"].broadcast_like(n)
y = ds["y"].broadcast_like(n)
z = ds["z"].broadcast_like(n)
n.values[:] = (np.sin(z) * (1.0 + t + x + y)).values
expected = np.cos(z) * (1.0 + t + x + y)
npt.assert_allclose(
n.bout.ddz().values, expected.values, rtol=1.0e-2, atol=1.0e-13
)
[docs]
def test_derivatives_doublenull(self, bout_xyt_example_files):
# Check function does not error on double-null topology
dataset_list, grid_ds = bout_xyt_example_files(
None,
lengths=(2, 3, 4, 3),
nxpe=1,
nype=6,
nt=1,
grid="grid",
guards={"x": 2, "y": 2},
topology="connected-double-null",
)
ds = open_boutdataset(
datapath=dataset_list,
gridfilepath=grid_ds,
geometry="toroidal",
keep_yboundaries=True,
)
ds["n"].bout.ddx()
ds["n"].bout.ddy()
ds["n"].bout.ddz()