#! /usr/bin/env python3
#
# Copyright 2018 California Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ISOFIT: Imaging Spectrometer Optimal FITting
# Author: David R Thompson, david.r.thompson@jpl.nasa.gov
from __future__ import annotations
import os
from typing import OrderedDict
import numpy as np
from scipy.interpolate import interp1d
from scipy.optimize import least_squares, minimize
from scipy.optimize import minimize_scalar as min1d
from isofit.core import units
from isofit.core.common import (
emissive_radiance,
eps,
)
from isofit.data import env
[docs]
def heuristic_atmosphere(
fm: ForwardModel,
x_surface: np.array,
x_RT: np.array,
x_instrument: np.array,
meas: np.array,
geom: Geometry,
wl_lo: int = 865,
wl_center: int = 945,
wl_hi: int = 1040,
):
"""From a given radiance, estimate atmospheric state with band ratios.
Used to initialize gradient descent inversions.
Args:
fm: isofit forward model
x_surface: surface portion of the state vector
x_RT: radiative transfer portion of the state vector
x_instrument: instrument portion of the state vector
meas: a one-D numpy vector of radiance in uW/nm/sr/cm2
geom: geometry object corresponding to given measurement
wl_lo: Low wavelength to use for the continuum removal H2O fit
wl_center: Center wavelength to use for the continuum removal H2O fit
wl_hi: High wavelength to use for the continuum removal H2O fit
Returns:
x_new: updated estimate of x_RT
"""
# Identify the latest instrument wavelength calibration (possibly
# state-dependent) and identify channel numbers for the band ratio.
wl, fwhm = fm.instrument.calibration(x_instrument)
blo = np.argmin(abs(wl - wl_lo))
bcenter = np.argmin(abs(wl - wl_center))
bhi = np.argmin(abs(wl - wl_hi))
offset = 5 # nm
if not (any(fm.RT.wl > (wl_lo - offset)) and any(fm.RT.wl < (wl_hi + offset))):
return x_RT
x_new = x_RT.copy()
# Figure out which RT object we are using
# TODO: this is currently very specific to vswir-tir 2-mode, eventually generalize
my_RT = None
for rte in fm.RT.rt_engines:
if rte.treat_as_emissive is False:
my_RT = rte
break
if not my_RT:
raise ValueError("No suitable RT object for initialization")
# Band ratio retrieval of H2O. Depending on the radiative transfer
# model we are using, this state parameter could go by several names.
for h2oname in ["H2OSTR", "h2o"]:
if h2oname not in fm.RT.statevec_names:
continue
# ignore unused names
if h2oname not in my_RT.lut_names:
continue
# find the index in the lookup table associated with water vapor
ind_sv = fm.RT.statevec_names.index(h2oname)
h2os, areas = [], []
# We iterate through every possible grid point in the lookup table,
# calculating the band ratio that we would see if this were the
# atmospheric H2O content. It assumes that defaults for all other
# atmospheric parameters (such as aerosol, if it is there).
for h2o in my_RT.lut_grid[h2oname]:
# Get Atmospheric terms at high spectral resolution
x_RT_2 = x_RT.copy()
x_RT_2[ind_sv] = h2o
# pass in all zeros, as this is ONLY used for Ls, which we will
# assume is not present
r, coeffs = invert_algebraic(
fm.surface,
fm.RT,
fm.instrument,
x_surface,
x_RT_2,
x_instrument,
meas,
geom,
)
r = fm.surface.fit_params(r, geom)[fm.idx_surf_rfl]
# Simple linear interpolation
vals = np.interp(wl[blo:bhi], [wl_lo, wl_hi], [r[blo], r[bhi]])
# Minimize the absolute distance between the continuum removed
D = np.sum(vals - r[blo:bhi])
areas.append(D)
h2os.append(h2o)
# Finally, interpolate to determine the actual water vapor level that
# would optimize the continuum-relative correction
p = interp1d(h2os, areas)
bounds = (h2os[0] + 0.001, h2os[-1] - 0.001)
best = min1d(lambda h: abs(p(h)), bounds=bounds, method="bounded")
x_new[ind_sv] = best.x
return x_new
[docs]
def invert_algebraic(
surface: Surface,
RT: RadiativeTransfer,
instrument: Instrument,
x_surface: np.array,
x_RT: np.array,
x_instrument: np.array,
meas: np.array,
geom: Geometry,
):
"""Inverts radiance algebraically using Lambertian assumptions to get a
reflectance. If the appropriate transmittance terms are available,
surface slope will enter for the initial guess. Otherwise, the surface
will be treated as if flat.
Args:
surface: surface model
RT: radiative transfer model to use
instrument: instrument model
x_surface: surface portion of the state vector
x_RT: radiative transfer portion of the state vector
x_instrument: instrument portion of the state vector
meas: a one-D numpy vector of radiance in uW/nm/sr/cm2
geom: geometry object corresponding to given measurement
Return:
rfl_est: estimate of the surface reflectance based on the given surface model and specified atmospheric state
coeffs: atmospheric parameters used for the inversion, returned for convenience
"""
# Figure out which RT object we are using
# TODO: this is currently very specific to vswir-tir 2-mode, eventually generalize
my_RT = None
for rte in RT.rt_engines:
if rte.treat_as_emissive is False:
my_RT = rte
break
if not my_RT:
raise ValueError("No suitable RT object for initialization")
# Get all radiance terms
(
rhi,
L_tot,
L_down_dir,
L_down_dif,
L_dir_dir,
L_dif_dir,
L_dir_dif,
L_dif_dif,
) = RT.calc_RT_quantities(x_RT, geom)
L_atm = RT.get_L_atm(x_RT, geom)
sphalb = rhi["sphalb"]
Ls = surface.calc_Ls(x_surface, geom)
# TODO - make this a function, and use it here and in radiatve transfer
# transmit thermal emission through the atmosphere
transup = rhi["transm_up_dir"] + rhi["transm_up_dif"]
if np.max(transup) > 1.1:
raise ValueError(
"Transmittance up is greater than 1.0, which is not physically possible. Most likely, this is an issue with LUT input convention."
)
# Get the wavelengths too - these may also be adjusted
wl, fwhm = instrument.calibration(x_instrument)
# Interpolate L_up linearly if needed.
Ls = interp1d(surface.wl, Ls, fill_value="extrapolate")(RT.wl)
# Now convert to what's scene at the instrument
L_up = Ls * transup
# Resample the components we need to use
L_atm = instrument.sample(x_instrument, RT.wl, L_atm)
L_tot = instrument.sample(x_instrument, RT.wl, L_tot)
sphalb = instrument.sample(x_instrument, RT.wl, sphalb)
L_up = instrument.sample(x_instrument, RT.wl, L_up)
# Now everything should be in hand to do the calculation
rdn_solrfl = meas - L_up
rfl = 1.0 / (L_tot / (rdn_solrfl - L_atm) + sphalb)
# explicity handle known nan cases - this doesn't handle nans
# that might appear from the RT directly, as we don't want to
# cover them up
rfl[rdn_solrfl - L_atm == 0] = 0.0
rfl[L_tot == 0] = 0.0
# While values can go above 1, they shouldn't got that high above.
# generally it means instability
rfl[rfl > 1.6] = 1.6
# interpolate the output
rfl_est = interp1d(wl, rfl, fill_value="extrapolate")(surface.wl)
# Some downstream code will benefit from our precalculated
# atmospheric optical parameters
coeffs = L_atm, sphalb, L_tot, transup, L_up
return rfl_est, coeffs
[docs]
def invert_analytical(
fm: ForwardModel,
winidx: np.array,
meas: np.array,
geom: Geometry,
x0: np.array,
sub_state,
num_iter: int = 1,
hash_table: OrderedDict = None,
hash_size: int = None,
diag_uncert: bool = True,
outside_ret_const: float = -0.01,
fill_value: float = -9999.0,
):
"""Perform an analytical estimate of the conditional MAP estimate for
a fixed atmosphere. Based on the "Inner loop" from Susiluoto, 2022.
Args:
fm: isofit forward model
winidx: indices of surface components of state vector (to be solved)
meas: a one-D numpy vector of radiance in uW/nm/sr/cm2
geom: geometry object corresponding to given measurement
x0: the initialization state including surface from the superpixel
and the atm from the smoothed atmosphere.
num_iter: number of interations to run through
hash_table: a hash table to use locally
hash_size: max size of given hash table
diag_uncert: flag indicating whether to diagonalize the uncertainty
Returns:
x: MAP estimate of the mean
S: diagonal conditional posterior covariance estimate
"""
from scipy.linalg.blas import dsymv
from scipy.linalg.lapack import dpotrf, dpotri
# Note, this will fail if x_instrument is populated
if len(fm.idx_instrument) > 0:
raise AttributeError(
"Invert analytical not currently set to "
"handle instrument state variable indexing"
)
x = x0.copy()
x_surface, x_RT, x_instrument = fm.unpack(x)
# Get all the RT quantities
(r, L_tot, L_down_dir, L_down_dif, L_dir_dir, L_dif_dir, L_dir_dif, L_dif_dif) = (
fm.RT.calc_RT_quantities(x_RT, geom)
)
# Path radiance and spherical albedo
L_atm = fm.RT.get_L_atm(x_RT, geom)
s = r["sphalb"]
# Get all the surface quantities for the super pixel
sub_surface, sub_RT, sub_instrument = fm.unpack(sub_state)
# Surface reflectance at the wl resolution of fm.RT
rho_dir_dir, rho_dif_dir = fm.calc_rfl(sub_surface, geom)
rho_dir_dir = fm.upsample(fm.surface.wl, rho_dir_dir)
rho_dif_dir = fm.upsample(fm.surface.wl, rho_dif_dir)
# Background conditions equal to the superpixel reflectance
bg = s * rho_dif_dir
# Special case: 1-component model
if type(L_tot) != np.ndarray or len(L_tot) == 1:
L_tot = L_down_dir + L_down_dif
# Get the inversion indices; Include glint indices if applicable
full_idx = np.concatenate((winidx, fm.idx_surf_nonrfl), axis=0)
outside_ret_windows = np.ones(len(fm.idx_surface), dtype=bool)
outside_ret_windows[full_idx] = False
outside_ret_windows = np.where(outside_ret_windows)[0]
iv_idx = fm.surface.analytical_iv_idx
# The H matrix does not change as a function of x-vector
H = fm.surface.analytical_model(
bg,
L_down_dir,
L_down_dif,
L_tot,
geom,
L_dir_dir=L_dir_dir,
L_dir_dif=L_dir_dif,
L_dif_dir=L_dif_dir,
L_dif_dif=L_dif_dif,
)
# Sample just the wavelengths and states of interest
L = H[winidx, :][:, iv_idx]
# Use cached scaling factor from inital normalized inverse (outside of loop).
Sa, Sa_inv, Sa_inv_sqrt = fm.Sa(x, geom)
Sa_inv = Sa_inv[fm.idx_surface, :][:, fm.idx_surface]
Sa_inv_sqrt = Sa_inv_sqrt[fm.idx_surface, :][:, fm.idx_surface]
trajectory = np.zeros((num_iter + 1, len(x)))
trajectory[0, :] = x
for n in range(num_iter):
# Measurement uncertainty
Seps = fm.Seps(x, meas, geom)[winidx, :][:, winidx]
# Prior mean
xa_full = fm.xa(x, geom)
xa_surface = xa_full[fm.idx_surface]
# Save the product of the prior covariance and mean
prprod = Sa_inv @ xa_surface
x_surface, x_RT, x_instrument = fm.unpack(x)
C = dpotrf(Seps, 1)[0]
P = dpotri(C, 1)[0]
P_tilde = ((L.T @ P) @ L).T
P_rcond = Sa_inv[iv_idx, :][:, iv_idx] + P_tilde
LI_rcond = dpotrf(P_rcond)[0]
C_rcond = dpotri(LI_rcond)[0]
xk = dsymv(
1,
C_rcond,
(L.T @ dsymv(1, P, meas[winidx] - L_atm[winidx]) + prprod[iv_idx]),
)
# Save trajectory step:
x_surface[iv_idx] = xk
if outside_ret_const is None:
x_surface[outside_ret_windows] = xa_surface[outside_ret_windows]
else:
x_surface[outside_ret_windows] = outside_ret_const
x[fm.idx_surface] = x_surface
trajectory[n + 1, :] = x
if diag_uncert:
if len(C_rcond):
full_unc = np.ones(len(x))
full_unc[iv_idx] = np.sqrt(np.diag(C_rcond))
else:
full_unc = np.ones(len(x))
full_unc[iv_idx] = [-9999 for i in x[iv_idx]]
return trajectory, full_unc
else:
return trajectory, C_rcond
[docs]
def invert_simple(forward: ForwardModel, meas: np.array, geom: Geometry):
"""Find an initial guess at the state vector. This currently uses
traditional (non-iterative, heuristic) atmospheric correction.
Args:
forward: isofit forward model
meas: a one-D numpy vector of radiance in uW/nm/sr/cm2
geom: geometry object corresponding to given measurement
Returns:
x: estimate of the full statevector based on initial conditions, geometry, and a heuristic guess
"""
surface = forward.surface
RT = forward.RT
instrument = forward.instrument
vswir_present = False
if any(forward.surface.wl < 2600):
vswir_present = True
tir_present = False
if any(forward.surface.wl > 2600):
tir_present = True
# First step is to get the atmosphere. We start from the initial state
# and estimate atmospheric terms using traditional heuristics.
x = forward.init.copy()
x_surface, x_RT, x_instrument = forward.unpack(x)
if vswir_present:
x[forward.idx_RT] = heuristic_atmosphere(
forward, x_surface, x_RT, x_instrument, meas, geom
)
# Now, with atmosphere fixed, we can invert the radiance algebraically
# via Lambertian approximations to get reflectance
x_surface, x_RT, x_instrument = forward.unpack(x)
rfl_est, coeffs = invert_algebraic(
surface, RT, instrument, x_surface, x_RT, x_instrument, meas, geom
)
# Condition thermal part on the VSWIR portion. Only works for
# Multicomponent surfaces. Finds the cluster nearest the VSWIR heuristic
# inversion and uses it for the TIR suface initialization.
if tir_present:
tir_idx = np.where(forward.surface.wl > 3000)[0]
if vswir_present:
x_surface_temp = x_surface.copy()
x_surface_temp[: len(rfl_est)] = rfl_est
mu = forward.surface.xa(x_surface_temp, geom)
rfl_est[tir_idx] = mu[tir_idx]
else:
rfl_est = 0.03 * np.ones(len(forward.surface.wl))
# Now we have an estimated reflectance. Fit the surface parameters.
x_surface[forward.idx_surface] = forward.surface.fit_params(rfl_est, geom)
# Find temperature of emissive surfaces
if tir_present:
# Estimate the total radiance at sensor, leaving out surface emission
# Radiate transfer calculations could take place at high spectral resolution
# so we upsample the surface reflectance
rfl_hi = forward.upsample(forward.surface.wl, rfl_est)
_, sphalb, _, transup, _ = coeffs
L_atm = RT.get_L_atm(x_RT, geom)
L_down_transmitted, _, _ = RT.get_L_down_transmitted(x_RT, geom)
L_total_without_surface_emission = L_atm + L_down_transmitted * rfl_hi / (
1.0 - sphalb * rfl_hi
)
# These tend to have high transmission factors; the emissivity of most
# materials is nearly 1 for these bands, so they are good for
# initializing the surface temperature.
clearest_wavelengths = [10125.0, 10390.00, 10690.00]
# This is fragile if other instruments have different wavelength
# spacing or range
clearest_indices = [
np.argmin(np.absolute(RT.wl - w)) for w in clearest_wavelengths
]
# Error function for nonlinear temperature fit
def err(z):
T = z
emissivity = forward.surface.emissivity_for_surface_T_init
Ls_est, d = emissive_radiance(
emissivity, T, forward.surface.wl[clearest_indices]
)
resid = (
transup[clearest_indices] * Ls_est
+ L_total_without_surface_emission[clearest_indices]
- meas[clearest_indices]
)
return sum(resid**2)
# Fit temperature, set bounds, and set the initial values
idx_T = forward.surface.surf_temp_ind
Tinit = np.array([forward.surface.init[idx_T]])
Tbest = minimize(err, Tinit).x
T = max(
forward.surface.bounds[idx_T][0] + eps,
min(Tbest, forward.surface.bounds[idx_T][1] - eps),
)
x_surface[idx_T] = Tbest
forward.surface.init[idx_T] = T
# Update the full state vector
x[forward.idx_surface] = x_surface
# If available, get initial guess of surface elevation from location file.
if geom.surface_elevation_km and "surface_elevation_km" in RT.statevec_names:
ind_sv = forward.idx_RT[RT.statevec_names.index("surface_elevation_km")]
if geom.surface_elevation_km < 0.0:
x[ind_sv] = 0.0
else:
x[ind_sv] = geom.surface_elevation_km
# We record these initial values in the geometry object - the only
# "stateful" part of the retrieval
geom.x_surf_init = x[forward.idx_surface]
geom.x_RT_init = x[forward.idx_RT]
return x
[docs]
def invert_liquid_water(
rfl_meas: np.array,
wl: np.array,
l_shoulder: float = 850,
r_shoulder: float = 1100,
lw_init: tuple = (0.02, 0.3, 0.0002),
lw_bounds: tuple = ([0, 0.5], [0, 1.0], [-0.0004, 0.0004]),
ewt_detection_limit: float = 0.5,
return_abs_co: bool = False,
):
"""Given a reflectance estimate, fit a state vector including liquid water path length
based on a simple Beer-Lambert surface model.
Args:
rfl_meas: surface reflectance spectrum
wl: instrument wavelengths, must be same size as rfl_meas and in units of nm
l_shoulder: wavelength of left absorption feature shoulder
r_shoulder: wavelength of right absorption feature shoulder
lw_init: initial guess for liquid water path length, intercept, and slope
lw_bounds: lower and upper bounds for liquid water path length, intercept, and slope
ewt_detection_limit: upper detection limit for ewt
return_abs_co: if True, returns absorption coefficients of liquid water
Returns:
solution: estimated liquid water path length, intercept, and slope based on a given surface reflectance
"""
# make sure that wavelengths are provided in nm
if wl[0] < 100:
wl = units.micron_to_nm(wl)
# params needed for liquid water fitting
lw_feature_left = np.argmin(abs(l_shoulder - wl))
lw_feature_right = np.argmin(abs(r_shoulder - wl))
wl_sel = wl[lw_feature_left : lw_feature_right + 1]
# adjust upper detection limit for ewt if specified
if ewt_detection_limit != 0.5:
lw_bounds[0][1] = ewt_detection_limit
# load imaginary part of liquid water refractive index and calculate wavelength dependent absorption coefficient
# options are: 'k_22c', 'k_minus8c', 'k_minus25c', 'k_minus7c', 'k_25c_H', 'k_20c', 'k_25c_S
path_k = env.path("data", "iop", "k_liquid_water_ice.csv")
k_wi = np.genfromtxt(path_k, delimiter=",", names=True, encoding="utf-8-sig")
k_wi_idx = ~np.isnan(k_wi["wl_20c"]) & ~np.isnan(k_wi["k_20c"])
kw = np.interp(x=wl_sel, xp=k_wi["wl_20c"][k_wi_idx], fp=k_wi["k_20c"][k_wi_idx])
abs_co_w = 4 * np.pi * kw / wl_sel
rfl_meas_sel = rfl_meas[lw_feature_left : lw_feature_right + 1]
x_opt = least_squares(
fun=beer_lambert_model,
x0=lw_init,
jac="2-point",
method="trf",
bounds=(
np.array([lw_bounds[ii][0] for ii in range(3)]),
np.array([lw_bounds[ii][1] for ii in range(3)]),
),
max_nfev=15,
args=(rfl_meas_sel, wl_sel, abs_co_w),
)
solution = x_opt.x
if return_abs_co:
return solution, abs_co_w
else:
return solution
[docs]
def beer_lambert_model(x, y, wl, alpha_lw):
"""Function, which computes the vector of residuals between measured and modeled surface reflectance optimizing
for path length of surface liquid water based on the Beer-Lambert attenuation law.
Args:
x: state vector (liquid water path length, intercept, slope)
y: measurement (surface reflectance spectrum)
wl: instrument wavelengths
alpha_lw: wavelength dependent absorption coefficients of liquid water
Returns:
resid: residual between modeled and measured surface reflectance
"""
attenuation = np.exp(-x[0] * 1e7 * alpha_lw)
rho = (x[1] + x[2] * wl) * attenuation
resid = rho - y
return resid