"""
The **harmo** module provides functions for transforming spherical harmonics datasets into spatial grid representations
and vice versa.
This module includes functions to:
* Convert spherical harmonics datasets into spatial grid DataArray.
* Convert spatial grid DataArray into spherical harmonics datasets.
* Compute associated Legendre functions.
* Calculate mid-month estimates for GRACE data products.
* Change the associated normalization to a spherical harmonics dataset.
* Compute scaling factors for unit conversions between spherical harmonics and grid data.
* Validate spherical harmonics and grid datasets.
"""
import datetime
import inspect
import pathlib
import warnings
from typing import Literal
import cf_xarray as cfxr
import numpy as np
import pandas as pd
import scipy as sc
import xarray as xr
from lenapy.constants import *
from lenapy.utils.gravity import estimate_normal_gravity
def _compute_factors(
lmax: int, normalization: Literal["4pi", "ortho", "schmidt"]
) -> tuple[np.ndarray, np.ndarray, float, float]:
"""
Compute recurrence coefficients f1 and f2 for the Legendre recursion.
Parameters
----------
lmax : int
Maximum degree.
normalization : {'4pi', 'ortho', 'schmidt'}
Normalization scheme.
Returns
-------
f1 : np.ndarray
Recurrence factor f1.
f2 : np.ndarray
Recurrence factor f2.
norm_p10 : float
Normalization for P(1,0).
norm_4pi : float
Overall normalization factor.
"""
size = (lmax + 1) * (lmax + 2) // 2
f1 = np.zeros(size)
f2 = np.zeros(size)
k = 2
if normalization in ("4pi", "ortho"):
norm_p10 = np.sqrt(3)
norm_4pi = 1 if normalization == "4pi" else 4 * np.pi
for l in range(2, lmax + 1):
k += 1
f1[k] = np.sqrt(2 * l - 1) * np.sqrt(2 * l + 1) / l
f2[k] = (l - 1) * np.sqrt(2 * l + 1) / (np.sqrt(2 * l - 3) * l)
for m in range(1, l - 1):
k += 1
f1[k] = (
np.sqrt(2 * l + 1)
* np.sqrt(2 * l - 1)
/ (np.sqrt(l + m) * np.sqrt(l - m))
)
f2[k] = (
np.sqrt(2 * l + 1)
* np.sqrt(l - m - 1)
* np.sqrt(l + m - 1)
/ (np.sqrt(2 * l - 3) * np.sqrt(l + m) * np.sqrt(l - m))
)
k += 2
elif normalization == "schmidt":
norm_p10 = 1
norm_4pi = 1
for l in range(2, lmax + 1):
k += 1
f1[k] = (2 * l - 1) / l
f2[k] = (l - 1) / l
for m in range(1, l - 1):
k += 1
f1[k] = (2 * l - 1) / (np.sqrt(l + m) * np.sqrt(l - m))
f2[k] = (
np.sqrt(l - m - 1)
* np.sqrt(l + m - 1)
/ (np.sqrt(l + m) * np.sqrt(l - m))
)
k += 2
else:
raise ValueError(
(
f"Unknown normalization '{normalization}'. "
"It should be either "
"'4pi', 'ortho' or 'schmidt'"
)
)
return f1, f2, norm_p10, norm_4pi
def _generate_grid(
bounds: list[float],
dlon: float,
dlat: float,
longitude: np.ndarray | None,
latitude: np.ndarray | None,
radians_in: bool,
) -> tuple[np.ndarray, np.ndarray]:
"""
Generate longitude and latitude arrays for the grid.
load longitude and latitude if given
if not : compute longitude and latitude in degrees between given or defaults bounds
Parameters
----------
bounds : list of float
[lonmin, lonmax, latmin, latmax]
dlon, dlat : float
Grid spacing in degrees.
longitude, latitude : np.ndarray or None
Optionally provided coordinate arrays.
radians_in : bool
Whether the input arrays are in radians.
Returns
-------
longitude : np.ndarray
Array of longitudes in degrees.
latitude : np.ndarray
Array of latitudes in degrees.
"""
if longitude is None:
longitude = np.arange(bounds[0] + dlon / 2.0, bounds[1] + dlon / 2.0, dlon)
elif radians_in:
longitude = np.rad2deg(longitude)
if latitude is None:
latitude = np.arange(bounds[2] + dlat / 2.0, bounds[3] + dlat / 2.0, dlat)
elif radians_in:
latitude = np.rad2deg(latitude)
return longitude, latitude
def _init_bounds_and_grid(
bounds: list[float] | None,
lonmin: float,
lonmax: float,
latmin: float,
latmax: float,
dlon: float,
dlat: float,
radians_in: bool,
) -> tuple[list[float], float, float]:
"""
Initialize bounds and grid resolution.
Parameters
----------
bounds: list of float or None
[lonmin, lonmax, latmin, latmax]
lonmin, lonmax, latmin, latmax: float
Default bounds if `bounds` is None.
dlon, dlat: float
Grid resolution in degrees or radians.
radians_in: bool
Whether the inputs are in radians.
Returns
-------
bounds: list of float
Bounds in degrees.
dlon: float
Longitude resolution in degrees.
dlat: float
Latitude resolution in degrees.
"""
if bounds is None:
bounds = [lonmin, lonmax, latmin, latmax]
if not isinstance(bounds, list) or len(bounds) != 4:
raise TypeError('"bounds" must be a list of 4 elements')
if radians_in:
bounds = [np.rad2deg(b) for b in bounds]
if dlon != 1:
dlon = np.rad2deg(dlon)
if dlat != 1:
dlat = np.rad2deg(dlat)
return bounds, dlon, dlat
def _handle_mass_conservation(
used_l: np.ndarray,
force_mass_conservation: bool,
) -> tuple[np.ndarray, bool, bool]:
"""
Test if mass conservation has to be forced to remove mass induced by the projection of C2n,0 coefficients
Parameters
----------
used_l: np.ndarray
Degrees used in the computation.
force_mass_conservation: bool
Whether to enforce mass conservation.
Returns
-------
used_l: np.ndarray
Possibly modified degrees list.
use_czero_coef: bool
Whether to re-add degree 0 later.
force_mass_conservation: bool
Updated flag (can be False if nothing to conserve).
"""
if force_mass_conservation and 0 in used_l:
if len(used_l) > 1:
return used_l[1:], True, force_mass_conservation
elif len(used_l) == 1:
return used_l, False, False
return used_l, False, force_mass_conservation
def _init_degrees(
data: xr.Dataset,
lmin: int = None,
lmax: int = None,
mmin: int = None,
mmax: int = None,
used_l: np.ndarray = None,
used_m: np.ndarray = None,
) -> tuple[np.ndarray, np.ndarray, int, int, int, int]:
"""
Initialize spherical harmonic degrees and orders to be used.
It prioritizes used_l and used_m if given over lmin, lmax, mmin and mmax
Parameters
----------
data : xr.Dataset
Spherical harmonics dataset with 'l' and 'm' dimensions.
lmin, lmax : int or None
Minimum and maximum degrees.
mmin, mmax : int or None
Minimum and maximum orders.
used_l, used_m : np.ndarray or None
Explicit list of degrees and orders to use.
Returns
-------
used_l: np.ndarray
Degrees to use.
used_m: np.ndarray
Orders to use.
lmin: int
Minimum degree
lmax: int
Maximum degree.
mmin:
Minimum order.
mmax:
Maximum order.
"""
lmax = int(data.l.max()) if lmax is None else lmax
mmax = int(min(lmax, data.m.max())) if mmax is None else mmax
lmin = int(data.l.min()) if lmin is None else lmin
mmin = int(data.m.min()) if mmin is None else mmin
used_l = np.arange(lmin, lmax + 1) if used_l is None else used_l
used_m = np.arange(mmin, mmax + 1) if used_m is None else used_m
return used_l, used_m, lmin, lmax, mmin, mmax
[docs]
def sh_to_grid(
data: xr.Dataset,
unit: Literal["mewh", "mmgeoid", "microGal", "bar", "mvcu", "norm"] = "mewh",
errors=False,
lmax: int | None = None,
mmax: int | None = None,
lmin: int | None = None,
mmin: int | None = None,
used_l: np.ndarray | None = None,
used_m: np.ndarray | None = None,
lonmin: float = -180,
lonmax: float = 180,
latmin: float = -90,
latmax: float = 90,
bounds: list = None,
dlon: float = 1,
dlat: float = 1,
longitude: np.ndarray | None = None,
latitude: np.ndarray | None = None,
radians_in: bool = False,
force_mass_conservation: bool = False,
ellipsoidal_earth: bool = False,
include_elastic: bool = True,
plm: xr.DataArray = None,
normalization_plm: Literal["4pi", "ortho", "schmidt"] = "4pi",
**kwargs,
) -> xr.DataArray:
"""
Transform Spherical Harmonics (SH) dataset into spatial DataArray.
With choice for constants, unit, love numbers, degree/order, spatial grid latitude and longitude, Earth hypothesis.
For details on unit transformations, see :func:`l_factor_conv` and the
associated `PDF document <../../_static/Mathematics_consideration_for_LENAPY.pdf>`_.
Parameters
----------
data : xr.Dataset
xr.Dataset that corresponds to SH data to convert into spatial representation.
unit : str, optional
'mewh', 'mmgeoid', 'microGal', 'bar', 'mvcu', or 'norm'
Unit of the spatial data used in the transformation. Default is 'mewh' for meters of Equivalent Water Height.
See utils.harmo.l_factor_conv() doc for details on the units.
errors : bool, optional
If True, propagate the errors from eclm and eslm variables to create an error grid. Default is False.
lmax : int, optional
Maximal degree of the SH coefficients to be used.
mmax : int, optional
Maximal order of the SH coefficients to be used.
lmin : int, optional
Minimal degree of the SH coefficients to be used.
mmin : int, optional
Minimal order of the SH coefficients to be used.
used_l : np.ndarray, optional
List of degree to use for the grid computation (if given, lmax and lmin are not considered).
used_m : np.ndarray, optional
List of order to use for the grid computation (if given, mmax and mmin are not considered).
lonmin : float, optional
Minimal longitude of the future grid.
lonmax : float, optional
Maximal longitude of the future grid.
latmin : float, optional
Minimal latitude of the future grid.
latmax : float, optional
Maximal latitude of the future grid.
bounds : list, optional
List of 4 elements with [lonmin, lonmax, latmin, latmax] (if given min/max information are not considered).
radians_in : bool, optional
True if the unit of the given latitude and longitude information is radians. Default is False for degree unit.
If radians_in is True and dlat or dlon are given, they are considered as radians.
dlon : float, optional
Spacing of the longitude values.
dlat : float, optional
Spacing of the latitude values.
longitude : np.ndarray, optional
List of longitude to use for the grid computation (if given, others longitude information are not considered).
latitude : np.ndarray, optional
List of latitude to use for the grid computation (if given, others latitude information are not considered).
force_mass_conservation : bool, optional
If True, force that the grid resulting from all coefficients except C0 has a null global mass. Default is False
ellipsoidal_earth : bool, optional
If True, consider the Earth as an ellipsoid following [Ditmar2018]. Default is False for a spherical Earth.
include_elastic : bool, optional
If True, the Earth behavior is elastic. Default is True
plm : xr.DataArray, optional
Precomputed plm values as a xr.DataArray variable. For example with the code :
plm = xr.DataArray(compute_plm(lmax, sin_latitude), dims=['l', 'm', 'latitude'],
coords={'l': data.l, 'm': data.m, 'latitude': latitude})
normalization_plm : str, optional
If plm need to be computed, choice of the norm corresponding to the SH dataset.
Either '4pi', 'ortho', or 'schmidt' for 4pi normalized, orthonormalized, or Schmidt semi-normalized SH
functions, respectively. Default is '4pi'.
**kwargs :
Supplementary parameters used by the function l_factor_conv to modify defaults constants used in the computation
for the unit conversion. These parameters include (see :func:`l_factor_conv` documentation for more details) :
a_earth, gm_earth, f_earth, omega_earth, rho_earth, ds_love
Returns
-------
xgrid : xr.DataArray
Spatial representation of the SH Dataset in the chosen unit.
"""
# add mask in output variable
used_l, used_m, lmin, lmax, mmin, mmax = _init_degrees(
data, lmin, lmax, mmin, mmax, used_l, used_m
)
used_l, use_czero_coef, force_mass_conservation = _handle_mass_conservation(
used_l, force_mass_conservation
)
bounds, dlon, dlat = _init_bounds_and_grid(
bounds, lonmin, lonmax, latmin, latmax, dlon, dlat, radians_in
)
longitude, latitude = _generate_grid(
bounds, dlon, dlat, longitude, latitude, radians_in
)
cos_latitude = np.cos(np.deg2rad(latitude))
sin_latitude = np.sin(np.deg2rad(latitude))
f_earth = kwargs["f_earth"] if "f_earth" in kwargs else LNPY_F_EARTH_GRS80
geocentric_colat = xr.DataArray(
np.arctan2(cos_latitude, (1 - f_earth) ** 2 * sin_latitude),
dims=["latitude"],
coords={"latitude": latitude},
)
# -- beginning of computation
# Computing plm for converting to spatial domain
if plm is None:
if ellipsoidal_earth:
plm = compute_plm(
lmax,
np.cos(geocentric_colat),
mmax=mmax,
normalization=normalization_plm,
)
else:
plm = compute_plm(
lmax, sin_latitude, mmax=mmax, normalization=normalization_plm
)
plm = xr.DataArray(
plm,
dims=["l", "m", "latitude"],
coords={
"l": np.arange(lmax + 1),
"m": np.arange(mmax + 1),
"latitude": latitude,
},
)
else:
# Verify plm integrity
if (
not isinstance(plm, xr.DataArray)
or "l" not in plm.coords
or "m" not in plm.coords
or "latitude" not in plm.coords
):
raise TypeError(
'Given argument "plm" has to be a DataArray with 3 coordinates [l, m, latitude]'
)
elif plm.l.max() < lmax:
raise AssertionError(
'Given argument "plm" maximal degree is too small ',
plm.l.max(),
"<",
lmax,
)
elif (plm.latitude.values != latitude).all():
raise AssertionError(
'Given argument "plm" latitude does not correspond to the wanted latitude ',
latitude,
)
# scale factor for each degree
lfactor, cst = l_factor_conv(
used_l,
unit=unit,
include_elastic=include_elastic,
ellipsoidal_earth=ellipsoidal_earth,
geocentric_colat=geocentric_colat,
attrs=data.attrs,
**kwargs,
)
# convolve unit over degree
plm_lfactor = plm.sel(l=used_l, m=used_m) * lfactor
# Calculating cos(m*phi) and sin(m*phi)
c_cos = xr.DataArray(
np.cos(used_m[:, np.newaxis] @ np.deg2rad(longitude)[np.newaxis, :]),
dims=["m", "longitude"],
coords={"m": used_m, "longitude": longitude},
)
s_sin = xr.DataArray(
np.sin(used_m[:, np.newaxis] @ np.deg2rad(longitude)[np.newaxis, :]),
dims=["m", "longitude"],
coords={"m": used_m, "longitude": longitude},
)
# summation over all spherical harmonic degrees
if not errors:
d_clm = (plm_lfactor * data.sel(l=used_l, m=used_m).clm).sum(dim="l")
d_slm = (plm_lfactor * data.sel(l=used_l, m=used_m).slm).sum(dim="l")
# Final calcul on the grid
xgrid = c_cos.dot(d_clm) + s_sin.dot(d_slm)
else:
d_clm = (plm_lfactor**2 * data.sel(l=used_l, m=used_m).clm ** 2).sum(dim="l")
d_slm = (plm_lfactor**2 * data.sel(l=used_l, m=used_m).slm ** 2).sum(dim="l")
# Final calcul of sigma on the grid
xgrid = np.sqrt((c_cos**2).dot(d_clm) + (s_sin**2).dot(d_slm))
unit = "Errors in " + unit
# force mass conservation by removing the global mass computed without C0 coefficient (generated by C2n,0 coeffs)
if force_mass_conservation and not errors:
if ellipsoidal_earth:
raise ValueError(
'Mass conservation set to True with argument "force_mass_conservation" cannot be'
" forced on ellipsoidal Earth"
)
int_fact = xgrid.lngeo.surface_cell(ellipsoidal_earth=False, a_earth=1) / (
4 * np.pi
)
# remove the mass on the whole grid
xgrid = (
xgrid
- (xgrid * int_fact).sum(dim=["latitude", "longitude"]) / int_fact.sum()
)
# restore C0 mass
if use_czero_coef:
lfactor_zero = l_factor_conv(
np.array([0]), unit=unit, attrs=data.attrs, **kwargs
)[0]
xgrid = xgrid + (lfactor_zero * data.clm.sel(l=0, m=0)).values
xgrid = xgrid.transpose("latitude", "longitude", ...)
xgrid.attrs = {"units": unit, "max_degree": int(lmax)}
if "radius" in data.attrs:
xgrid.attrs["radius"] = data.attrs["radius"]
if "earth_gravity_constant" in data.attrs:
xgrid.attrs["earth_gravity_constant"] = data.attrs["earth_gravity_constant"]
return xgrid
[docs]
def grid_to_sh(
grid: xr.DataArray,
lmax: int,
unit: Literal["mewh", "mmgeoid", "microGal", "bar", "mvcu", "norm"] = "mewh",
mmax: int | None = None,
lmin: int = 0,
mmin: int = 0,
used_l: np.ndarray | None = None,
used_m: np.ndarray | None = None,
ellipsoidal_earth: bool = False,
include_elastic: bool = True,
plm: xr.DataArray | None = None,
normalization_plm: Literal["4pi", "ortho", "schmidt"] = "4pi",
**kwargs,
) -> xr.Dataset:
"""
Transform gravity field spatial representation DataArray into Spherical Harmonics (SH) dataset.
With choice for constants, unit of the spatial DataArray, love_numbers, degree/order, Earth hypothesis.
For details on unit transformations, see :func:`l_factor_conv` and the
associated `PDF document <../../_static/Mathematics_consideration_for_LENAPY.pdf>`_.
Parameters
----------
grid : xr.DataArray
xr.Dataset that corresponds a gravity field spatial representation in a unit to convert into SH.
lmax : int
Maximal degree of the SH coefficients to be computed.
unit : str, optional
'mewh', 'mmgeoid', 'microGal', 'bar', 'mvcu', or 'norm'
Unit of the spatial data used in the transformation. Default is 'mewh' for meters of Equivalent Water Height.
See constants.l_factor_conv() doc for details on the units.
mmax : int, optional
Maximal order of the SH coefficients to be computed.
lmin : int, optional
Minimal degree of the SH coefficients to be computed. Default is 0.
mmin : int, optional
Minimal order of the SH coefficients to be computed. Default is 0.
used_l : np.ndarray, optional
List of degree to compute for the SH Dataset (if given, lmax and lmin are not considered).
used_m : np.ndarray, optional
List of order to compute for the SH Dataset (if given, mmax and mmin are not considered).
ellipsoidal_earth : bool, optional
If True, consider the Earth as an ellipsoid following [Ditmar2018]. Default is False for a spherical Earth.
include_elastic : bool, optional
If True, the Earth behavior is elastic. Default is True.
plm : xr.DataArray, optional
Precomputed plm values as a xr.DataArray variable. For example with the code :
plm = xr.DataArray(compute_plm(lmax, np.sin(np.deg2rad(latitude))), dims=['l', 'm', 'latitude'],
coords={'l': np.arange(lmax+1), 'm': np.arange(lmax+1), 'latitude': latitude})
normalization_plm : str, optional
If plm need to be computed, choice of the norm. Either '4pi', 'ortho', or 'schmidt' for
4pi normalized, orthonormalized, or Schmidt semi-normalized SH functions, respectively. Default is '4pi'.
Output SH coefficient will be normalized according to this parameter.
**kwargs :
Supplementary parameters used by the function l_factor_conv to modify defaults constants used in the computation
for the unit conversion. These parameters include (see :func:`l_factor_conv` documentation for more details) :
a_earth, gm_earth, f_earth, omega_earth, rho_earth, ds_love
Returns
-------
ds_out : xr.Dataset
SH Dataset computed from the grid with the chosen unit.
"""
# -- set degree and order default parameters
# it prioritizes used_l and used_m if given over lmin, lmax, mmin and mmax
mmax = lmax if mmax is None else mmax
used_l = np.arange(lmin, lmax + 1) if used_l is None else used_l
used_m = np.arange(mmin, mmax + 1) if used_m is None else used_m
cos_latitude = np.cos(np.deg2rad(grid.cf["latitude"].values))
sin_latitude = np.sin(np.deg2rad(grid.cf["latitude"].values))
f_earth = kwargs["f_earth"] if "f_earth" in kwargs else LNPY_F_EARTH_GRS80
geocentric_colat = xr.DataArray(
np.arctan2(cos_latitude, (1 - f_earth) ** 2 * sin_latitude),
dims=["latitude"],
coords={"latitude": grid.cf["latitude"]},
)
# create DataArray corresponding to the integration factor for each cell
# case for ellipsoidal earth where integration over ellipsoidal cell
if ellipsoidal_earth:
surface = grid.lngeo.surface_cell(ellipsoidal_earth=True, f_earth=f_earth)
int_fact = surface / surface.sum()
# case for spherical earth where integration over spherical cell
else:
int_fact = grid.lngeo.surface_cell(ellipsoidal_earth=False, a_earth=1) / (
4 * np.pi
)
# Create a readable cf_xarray DataArray
int_fact["latitude"].attrs = dict(standard_name="latitude")
int_fact["longitude"].attrs = dict(standard_name="longitude")
# scale factor for each degree
lfactor, cst = l_factor_conv(
used_l,
unit=unit,
include_elastic=include_elastic,
ellipsoidal_earth=ellipsoidal_earth,
geocentric_colat=geocentric_colat,
attrs=grid.attrs,
**kwargs,
)
# -- prepare variables for the computation of SH
# Computing plm for converting to spatial domain
if plm is None:
if ellipsoidal_earth:
plm = compute_plm(
lmax,
np.cos(geocentric_colat),
mmax=mmax,
normalization=normalization_plm,
)
else:
plm = compute_plm(
lmax, sin_latitude, mmax=mmax, normalization=normalization_plm
)
plm = xr.DataArray(
plm,
dims=["l", "m", "latitude"],
coords={
"l": np.arange(lmax + 1),
"m": np.arange(lmax + 1),
"latitude": grid.cf["latitude"],
},
)
else:
# Verify plm integrity
align = xr.align(
plm.latitude, grid.cf["latitude"]
) # return the intersection of latitudes for each input
if (
not isinstance(plm, xr.DataArray)
or "l" not in plm.coords
or "m" not in plm.coords
or "latitude" not in plm.coords
):
raise TypeError(
'Given argument "plm" has to be a DataArray with 3 coordinates [l, m, latitude]'
)
elif plm.l.max() < lmax:
raise AssertionError(
'Given argument "plm" maximal degree is too small ',
plm.l.max(),
"<",
lmax,
)
elif align[0] != plm.latitude.size or align[1].size != grid.cf["latitude"].size:
raise AssertionError(
'Given argument "plm" latitude does not correspond to the grid latitude'
)
# convolve unit over degree
plm_lfactor = plm.sel(l=used_l, m=used_m) / lfactor
# Calculating cos/sin of phi arrays, [m,phi]
c_cos = xr.DataArray(
np.cos(
used_m[:, np.newaxis]
[docs]
@ np.deg2rad(grid.cf["longitude"].values)[np.newaxis, :]
),
dims=["m", "longitude"],
coords={"m": used_m, "longitude": grid.cf["longitude"]},
)
s_sin = xr.DataArray(
np.sin(
used_m[:, np.newaxis] @ np.deg2rad(grid.longitude.values)[np.newaxis, :]
),
dims=["m", "longitude"],
coords={"m": used_m, "longitude": grid.cf["longitude"]},
)
# -- Computation of SH
# Multiplying data and integral factor with sin/cos of m*longitude. This will sum over longitude, [m,theta]
dcos = c_cos.dot(grid.cf.rename_like(int_fact) * int_fact, dim=["longitude"])
dsin = s_sin.dot(grid.cf.rename_like(int_fact) * int_fact, dim=["longitude"])
# Multiplying plm and degree scale factors with last variable to sum over latitude, [l, m, ...]
clm = plm_lfactor.dot(dcos, dim=["latitude"])
slm = plm_lfactor.dot(dsin, dim=["latitude"])
# add name for the merge into xr.Dataset
clm.name = "clm"
slm.name = "slm"
ds_out = xr.merge([clm, slm], join="exact")
cst.update({"max_degree": lmax, "norm": normalization_plm})
ds_out.attrs = cst
return ds_out
def compute_plm(
lmax: int,
z: np.ndarray,
mmax: int = None,
normalization: Literal["4pi", "ortho", "schmidt"] = "4pi",
dtype: complex | float | type[complex] | type[float] = np.float128,
) -> np.ndarray:
"""
Compute all the associated Legendre functions up to a maximum degree and
order using the recursion relation from [Holmes2002]_
(Adapted from SHTOOLS/pyshtools tools [Wieczorek2018]_)
Parameters
----------
lmax : int
Maximum degree of legrendre functions.
z : np.ndarray
Argument of the associated Legendre functions.
mmax : int or NoneType, optional
Maximum order of associated legrendre functions.
normalization : {'4pi', 'ortho', 'schmidt'}, optional
'4pi', 'ortho', or 'schmidt' for use with geodesy 4pi normalized, orthonormalized, or Schmidt semi-normalized
spherical harmonic functions, respectively. Default is '4pi'.
dtype : dtype, optional
Data type of the output array. Default is np.float128.
Returns
-------
plm : np.ndarray
Fully-normalized Legendre functions as a 3D array with "l", "m" and z dimensions.
References
----------
.. [Holmes2002] S. A. Holmes and W. E. Featherstone,
"A unified approach to the Clenshaw summation and the recursive computation of very high degree and order
normalised associated Legendre functions",
*Journal of Geodesy*, 76, 279--299, (2002).
`doi: 10.1007/s00190-002-0216-2 <https://doi.org/10.1007/s00190-002-0216-2>`_
.. [Wieczorek2018] M. A. Wieczorek and M. Meschede,
"SHTools: Tools for working with spherical harmonics",
*Geochemistry, Geophysics, Geosystems*, 19, 2574–2592, (2018).
`doi: 10.1029/2018GC007529 <https://doi.org/10.1029/2018GC007529>`_
"""
if lmax > 300:
warnings.warn(
"No validation made with lmax > 300, the "
"Legendre functions may be unstable because of IEEE754-2008 norm."
)
# removing singleton dimensions of x
# update type to provide more memory for computation (np.float32 create some problems)
z = np.atleast_1d(z).flatten().astype(dtype)
# if default mmax, set mmax to be maximal degree
mmax = lmax if mmax is None else mmax
f1, f2, norm_p10, norm_4pi = _compute_factors(lmax, normalization)
# scale factor based on Holmes2002
scalef = 1e-280
p = np.zeros(((lmax + 1) * (lmax + 2) // 2, len(z)))
# u is sine of colatitude (cosine of latitude), for z=cos(th): u=sin(th)
u = np.sqrt(1 - z**2)
# update where u==0 to minimal numerical precision different from 0 to prevent invalid divisions
u[u == 0] = np.finfo(dtype).eps
# Calculate P(l,0) (not scaled)
p[0, :] = 1 / np.sqrt(norm_4pi)
if lmax: # test for the case where lmax=0
p[1, :] = norm_p10 * z / np.sqrt(norm_4pi)
k = 1
for l in range(2, lmax + 1):
k += l
p[k, :] = f1[k] * z * p[k - l, :] - f2[k] * p[k - 2 * l + 1, :]
# Calculate P(m,m), P(m+1,m), and P(l,m)
pmm = np.sqrt(2) * scalef / np.sqrt(norm_4pi)
rescalem = 1 / scalef
kstart = 0
for m in range(1, lmax + 1):
rescalem = rescalem * u
# Calculate P(m,m)
kstart += m + 1
pmm = pmm * np.sqrt(2 * m + 1) / np.sqrt(2 * m)
if normalization in ("4pi", "ortho"):
p[kstart, :] = pmm
elif normalization == "schmidt":
p[kstart, :] = pmm / np.sqrt(2 * m + 1)
if m != lmax: # test if P(m+1,m) exist
# Calculate P(m+1,m)
k = kstart + m + 1
if normalization in ("4pi", "ortho"):
p[k, :] = z * np.sqrt(2 * m + 3) * pmm
elif normalization == "schmidt":
p[k, :] = z * pmm
else:
# set up k for rescale P(lmax,lmax)
k = kstart
# Calculate P(l,m)
for l in range(m + 2, lmax + 1):
k += l
p[k, :] = z * f1[k] * p[k - l, :] - f2[k] * p[k - 2 * l + 1, :]
p[k - 2 * l + 1, :] = p[k - 2 * l + 1, :] * rescalem
# rescale
p[k, :] = p[k, :] * rescalem
if m != lmax:
p[k - lmax, :] = p[k - lmax, :] * rescalem
# reshape Legendre polynomials to output dimensions (lower triangle array)
plm = np.zeros((lmax + 1, lmax + 1, len(z)))
ind = np.tril_indices(lmax + 1)
plm[ind] = p
# return the legendre polynomials and truncating orders to mmax
return plm[:, : mmax + 1, :]
[docs]
def mid_month_grace_estimate(
begin_time: datetime.datetime, end_time: datetime.datetime
) -> datetime.datetime:
"""
Calculate middle of the month date based on begin_time and end_time for GRACE products.
begin_time is rounded to equal to the first day of the month and
end_time is rounded to equal to the first day of the month after.
Parameters
----------
begin_time : datetime.datetime
Date of the beginning of the month.
end_time : datetime.datetime
Date of the end of the month + 1 day.
Returns
-------
mid_month : datetime.datetime
Date of the middle of the month.
"""
# to compute mid_month, need to round begin_time to the 1st of the month
# deal with GRACE month when the begin date is not between 16 of month before and 15 of the month
# it includes May 2015, Dec 2011 (JPL), Mar 2017 and Oct 2018 that cover second half of month
if (
(begin_time.day <= 15 and begin_time.strftime("%Y%j") != "2015102")
or begin_time.strftime("%Y%j") == "2011351"
or begin_time.strftime("%Y%j") == "2017076"
or begin_time.strftime("%Y%j") == "2018295"
):
tmp_begin = begin_time.replace(day=1)
else:
tmp_begin = (begin_time.replace(day=1) + datetime.timedelta(days=32)).replace(
day=1
)
# to compute mid_month, need to round end_time to the 1st of the month after
# deal with GRACE month when the end date is not between 16 of month and 15 of the month after
# it includes Janv 2004, Nov 2011 (CSR, GFZ) and May 2015 and Dec 2011 for TN14
if (
end_time.day <= 15
and end_time.strftime("%Y%j") not in ("2004014", "2011320", "2015132")
) or end_time.strftime("%Y%j") == "2012016":
tmp_end = end_time.replace(day=1)
else:
tmp_end = (end_time.replace(day=1) + datetime.timedelta(days=32)).replace(day=1)
return tmp_begin + (tmp_end - tmp_begin) / 2
[docs]
def change_normalization(
ds: xr.Dataset,
new_normalization: Literal["4pi", "ortho", "schmidt"] = "4pi",
old_normalization: Literal["4pi", "ortho", "schmidt"] | None = None,
apply: bool = False,
) -> xr.Dataset:
"""
Spherical Harmonics (SH) dataset are associated with a Legendre polynomial normalization.
This function return the dataset updated with the new normalization asked in input (as a deep copy by default).
The current normalization can be given in parameters or can be contained in ds.attrs['norm'].
Parameters
----------
ds : xr.Dataset
xr.Dataset that corresponds to SH data associated with a current reference frame (constants whise) to update.
new_normalization : str
New normalization for the SH dataset, either '4pi', 'ortho', or 'schmidt'.
old_normalization : str | None, optional
Current normalization of the SH dataset.
Either '4pi', 'ortho', 'schmidt' or 'unnorm' for
4pi normalized, orthonormalized, Schmidt semi-normalized, or unnormalized SH functions, respectively.
Default is '4pi'. If not provided, uses `ds.attrs['norm']`.
apply : bool, optional
If True, apply the update to the current dataset without making a deep copy. Default is False.
Returns
-------
ds_out : xr.Dataset
Updated dataset with the new normalization.
Examples
--------
>>> ds_new_norm = change_normalization(ds, new_normalization='schmidt')
"""
try:
old_normalization = (
ds.attrs["norm"] if old_normalization is None else old_normalization
)
except KeyError:
raise KeyError(
"If you provide no information about the current normalization of your ds dataset using "
"'old_normalization' parameters, those information need to be contained in ds.attrs dict as "
"ds.attrs['norm']."
)
# -- conversion for each system
if new_normalization == "4pi" and old_normalization == "schmidt":
update_factor = 1 / np.sqrt(2 * ds.l + 1)
elif new_normalization == "schmidt" and old_normalization == "4pi":
update_factor = np.sqrt(2 * ds.l + 1)
elif new_normalization == "ortho" and old_normalization == "schmidt":
update_factor = np.sqrt((4 * np.pi) / (2 * ds.l + 1))
elif new_normalization == "schmidt" and old_normalization == "ortho":
update_factor = np.sqrt((2 * ds.l + 1) / (4 * np.pi))
elif new_normalization == "ortho" and old_normalization == "4pi":
update_factor = np.sqrt(4 * np.pi)
elif new_normalization == "4pi" and old_normalization == "ortho":
update_factor = 1 / np.sqrt(4 * np.pi)
elif old_normalization == "unnorm":
fact_l_minus_m = xr.apply_ufunc(sc.special.factorial, ds.l - ds.m)
fact_l_minus_m = fact_l_minus_m.where(fact_l_minus_m != 0, float("NaN"))
if new_normalization == "4pi":
update_factor = np.sqrt(
xr.apply_ufunc(sc.special.factorial, ds.l + ds.m)
/ fact_l_minus_m
/ (2 * ds.l + 1)
/ (2 - (0 == ds.m).astype(int))
)
elif new_normalization == "ortho":
update_factor = np.sqrt(
xr.apply_ufunc(sc.special.factorial, ds.l + ds.m)
* 4
* np.pi
/ fact_l_minus_m
/ (2 * ds.l + 1)
/ (2 - (0 == ds.m).astype(int))
)
elif new_normalization == "schmidt":
update_factor = np.sqrt(
xr.apply_ufunc(sc.special.factorial, ds.l + ds.m)
* 4
* np.pi
/ fact_l_minus_m
/ (2 - (0 == ds.m).astype(int))
)
update_factor = update_factor.fillna(0)
else:
update_factor = 1
# if apply = False : Copy the dataset to avoid modifying the input dataset
ds_out = ds if apply else ds.copy(deep=True)
# Update the clm and slm values
ds_out["clm"] *= update_factor
ds_out["slm"] *= update_factor
# Update the attributes in the output dataset
ds_out.attrs["norm"] = new_normalization
return ds_out
def _get_earth_parameters(
attrs: dict | None, a_earth: float | None, gm_earth: float | None
) -> tuple[float, float]:
"""
Retrieve Earth parameters from inputs or fallback constants.
Parameters
----------
attrs : dict or None
Attributes potentially containing Earth parameters.
a_earth : float or None
Semi-major axis.
gm_earth : float or None
Gravitational constant.
Returns
-------
a_earth : float
Resolved semi-major axis.
gm_earth : float
Resolved gravitational constant.
"""
if attrs is None:
attrs = {}
# return first element if not None, otherwise return value for the attrs.key if it exists otherwise return last term
resolved_a = a_earth or float(attrs.get("radius", LNPY_A_EARTH_GRS80))
resolved_gm = gm_earth or float(attrs.get("earth_gravity_constant", LNPY_GM_EARTH))
return resolved_a, resolved_gm
[docs]
def load_default_love_numbers() -> xr.Dataset:
"""
Load default Love numbers dataset.
Returns
-------
ds_love : xr.Dataset
Love numbers dataset.
"""
current_file = inspect.getframeinfo(inspect.currentframe()).filename
folderpath = pathlib.Path(current_file).absolute().parent.parent
default_love_file = folderpath / "resources" / "LoveNumbers_Gegout97.txt"
df = pd.read_csv(default_love_file, names=["kl"])
ds = xr.Dataset.from_dataframe(df).rename({"index": "l"})
return ds
def _compute_a_div_r_lat(
geocentric_colat: xr.DataArray, f_earth: float
) -> np.ndarray | xr.DataArray:
"""
Compute a/r(θ) for ellipsoidal Earth correction.
Parameters
----------
geocentric_colat : xr.DataArray
Geocentric colatitudes in radians.
f_earth : float
Earth flattening.
Returns
-------
a_div_r_lat : np.ndarray | xr.DataArray
"""
# e = sqrt(2f - f**2)
e_earth = np.sqrt(2 * f_earth - f_earth**2)
# a_div_r_lat = a / r(theta) with r(theta) = a(1-f)/sqrt(1 - e**2*sin(theta)**2)
return np.sqrt(1 - e_earth**2 * np.sin(geocentric_colat) ** 2) / (1 - f_earth)
def _compute_l_factor(
l: np.ndarray | xr.DataArray,
unit: str,
ellipsoidal_earth: bool,
geocentric_colat: xr.DataArray | None,
ds_love: xr.Dataset | None,
a_earth: float,
gm_earth: float,
f_earth: float,
omega_earth: float,
rho_earth: float,
rho_water: float,
fraction: xr.DataArray,
a_div_r_lat: np.ndarray | xr.DataArray | None,
) -> xr.DataArray:
"""
Compute the degree-dependent scale factor.
Parameters
----------
l: np.ndarray or xr.DataArray
Degrees
unit: str
Unit type for conversion
ellipsoidal_earth: bool
Whether to apply ellipsoidal correction
geocentric_colat : xr.DataArray
Geocentric colatitude
ds_love: xr.Dataset, optional
Love numbers
a_earth: float
Earth radius
gm_earth: float
Earth gravitational constant
f_earth : float
Earth flattening
omega_earth : float, optional
Earth's rotation rate
rho_earth: float
Earth density
rho_water: float
Water density
fraction: xr.DataArray
Redistribution factor
a_div_r_lat: np.ndarray | xr.DataArray | None
Ellipsoidal correction factor
Returns
-------
l_factor : xr.DataArray
Degree-dependent conversion factor.
"""
# l_factor is degree dependant
if unit == "norm":
# norm, fully normalized spherical harmonics
l_factor = xr.ones_like(l)
if ellipsoidal_earth:
l_factor = l_factor * a_div_r_lat**l
elif unit == "mewh":
# mewh, meters equivalent water height [kg.m-2]
# the exact formula is l_factor*(1 - f) (see [Ditmar2018]_ after eq. 17)
# it is an approximation of the order of 0.3% to be coherent with the common formula from Wahr 1998
l_factor = rho_earth * a_earth * (2 * l + 1) / (3 * fraction * rho_water)
if ellipsoidal_earth:
l_factor = l_factor * a_div_r_lat ** (l + 2)
elif unit == "mmgeoid":
# mmgeoid, millimeters geoid height
if ellipsoidal_earth:
gamma_0 = estimate_normal_gravity(
np.deg2rad(geocentric_colat.latitude),
a_earth,
gm_earth,
f_earth,
omega_earth,
)
l_factor = gm_earth / a_earth / gamma_0 * 1e3 * a_div_r_lat ** (l + 1)
else:
# Simplification of the formula for the spherical case
l_factor = xr.ones_like(l) * a_earth * 1e3
elif unit == "microGal":
# microGal, microGal gravity perturbations
l_factor = gm_earth * (l + 1) / (a_earth**2) * 1e8
if ellipsoidal_earth:
l_factor = l_factor * a_div_r_lat ** (l + 2)
elif unit == "potential":
# potential, meters².seconds⁻²
l_factor = gm_earth / a_earth
if ellipsoidal_earth:
l_factor = l_factor * a_div_r_lat ** (l + 1)
elif unit == "pascal":
# pascal, equivalent surface pressure
l_factor = LNPY_G_WMO * rho_earth * a_earth * (2 * l + 1) / (3 * fraction)
if ellipsoidal_earth:
l_factor = l_factor * a_div_r_lat ** (l + 1)
elif unit == "mvcu":
# mvcu, meters viscoelastic crustal uplift
l_factor = a_earth * (2 * l + 1) / 2
if ellipsoidal_earth:
l_factor = l_factor * a_div_r_lat ** (l + 1)
elif unit == "mecu":
# mecu, meters elastic crustal deformation (uplift)
l_factor = a_earth * ds_love.hl / fraction
if ellipsoidal_earth:
l_factor = l_factor * a_div_r_lat ** (l - 1)
elif unit == "int_radial_mag":
# internal radial magnetic field in nT
l_factor = l + 1
if ellipsoidal_earth:
pass
elif unit == "ext_radial_mag":
# external radial magnetic field in nT
l_factor = -l
if ellipsoidal_earth:
pass
else:
raise ValueError(
"Invalid 'unit' parameter value in l_factor_conv function, valid values are: "
"(norm, mewh, mmgeoid, microGal, bar, mvcu)"
)
return l_factor
[docs]
def l_factor_conv(
l: np.ndarray,
unit: Literal["mewh", "mmgeoid", "microGal", "pascal", "mvcu", "norm"] = "mewh",
include_elastic: bool = True,
ellipsoidal_earth: bool = False,
geocentric_colat: xr.DataArray | None = None,
ds_love: xr.Dataset | None = None,
a_earth: float | None = None,
gm_earth: float | None = None,
f_earth: float = LNPY_F_EARTH_GRS80,
omega_earth: float = LNPY_OMEGA_EARTH_GRS80,
rho_earth: float = LNPY_RHO_EARTH,
rho_water: float = 1000,
attrs: dict | None = None,
) -> tuple[xr.DataArray, dict]:
"""
Compute a scale factor for a transformation between spherical harmonics and grid data.
Spatial data over the grid are associated with a specific unit.
The scale factor is degree-dependent and is computed for the given list of degree l.
The scale factor can be estimated using elastic or non-elastic Earth as well as a spherical or ellipsoidal Earth.
Parameters
----------
l : np.ndarray
Degree for which the scale factor is estimated.
unit : str, optional
'mewh', 'mmgeoid', 'microGal', 'pascal', 'mvcu', or 'norm'
Unit of the spatial data used in the transformation. Default is 'mewh' for meters of Equivalent Water Height.
'mmgeoid' represents millimeters mmgeoid height, 'microGal' represents microGal gravity perturbations,
'pascal' represents equivalent surface pressure in pascal and
'mvcu' represents meters viscoelastic crustal uplift
include_elastic : bool, optional
If True, the Earth behavior is elastic.
ellipsoidal_earth : bool, optional
If True, consider the Earth as an ellipsoid following [Ditmar2018]_ and if False as a sphere.
geocentric_colat : xr.DataArray | None, optional
Geocentric colatitude for ellipsoidal Earth radius computation in radians, the dimension is geographic latitude.
ds_love : xr.Dataset | None, optional
Dataset with the l dimension corresponding to degree and with l (and possibly h and k) variables that
are Love numbers.
Default Love numbers used are from Gegout97.
a_earth : float, optional
Earth semi-major axis [m]. If not provided, uses `data.attrs['radius']` and
if it does not exist, uses LNPY_A_EARTH_GRS80.
if it does not exist, uses LNPY_A_EARTH_GRS80.
gm_earth : float, optional
Standard gravitational parameter for Earth [m³.s⁻²]. If not provided, uses
`data.attrs['earth_gravity_constant']` and if it does not exist, uses LNPY_GM_EARTH.
f_earth : float, optional
Earth flattening. Default is LNPY_F_EARTH_GRS80.
omega_earth : float, optional
Earth's rotation rate [rad.s⁻¹]. Default is LNPY_OMEGA_EARTH.
rho_earth : float, optional
Earth density [kg.m⁻³] for Equivalent Water Height formula. Default is LNPY_RHO_EARTH.
rho_water : float, optional
Water density [kg.m⁻³] for Equivalent Water Height formula. Default is 1000 Kg.m⁻³.
attrs : dict | None, optional
ds.attrs information that might help to estimate l_factor if no parameters are given.
Returns
-------
l_factor : xr.DataArray
Degree-dependent scale factor.
cst: dict
Attributes for the final dataset
References
----------
.. [Ditmar2018] P. Ditmar,
"Conversion of time-varying Stokes coefficients into mass anomalies at the
Earth’s surface considering the Earth’s oblateness",
*Journal of Geodesy*, 92, 1401--1412, (2018).
`doi: 10.1007/s00190-018-1128-0 <https://doi.org/10.1007/s00190-018-1128-0>`_
"""
a_earth, gm_earth = _get_earth_parameters(attrs, a_earth, gm_earth)
l = xr.DataArray(l, dims=["l"], coords={"l": l})
fraction = xr.ones_like(l)
# include elastic redistribution with kl Love numbers
if include_elastic:
if ds_love is None:
ds_love = load_default_love_numbers()
fraction = fraction + ds_love.kl
a_div_r_lat = None
if ellipsoidal_earth:
# test if geocentric_colat is set
if geocentric_colat is None:
raise ValueError(
"For ellipsoidal Earth, you need to set "
"the parameter 'geocentric_colat' in l_factor_conv function"
)
a_div_r_lat = _compute_a_div_r_lat(geocentric_colat, f_earth)
l_factor = _compute_l_factor(
l,
unit,
ellipsoidal_earth,
geocentric_colat,
ds_love,
a_earth,
gm_earth,
f_earth,
omega_earth,
rho_earth,
rho_water,
fraction,
a_div_r_lat,
)
cst = {"gm_earth": gm_earth, "a_earth": a_earth}
return l_factor, cst
[docs]
def assert_sh(ds: xr.Dataset) -> bool:
"""
Verify if the dataset ds has dimensions 'l' and 'm' as well as variables 'clm' and 'slm'
Raise Assertion error if not.
Parameters
----------
ds : xr.Dataset
Dataset to verify.
Returns
-------
True : bool
Returns True if the dataset ds has dimensions 'l' and 'm' as well as variables 'clm' and 'slm'.
Raises
------
AssertionError
This function raise AssertionError is ds is not a xr.Dataset corresponding to spherical harmonics
"""
if "l" not in ds.coords:
raise AssertionError(
"The degree coordinates that should be named 'l' does not exist"
)
if "m" not in ds.coords:
raise AssertionError(
"The order coordinates that should be named 'm' does not exist"
)
if "clm" not in ds.keys():
raise AssertionError("The Dataset have to contain 'clm' variable")
if "slm" not in ds.keys():
raise AssertionError("The Dataset have to contain 'slm' variable")
return True