Source code for isofit.utils.surface_model

#! /usr/bin/env python3
#
#  Copyright 2019 California Institute of Technology
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#
# ISOFIT: Imaging Spectrometer Optimal FITting
# Author: David R Thompson, david.r.thompson@jpl.nasa.gov
#

import os
from types import SimpleNamespace

import click
import numpy as np
import scipy
from scipy.linalg import inv
from sklearn.cluster import KMeans
from spectral.io import envi

from isofit.core import units
from isofit.core.common import envi_header, expand_path, json_load_ascii, svd_inv


[docs] def next_diag_val(C: np.ndarray, starting_index, direction): if direction == 1: for i in range(starting_index, C.shape[0]): if np.isnan(C[i, i]) == False: return C[i, i] if direction == -1: for i in range(starting_index, -1, -1): if np.isnan(C[i, i]) == False: return C[i, i] return None
[docs] def surface_model( config_path: str, wavelength_path: str = None, output_path: str = None, seed: int = 13, multisurface: bool = False, ) -> None: """The surface model tool contains everything you need to build basic multicomponent (i.e. colleciton of Gaussian) surface priors for the multicomponent surface model. Args: config_path: path to a JSON formatted surface model configuration wavelength_path: optional path to a three-column wavelength file, overriding the configuration file settings output_path: optional path to the destination .mat file, overriding the configuration file settings seed: seed used for clustering Returns: None """ # Set seed for random draws later np.random.seed(seed) # Load configuration JSON into a local dictionary configdir, _ = os.path.split(os.path.abspath(config_path)) config = json_load_ascii(config_path, shell_replace=True) if wavelength_path: wavelength_file = wavelength_path else: if "wavelength_file" not in config: raise ValueError( "Missing config parameter 'wavelength_file', set via surface_model(wavelength_path=...)" ) wavelength_file = expand_path(configdir, config["wavelength_file"]) if output_path: outfile = output_path else: if "output_model_file" not in config: raise ValueError( "Missing config parameter 'output_model_file', set via surface_model(output_path=...)" ) outfile = expand_path(configdir, config["output_model_file"]) # Determine top level parameters for q in ["sources", "normalize"]: if q not in config: raise ValueError("Missing parameter: %s" % q) normalize = config["normalize"] reference_windows = config["reference_windows"] # load wavelengths file, and change units to nm if needed if os.path.splitext(wavelength_file)[-1] == ".hdr": ds = envi.open(wavelength_file) wl = np.array([float(x) for x in ds.metadata["wavelength"]]) else: q = np.loadtxt(wavelength_file) if q.shape[1] > 2: q = q[:, 1:] wl = q[:, 0] if wl[0] < 100: wl = units.micron_to_nm(wl) nchan = len(wl) # build global reference windows refwl = [] for wi, window in enumerate(reference_windows): active_wl = np.logical_and(wl >= window[0], wl < window[1]) refwl.extend(wl[active_wl]) normind = np.array([np.argmin(abs(wl - w)) for w in refwl]) refwl = np.array(refwl, dtype=float) # create basic model template model = { "normalize": normalize, "wl": wl, "means": [], "covs": [], "attribute_means": [], "attribute_covs": [], "attributes": [], "refwl": refwl, "surface_categories": [], } # each "source" (i.e. spectral library) is treated separately for si, source_config in enumerate(config["sources"]): # Determine source parameters for q in ["input_spectrum_files", "windows", "n_components", "windows"]: if q not in source_config: raise ValueError("Source %i is missing a parameter: %s" % (si, q)) if multisurface: if "surface_category" not in source_config: raise ValueError( "Source %i is missing a parameter: surface_category" % (si) ) # Determine whether we should synthesize our own mixtures if "mixtures" in source_config: mixtures = source_config["mixtures"] elif "mixtures" in config: mixtures = config["mixtures"] else: mixtures = 0 # open input files associated with this source infiles = [ expand_path(configdir, fi) for fi in source_config["input_spectrum_files"] ] # associate attributes, if they exist. These will not be used # in the retrieval, but can be used in post-analysis if "input_attribute_files" in source_config: infiles_attributes = [ expand_path(configdir, fi) for fi in source_config["input_attribute_files"] ] if len(infiles_attributes) != len(infiles): raise IndexError("spectrum / attribute file mismatch") else: infiles_attributes = [None for fi in source_config["input_spectrum_files"]] ncomp = int(source_config["n_components"]) windows = source_config["windows"] # Surface model handling surface_category = source_config.get("surface_category", "") # load spectra spectra, attributes = [], [] for infile, attribute_file in zip(infiles, infiles_attributes): rfl = envi.open(envi_header(infile), infile) nl, nb, ns = [int(rfl.metadata[n]) for n in ("lines", "bands", "samples")] swl = np.array([float(f) for f in rfl.metadata["wavelength"]]) # Maybe convert to nanometers if swl[0] < 100: swl = units.micron_to_nm(swl) # Load library and adjust interleave, if needed rfl_mm = rfl.open_memmap(interleave="bip", writable=False) x = np.array(rfl_mm[:, :, :]) x = x.reshape(nl * ns, nb) # import spectra and resample for x1 in x: p = scipy.interpolate.interp1d( swl, x1, kind="linear", bounds_error=False, fill_value="extrapolate" ) spectra.append(p(wl)) # Load attributes if attribute_file is not None: attr = envi.open(envi_header(attribute_file), attribute_file) nla, nba, nsa = [ int(attr.metadata[n]) for n in ("lines", "bands", "samples") ] # Load library and adjust interleave, if needed attr_mm = attr.open_memmap(interleave="bip", writable=False) x = np.array(attr_mm[:, :, :]) x = x.reshape(nla * nsa, nba) model["attributes"] = attr.metadata["band names"] # import spectra and resample for x1 in x: attributes.append(x1) if len(attributes) > 0 and len(attributes) != len(spectra): raise IndexError("Mismatch in number of spectra vs. attributes") # calculate mixtures, if needed if len(attributes) > 0 and mixtures > 0: raise ValueError("Synthetic mixtures w/ attributes is not advised") n = float(len(spectra)) nmix = int(n * mixtures) for mi in range(nmix): s1, m1 = spectra[int(np.random.rand() * n)], np.random.rand() s2, m2 = spectra[int(np.random.rand() * n)], 1.0 - m1 spectra.append(m1 * s1 + m2 * s2) # Lists to arrays spectra = np.array(spectra) attributes = np.array(attributes) # Flag bad data use = np.all(np.isfinite(spectra), axis=1) spectra = spectra[use, :] if len(attributes) > 0: attributes = attributes[use, :] ## Accumulate total list of window indices # window_idx = -np.ones((nchan), dtype=int) # for wi, win in enumerate(windows): # active_wl = np.logical_and(wl >= win['interval'][0], wl < win['interval'][1]) # window_idx[active_wl] = wi # Two step model generation. First step is k-means clustering. # This is more "stable" than Expectation Maximization with an # unconstrained covariance matrix kmeans = KMeans( init="k-means++", n_clusters=ncomp, n_init=10, random_state=seed ) kmeans.fit(spectra) Z = kmeans.predict(spectra) # Build a combined dataset of attributes and spectra if len(attributes) > 0: spectra_attr = np.concatenate((spectra, attributes), axis=1) # now fit the full covariance for each component for ci in range(ncomp): print(ci, source_config["input_spectrum_files"]) m = np.mean(spectra[Z == ci, :], axis=0) C = np.cov(spectra[Z == ci, :], rowvar=False) C_base = C.copy() if len(attributes) > 0: m_attr = np.mean(spectra_attr[Z == ci, :], axis=0) C_attr = np.cov(spectra_attr[Z == ci, :], rowvar=False) # for i in range(nchan): for window in windows: window_idx = np.where( np.logical_and( wl >= window["interval"][0], wl < window["interval"][1] ) )[0] if len(window_idx) == 0: continue window_range = slice(window_idx[0], window_idx[-1] + 1) # To minimize bias, leave the channels independent # and uncorrelated if window["correlation"] == "decorrelated": c_diag = ( C[window_range, window_range] + float(window["regularizer"]) ) * np.eye(len(window_idx)) C[window_range, :] = 0 C[:, window_range] = 0 C[window_range, window_range] = c_diag for window in windows: window_idx = np.where( np.logical_and( wl >= window["interval"][0], wl < window["interval"][1] ) )[0] if len(window_idx) == 0: continue window_range = slice(window_idx[0], window_idx[-1] + 1) # Each spectral interval, or window, is constructed # using one of several rules. We can draw the covariance # directly from the data... if window["correlation"] in ["EM", "EM-gauss"]: cdiag = ( C_base[window_range, window_range] + np.eye(len(window_idx)) * float(window["regularizer"]) ).copy() if "isolated" in list(window.keys()) and window["isolated"] == 1: C[window_range, :] = 0 C[:, window_range] = 0 C[window_range, window_range] = cdiag window_idx = np.where( np.logical_and( wl >= window["interval"][0], wl < window["interval"][1] ) )[0] if len(window_idx) == 0: continue window_range = slice(window_idx[0], window_idx[-1] + 1) if window["correlation"] == "GP": for i in window_idx: # Alternatively, we can use a band diagonal form, # a Gaussian process that promotes local smoothness. width = float(window["gp_width"]) magnitude = float(window["gp_magnitude"]) kernel = scipy.stats.norm.pdf((wl - wl[i]) / width) kernel = kernel / kernel.sum() * magnitude C[i, :] = kernel C[:, i] = kernel C[i, i] = C[i, i] + float(window["regularizer"]) elif window["correlation"] in ["decorrelated", "EM"]: # already handled continue else: raise ValueError( "I do not recognize the method " + window["correlation"] ) # Now do any cross-block feathering by augmenting the precision matrix P = inv(C) for window in windows: window_idx = np.where( np.logical_and( wl >= window["interval"][0], wl < window["interval"][1] ) )[0] if len(window_idx) == 0: continue # Look for the "feather_forward" or "feather_backward" options if window["correlation"] in ["EM", "decorrelated"]: if "feather_backward" in list(window.keys()): P[window_idx[0] - 1, window_idx[0]] -= ( 1.0 / window["feather_backward"] ) P[window_idx[0], window_idx[0] - 1] -= ( 1.0 / window["feather_backward"] ) if "feather_forward" in list(window.keys()): P[window_idx[-1] + 1, window_idx[-1]] -= ( 1.0 / window["feather_forward"] ) P[window_idx[-1], window_idx[-1] + 1] -= ( 1.0 / window["feather_forward"] ) C = inv(P) # Normalize the component spectrum if desired if normalize == "Euclidean": z = np.sqrt(np.sum(pow(m[normind], 2))) elif normalize == "RMS": z = np.sqrt(np.mean(pow(m[normind], 2))) elif normalize == "None": z = 1.0 else: raise ValueError("Unrecognized normalization: %s\n" % normalize) m = m / z C = C / (z**2) try: Cinv = svd_inv(C) except: errstr = f"C from group {si, ci} is not invertible." raise AttributeError(errstr) model["means"].append(m) model["covs"].append(C) if len(attributes) > 0: model["attribute_means"].append(m_attr) model["attribute_covs"].append(C_attr) model["surface_categories"].append(surface_category) model["means"] = np.array(model["means"]) model["covs"] = np.array(model["covs"]) model["attribute_means"] = np.array(model["attribute_means"]) model["attribute_covs"] = np.array(model["attribute_covs"]) if multisurface: # Divide up model dict based on surface_type surface_categories = np.unique(model["surface_categories"]) for surface_category in surface_categories: i = np.argwhere(np.array(model["surface_categories"]) == surface_category) type_model = model.copy() type_model["means"] = np.squeeze(model["means"][i]) type_model["covs"] = np.squeeze(model["covs"][i]) # To handle surfaces with 1 component, this check returns true if type_model["means"].ndim == 1 and type_model["covs"].ndim == 2: type_model["means"] = type_model["means"][np.newaxis, :] type_model["covs"] = type_model["covs"][np.newaxis, :] type_model["surface_categories"] = [ str(stype[0]) for stype in np.array(model["surface_categories"])[i] ] if len(model["attribute_means"]): type_model["attribute_means"] = np.squeeze(model["attribute_means"][i]) type_model["attribute_covs"] = np.squeeze(model["attribute_covs"][i]) name, ext = os.path.splitext(outfile) type_outfile = f"{name}_{str(surface_category)}{ext}" scipy.io.savemat(type_outfile, type_model) else: scipy.io.savemat(outfile, model)
# Input arguments @click.command(name="surface_model") @click.argument("config_path") @click.option( "--wavelength_path", help="Input wavelengths to three-column wavelength file to use" ) @click.option("--output_path", help="Path to write the created surface model to") @click.option("--seed", default=13, type=int, help="Seed for reproducibility")
[docs] def cli(**kwargs): """Build a new surface model to a block of data""" # SimpleNamespace converts a dict into dot-notational surface_model(**kwargs) click.echo("Done")
if __name__ == "__main__": raise NotImplementedError( "surface_model.py cannot be called this way. Run as:\n isofit surface_model [CONFIG_PATH]" )