#! /usr/bin/env python3
#
# Copyright 2018 California Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ISOFIT: Imaging Spectrometer Optimal FITting
# Authors: Philip G. Brodrick, philip.brodrick@jpl.nasa.gov
# Niklas Bohn, urs.n.bohn@jpl.nasa.gov
# Jay E. Fahlen, jay.e.fahlen@jpl.nasa.gov
#
from __future__ import annotations
import logging
import numpy as np
from isofit.core import units
from isofit.core.common import eps, svd_inv_sqrt
from isofit.radiative_transfer.engines import Engines
[docs]
Logger = logging.getLogger(__file__)
[docs]
RTE = ["modtran", "sRTMnet", "KernelFlowsGP"]
[docs]
def confPriority(key, configs):
"""
Selects a key from a config if the value for that key is not None
Prioritizes returning the first value found in the configs list
TODO: ISOFIT configs are annoying and will create keys to NoneTypes
Should use mlky to handle key discovery at runtime instead of like this
"""
value = None
for config in configs:
if hasattr(config, key):
value = getattr(config, key)
if value is not None:
break
return value
[docs]
class RadiativeTransfer:
"""This class controls the radiative transfer component of the forward
model. An ordered dictionary is maintained of individual RTMs (MODTRAN,
for example). We loop over the dictionary concatenating the radiation
and derivatives from each RTM and interval to form the complete result.
In general, some of the state vector components will be shared between
RTMs and bands. For example, H20STR is shared between both VISNIR and
TIR. This class maintains the master list of statevectors.
"""
# Keys to retrieve from 3 sections to use the preferred
# Prioritizes retrieving from radiative_transfer_engines first, then instrument, then radiative_transfer
[docs]
_keys = [
"interpolator_style",
"overwrite_interpolator",
"lut_grid",
"lut_path",
"wavelength_file",
]
def __init__(self, full_config: Config):
config = full_config.forward_model.radiative_transfer
confIT = full_config.forward_model.instrument
[docs]
self.lut_grid = config.lut_grid
[docs]
self.statevec_names = config.statevector.get_element_names()
for idx in range(len(config.radiative_transfer_engines)):
confRT = config.radiative_transfer_engines[idx]
if confRT.engine_name not in Engines:
raise AttributeError(
f"Invalid radiative transfer engine choice. Got: {confRT.engine_name}; Must be one of: {RTE}"
)
# Generate the params for this RTE
params = {
key: confPriority(key, [confRT, confIT, config]) for key in self._keys
}
params["engine_config"] = confRT
# Select the right RTE and initialize it
rte = Engines[confRT.engine_name](**params)
self.rt_engines.append(rte)
# Make sure the length of the config statevectores match the engine's assumed statevectors
if (expected := len(config.statevector.get_element_names())) != (
got := len(rte.indices.x_RT)
):
error = f"Mismatch between the number of elements for the config statevector and LUT.indices.x_RT: {expected=}, {got=}"
Logger.error(error)
raise AttributeError(error)
# The rest of the code relies on sorted order of the individual RT engines which cannot
# be guaranteed by the dict JSON or YAML input
self.rt_engines.sort(key=lambda x: x.wl[0])
# Retrieved variables. We establish scaling, bounds, and
# initial guesses for each state vector element. The state
# vector elements are all free parameters in the RT lookup table,
# and they all have associated dimensions in the LUT grid.
self.bounds, self.scale, self.init = [], [], []
self.prior_mean, self.prior_sigma = [], []
for sv, sv_name in zip(*config.statevector.get_elements()):
self.bounds.append(sv.bounds)
self.scale.append(sv.scale)
self.init.append(sv.init)
self.prior_sigma.append(sv.prior_sigma)
self.prior_mean.append(sv.prior_mean)
[docs]
self.bounds = np.array(self.bounds)
[docs]
self.scale = np.array(self.scale)
[docs]
self.init = np.array(self.init)
[docs]
self.prior_mean = np.array(self.prior_mean)
[docs]
self.prior_sigma = np.array(self.prior_sigma)
[docs]
self.Sa_cached = np.diagflat(np.power(self.prior_sigma, 2))
[docs]
self.Sa_normalized = self.Sa_cached / np.mean(np.diag(self.Sa_cached))
self.Sa_inv_normalized, self.Sa_inv_sqrt_normalized = svd_inv_sqrt(
self.Sa_normalized
)
[docs]
self.wl = np.concatenate([RT.wl for RT in self.rt_engines])
[docs]
self.bvec = config.unknowns.get_element_names()
[docs]
self.bval = np.array([x for x in config.unknowns.get_elements()[0]])
[docs]
self.solar_irr = np.concatenate([RT.solar_irr for RT in self.rt_engines])
[docs]
def xa(self):
"""Pull the priors from each of the individual RTs."""
return self.prior_mean
[docs]
def Sa(self):
"""Pull the priors from each of the individual RTs."""
return self.Sa_cached
[docs]
def Sb(self):
"""Uncertainty due to unmodeled variables."""
return np.diagflat(np.power(self.bval, 2))
[docs]
def get_shared_rtm_quantities(self, x_RT, geom):
"""Return only the set of RTM quantities (transup, sphalb, etc.) that are contained
in all RT engines.
"""
ret = []
for RT in self.rt_engines:
ret.append(RT.get(x_RT, geom))
return self.pack_arrays(ret)
@property
[docs]
def coszen(self):
"""
Backwards compatibility until Geometry takes over this param
Return some child RTE coszen
"""
for child in self.rt_engines:
if "coszen" in child.lut:
return child.lut.coszen.data
[docs]
def calc_rdn(
self,
x_RT,
rho_dir_dir,
rho_dif_dir,
Ls,
L_tot,
L_dir_dir,
L_dif_dir,
L_dir_dif,
L_dif_dif,
r,
geom,
):
"""
Physics-based forward model to calculate at-sensor radiance.
Includes topography, background reflectance, and glint.
"""
# Adjacency effects
# ToDo: we need to think about if we want to obtain the background reflectance from the Geometry object
# or from the surface model, i.e., the same way as we do with the target pixel reflectance
rho_dir_dif = (
geom.bg_rfl if isinstance(geom.bg_rfl, np.ndarray) else rho_dir_dir
)
rho_dif_dif = (
geom.bg_rfl if isinstance(geom.bg_rfl, np.ndarray) else rho_dif_dir
)
# Atmospheric path radiance
L_atm = self.get_L_atm(x_RT, geom)
# Atmospheric spherical albedo
s_alb = r["sphalb"]
atm_surface_scattering = s_alb * rho_dif_dif
# Special case: 1-component model
if not isinstance(L_dir_dir, np.ndarray) or len(L_dir_dir) == 1:
# we assume rho_dir_dir = rho_dif_dir = rho_dir_dif = rho_dif_dif
rho_dif_dif = rho_dir_dir
# eliminate spherical albedo and one reflectance term from numerator if using 1-component model
atm_surface_scattering = 1
# Thermal transmittance
L_up = Ls * (r["transm_up_dir"] + r["transm_up_dif"])
# Our radiance model follows the physics as presented in Guanter (2006), Vermote et al. (1997), and
# Tanre et al. (1983). This particular formulation facilitates the consideration of topographic effects,
# glint, or BRDF modeling in general. The contribution of the target to the signal at the top of the atmosphere
# is decomposed as the sum of four terms:
# 1. photons directly transmitted from the sun to the target and directly reflected back to the sensor
# rho_dir_dir => directional-directional surface reflectance of the target
# 2. photons scattered by the atmosphere then reflected by the target and directly transmitted to the sensor
# rho_dif_dir => surface diffuse-directional reflectance
# 3. photons directly transmitted to the target but scattered by the atmosphere on their way to the sensor
# rho_dir_dif => surface directional-diffuse reflectance
# 4. photons having at least two interactions with the atmosphere and one with the target
# rho_dif_dif => surface diffuse-diffuse reflectance
# These terms are also called coupling terms, as they are responsible for the coupling between atmospheric
# radiative transfer and the surface reflectance properties.
# The coupling terms are multiplied by four different combinations of direct and diffuse radiance terms:
# 1. L_dir_dir => downward direct * upward direct
# 2. L_dif_dir => downward diffuse * upward direct
# 3. L_dir_dif => downward direct * upward diffuse
# 4. L_dif_dif => downward diffuse * upward diffuse
# When separated radiance terms and/or a BRDF model of the surface are not available,
# the Lambertian assumption is made for the target reflectance:
# rho_dir_dir = rho_dif_dir = rho_dir_dif = rho_dif_dif
# In this case, our radiance model reduces to:
# L_atm + (L_tot * rho_dir_dir) / (1 - S * rho_dir_dir) + L_up,
# with L_tot being the total radiance (downward * upward, direct + diffuse).
# TOA radiance model
ret = (
L_atm
+ L_dir_dir * rho_dir_dir
+ L_dif_dir * rho_dif_dir
+ L_dir_dif * rho_dir_dif
+ L_dif_dif * rho_dif_dif
+ (L_tot * atm_surface_scattering * rho_dif_dif) / (1 - s_alb * rho_dif_dif)
+ L_up
)
return ret
[docs]
def get_L_atm(self, x_RT: np.array, geom: Geometry) -> np.array:
"""Get the interpolated modeled atmospheric path radiance.
Args:
x_RT: radiative-transfer portion of the statevector
geom: local geometry conditions for lookup
Returns:
interpolated modeled atmospheric path radiance
"""
L_atms = []
verified_geom = geom.verify(self.coszen)
coszen, cos_i = verified_geom["coszen"], verified_geom["cos_i"]
for RT in self.rt_engines:
if RT.treat_as_emissive:
r = RT.get(x_RT, geom)
rdn = r["thermal_upwelling"]
L_atms.append(rdn)
else:
r = RT.get(x_RT, geom)
if RT.rt_mode == "rdn":
L_atm = r["rhoatm"]
else:
rho_atm = r["rhoatm"]
L_atm = units.transm_to_rdn(rho_atm, coszen, self.solar_irr)
L_atms.append(L_atm)
return np.hstack(L_atms)
[docs]
def get_L_down_transmitted(self, x_RT: np.array, geom: Geometry) -> np.array:
"""Get the interpolated direct and diffuse downward radiance on the sun-to-surface path.
Thermal_downwelling already includes the transmission factor.
Also assume there is no multiple scattering for TIR.
Args:
x_RT: radiative-transfer portion of the statevector
geom: local geometry conditions for lookup
Returns:
interpolated total, direct, and diffuse downward atmospheric radiance
"""
L_downs = []
L_downs_dir = []
L_downs_dif = []
# Check coszen against cos_i
verified_geom = geom.verify(self.coszen)
coszen, cos_i, skyview_factor = (
verified_geom["coszen"],
verified_geom["cos_i"],
verified_geom["skyview_factor"],
)
for RT in self.rt_engines:
if RT.treat_as_emissive:
r = RT.get(x_RT, geom)
rdn = r["thermal_downwelling"]
L_downs.append(rdn)
else:
r = RT.get(x_RT, geom)
if RT.rt_mode == "rdn":
L_down_dir = r["transm_down_dir"]
L_down_dif = r["transm_down_dif"]
else:
# Transform downward transmittance to radiance
L_down_dir = units.transm_to_rdn(
r["transm_down_dir"], coszen, self.solar_irr
)
L_down_dif = units.transm_to_rdn(
r["transm_down_dif"], coszen, self.solar_irr
)
# Apply sky view factor to downward diffuse for 1C case.
L_down_dif *= skyview_factor
L_down = L_down_dir + L_down_dif
L_downs.append(L_down)
L_downs_dir.append(L_down_dir)
L_downs_dif.append(L_down_dif)
return np.hstack(L_downs), np.hstack(L_downs_dir), np.hstack(L_downs_dif)
[docs]
def get_L_coupled(self, r: dict, geom: Geometry):
"""Get the interpolated radiance terms on the sun-to-surface-to-sensor path.
These follow the physics as presented in Guanter (2006), Vermote et al. (1997), and Tanre et al. (1983).
Args:
r: interpolated radiative transfer quantities from the LUT
coszen: top-of-atmosphere solar zenith angle
cos_i: local solar zenith angle at the surface
Returns:
interpolated radiances along all optical paths:
L_dir_dir => downward direct * upward direct
L_dif_dir => downward diffuse * upward direct
L_dir_dif => downward direct * upward diffuse
L_dif_dif => downward diffuse * upward diffuse
"""
# Check coszen against cos_i
verified_geom = geom.verify(self.coszen)
coszen, cos_i, skyview_factor = (
verified_geom["coszen"],
verified_geom["cos_i"],
verified_geom["skyview_factor"],
)
# radiances along all optical paths
L_coupled = []
if any(
[
not isinstance(r[key], np.ndarray) or len(r[key]) == 1
for key in self.rt_engines[0].coupling_terms
]
):
# In case of the 1-component model, we cannot populate the coupling terms
L_coupled = [
0,
0,
0,
0,
]
else:
for key in self.rt_engines[0].coupling_terms:
L_coupled.append(
units.transm_to_rdn(r[key], coszen=coszen, solar_irr=self.solar_irr)
if self.rt_engines[0].rt_mode == "transm"
else r[key]
)
# Assigning coupled terms, unscaling and rescaling downward direct radiance by local solar zenith angle.
# Downward diffuse components are scaled by viewable sky fraction (i.e., "ungula" of viewable sky in solid geometry terms).
L_dir_dir = L_coupled[0] / coszen * cos_i
L_dif_dir = L_coupled[1] * skyview_factor
L_dir_dif = L_coupled[2] / coszen * cos_i
L_dif_dif = L_coupled[3] * skyview_factor
return L_dir_dir, L_dif_dir, L_dir_dif, L_dif_dif
[docs]
def calc_RT_quantities(self, x_RT: np.ndarray, geom: Geometry):
"""Retrieves the RT quantities including the LUT sample (r),
and the radiances (L). This function handles the hand-off between
the 1c and 4c model.
In the 1c case, L_dir_dir, L_dif_dir, L_dir_dif, L_dif_dif = 0,
and L_tot, L_down_dir, and L_down_dif are populated within the
if statement.
In the 4c case, we always use returns from get_L_coupled
"""
# Propogate LUT
r = self.get_shared_rtm_quantities(x_RT, geom)
# Default: get directional radiances
L_dir_dir, L_dif_dir, L_dir_dif, L_dif_dif = self.get_L_coupled(r, geom)
L_tot = L_dir_dir + L_dif_dir + L_dir_dif + L_dif_dif
# Handle case for 1c vs 4c model
if not isinstance(L_tot, np.ndarray) or len(L_tot) == 1:
# 1c model w/in if clause
L_tot, L_down_dir, L_down_dif = self.get_L_down_transmitted(x_RT, geom)
else:
# 4c model w/in else clause
L_down_dir = L_dir_dir + L_dif_dir
L_down_dif = L_dif_dir + L_dif_dir
return (
r,
L_tot,
L_down_dir,
L_down_dif,
L_dir_dir,
L_dif_dir,
L_dir_dif,
L_dif_dif,
)
[docs]
def drdn_dRT(self, x_RT, geom, rho_dir_dir, rho_dif_dir, Ls, rdn):
"""Derivative of estimated radiance w.r.t. RT statevector elements.
We use a numerical approach to approximate dRT with a constant surface
reflectance. This is a reasonable approx. for the multicomponent surface.
When using the glint model however, this does not take into account
the dependence of the surface reflectance on the atmosphere.
"""
# perturb each element of the RT state vector (finite difference)
K_RT = []
x_RTs_perturb = x_RT + np.eye(len(x_RT)) * eps
for x_RT_perturb in list(x_RTs_perturb):
(
r,
L_tot,
L_down_dir,
L_down_dif,
L_dir_dir,
L_dif_dir,
L_dir_dif,
L_dif_dif,
) = self.calc_RT_quantities(x_RT_perturb, geom)
# Surface state is held constant?
rdne = self.calc_rdn(
x_RT_perturb,
rho_dir_dir,
rho_dif_dir,
Ls,
L_tot,
L_dir_dir,
L_dif_dir,
L_dir_dif,
L_dif_dif,
r,
geom,
)
K_RT.append((rdne - rdn) / eps)
K_RT = np.array(K_RT).T
return K_RT
[docs]
def drdn_dRTb(self, x_RT, geom, rho_dir_dir, rho_dif_dir, Ls, rdn):
"""Derivative of estimated rdn w.r.t. H2O_ABSCO
Currently, the K_b matrix only covers forward model derivatives
due to H2O_ABSCO unknowns, so that subsequent errors might occur
when water vapor is not part of the statevector
(which is very unlikely though).
"""
if len(self.bvec) == 0:
Kb_RT = np.zeros((0, len(self.wl.shape)))
# ToDo: might require modification in case more unknowns are added
# The following statement captures the case that H2O is not part
# of the statevector.
# but might need to be modified as soon as we add more unknowns
elif len(self.bvec) > 0 and "H2OSTR" not in self.statevec_names:
Kb_RT = np.zeros((1, len(self.wl)))
else:
# unknown parameters modeled as random variables per
# Rodgers et al (2000) K_b matrix. We calculate these derivatives
# by finite differences
Kb_RT = []
perturb = 1.0 + eps
for unknown in self.bvec:
if unknown == "H2O_ABSCO" and "H2OSTR" in self.statevec_names:
i = self.statevec_names.index("H2OSTR")
x_RT_perturb = x_RT.copy()
x_RT_perturb[i] = x_RT[i] * perturb
(
r,
L_tot,
L_down_dir,
L_down_dif,
L_dir_dir,
L_dif_dir,
L_dir_dif,
L_dif_dif,
) = self.calc_RT_quantities(x_RT_perturb, geom)
rdne = self.calc_rdn(
x_RT_perturb,
rho_dir_dir,
rho_dif_dir,
Ls,
L_tot,
L_dir_dir,
L_dif_dir,
L_dir_dif,
L_dif_dif,
r,
geom,
)
Kb_RT.append((rdne - rdn) / eps)
Kb_RT = np.array(Kb_RT).T
return Kb_RT
[docs]
def summarize(self, x_RT, geom):
ret = []
for RT in self.rt_engines:
ret.append(RT.summarize(x_RT, geom))
ret = "\n".join(ret)
return ret
[docs]
def pack_arrays(self, rtm_quantities_from_RT_engines):
"""Take the list of dict outputs from each RT engine and
stack their internal arrays in the same order. Keep only
those quantities that are common to all RT engines.
"""
# Get the intersection of the sets of keys from each of the rtm_quantities_from_RT_engines
shared_rtm_keys = set(rtm_quantities_from_RT_engines[0].keys())
if len(rtm_quantities_from_RT_engines) > 1:
for rtm_quantities_from_one_RT_engine in rtm_quantities_from_RT_engines[1:]:
shared_rtm_keys.intersection_update(
rtm_quantities_from_one_RT_engine.keys()
)
# Concatenate the different band ranges
rtm_quantities_concatenated_over_RT_bands = {}
for key in shared_rtm_keys:
temp = [x[key] for x in rtm_quantities_from_RT_engines]
rtm_quantities_concatenated_over_RT_bands[key] = np.hstack(temp)
return rtm_quantities_concatenated_over_RT_bands
[docs]
def ext550_to_vis(ext550):
"""VIS is defined as a function of the surface aerosol extinction coefficient
at 550 nm in km-1, EXT550, by the formula VIS[km] = ln(50) / (EXT550 + 0.01159),
where 0.01159 is the surface Rayleigh scattering coefficient at 550 nm in km-1
(see MODTRAN6 manual, p. 50).
"""
return np.log(50.0) / (ext550 + 0.01159)