Source code for isofit.core.common

#! /usr/bin/env python3
#
#  Copyright 2018 California Institute of Technology
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#
# ISOFIT: Imaging Spectrometer Optimal FITting
# Author: David R Thompson, david.r.thompson@jpl.nasa.gov
#

import json
import os
from collections import OrderedDict
from difflib import SequenceMatcher
from os.path import expandvars
from typing import List

import numpy as np

# sc Adding in xarray for non-gauss SRF file io
import xarray as xr
import xxhash
from scipy.interpolate import RegularGridInterpolator

from isofit.core import units

# small value used in finite difference derivatives
[docs] eps = 1e-5
# Global variable makes it non-shared mem in ray
[docs] Cache = {"stats": {}}
[docs] class VectorInterpolator: """Linear look up table interpolator. Support linear interpolation through radial space by expanding the look up tables with sin and cos dimensions. Args: grid_input: list of lists of floats, indicating the gridpoint elements in each grid dimension data_input: n dimensional array of radiative transfer engine outputs (each dimension size corresponds to the given grid_input list length, with the last dimensions equal to the number of sensor channels) version: version to use: 'rg' for scipy RegularGridInterpolator, 'mlg' for multilinear grid interpolator """ def __init__( self, grid_input: List[List[float]], data_input: np.array, version="mlg", ): # Determine if this a singular unique value, if so just return that directly val = data_input[(0,) * data_input.ndim] if np.isnan(val) and np.isnan(data_input).all() or np.all(data_input == val): self.method = -1 self.value = val return
[docs] self.single_point_data = None
# Lists and arrays are mutable, so copy first grid = grid_input.copy() data = data_input.copy() # Check if we are using a single grid point. If so, store the grid input. if np.prod(list(map(len, grid))) == 1: self.single_point_data = data
[docs] self.n = data.shape[-1]
# RegularGrid if version == "rg": grid_aug = grid + [np.arange(data.shape[-1])] self.itp = RegularGridInterpolator( grid_aug, data, bounds_error=False, fill_value=None ) self.method = 1 # Multilinear Grid elif version == "mlg": self.method = 2 # None to disable, 0 for unlimited, negatives == 1 self.cache_size = 1 self.gridtuples = [np.array(t) for t in grid] self.gridarrays = data self.binwidth = [ t[1:] - t[:-1] for t in self.gridtuples ] # binwidth arrays for each dimension self.maxbaseinds = np.array([len(t) - 1 for t in self.gridtuples]) else: raise AttributeError(f"Unknown interpolator version: {version!r}")
[docs] def _interpolate(self, points): """ Supports style 'rg' """ # If we only have one point, we can't do any interpolation, so just # return the original data. if self.single_point_data is not None: return self.single_point_data x = np.zeros((self.n, len(points) + 1)) x[:, :-1] = points # This last dimension is always an integer so no # interpolation is performed. This is done only # for performance reasons. x[:, -1] = np.arange(self.n) res = self.itp(x) return res
[docs] def _lookup(self, i, point): """ Calculates the slicing for the cube in _multilinear_grid """ j = np.searchsorted(self.gridtuples[i][:-1], point) - 1 # Bounds functions lower = lambda: max(min(self.maxbaseinds[i], j), 0) upper = lambda: max(min(self.maxbaseinds[i] + 2, j + 2), 2) if point >= self.gridtuples[i][-1]: return None, upper() - 1 elif point <= self.gridtuples[i][0]: return None, lower() else: delta = (point - self.gridtuples[i][j]) / self.binwidth[i][j] return delta, slice(lower(), upper())
[docs] def _multilinear_grid(self, points): """ Cached version of Jouni's implementation Args: points: The point being interpolated. If at the limit, the extremal value in the grid is returned. Returns: cube: np.ndarray """ deltas = [None] * points.size idxs = [None] * points.size for i, point in enumerate(points): if self.cache_size is not None: cache = Cache.setdefault(i, {}) stats = Cache["stats"].setdefault(i, {"hit": 0, "miss": 0}) if point in cache: data = cache[point] stats["hit"] += 1 else: # Simple FIFO if self.cache_size and len(cache) >= self.cache_size: cache.pop(list(cache)[0]) data = self._lookup(i, point) cache[point] = data stats["miss"] += 1 else: data = self._lookup(i, point) deltas[i], idxs[i] = data cube = np.copy(self.gridarrays[tuple(idxs)], order="A") # Only linear interpolate sliced dimensions for i, idx in enumerate(idxs): if isinstance(idx, slice): cube[0] *= 1 - deltas[i] cube[1] *= deltas[i] cube[0] += cube[1] cube = cube[0] return cube
[docs] def __call__(self, *args, **kwargs): """ Passes args to the appropriate interpolation method defined by the version at object init. """ if self.method == -1: return self.value elif self.method == 1: return self._interpolate(*args, **kwargs) elif self.method == 2: return self._multilinear_grid(*args, **kwargs)
[docs] def load_wavelen(wavelength_file: str): """Load a wavelength file, and convert to nanometers if needed. Args: wavelength_file: file to read wavelengths from Returns: (np.array, np.array): wavelengths, full-width-half-max """ q = np.loadtxt(wavelength_file) if q.shape[1] > 2: q = q[:, 1:3] if q[0, 0] < 100: q = units.micron_to_nm(q) wl, fwhm = q.T return wl, fwhm
[docs] def emissive_radiance( emissivity: np.array, T: np.array, wl: np.array ) -> (np.array, np.array): """Calcluate the radiance of a surface due to emission. Args: emissivity: surface emissivity. T: surface temperature [K] wl: emmissivity wavelengths [nm] Returns: np.array: surface upwelling radiance in uW $cm^{-2} sr^{-1} nm^{-nm}$ np.array: partial derivative of radiance with respect to temperature uW $cm^{-2} sr^{-1} nm^{-1} k^{-1}$ """ c_1 = 1.88365e32 / np.pi c_2 = 14387690 J_per_eV = 1.60218e-19 wl_um = wl / 1000.0 ph_per_sec_cm2_sr_nm = c_1 / (wl**4) / (np.exp(c_2 / wl / T) - 1.0) * emissivity # photon energy in eV eV_per_sec_cm2_sr_nm = 1.2398 * ph_per_sec_cm2_sr_nm / wl_um W_per_cm2_sr_nm = J_per_eV * eV_per_sec_cm2_sr_nm uW_per_cm2_sr_nm = units.W_to_uW(W_per_cm2_sr_nm) dRdn_dT = units.W_to_uW( c_1 / (wl**4) * (-pow(np.exp(c_2 / wl / T) - 1.0, -2.0)) * np.exp(c_2 / wl / T) * (-pow(T, -2) * c_2 / wl) * emissivity / wl_um * 1.2398 * J_per_eV ) return uW_per_cm2_sr_nm, dRdn_dT
[docs] def svd_inv(C: np.array, hashtable: OrderedDict = None, max_hash_size: int = None): """Matrix inversion, based on decomposition. Built to be stable, and positive. Args: C: matrix to invert hashtable: if used, the hashtable to store/retrieve results in/from max_hash_size: maximum size of hashtable Return: np.array: inverse of C """ return svd_inv_sqrt(C, hashtable, max_hash_size)[0]
[docs] def svd_inv_sqrt( C: np.array, hashtable: OrderedDict = None, max_hash_size: int = None ) -> (np.array, np.array): """Matrix inversion, based on decomposition. Built to be stable, and positive. Args: C: matrix to invert hashtable: if used, the hashtable to store/retrieve results in/from max_hash_size: maximum size of hashtable Return: (np.array, np.array): inverse of C and square root of the inverse of C """ # If we have a hash table, look for the precalculated solution h = None if hashtable is not None: # If arrays are in Fortran ordering, they are not hashable. if not C.flags["C_CONTIGUOUS"]: C = C.copy(order="C") h = xxhash.xxh64_digest(C) if h in hashtable: return hashtable[h] # Default to using numpy eigh (which uses LAPACK evd driver by default). try: D, P = np.linalg.eigh(C) except: D, P = None, None # Sanity check for edge cases that we encounter with negative eigen values, and we offset by inv_eps. inv_eps_checks = [1e-6, 1e-5, 1e-4] inv_eps = None if D is None or np.any(D < 0) or np.any(np.isnan(D)): for inv_eps in inv_eps_checks: try: D, P = np.linalg.eigh(C + np.eye(C.shape[0]) * inv_eps) if not (np.any(D < 0) or np.any(np.isnan(D))): break except: continue else: raise ValueError( "Matrix inversion contains negative values, " + "even after adding {} to the diagonal.".format(inv_eps) ) Ds = np.diag(1 / np.sqrt(D)) L = P @ Ds Cinv_sqrt = L @ P.T Cinv = L @ L.T # If there is a hash table, cache our solution. Bound the total cache # size by removing any extra items in FIFO order. if (hashtable is not None) and (max_hash_size is not None): hashtable[h] = (Cinv, Cinv_sqrt) while len(hashtable) > max_hash_size: hashtable.popitem(last=False) return Cinv, Cinv_sqrt
[docs] def expand_path(directory: str, subpath: str) -> str: """Expand a path variable to an absolute path, if it is not one already. Args: directory: absolute location subpath: path to expand Returns: str: expanded path """ if subpath.startswith("/"): return subpath return os.path.join(directory, subpath)
[docs] def recursive_replace(obj, key, val) -> None: """Find and replace a vector in a nested (mutable) structure. Args: obj: object to replace within key: key to replace val: value to replace with """ if isinstance(obj, dict): if key in obj: obj[key] = val for item in obj.values(): recursive_replace(item, key, val) elif any(isinstance(obj, t) for t in (list, tuple)): for item in obj: recursive_replace(item, key, val)
[docs] def get_absorption(wl: np.array, absfile: str) -> (np.array, np.array): """Calculate water and ice absorption coefficients using indices of refraction, and interpolate them to new wavelengths (user specifies nm). Args: wl: wavelengths to interpolate to absfile: file containing indices of refraction. Wavelength unts are in nm. Returns: np.array: interpolated, wavelength-specific water absorption coefficients np.array: interpolated, wavelength-specific ice absorption coefficients """ # read the indices of refraction q = np.loadtxt(absfile, delimiter=",") wl_orig_nm = q[:, 0] wl_orig_cm = units.nm_to_cm(wl_orig_nm) water_imag = q[:, 2] ice_imag = q[:, 4] # calculate absorption coefficients in cm^-1 water_abscf = water_imag * np.pi * 4.0 / wl_orig_cm ice_abscf = ice_imag * np.pi * 4.0 / wl_orig_cm # interpolate to new wavelengths (user provides nm) water_abscf_intrp = np.interp(wl, wl_orig_nm, water_abscf) ice_abscf_intrp = np.interp(wl, wl_orig_nm, ice_abscf) return water_abscf_intrp, ice_abscf_intrp
[docs] def recursive_reencode(j, shell_replace: bool = True): """Recursively re-encode a mutable object (ascii->str). Args: j: object to reencode shell_replace: boolean helper for recursive calls Returns: Object: expanded, reencoded object """ if isinstance(j, dict): for key, value in j.items(): j[key] = recursive_reencode(value) return j elif isinstance(j, list): for i, k in enumerate(j): j[i] = recursive_reencode(k) return j elif isinstance(j, tuple): return tuple([recursive_reencode(k) for k in j]) else: if shell_replace and isinstance(j, str): try: j = expandvars(j) except IndexError: pass return j
[docs] def json_load_ascii(filename: str, shell_replace: bool = True) -> dict: """Load a hierarchical structure, convert all unicode to ASCII and expand environment variables. Args: filename: json file to load from shell_replace: boolean Returns: dict: encoded dictionary """ with open(filename, "r") as fin: j = json.load(fin) return recursive_reencode(j, shell_replace)
[docs] def expand_all_paths(to_expand: dict, absdir: str): """Expand any dictionary entry containing the string 'file' into an absolute path, if needed. Args: to_expand: dictionary to expand absdir: path to expand with (absolute directory) Returns: dict: dictionary with expanded paths """ def recursive_expand(j): if isinstance(j, dict): for key, value in j.items(): if ( isinstance(key, str) and ("file" in key or "directory" in key or "path" in key) and isinstance(value, str) ): j[key] = expand_path(absdir, value) else: j[key] = recursive_expand(value) return j elif isinstance(j, list): for i, k in enumerate(j): j[i] = recursive_expand(k) return j elif isinstance(j, tuple): return tuple([recursive_reencode(k) for k in j]) return j return recursive_expand(to_expand)
[docs] def find_header(imgfile: str) -> str: """Safely return the header associated with an image file. Args: imgfile: file name of base image Returns: str: header filename if one exists """ if os.path.exists(imgfile + ".hdr"): return imgfile + ".hdr" ind = imgfile.rfind(".raw") if ind >= 0: return imgfile[0:ind] + ".hdr" ind = imgfile.rfind(".img") if ind >= 0: return imgfile[0:ind] + ".hdr" raise IOError("No header found for file {0}".format(imgfile))
[docs] def calculate_resample_matrix( wl: np.array, wl2: np.array, fwhm2: np.array, srf_file: str = None ) -> np.array: """Calculate the resampling matrix for a given set of wavelengths and FWHM. Once calculated, resmpling is just the dot product of this matrix with the vector to be resampled. Args: wl: sample starting wavelengths wl2: wavelengths to resample to fwhm2: full-width-half-max at resample resolution srf_file: SRF for the sensor if not assuming Gaussian Returns: np.array: transformation matrix (H) """ if srf_file is None: H = np.array( [ spectral_response_function(wl, wi, fwhmi / 2.355) for wi, fwhmi in zip(wl2, fwhm2) ] ) H[np.isnan(H)] = 0 else: # Loading in user-supplied srf file and assigning variables srf_data = xr.open_dataset(srf_file) sensor_rsr, rsr_wls = srf_data.RSR.data, srf_data.wavelength.data # Grabbing indices of rsr wavelengths which match sensor wavelengths (wl2) idx = [np.argwhere(abs(rsr_wls - wl2[i]) < 0.1)[0] for i in range(len(wl2))] idx = np.asarray(idx).flatten() # Getting RSR at sensor spectral resolution rsr_channel_res = sensor_rsr[:, idx] # Normalize H to unit length H = rsr_channel_res / np.sum(rsr_channel_res, axis=1)[:, np.newaxis] H[np.isnan(H)] = 0 return H
[docs] def resample_spectrum( x: np.array, wl: np.array, wl2: np.array, fwhm2: np.array, fill: bool = False, srf_file: str = None, H: np.array = None, ) -> np.array: """Resample a spectrum to a new wavelength / FWHM. Assumes Gaussian SRFs. Args: x: radiance vector wl: sample starting wavelengths wl2: wavelengths to resample to fwhm2: full-width-half-max at resample resolution fill: boolean indicating whether to fill in extrapolated regions ### sc Adding for non-Gaussian SRF ### srf_file: SRF for the sensor if not assuming Gaussian H: pre-computed transformation matrix, to enable caching Returns: np.array: interpolated radiance vector """ # sc including if else to add non-Gaussian SRF # Probably a better way than this with file paths, etc. if H is None: H = calculate_resample_matrix(wl, wl2, fwhm2, srf_file) dims = len(x.shape) if fill: if dims > 1: raise Exception("resample_spectrum(fill=True) only works with vectors") x = x.reshape(-1, 1) xnew = np.dot(H, x).ravel() good = np.isfinite(xnew) for i, xi in enumerate(xnew): if not good[i]: nearest_good_ind = np.argmin(abs(wl2[good] - wl2[i])) xnew[i] = xnew[nearest_good_ind] return xnew else: # Matrix if dims > 1: return np.dot(H, x.T).T # Vector else: x = x.reshape(-1, 1) return np.dot(H, x).ravel()
[docs] def load_spectrum(spectrum_file: str) -> (np.array, np.array): """Load a single spectrum from a text file with initial columns giving wavelength and magnitude, respectively. Args: spectrum_file: file to load spectrum from Returns: np.array: spectrum values np.array: wavelengths, if available in the file """ spectrum = np.loadtxt(spectrum_file) if spectrum.ndim > 1: spectrum = spectrum[:, :2] wavelengths, spectrum = spectrum.T if wavelengths[0] < 100: wavelengths = units.micron_to_nm(wavelengths) return spectrum, wavelengths else: return spectrum, None
[docs] def spectral_response_function(response_range: np.array, mu: float, sigma: float): """Calculate the spectral response function. Args: response_range: signal range to calculate over mu: mean signal value sigma: signal variation Returns: np.array: spectral response function """ u = (response_range - mu) / abs(sigma) y = (1.0 / (np.sqrt(2.0 * np.pi) * abs(sigma))) * np.exp(-u * u / 2.0) srf = y / y.sum() return srf
[docs] def combos(inds: List[List[float]]) -> np.array: """Return all combinations of indices in a list of index sublists. For example, the call:: combos([[1, 2], [3, 4, 5]]) ...[[1, 3], [2, 3], [1, 4], [2, 4], [1, 5], [2, 5]] This is used for interpolation in the high-dimensional LUT. Args: inds: list of lists of values to expand Returns: np.array: meshgrid array of combinations """ n = len(inds) cases = np.prod([len(i) for i in inds]) gridded_combinations = np.array(np.meshgrid(*inds)).reshape((n, cases)).T return gridded_combinations
[docs] def conditional_gaussian( mu: np.array, C: np.array, window: np.array, remain: np.array, x: np.array ) -> (np.array, np.array): """Define the conditional Gaussian distribution for convenience. len(window)+len(remain)=len(x) Args: mu: mean values C: matrix for conditioning window: contains all indices not in remain remain: contains indices of the observed part x1 x: values to condition with Returns: (np.array, np.array): conditional mean, conditional covariance """ w = np.array(window)[:, np.newaxis] r = np.array(remain)[:, np.newaxis] C11 = C[r, r.T] C12 = C[r, w.T] C21 = C[w, r.T] C22 = C[w, w.T] Cinv = svd_inv(C11) conditional_mean = mu[window] + C21 @ Cinv @ (x - mu[remain]) conditional_cov = C22 - C21 @ Cinv @ C12 return conditional_mean, conditional_cov
[docs] def envi_header(inputpath): """ Convert a envi binary/header path to a header, handling extensions Args: inputpath: path to envi binary file Returns: str: the header file associated with the input reference. """ if ( os.path.splitext(inputpath)[-1] == ".img" or os.path.splitext(inputpath)[-1] == ".dat" or os.path.splitext(inputpath)[-1] == ".raw" ): # headers could be at either filename.img.hdr or filename.hdr. Check both, return the one that exists if it # does, if not return the latter (new file creation presumed). hdrfile = os.path.splitext(inputpath)[0] + ".hdr" if os.path.isfile(hdrfile): return hdrfile elif os.path.isfile(inputpath + ".hdr"): return inputpath + ".hdr" return hdrfile elif os.path.splitext(inputpath)[-1] == ".hdr": return inputpath else: return inputpath + ".hdr"
[docs] def ray_start(num_cores, num_cpus=2, memory_b=-1): import subprocess base_args = [ "ray", "start", "--head", "--num-cpus", str(int(num_cores / num_cpus)), "--include-dashboard", "0", ] if memory_b != -1: base_args.append("--memory") base_args.append(str(int(memory_b / num_cpus))) head_args = base_args.copy() head_args.append("--head") result = subprocess.run(head_args, capture_output=True) stdout = str(result.stdout, encoding="utf-8") if num_cpus > 1: key = "--address=" start_loc = stdout.find(key) + len(key) + 1 end_loc = stdout.find("'", start_loc) address = stdout[start_loc:end_loc] base_args.append("--address") base_args.append(address) result = subprocess.run(base_args, capture_output=True)
from datetime import datetime as dtt
[docs] class Track: """ Tracks and reports the percentage complete for some arbitrary sized iterable. Borrowed from mlky """ def __init__(self, total, step=5, print=print, reverse=False, message="complete"): """ Parameters ---------- total: int, iterable Total items in iterable. If iterable, will call len() on it step: float, default=0.05 Percentage step size to use for reporting, eg. 0.05 is every 5% print: func, default=print Print function to use, eg. logging.info reverse: bool, default=False Reverse the count such that 0 is 100% message: str, default="complete" Message to be included in the output """ if hasattr(total, "__iter__"): total = len(total)
[docs] self.step = step
[docs] self.total = total
[docs] self.print = print
[docs] self.start = dtt.now()
[docs] self.percent = step
[docs] self.reverse = reverse
[docs] self.message = message
[docs] def __call__(self, count): """ Parameters ---------- count: int, iterable The current count of items finished. If iterable, will call len() on it Returns ------- bool True if a percentage step was just crossed, False otherwise """ if hasattr(count, "__iter__"): count = len(count) current = count / self.total if self.reverse: current = 1 - current current *= 100 if current >= self.percent: elap = dtt.now() - self.start rate = elap / self.total esti = 100 / self.percent * elap - elap self.print( f"{current:6.2f}% {self.message} (elapsed: {elap}, rate: {rate}, eta: {esti})" ) self.percent += self.step return True return False
[docs] def compare(a, b, threshold=0.8, not_same=True): """ Compares strings in `a` to strings in `b` Parameters ---------- a : str | list[str] A string or list of strings to compare with `b` a : str | list[str] A string or list of strings to compare with `a` threshold : float, default=0.8 Ratio threshold to meet to be considered matching. Must be between 0 and 1. not_same : bool, default=True Only include matches in which the two strings are not the same Returns ------- matching : dict For each string in `a`, return a list of strings in `b` that meet the matching threshold """ if isinstance(a, str): a = [a] if isinstance(b, str): b = [b] matches = {} for x in a: for y in b: if not_same and x == y: continue if SequenceMatcher(None, x, y).ratio() >= threshold: matches.setdefault(x, []).append(y) return matches