#! /usr/bin/env python3
#
# Copyright 2018 California Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ISOFIT: Imaging Spectrometer Optimal FITting
# Author: David R Thompson, david.r.thompson@jpl.nasa.gov
#
from __future__ import annotations
import numpy as np
from scipy.linalg import block_diag, norm
from isofit.core.common import svd_inv_sqrt
from isofit.surface.surface import Surface
[docs]
class MultiComponentSurface(Surface):
"""A model of the surface based on a collection of multivariate
Gaussians, with one or more equiprobable components and full
covariance matrices.
To evaluate the probability of a new spectrum, we calculate the
Mahalanobis distance to each component cluster, and use that as our
Multivariate Gaussian surface model.
"""
def __init__(self, full_config: Config):
"""."""
super().__init__(full_config)
config = full_config.forward_model.surface
# Models are stored as dictionaries in .mat format
# TODO: enforce surface_file existence in the case of multicomponent_surface
[docs]
self.components = list(zip(self.model_dict["means"], self.model_dict["covs"]))
[docs]
self.n_comp = len(self.components)
[docs]
self.wl = self.model_dict["wl"][0]
[docs]
self.n_wl = len(self.wl)
# Set up normalization method
[docs]
self.normalize = self.model_dict["normalize"]
if self.normalize == "Euclidean":
self.norm = lambda r: norm(r)
elif self.normalize == "RMS":
self.norm = lambda r: np.sqrt(np.mean(pow(r, 2)))
elif self.normalize == "None":
self.norm = lambda r: 1.0
else:
raise ValueError("Unrecognized Normalization: %s\n" % self.normalize)
[docs]
self.selection_metric = config.selection_metric
[docs]
self.select_on_init = config.select_on_init
# Reference values are used for normalizing the reflectances.
# in the VSWIR regime, reflectances are normalized so that the model
# is agnostic to absolute magnitude.
[docs]
self.refwl = np.squeeze(self.model_dict["refwl"])
[docs]
self.idx_ref = [np.argmin(abs(self.wl - w)) for w in np.squeeze(self.refwl)]
self.idx_ref = np.array(self.idx_ref)
# Variables retrieved: each channel maps to a reflectance model parameter
rmin, rmax = -0.05, 2.0
[docs]
self.statevec_names = ["RFL_%04i" % int(w) for w in self.wl]
[docs]
self.idx_surface = np.arange(len(self.statevec_names))
# Change this if you don't want to analytical solve for all the full statevector elements.
[docs]
self.analytical_iv_idx = np.arange(len(self.statevec_names))
[docs]
self.bounds = [[rmin, rmax] for w in self.wl]
[docs]
self.scale = [1.0 for w in self.wl]
[docs]
self.init = [0.15 * (rmax - rmin) + rmin for v in self.wl]
[docs]
self.idx_lamb = np.arange(self.n_wl)
[docs]
self.n_state = len(self.statevec_names)
# Surface specific attributes. Can override in inheriting classes
[docs]
self.full_glint = False
# Cache some important computations
self.Covs, self.Cinvs, self.mus = [], [], []
self.Sa_inv_normalized, self.Sa_inv_sqrt_normalized = [], []
nprefix = self.idx_lamb[0]
nsuffix = len(self.statevec_names) - self.idx_lamb[-1] - 1
for i in range(self.n_comp):
Cov = self.components[i][1]
self.Covs.append(np.array([Cov[j, self.idx_ref] for j in self.idx_ref]))
self.Cinvs.append(svd_inv_sqrt(self.Covs[-1])[0])
self.mus.append(self.components[i][0][self.idx_ref])
# Caching the normalized Sa inv and Sa inv sqrt
Cov_full = block_diag(
np.zeros((nprefix, nprefix)),
self.components[i][1],
np.zeros((nsuffix, nsuffix)),
)
Cov_normalized = Cov_full / np.mean(np.diag(Cov_full))
Cinv_normalized, Cinv_sqrt_normalized = svd_inv_sqrt(Cov_normalized)
self.Sa_inv_normalized.append(Cinv_normalized)
self.Sa_inv_sqrt_normalized.append(Cinv_sqrt_normalized)
[docs]
def component(self, x, geom):
"""We pick a surface model component using the Mahalanobis distance.
This always uses the Lambertian (non-specular) version of the
surface reflectance. If the forward model initialize via heuristic
(i.e. algebraic inversion), the component is only calculated once
based on that first solution. That state is preserved in the
geometry object.
"""
if self.n_comp <= 1:
return 0
elif hasattr(geom, "surf_cmp_init"):
return geom.surf_cmp_init
elif self.select_on_init and hasattr(geom, "x_surf_init"):
x_surface = geom.x_surf_init
else:
x_surface = x
# Get the (possibly normalized) reflectance
lamb = self.calc_lamb(x_surface, geom)
lamb_ref = lamb[self.idx_ref]
lamb_ref = lamb_ref / self.norm(lamb_ref)
# Mahalanobis or Euclidean distances
mds = []
for ci in range(self.n_comp):
ref_mu = self.mus[ci]
ref_Cinv = self.Cinvs[ci]
if self.selection_metric == "Mahalanobis":
md = (lamb_ref - ref_mu).T.dot(ref_Cinv).dot(lamb_ref - ref_mu)
else:
md = sum(pow(lamb_ref - ref_mu, 2))
mds.append(md)
closest = np.argmin(mds)
if (
self.select_on_init
and hasattr(geom, "x_surf_init")
and (not hasattr(geom, "surf_cmp_init"))
):
geom.surf_cmp_init = closest
return closest
[docs]
def xa(self, x_surface, geom):
"""Mean of prior distribution, calculated at state x. We find
the covariance in a normalized space (normalizing by z) and then un-
normalize the result for the calling function. This always uses the
Lambertian (non-specular) version of the surface reflectance."""
lamb = self.calc_lamb(x_surface, geom)
lamb_ref = lamb[self.idx_ref]
mu = np.zeros(self.n_state)
ci = self.component(x_surface, geom)
lamb_mu = self.components[ci][0]
lamb_mu = lamb_mu * self.norm(lamb_ref)
mu[self.idx_lamb] = lamb_mu
return mu
[docs]
def Sa(self, x_surface, geom):
"""Covariance of prior distribution, calculated at state x. We find
the covariance in a normalized space (normalizing by z) and then un-
normalize the result for the calling function."""
lamb = self.calc_lamb(x_surface, geom)
lamb_ref = lamb[self.idx_ref]
ci = self.component(x_surface, geom)
Cov = self.components[ci][1]
Sa_unnormalized = Cov * (self.norm(lamb_ref) ** 2)
# select the Sa inverse from the list of components
Sa_inv_normalized = self.Sa_inv_normalized[ci]
Sa_inv_sqrt_normalized = self.Sa_inv_sqrt_normalized[ci]
# If there are no other state vector elements, we're done.
if len(self.statevec_names) == len(self.idx_lamb):
return Sa_unnormalized, Sa_inv_normalized, Sa_inv_sqrt_normalized
# Embed into a larger state vector covariance matrix
nprefix = self.idx_lamb[0]
nsuffix = len(self.statevec_names) - self.idx_lamb[-1] - 1
Cov_prefix = np.zeros((nprefix, nprefix))
Cov_suffix = np.zeros((nsuffix, nsuffix))
Sa_unnormalized = block_diag(Cov_prefix, Sa_unnormalized, Cov_suffix)
return Sa_unnormalized, Sa_inv_normalized, Sa_inv_sqrt_normalized
[docs]
def fit_params(self, rfl_meas, geom, *args):
"""Given a reflectance estimate, fit a state vector."""
x_surface = np.zeros(len(self.statevec_names))
if len(rfl_meas) != len(self.idx_lamb):
raise ValueError("Mismatched reflectances")
for i, r in zip(self.idx_lamb, rfl_meas):
x_surface[i] = max(
self.bounds[i][0] + 0.001, min(self.bounds[i][1] - 0.001, r)
)
return x_surface
[docs]
def calc_rfl(self, x_surface, geom):
"""Non-Lambertian reflectance.
Inputs:
x_surface : np.ndarray
Surface portion of the statevector element
geom : Geometry
Isofit geometry object
Outputs:
rho_dir_dir : np.ndarray
Reflectance quantity for downward direct photon paths
rho_dif_dir : np.ndarray
Reflectance quantity for downward diffuse photon paths
NOTE:
We do not handle direct and diffuse photon path reflectance
quantities differently for the multicomponent surface model.
This is why we return the same quantity for both outputs.
"""
rho_dir_dir = rho_dif_dir = self.calc_lamb(x_surface, geom)
return rho_dir_dir, rho_dif_dir
[docs]
def calc_lamb(self, x_surface, geom):
"""Lambertian reflectance."""
return x_surface[self.idx_lamb]
[docs]
def drfl_dsurface(self, x_surface, geom):
"""Partial derivative of reflectance with respect to state vector,
calculated at x_surface."""
return self.dlamb_dsurface(x_surface, geom)
[docs]
def dlamb_dsurface(self, x_surface, geom):
"""Partial derivative of Lambertian reflectance with respect to
state vector, calculated at x_surface."""
dlamb = np.eye(self.n_wl, dtype=float)
nprefix = self.idx_lamb[0]
nsuffix = self.n_state - self.idx_lamb[-1] - 1
prefix = np.zeros((self.n_wl, nprefix))
suffix = np.zeros((self.n_wl, nsuffix))
return np.concatenate((prefix, dlamb, suffix), axis=1)
[docs]
def drdn_drfl(self, L_tot, s_alb, rho_dif_dir):
"""Partial derivative of radiance with respect to
surface reflectance"""
return L_tot / ((1.0 - s_alb * rho_dif_dir) ** 2)
[docs]
def calc_Ls(self, x_surface, geom):
"""Emission of surface, as a radiance."""
return np.zeros(self.n_wl, dtype=float)
[docs]
def dLs_dsurface(self, x_surface, geom):
"""Partial derivative of surface emission with respect to state vector,
calculated at x_surface."""
dLs = np.zeros((self.n_wl, self.n_wl), dtype=float)
nprefix = self.idx_lamb[0]
nsuffix = len(self.statevec_names) - self.idx_lamb[-1] - 1
prefix = np.zeros((self.n_wl, nprefix))
suffix = np.zeros((self.n_wl, nsuffix))
return np.concatenate((prefix, dLs, suffix), axis=1)
[docs]
def drdn_dLs(self, t_total_up):
"""Partial derivative of radiance with respect to
surface emission"""
return t_total_up
[docs]
def drdn_dsurface(
self,
rho_dif_dir,
drfl_dsurface,
dLs_dsurface,
s_alb,
t_total_up,
L_tot,
L_down_dir,
):
"""Derivative of radiance with respect to
full surface vector"""
# Construct the output matrix:
# Dimensions should be (len(RT.wl), len(x_surface))
# which is correctly handled by the instrument resampling
drdn_dsurface = np.zeros(drfl_dsurface.shape)
drdn_drfl = self.drdn_drfl(L_tot, s_alb, rho_dif_dir)
drdn_dsurface[:, : self.n_wl] = np.multiply(
drdn_drfl[:, np.newaxis], drfl_dsurface[:, : self.n_wl]
)
# Get the derivative w.r.t. surface emission
drdn_dLs = np.multiply(self.drdn_dLs(t_total_up)[:, np.newaxis], dLs_dsurface)
return np.add(drdn_dsurface, drdn_dLs)
[docs]
def analytical_model(
self,
background,
L_down_dir,
L_down_dif,
L_tot,
geom,
L_dir_dir=None,
L_dir_dif=None,
L_dif_dir=None,
L_dif_dif=None,
):
"""
Linearization of the surface reflectance terms to use in the
AOE inner loop (see Susiluoto, 2025). We set the quadratic
spherical albedo term to a constant background, which
simplifies the linearization
background = s * rho_bg
"""
# If you ignore multi-scattering
theta = L_tot + (L_tot * background / (1 - background))
# theta = L_tot
H = np.eye(self.n_wl, self.n_wl)
H = theta[:, np.newaxis] * H
return H
[docs]
def summarize(self, x_surface, geom):
"""Summary of state vector."""
if len(x_surface) < 1:
return ""
return "Component: %i" % self.component(x_surface, geom)