#! /usr/bin/env python3
#
# Copyright 2018 California Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ISOFIT: Imaging Spectrometer Optimal FITting
# Author: David R Thompson, david.r.thompson@jpl.nasa.gov
#
import json
import os
from collections import OrderedDict
from difflib import SequenceMatcher
from os.path import expandvars
from typing import List
import numpy as np
# sc Adding in xarray for non-gauss SRF file io
import xarray as xr
import xxhash
from scipy.interpolate import RegularGridInterpolator
from isofit.core import units
# small value used in finite difference derivatives
# Global variable makes it non-shared mem in ray
[docs]
class VectorInterpolator:
"""Linear look up table interpolator. Support linear interpolation through radial space by expanding the look
up tables with sin and cos dimensions.
Args:
grid_input: list of lists of floats, indicating the gridpoint elements in each grid dimension
data_input: n dimensional array of radiative transfer engine outputs (each dimension size corresponds to the
given grid_input list length, with the last dimensions equal to the number of sensor channels)
version: version to use: 'rg' for scipy RegularGridInterpolator, 'mlg' for multilinear grid interpolator
"""
def __init__(
self,
grid_input: List[List[float]],
data_input: np.array,
version="mlg",
):
# Determine if this a singular unique value, if so just return that directly
val = data_input[(0,) * data_input.ndim]
if np.isnan(val) and np.isnan(data_input).all() or np.all(data_input == val):
self.method = -1
self.value = val
return
[docs]
self.single_point_data = None
# Lists and arrays are mutable, so copy first
grid = grid_input.copy()
data = data_input.copy()
# Check if we are using a single grid point. If so, store the grid input.
if np.prod(list(map(len, grid))) == 1:
self.single_point_data = data
[docs]
self.n = data.shape[-1]
# RegularGrid
if version == "rg":
grid_aug = grid + [np.arange(data.shape[-1])]
self.itp = RegularGridInterpolator(
grid_aug, data, bounds_error=False, fill_value=None
)
self.method = 1
# Multilinear Grid
elif version == "mlg":
self.method = 2
# None to disable, 0 for unlimited, negatives == 1
self.cache_size = 1
self.gridtuples = [np.array(t) for t in grid]
self.gridarrays = data
self.binwidth = [
t[1:] - t[:-1] for t in self.gridtuples
] # binwidth arrays for each dimension
self.maxbaseinds = np.array([len(t) - 1 for t in self.gridtuples])
else:
raise AttributeError(f"Unknown interpolator version: {version!r}")
[docs]
def _interpolate(self, points):
"""
Supports style 'rg'
"""
# If we only have one point, we can't do any interpolation, so just
# return the original data.
if self.single_point_data is not None:
return self.single_point_data
x = np.zeros((self.n, len(points) + 1))
x[:, :-1] = points
# This last dimension is always an integer so no
# interpolation is performed. This is done only
# for performance reasons.
x[:, -1] = np.arange(self.n)
res = self.itp(x)
return res
[docs]
def _lookup(self, i, point):
"""
Calculates the slicing for the cube in _multilinear_grid
"""
j = np.searchsorted(self.gridtuples[i][:-1], point) - 1
# Bounds functions
lower = lambda: max(min(self.maxbaseinds[i], j), 0)
upper = lambda: max(min(self.maxbaseinds[i] + 2, j + 2), 2)
if point >= self.gridtuples[i][-1]:
return None, upper() - 1
elif point <= self.gridtuples[i][0]:
return None, lower()
else:
delta = (point - self.gridtuples[i][j]) / self.binwidth[i][j]
return delta, slice(lower(), upper())
[docs]
def _multilinear_grid(self, points):
"""
Cached version of Jouni's implementation
Args:
points: The point being interpolated. If at the limit, the extremal value in
the grid is returned.
Returns:
cube: np.ndarray
"""
deltas = [None] * points.size
idxs = [None] * points.size
for i, point in enumerate(points):
if self.cache_size is not None:
cache = Cache.setdefault(i, {})
stats = Cache["stats"].setdefault(i, {"hit": 0, "miss": 0})
if point in cache:
data = cache[point]
stats["hit"] += 1
else:
# Simple FIFO
if self.cache_size and len(cache) >= self.cache_size:
cache.pop(list(cache)[0])
data = self._lookup(i, point)
cache[point] = data
stats["miss"] += 1
else:
data = self._lookup(i, point)
deltas[i], idxs[i] = data
cube = np.copy(self.gridarrays[tuple(idxs)], order="A")
# Only linear interpolate sliced dimensions
for i, idx in enumerate(idxs):
if isinstance(idx, slice):
cube[0] *= 1 - deltas[i]
cube[1] *= deltas[i]
cube[0] += cube[1]
cube = cube[0]
return cube
[docs]
def __call__(self, *args, **kwargs):
"""
Passes args to the appropriate interpolation method defined by the version at
object init.
"""
if self.method == -1:
return self.value
elif self.method == 1:
return self._interpolate(*args, **kwargs)
elif self.method == 2:
return self._multilinear_grid(*args, **kwargs)
[docs]
def load_wavelen(wavelength_file: str):
"""Load a wavelength file, and convert to nanometers if needed.
Args:
wavelength_file: file to read wavelengths from
Returns:
(np.array, np.array): wavelengths, full-width-half-max
"""
q = np.loadtxt(wavelength_file)
if q.shape[1] > 2:
q = q[:, 1:3]
if q[0, 0] < 100:
q = units.micron_to_nm(q)
wl, fwhm = q.T
return wl, fwhm
[docs]
def emissive_radiance(
emissivity: np.array, T: np.array, wl: np.array
) -> (np.array, np.array):
"""Calcluate the radiance of a surface due to emission.
Args:
emissivity: surface emissivity.
T: surface temperature [K]
wl: emmissivity wavelengths [nm]
Returns:
np.array: surface upwelling radiance in uW $cm^{-2} sr^{-1} nm^{-nm}$
np.array: partial derivative of radiance with respect to temperature uW $cm^{-2} sr^{-1} nm^{-1} k^{-1}$
"""
c_1 = 1.88365e32 / np.pi
c_2 = 14387690
J_per_eV = 1.60218e-19
wl_um = wl / 1000.0
ph_per_sec_cm2_sr_nm = c_1 / (wl**4) / (np.exp(c_2 / wl / T) - 1.0) * emissivity
# photon energy in eV
eV_per_sec_cm2_sr_nm = 1.2398 * ph_per_sec_cm2_sr_nm / wl_um
W_per_cm2_sr_nm = J_per_eV * eV_per_sec_cm2_sr_nm
uW_per_cm2_sr_nm = units.W_to_uW(W_per_cm2_sr_nm)
dRdn_dT = units.W_to_uW(
c_1
/ (wl**4)
* (-pow(np.exp(c_2 / wl / T) - 1.0, -2.0))
* np.exp(c_2 / wl / T)
* (-pow(T, -2) * c_2 / wl)
* emissivity
/ wl_um
* 1.2398
* J_per_eV
)
return uW_per_cm2_sr_nm, dRdn_dT
[docs]
def svd_inv(C: np.array, hashtable: OrderedDict = None, max_hash_size: int = None):
"""Matrix inversion, based on decomposition. Built to be stable, and positive.
Args:
C: matrix to invert
hashtable: if used, the hashtable to store/retrieve results in/from
max_hash_size: maximum size of hashtable
Return:
np.array: inverse of C
"""
return svd_inv_sqrt(C, hashtable, max_hash_size)[0]
[docs]
def svd_inv_sqrt(
C: np.array, hashtable: OrderedDict = None, max_hash_size: int = None
) -> (np.array, np.array):
"""Matrix inversion, based on decomposition. Built to be stable, and positive.
Args:
C: matrix to invert
hashtable: if used, the hashtable to store/retrieve results in/from
max_hash_size: maximum size of hashtable
Return:
(np.array, np.array): inverse of C and square root of the inverse of C
"""
# If we have a hash table, look for the precalculated solution
h = None
if hashtable is not None:
# If arrays are in Fortran ordering, they are not hashable.
if not C.flags["C_CONTIGUOUS"]:
C = C.copy(order="C")
h = xxhash.xxh64_digest(C)
if h in hashtable:
return hashtable[h]
# Default to using numpy eigh (which uses LAPACK evd driver by default).
try:
D, P = np.linalg.eigh(C)
except:
D, P = None, None
# Sanity check for edge cases that we encounter with negative eigen values, and we offset by inv_eps.
inv_eps_checks = [1e-6, 1e-5, 1e-4]
inv_eps = None
if D is None or np.any(D < 0) or np.any(np.isnan(D)):
for inv_eps in inv_eps_checks:
try:
D, P = np.linalg.eigh(C + np.eye(C.shape[0]) * inv_eps)
if not (np.any(D < 0) or np.any(np.isnan(D))):
break
except:
continue
else:
raise ValueError(
"Matrix inversion contains negative values, "
+ "even after adding {} to the diagonal.".format(inv_eps)
)
Ds = np.diag(1 / np.sqrt(D))
L = P @ Ds
Cinv_sqrt = L @ P.T
Cinv = L @ L.T
# If there is a hash table, cache our solution. Bound the total cache
# size by removing any extra items in FIFO order.
if (hashtable is not None) and (max_hash_size is not None):
hashtable[h] = (Cinv, Cinv_sqrt)
while len(hashtable) > max_hash_size:
hashtable.popitem(last=False)
return Cinv, Cinv_sqrt
[docs]
def expand_path(directory: str, subpath: str) -> str:
"""Expand a path variable to an absolute path, if it is not one already.
Args:
directory: absolute location
subpath: path to expand
Returns:
str: expanded path
"""
if subpath.startswith("/"):
return subpath
return os.path.join(directory, subpath)
[docs]
def recursive_replace(obj, key, val) -> None:
"""Find and replace a vector in a nested (mutable) structure.
Args:
obj: object to replace within
key: key to replace
val: value to replace with
"""
if isinstance(obj, dict):
if key in obj:
obj[key] = val
for item in obj.values():
recursive_replace(item, key, val)
elif any(isinstance(obj, t) for t in (list, tuple)):
for item in obj:
recursive_replace(item, key, val)
[docs]
def get_absorption(wl: np.array, absfile: str) -> (np.array, np.array):
"""Calculate water and ice absorption coefficients using indices of
refraction, and interpolate them to new wavelengths (user specifies nm).
Args:
wl: wavelengths to interpolate to
absfile: file containing indices of refraction. Wavelength unts are in nm.
Returns:
np.array: interpolated, wavelength-specific water absorption coefficients
np.array: interpolated, wavelength-specific ice absorption coefficients
"""
# read the indices of refraction
q = np.loadtxt(absfile, delimiter=",")
wl_orig_nm = q[:, 0]
wl_orig_cm = units.nm_to_cm(wl_orig_nm)
water_imag = q[:, 2]
ice_imag = q[:, 4]
# calculate absorption coefficients in cm^-1
water_abscf = water_imag * np.pi * 4.0 / wl_orig_cm
ice_abscf = ice_imag * np.pi * 4.0 / wl_orig_cm
# interpolate to new wavelengths (user provides nm)
water_abscf_intrp = np.interp(wl, wl_orig_nm, water_abscf)
ice_abscf_intrp = np.interp(wl, wl_orig_nm, ice_abscf)
return water_abscf_intrp, ice_abscf_intrp
[docs]
def recursive_reencode(j, shell_replace: bool = True):
"""Recursively re-encode a mutable object (ascii->str).
Args:
j: object to reencode
shell_replace: boolean helper for recursive calls
Returns:
Object: expanded, reencoded object
"""
if isinstance(j, dict):
for key, value in j.items():
j[key] = recursive_reencode(value)
return j
elif isinstance(j, list):
for i, k in enumerate(j):
j[i] = recursive_reencode(k)
return j
elif isinstance(j, tuple):
return tuple([recursive_reencode(k) for k in j])
else:
if shell_replace and isinstance(j, str):
try:
j = expandvars(j)
except IndexError:
pass
return j
[docs]
def json_load_ascii(filename: str, shell_replace: bool = True) -> dict:
"""Load a hierarchical structure, convert all unicode to ASCII and
expand environment variables.
Args:
filename: json file to load from
shell_replace: boolean
Returns:
dict: encoded dictionary
"""
with open(filename, "r") as fin:
j = json.load(fin)
return recursive_reencode(j, shell_replace)
[docs]
def expand_all_paths(to_expand: dict, absdir: str):
"""Expand any dictionary entry containing the string 'file' into
an absolute path, if needed.
Args:
to_expand: dictionary to expand
absdir: path to expand with (absolute directory)
Returns:
dict: dictionary with expanded paths
"""
def recursive_expand(j):
if isinstance(j, dict):
for key, value in j.items():
if (
isinstance(key, str)
and ("file" in key or "directory" in key or "path" in key)
and isinstance(value, str)
):
j[key] = expand_path(absdir, value)
else:
j[key] = recursive_expand(value)
return j
elif isinstance(j, list):
for i, k in enumerate(j):
j[i] = recursive_expand(k)
return j
elif isinstance(j, tuple):
return tuple([recursive_reencode(k) for k in j])
return j
return recursive_expand(to_expand)
[docs]
def calculate_resample_matrix(
wl: np.array, wl2: np.array, fwhm2: np.array, srf_file: str = None
) -> np.array:
"""Calculate the resampling matrix for a given set of wavelengths and FWHM.
Once calculated, resmpling is just the dot product of this matrix with the
vector to be resampled.
Args:
wl: sample starting wavelengths
wl2: wavelengths to resample to
fwhm2: full-width-half-max at resample resolution
srf_file: SRF for the sensor if not assuming Gaussian
Returns:
np.array: transformation matrix (H)
"""
if srf_file is None:
H = np.array(
[
spectral_response_function(wl, wi, fwhmi / 2.355)
for wi, fwhmi in zip(wl2, fwhm2)
]
)
H[np.isnan(H)] = 0
else:
# Loading in user-supplied srf file and assigning variables
srf_data = xr.open_dataset(srf_file)
sensor_rsr, rsr_wls = srf_data.RSR.data, srf_data.wavelength.data
# Grabbing indices of rsr wavelengths which match sensor wavelengths (wl2)
idx = [np.argwhere(abs(rsr_wls - wl2[i]) < 0.1)[0] for i in range(len(wl2))]
idx = np.asarray(idx).flatten()
# Getting RSR at sensor spectral resolution
rsr_channel_res = sensor_rsr[:, idx]
# Normalize H to unit length
H = rsr_channel_res / np.sum(rsr_channel_res, axis=1)[:, np.newaxis]
H[np.isnan(H)] = 0
return H
[docs]
def resample_spectrum(
x: np.array,
wl: np.array,
wl2: np.array,
fwhm2: np.array,
fill: bool = False,
srf_file: str = None,
H: np.array = None,
) -> np.array:
"""Resample a spectrum to a new wavelength / FWHM.
Assumes Gaussian SRFs.
Args:
x: radiance vector
wl: sample starting wavelengths
wl2: wavelengths to resample to
fwhm2: full-width-half-max at resample resolution
fill: boolean indicating whether to fill in extrapolated regions
### sc Adding for non-Gaussian SRF ###
srf_file: SRF for the sensor if not assuming Gaussian
H: pre-computed transformation matrix, to enable caching
Returns:
np.array: interpolated radiance vector
"""
# sc including if else to add non-Gaussian SRF
# Probably a better way than this with file paths, etc.
if H is None:
H = calculate_resample_matrix(wl, wl2, fwhm2, srf_file)
dims = len(x.shape)
if fill:
if dims > 1:
raise Exception("resample_spectrum(fill=True) only works with vectors")
x = x.reshape(-1, 1)
xnew = np.dot(H, x).ravel()
good = np.isfinite(xnew)
for i, xi in enumerate(xnew):
if not good[i]:
nearest_good_ind = np.argmin(abs(wl2[good] - wl2[i]))
xnew[i] = xnew[nearest_good_ind]
return xnew
else:
# Matrix
if dims > 1:
return np.dot(H, x.T).T
# Vector
else:
x = x.reshape(-1, 1)
return np.dot(H, x).ravel()
[docs]
def load_spectrum(spectrum_file: str) -> (np.array, np.array):
"""Load a single spectrum from a text file with initial columns giving
wavelength and magnitude, respectively.
Args:
spectrum_file: file to load spectrum from
Returns:
np.array: spectrum values
np.array: wavelengths, if available in the file
"""
spectrum = np.loadtxt(spectrum_file)
if spectrum.ndim > 1:
spectrum = spectrum[:, :2]
wavelengths, spectrum = spectrum.T
if wavelengths[0] < 100:
wavelengths = units.micron_to_nm(wavelengths)
return spectrum, wavelengths
else:
return spectrum, None
[docs]
def spectral_response_function(response_range: np.array, mu: float, sigma: float):
"""Calculate the spectral response function.
Args:
response_range: signal range to calculate over
mu: mean signal value
sigma: signal variation
Returns:
np.array: spectral response function
"""
u = (response_range - mu) / abs(sigma)
y = (1.0 / (np.sqrt(2.0 * np.pi) * abs(sigma))) * np.exp(-u * u / 2.0)
srf = y / y.sum()
return srf
[docs]
def combos(inds: List[List[float]]) -> np.array:
"""Return all combinations of indices in a list of index sublists.
For example, the call::
combos([[1, 2], [3, 4, 5]])
...[[1, 3], [2, 3], [1, 4], [2, 4], [1, 5], [2, 5]]
This is used for interpolation in the high-dimensional LUT.
Args:
inds: list of lists of values to expand
Returns:
np.array: meshgrid array of combinations
"""
n = len(inds)
cases = np.prod([len(i) for i in inds])
gridded_combinations = np.array(np.meshgrid(*inds)).reshape((n, cases)).T
return gridded_combinations
[docs]
def conditional_gaussian(
mu: np.array, C: np.array, window: np.array, remain: np.array, x: np.array
) -> (np.array, np.array):
"""Define the conditional Gaussian distribution for convenience.
len(window)+len(remain)=len(x)
Args:
mu: mean values
C: matrix for conditioning
window: contains all indices not in remain
remain: contains indices of the observed part x1
x: values to condition with
Returns:
(np.array, np.array): conditional mean, conditional covariance
"""
w = np.array(window)[:, np.newaxis]
r = np.array(remain)[:, np.newaxis]
C11 = C[r, r.T]
C12 = C[r, w.T]
C21 = C[w, r.T]
C22 = C[w, w.T]
Cinv = svd_inv(C11)
conditional_mean = mu[window] + C21 @ Cinv @ (x - mu[remain])
conditional_cov = C22 - C21 @ Cinv @ C12
return conditional_mean, conditional_cov
[docs]
def ray_start(num_cores, num_cpus=2, memory_b=-1):
import subprocess
base_args = [
"ray",
"start",
"--head",
"--num-cpus",
str(int(num_cores / num_cpus)),
"--include-dashboard",
"0",
]
if memory_b != -1:
base_args.append("--memory")
base_args.append(str(int(memory_b / num_cpus)))
head_args = base_args.copy()
head_args.append("--head")
result = subprocess.run(head_args, capture_output=True)
stdout = str(result.stdout, encoding="utf-8")
if num_cpus > 1:
key = "--address="
start_loc = stdout.find(key) + len(key) + 1
end_loc = stdout.find("'", start_loc)
address = stdout[start_loc:end_loc]
base_args.append("--address")
base_args.append(address)
result = subprocess.run(base_args, capture_output=True)
from datetime import datetime as dtt
[docs]
class Track:
"""
Tracks and reports the percentage complete for some arbitrary sized iterable.
Borrowed from mlky
"""
def __init__(self, total, step=5, print=print, reverse=False, message="complete"):
"""
Parameters
----------
total: int, iterable
Total items in iterable. If iterable, will call len() on it
step: float, default=0.05
Percentage step size to use for reporting, eg. 0.05 is every 5%
print: func, default=print
Print function to use, eg. logging.info
reverse: bool, default=False
Reverse the count such that 0 is 100%
message: str, default="complete"
Message to be included in the output
"""
if hasattr(total, "__iter__"):
total = len(total)
[docs]
def __call__(self, count):
"""
Parameters
----------
count: int, iterable
The current count of items finished. If iterable, will call len() on it
Returns
-------
bool
True if a percentage step was just crossed, False otherwise
"""
if hasattr(count, "__iter__"):
count = len(count)
current = count / self.total
if self.reverse:
current = 1 - current
current *= 100
if current >= self.percent:
elap = dtt.now() - self.start
rate = elap / self.total
esti = 100 / self.percent * elap - elap
self.print(
f"{current:6.2f}% {self.message} (elapsed: {elap}, rate: {rate}, eta: {esti})"
)
self.percent += self.step
return True
return False
[docs]
def compare(a, b, threshold=0.8, not_same=True):
"""
Compares strings in `a` to strings in `b`
Parameters
----------
a : str | list[str]
A string or list of strings to compare with `b`
a : str | list[str]
A string or list of strings to compare with `a`
threshold : float, default=0.8
Ratio threshold to meet to be considered matching. Must be between 0 and 1.
not_same : bool, default=True
Only include matches in which the two strings are not the same
Returns
-------
matching : dict
For each string in `a`, return a list of strings in `b` that meet the matching
threshold
"""
if isinstance(a, str):
a = [a]
if isinstance(b, str):
b = [b]
matches = {}
for x in a:
for y in b:
if not_same and x == y:
continue
if SequenceMatcher(None, x, y).ratio() >= threshold:
matches.setdefault(x, []).append(y)
return matches