#! /usr/bin/env python3
#
# Copyright 2018 California Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ISOFIT: Imaging Spectrometer Optimal FITting
# Author: David R Thompson, david.r.thompson@jpl.nasa.gov
#
from __future__ import annotations
import logging
import numpy as np
from scipy.interpolate import interp1d, splev, splrep
from scipy.io import loadmat
from scipy.signal import convolve
from isofit.core import units
from isofit.core.common import (
emissive_radiance,
eps,
load_wavelen,
resample_spectrum,
spectral_response_function,
svd_inv_sqrt,
)
### Variables ###
# Max. wavelength difference (nm) that does not trigger expensive resampling
### Classes ###
[docs]
class Instrument:
def __init__(self, full_config: Config):
"""A model of the spectrometer instrument, including spectral
response and noise covariance matrices. Noise is typically calculated
from a parametric model, fit for the specific instrument. It is a
function of the radiance level."""
config = full_config.forward_model.instrument
# If needed, skip first index column and/or convert to nanometers
self.wl_init, self.fwhm_init = load_wavelen(config.wavelength_file)
[docs]
self.n_chan = len(self.wl_init)
[docs]
self.fast_resample = config.fast_resample
[docs]
self.bounds = config.statevector.get_all_bounds()
[docs]
self.scale = config.statevector.get_all_scales()
[docs]
self.init = config.statevector.get_all_inits()
[docs]
self.prior_mean = np.array(config.statevector.get_all_prior_means())
[docs]
self.prior_sigma = np.array(config.statevector.get_all_prior_sigmas())
[docs]
self.Sa_cached = np.diagflat(np.power(self.prior_sigma, 2))
[docs]
self.Sa_normalized = self.Sa_cached / np.mean(np.diag(self.Sa_cached))
self.Sa_inv_normalized, self.Sa_inv_sqrt_normalized = svd_inv_sqrt(
self.Sa_normalized
)
[docs]
self.statevec_names = config.statevector.get_element_names()
[docs]
self.n_state = len(self.statevec_names)
[docs]
self.integrations = config.integrations
[docs]
self.dn_uncertainty_embedding = None
if (
config.unknowns is not None
and config.unknowns.dn_uncertainty_file is not None
):
dn_uncertainty_mat = loadmat(config.unknowns.dn_uncertainty_file)
# Check validity of linearity file for dn-based noise
keys = [
"input_dn",
"dn_ratio",
"rcc",
"rcc_wl",
]
bad = [
1 if np.any(~np.isfinite(dn_uncertainty_mat[key])) else 0
for key in keys
]
if np.sum(bad):
er = f"""
Invalid value found in dn_uncertainty_mat keys: {[keys[i] for i in bad if i]}.
Check file at: {config.unknowns.dn_uncertainty_file}
"""
logging.error(er)
raise ValueError(er)
input_dn = dn_uncertainty_mat["input_dn"].squeeze()
dn_ratio = dn_uncertainty_mat["dn_ratio"].squeeze()
rcc_in = dn_uncertainty_mat["rcc"].squeeze()
rcc_wl = dn_uncertainty_mat["rcc_wl"].squeeze()
rcc_interp = interp1d(rcc_wl, rcc_in, fill_value="extrapolate")
self.dn_uncertainty_rcc = rcc_interp(self.wl_init)
self.dn_uncertainty_interp = interp1d(
input_dn, dn_ratio, fill_value="extrapolate"
)
self.dn_uncertainty_inflation = dn_uncertainty_mat.get(
"inflation", [1.0]
).squeeze()
self.dn_uncertainty_embedding = dn_uncertainty_mat.get(
"embedding_location", "Sy"
)
if config.SNR is not None:
self.model_type = "SNR"
self.snr = config.SNR
elif config.parametric_noise_file is not None:
self.model_type = "parametric"
self.noise_file = config.parametric_noise_file
coeffs = np.loadtxt(self.noise_file, delimiter=" ", comments="#")
p_a, p_b, p_c = [
interp1d(coeffs[:, 0], coeffs[:, col], fill_value="extrapolate")
for col in (1, 2, 3)
]
self.noise = np.array([[p_a(w), p_b(w), p_c(w)] for w in self.wl_init])
elif config.pushbroom_noise_file is not None:
self.model_type = "pushbroom"
self.noise_file = config.pushbroom_noise_file
D = loadmat(self.noise_file)
self.ncols = D["columns"][0, 0]
if self.n_chan != np.sqrt(D["bands"][0, 0]):
logging.error("Noise model mismatches wavelength # bands")
raise ValueError("Noise model mismatches wavelength # bands")
cshape = (self.ncols, self.n_chan, self.n_chan)
self.covs = D["covariances"].reshape(cshape)
self.integrations = config.integrations
elif config.nedt_noise_file is not None:
self.model_type = "NEDT"
self.noise_file = config.nedt_noise_file
self.noise_data = np.loadtxt(self.noise_file, delimiter=",", skiprows=8)
noise_data_w_nm = units.micron_to_nm(self.noise_data[:, 0])
noise_data_NEDT = self.noise_data[:, 1]
nedt = interp1d(noise_data_w_nm, noise_data_NEDT)(self.wl_init)
T, emis = 300.0, 0.95 # From Glynn Hulley, 2/18/2020
_, drdn_dT = emissive_radiance(emis, T, self.wl_init)
self.noise_NESR = nedt * drdn_dT
else:
raise IndexError("Please define the instrument noise.")
# This should never be reached, as an error is designated in the config read
# We track several unretrieved free variables, that are specified
# in a fixed order (always start with relative radiometric
# calibration)
[docs]
self.unknowns = config.unknowns
[docs]
self.bval = np.zeros(self.n_chan)
[docs]
self.bvec = ["Cal_Relative_%04i" % int(w) for w in self.wl_init]
# self.unknowns should always exist via configs
# but may not exist for manual configs
if self.unknowns:
# Now handle spectral calibration uncertainties
if self.unknowns.wavelength_calibration_uncertainty is not None:
self.bvec += ["Cal_Spectral"]
self.cal_spectral_idx = len(self.bval)
self.bval = np.hstack(
[self.bval, self.unknowns.wavelength_calibration_uncertainty]
)
if self.unknowns.stray_srf_uncertainty is not None:
self.bvec += ["Cal_Stray_SRF"]
self.cal_stray_idx = len(self.bval) + 1
self.bval = np.hstack([self.bval, self.unknowns.stray_srf_uncertainty])
# Determine whether the calibration is fixed. If it is fixed,
# and the wavelengths of radiative transfer modeling and instrument
# are the same, then we can bypass computationally expensive sampling
# operations later.
[docs]
self.calibration_fixed = True
if (
config.statevector.GROW_FWHM is not None
or config.statevector.WL_SHIFT is not None
or config.statevector.WL_SPACE is not None
):
self.calibration_fixed = False
[docs]
def xa(self):
"""Mean of prior distribution, calculated at state x."""
return self.init.copy()
[docs]
def Sa(self):
"""Covariance of prior distribution (diagonal)."""
if self.n_state == 0:
return np.zeros((0, 0), dtype=float)
return self.Sa_cached
[docs]
def Sb(self, meas):
"""Uncertainty due to unmodeled variables."""
bval = self.bval.copy()
# First we take care of radiometric uncertainties, which add
# in quadrature. We sum their squared values. Systematic
# radiometric uncertainties account for differences in sampling
# and radiative transfer that manifest predictably as a function
# of wavelength.
if self.unknowns:
if self.unknowns.channelized_radiometric_uncertainty_file is not None:
f = self.unknowns.channelized_radiometric_uncertainty_file
u = np.loadtxt(f, comments="#")
if len(u.shape) > 0 and u.shape[1] > 1:
u = u[:, 1]
bval[: self.n_chan] = bval[: self.n_chan] + pow(u, 2)
# Uncorrelated radiometric uncertainties are consistent and
# independent in all channels.
if self.unknowns.uncorrelated_radiometric_uncertainty:
u = self.unknowns.uncorrelated_radiometric_uncertainty
bval[: self.n_chan] = bval[: self.n_chan] + pow(
np.ones(self.n_chan) * u, 2
)
# Uncertainty due to imperfect knowledge of linearity correction
if self.dn_uncertainty_embedding == "Sb":
bval[: self.n_chan] += np.power(
self.DN_additive_uncertainty(
meas,
self.dn_uncertainty_rcc,
self.dn_uncertainty_interp,
self.dn_uncertainty_inflation,
),
2,
)
# Radiometric uncertainties combine via Root Sum Square...
# Be careful to avoid square roots of zero!
small = np.ones(self.n_chan) * eps
bval[: self.n_chan] = np.maximum(bval[: self.n_chan], small)
bval[: self.n_chan] = np.sqrt(bval[: self.n_chan])
return np.diagflat(np.power(bval, 2))
[docs]
def Sy(self, meas, geom):
"""Calculate measuremment error covariance. Kelvin Man Yiu Leung and
Jayanth Jagalur Mohan (MIT) developed the noise clipping strategy.
Input: meas, the instrument measurement
Returns: Sy, the measurement error covariance due to instrument noise
"""
Sy = None
if self.model_type == "SNR":
nedl = (1.0 / self.snr) * meas
minimum_noise = np.sqrt(1e-7)
bad = nedl < minimum_noise
if np.any(bad):
logging.debug(
"SNR noise model found noise <= 0 - adjusting to slightly positive"
" to avoid /0."
)
nedl[bad] = minimum_noise
Sy = np.diagflat(np.power(nedl, 2))
elif self.model_type == "parametric":
noise_plus_meas = self.noise[:, 1] + meas
if np.any(noise_plus_meas <= 0):
noise_plus_meas[noise_plus_meas <= 0] = 1e-5
logging.debug(
"Parametric noise model found noise <= 0 - adjusting to slightly"
" positive to avoid /0."
)
nedl = np.abs(
self.noise[:, 0] * np.sqrt(noise_plus_meas) + self.noise[:, 2]
)
nedl = nedl / np.sqrt(self.integrations)
Sy = np.diagflat(np.power(nedl, 2))
elif self.model_type == "pushbroom":
C = np.squeeze(self.covs.mean(axis=0))
Sy = C / np.sqrt(self.integrations)
elif self.model_type == "NEDT":
Sy = np.diagflat(np.power(self.noise_NESR, 2))
if self.dn_uncertainty_embedding:
# Uncertainty due to imperfect knowledge of linearity correction
np.fill_diagonal(
Sy,
(
Sy.diagonal()
+ self.DN_additive_uncertainty(
meas,
self.dn_uncertainty_rcc,
self.dn_uncertainty_interp,
self.dn_uncertainty_inflation,
)
),
)
return Sy
[docs]
def dmeas_dinstrument(self, x_instrument, wl_hi, rdn_hi):
"""Jacobian of measurement with respect to the instrument
free parameter state vector. We use finite differences for now."""
dmeas_dinstrument = np.zeros((self.n_chan, self.n_state), dtype=float)
if self.n_state == 0:
return dmeas_dinstrument
meas = self.sample(x_instrument, wl_hi, rdn_hi)
for ind in range(self.n_state):
x_instrument_perturb = x_instrument.copy()
x_instrument_perturb[ind] = x_instrument_perturb[ind] + eps
meas_perturb = self.sample(x_instrument_perturb, wl_hi, rdn_hi)
dmeas_dinstrument[:, ind] = (meas_perturb - meas) / eps
return dmeas_dinstrument
[docs]
def dmeas_dinstrumentb(self, x_instrument, wl_hi, rdn_hi):
"""Jacobian of radiance with respect to the instrument parameters
that are unknown and not retrieved, i.e., the inevitable persisting
uncertainties in instrument spectral and radiometric calibration.
Input: meas, a vector of size n_chan
Returns: Kb_instrument, a matrix of size [n_measurements x nb_instrument]
"""
# Uncertainty due to radiometric calibration
meas = self.sample(x_instrument, wl_hi, rdn_hi)
dmeas_dinstrument = np.hstack(
(
np.diagflat(meas),
np.zeros((self.n_chan, len(self.bvec) - len(self.wl_init))),
)
)
# Uncertainty due to spectral calibration
if self.unknowns:
if self.unknowns.wavelength_calibration_uncertainty is not None:
dmeas_dinstrument[:, self.cal_spectral_idx] = self.sample(
x_instrument, wl_hi, np.hstack((np.diff(rdn_hi), np.array([0])))
)
# Uncertainty due to spectral stray light
if self.unknowns.stray_srf_uncertainty is not None:
ssrf = spectral_response_function(np.arange(-10, 11), 0, 4)
blur = convolve(meas, ssrf, mode="same")
dmeas_dinstrument[:, self.cal_stray_idx] = blur - meas
return dmeas_dinstrument
[docs]
def sample(self, x_instrument, wl_hi, rdn_hi):
"""Apply instrument sampling to a radiance spectrum, returning predicted measurement."""
if (
self.calibration_fixed
and (len(self.wl_init) == len(wl_hi))
and all((self.wl_init - wl_hi) < wl_tol)
):
return rdn_hi
wl, fwhm = self.calibration(x_instrument)
# If rdn_hi is a vector of length 1, return itself
if rdn_hi.ndim == 1 and len(rdn_hi) <= 1:
return rdn_hi
# If rdn_hi is a vector of length > 1, return it resampled to instrument
elif rdn_hi.ndim == 1 and len(rdn_hi) > 1:
return resample_spectrum(rdn_hi, wl_hi, wl, fwhm)
# If rdn_hi is a multidim array, do the multidim resampling
else:
resamp = []
# The "fast resample" option approximates a complete resampling
# by a convolution with a uniform FWHM.
if self.fast_resample:
for i, r in enumerate(rdn_hi):
ssrf = spectral_response_function(np.arange(-10, 11), 0, fwhm[0])
blur = convolve(r, ssrf, mode="same")
resamp.append(interp1d(wl_hi, blur)(wl))
else:
for i, r in enumerate(rdn_hi):
r2 = resample_spectrum(r, wl_hi, wl, fwhm)
resamp.append(r2)
return np.array(resamp)
[docs]
def simulate_measurement(self, meas, geom):
"""Simulate a measurement by the given sensor, for a true radiance
sampled to instrument wavelengths. This basically just means
drawing a sample from the noise distribution."""
Sy = self.Sy(meas, geom)
mu = np.zeros(meas.shape)
rdn_sim = meas + np.random.multivariate_normal(mu, Sy)
return rdn_sim
[docs]
def calibration(self, x_instrument):
"""Calculate the measured wavelengths."""
wl, fwhm = self.wl_init, self.fwhm_init
space_orig = wl - wl[0]
offset = wl[0]
if "GROW_FWHM" in self.statevec_names:
ind = self.statevec_names.index("GROW_FWHM")
fwhm = fwhm + x_instrument[ind]
elif any([v.startswith("FWHMSPL") for v in self.statevec_names]):
# cubic spline perturbation
channels, vals = [], []
for i, v in enumerate(self.statevec_names):
if v.startswith("FWHMSPL"):
chan = float(v.split("_")[1])
channels.append(chan)
vals.append(x_instrument[i])
sp = splrep(channels, vals, s=0)
xnew = np.arange(len(wl))
fwhm = fwhm + splev(xnew, sp)
if "WL_SPACE" in self.statevec_names:
ind = self.statevec_names.index("WL_SPACE")
space = x_instrument[ind]
else:
space = 1.0
if "WL_SHIFT" in self.statevec_names:
ind = self.statevec_names.index("WL_SHIFT")
shift = x_instrument[ind]
elif any([v.startswith("WLSPL") for v in self.statevec_names]):
# cubic spline perturbation
channels, vals = [], []
for i, v in enumerate(self.statevec_names):
if v.startswith("WLSPL"):
chan = int(v.split("_")[1])
channels.append(chan)
vals.append(x_instrument[i])
sp = splrep(channels, vals, s=0)
xnew = np.arange(len(wl))
shift = splev(xnew, sp)
else:
shift = 0.0
wl = offset + shift + space_orig * space
return wl, fwhm
@staticmethod
[docs]
def DN_additive_uncertainty(meas, rcc, interp, inflation):
# Into DN space with rccs
dn_est = np.maximum(meas / rcc, 0)
noise_est = interp(dn_est)
return np.abs(meas * (noise_est - 1) * inflation)
[docs]
def summarize(self, x_instrument, geom):
"""Summary of state vector."""
if len(x_instrument) < 1:
return ""
return "Instrument: " + " ".join(["%5.3f" % xi for xi in x_instrument])