#! /usr/bin/env python3
#
# Authors: Philip G. Brodrick and Niklas Bohn
#
from __future__ import annotations
import json
import logging
import os
import subprocess
from datetime import datetime
from os.path import abspath, dirname, exists, join, split
from shutil import copyfile
from sys import platform
from typing import List
import netCDF4 as nc
import numpy as np
from scipy.io import loadmat
from spectral.io import envi
from isofit import __version__
from isofit.core import isofit, units
from isofit.core.common import (
envi_header,
expand_path,
json_load_ascii,
resample_spectrum,
)
from isofit.core.multistate import SurfaceMapping
from isofit.data import env
from isofit.radiative_transfer.engines.modtran import ModtranRT
from isofit.utils.surface_model import surface_model
[docs]
class Pathnames:
"""Class to determine and hold the large number of relative and absolute paths that are needed for isofit and
MODTRAN configuration files.
"""
def __init__(
self,
input_radiance,
input_loc,
input_obs,
surface_class_file,
surface_path,
working_directory,
ray_temp_dir,
sensor="NA-*",
copy_input_files=False,
modtran_path=None,
rdn_factors_path=None,
model_discrepancy_path=None,
aerosol_climatology_path=None,
channelized_uncertainty_path=None,
interpolate_inplace=False,
skyview_factor=None,
subs: bool = False,
classify_multisurface: bool = False,
):
# Determine FID based on sensor name
if sensor == "ang":
self.fid = split(input_radiance)[-1][:18]
elif sensor == "av3":
self.fid = split(input_radiance)[-1][:18]
elif sensor == "av5":
self.fid = split(input_radiance)[-1][:18]
elif sensor == "avcl":
self.fid = split(input_radiance)[-1][:16]
elif sensor == "emit":
self.fid = split(input_radiance)[-1][:19]
elif sensor == "enmap":
self.fid = split(input_radiance)[-1].split("_")[5]
elif sensor == "hyp":
self.fid = split(input_radiance)[-1][:22]
elif sensor == "neon":
self.fid = split(input_radiance)[-1][:21]
elif sensor == "prism":
self.fid = split(input_radiance)[-1][:18]
elif sensor == "prisma":
self.fid = input_radiance.split("/")[-1].split("_")[1]
elif sensor == "gao":
self.fid = split(input_radiance)[-1][:23]
elif sensor == "oci":
self.fid = split(input_radiance)[-1][:24]
elif sensor == "tanager":
self.fid = split(input_radiance)[-1][:23]
elif sensor[:3] == "NA-":
self.fid = os.path.splitext(os.path.basename(input_radiance))[0]
logging.info("Flightline ID: %s" % self.fid)
# Names from inputs
[docs]
self.aerosol_climatology = aerosol_climatology_path
[docs]
self.working_directory = abspath(working_directory)
[docs]
self.full_lut_directory = abspath(join(self.working_directory, "lut_full/"))
[docs]
self.surface_path = surface_path
# set up some sub-directories
[docs]
self.lut_h2o_directory = abspath(join(self.working_directory, "lut_h2o/"))
[docs]
self.config_directory = abspath(join(self.working_directory, "config/"))
[docs]
self.data_directory = abspath(join(self.working_directory, "data/"))
[docs]
self.output_directory = abspath(join(self.working_directory, "output/"))
# define all output names
rdn_fname = self.fid + "_rdn"
[docs]
self.rfl_working_path = abspath(
join(self.output_directory, rdn_fname.replace("_rdn", "_rfl"))
)
[docs]
self.uncert_working_path = abspath(
join(self.output_directory, rdn_fname.replace("_rdn", "_uncert"))
)
[docs]
self.lbl_working_path = abspath(
join(self.output_directory, rdn_fname.replace("_rdn", "_lbl"))
)
[docs]
self.state_working_path = abspath(
join(self.output_directory, rdn_fname.replace("_rdn", "_state"))
)
[docs]
self.surface_template_path = abspath(join(self.data_directory, "surface.mat"))
[docs]
self.surface_working_paths = {}
if copy_input_files is True:
self.radiance_working_path = abspath(
join(self.input_data_directory, rdn_fname)
)
self.obs_working_path = abspath(
join(self.input_data_directory, self.fid + "_obs")
)
self.loc_working_path = abspath(
join(self.input_data_directory, self.fid + "_loc")
)
if classify_multisurface and not surface_class_file:
self.surface_class_working_path = abspath(
join(self.input_data_directory, self.fid + "_surface_class")
)
else:
self.surface_class_working_path = (
abspath(surface_class_file) if surface_class_file else None
)
self.surface_class_subs_path = abspath(
join(self.input_data_directory, self.fid + "_subs_surface_class")
)
else:
self.radiance_working_path = abspath(self.input_radiance_file)
self.obs_working_path = abspath(self.input_obs_file)
self.loc_working_path = abspath(self.input_loc_file)
if classify_multisurface and not surface_class_file:
self.surface_class_working_path = abspath(
join(self.input_data_directory, self.fid + "_surface_class")
)
else:
self.surface_class_working_path = (
abspath(surface_class_file) if surface_class_file else None
)
self.surface_class_subs_path = abspath(
join(self.input_data_directory, self.fid + "_subs_surface_class")
)
if interpolate_inplace:
self.radiance_interp_path = self.radiance_working_path
else:
self.radiance_interp_path = abspath(
join(self.input_data_directory, rdn_fname + "_interp")
)
if channelized_uncertainty_path:
self.input_channelized_uncertainty_path = channelized_uncertainty_path
else:
self.input_channelized_uncertainty_path = os.getenv(
"ISOFIT_CHANNELIZED_UNCERTAINTY"
)
[docs]
self.channelized_uncertainty_working_path = abspath(
join(self.data_directory, "channelized_uncertainty.txt")
)
if model_discrepancy_path:
self.input_model_discrepancy_path = model_discrepancy_path
else:
self.input_model_discrepancy_path = None
[docs]
self.model_discrepancy_working_path = abspath(
join(self.data_directory, "model_discrepancy.mat")
)
if skyview_factor:
self.svf_working_path = abspath(skyview_factor)
else:
self.svf_working_path = None
[docs]
self.svf_subs_path = abspath(
join(self.input_data_directory, self.fid + "_subs_svf")
)
[docs]
self.rdn_subs_path = abspath(
join(self.input_data_directory, self.fid + "_subs_rdn")
)
[docs]
self.obs_subs_path = abspath(
join(self.input_data_directory, self.fid + "_subs_obs")
)
[docs]
self.loc_subs_path = abspath(
join(self.input_data_directory, self.fid + "_subs_loc")
)
[docs]
self.rfl_subs_path = abspath(
# join(self.output_directory, self.fid + f"{subs_str}" + "_rfl")
join(self.output_directory, self.fid + "_subs_rfl")
)
[docs]
self.atm_coeff_path = abspath(
join(self.output_directory, self.fid + "_subs_atm")
)
[docs]
self.state_subs_path = abspath(
join(self.output_directory, self.fid + "_subs_state")
)
[docs]
self.uncert_subs_path = abspath(
join(self.output_directory, self.fid + "_subs_uncert")
)
[docs]
self.h2o_subs_path = abspath(
join(self.output_directory, self.fid + "_subs_h2o")
)
[docs]
self.wavelength_path = abspath(join(self.data_directory, "wavelengths.txt"))
[docs]
self.modtran_template_path = abspath(
join(self.config_directory, self.fid + "_modtran_tpl.json")
)
[docs]
self.h2o_template_path = abspath(
join(self.config_directory, self.fid + "_h2o_tpl.json")
)
[docs]
self.isofit_full_config_path = abspath(
join(self.config_directory, self.fid + "_isofit.json")
)
[docs]
self.h2o_config_path = abspath(
join(self.config_directory, self.fid + "_h2o.json")
)
if modtran_path:
self.modtran_path = modtran_path
else:
self.modtran_path = os.getenv("MODTRAN_DIR", env.modtran)
[docs]
self.sixs_path = os.getenv("SIXS_DIR", env.sixs)
if sensor == "avcl":
self.noise_path = str(env.path("data", "avirisc_noise.txt"))
elif sensor == "oci":
self.noise_path = str(env.path("data", "oci", "oci_noise.txt"))
elif sensor == "emit":
self.noise_path = str(env.path("data", "emit_noise.txt"))
if self.input_channelized_uncertainty_path is None:
self.input_channelized_uncertainty_path = str(
env.path("data", "emit_osf_uncertainty.txt")
)
if self.input_model_discrepancy_path is None:
self.input_model_discrepancy_path = str(
env.path("data", "emit_model_discrepancy.mat")
)
elif sensor == "tanager":
self.noise_path = str(env.path("data", "tanager1_noise_20241016.txt"))
else:
self.noise_path = None
logging.info("no noise path found, proceeding without")
# quit()
[docs]
self.earth_sun_distance_path = str(env.path("data", "earth_sun_distance.txt"))
irr_path = [
"examples",
"20151026_SantaMonica",
"data",
"prism_optimized_irr.dat",
]
if sensor == "oci":
irr_path = ["data", "oci", "tsis_f0_0p1.txt"]
[docs]
self.irradiance_file = str(env.path(*irr_path))
[docs]
self.aerosol_tpl_path = str(env.path("data", "aerosol_template.json"))
[docs]
self.rdn_factors_path = None
if rdn_factors_path is not None:
self.rdn_factors_path = abspath(rdn_factors_path)
[docs]
self.ray_temp_dir = ray_temp_dir
[docs]
def make_directories(self):
"""Build required subdirectories inside working_directory"""
for dpath in [
self.working_directory,
self.lut_h2o_directory,
self.full_lut_directory,
self.config_directory,
self.data_directory,
self.input_data_directory,
self.output_directory,
]:
if not exists(dpath):
os.mkdir(dpath)
[docs]
def stage_files(self):
"""Stage data files by copying into working directory"""
files_to_stage = [
(self.input_radiance_file, self.radiance_working_path, True),
(self.input_obs_file, self.obs_working_path, True),
(self.input_loc_file, self.loc_working_path, True),
(
self.input_channelized_uncertainty_path,
self.channelized_uncertainty_working_path,
False,
),
(
self.input_model_discrepancy_path,
self.model_discrepancy_working_path,
False,
),
]
for src, dst, hasheader in files_to_stage:
if src is None:
continue
if not exists(dst):
logging.info("Staging %s to %s" % (src, dst))
copyfile(src, dst)
if hasheader:
copyfile(envi_header(src), envi_header(dst))
class SerialEncoder(json.JSONEncoder):
"""Encoder for json to help ensure json objects can be passed to the workflow manager."""
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
else:
return super(SerialEncoder, self).default(obj)
[docs]
class LUTConfig:
"""A look up table class, containing default grid options. All properties may be overridden with the optional
input configuration file path
Args:
lut_config_file: configuration file to override default values
emulator: emulator used - will modify required points appropriately
no_min_lut_spacing: span all LUT dimensions with at least 2 points
"""
def __init__(
self,
lut_config_file: str = None,
emulator: str = None,
no_min_lut_spacing: bool = False,
atmosphere_type="ATM_MIDLAT_SUMMER",
):
if lut_config_file is not None:
with open(lut_config_file, "r") as f:
lut_config = json.load(f)
# For each element, set the look up table spacing (lut_spacing) as the
# anticipated spacing value, or 0 to use a single point (not LUT).
# Set the 'lut_spacing_min' as the minimum distance allowed - if separation
# does not meet this threshold based on the available data, on a single
# point will be used.
# Units of kilometers
[docs]
self.elevation_spacing = 0.25
[docs]
self.elevation_spacing_min = 0.2
# Units of g / m2
[docs]
self.h2o_spacing = 0.25
[docs]
self.h2o_spacing_min = 0.03
# Special parameter to specify the minimum allowable water vapor value in g / m2
# Set defaults, will override based on settings
# Units of g / m2
[docs]
self.h2o_range = [0.2, 5]
# Units of degrees
[docs]
self.to_sensor_zenith_spacing = 10
[docs]
self.to_sensor_zenith_spacing_min = 2
# Units of degrees
[docs]
self.to_sun_zenith_spacing = 10
[docs]
self.to_sun_zenith_spacing_min = 2
# Units of degrees
[docs]
self.relative_azimuth_spacing = 60
[docs]
self.relative_azimuth_spacing_min = 25
# Units of AOD
[docs]
self.aerosol_0_spacing = 0
[docs]
self.aerosol_0_spacing_min = 0
# Units of AOD
[docs]
self.aerosol_1_spacing = 0
[docs]
self.aerosol_1_spacing_min = 0
# Units of AOD
[docs]
self.aerosol_2_spacing = 0.1
[docs]
self.aerosol_2_spacing_min = 0
# Units of AOD
[docs]
self.aerosol_0_range = [
ModtranRT.modtran_aot_lowerbound_polynomials()[atmosphere_type](0),
1,
]
[docs]
self.aerosol_1_range = [
ModtranRT.modtran_aot_lowerbound_polynomials()[atmosphere_type](0),
1,
]
[docs]
self.aerosol_2_range = [
ModtranRT.modtran_aot_lowerbound_polynomials()[atmosphere_type](0),
1,
]
[docs]
self.aot_550_range = [
ModtranRT.modtran_aot_lowerbound_polynomials()[atmosphere_type](0),
1,
]
[docs]
self.aot_550_spacing = 0
[docs]
self.aot_550_spacing_min = 0
# CO2 ppm
[docs]
self.co2_range = [380, 440]
[docs]
self.co2_spacing_min = 60
[docs]
self.no_min_lut_spacing = no_min_lut_spacing
# overwrite anything that comes in from the config file
if lut_config_file is not None:
for key in lut_config:
if key in self.__dict__:
setattr(self, key, lut_config[key])
if emulator is not None and os.path.splitext(emulator)[1] != ".jld2":
self.aot_550_range = self.aerosol_2_range
self.aot_550_spacing = self.aerosol_2_spacing
self.aot_550_spacing_min = self.aerosol_2_spacing_min
self.aerosol_2_spacing = 0
[docs]
def get_grid_with_data(
self, data_input: np.array, spacing: float, min_spacing: float
):
min_val = np.min(data_input)
max_val = np.max(data_input)
return self.get_grid(min_val, max_val, spacing, min_spacing)
[docs]
def get_grid(
self, minval: float, maxval: float, spacing: float, min_spacing: float
):
if spacing == 0:
logging.debug("Grid spacing set at 0, using no grid.")
return None
num_gridpoints = int(np.ceil((maxval - minval) / spacing)) + 1
# if we want to ensure there is no minimum spacing, override the spacing
# value to set the number of grid points to at least 2
if (
self.no_min_lut_spacing
and num_gridpoints == 1
and np.isclose(maxval, minval) is False
):
num_gridpoints = 2
grid = np.linspace(minval, maxval, num_gridpoints)
if min_spacing > 0.0001:
grid = np.round(grid, 4)
if len(grid) == 1:
logging.debug(
f"Grid spacing is 0, which is less than {min_spacing}. No grid used"
)
return None
elif (
# Need this first conditional to rule out rounding errors
len(grid) == 2
and np.abs(grid[1] - grid[0]) < min_spacing
and self.no_min_lut_spacing is False
):
logging.debug(
f"Grid spacing is {grid[1]-grid[0]}, which is less than {min_spacing}. "
" No grid used"
)
return None
else:
return grid
[docs]
class SerialEncoder(json.JSONEncoder):
"""Encoder for json to help ensure json objects can be passed to the workflow manager."""
[docs]
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
else:
return super(SerialEncoder, self).default(obj)
[docs]
def check_surface_model(
surface_path: str,
output_model_path: str = None,
wl: np.array = [],
surface_wavelength_path: str = "",
surface_category: str = "multicomponent_surface",
multisurface: bool = False,
) -> str:
"""
Checks and rebuilds surface model if needed.
TODO - Could be extended to allow for both dir and file surface_path
inputs. The dir inputs could accomdate complex surfaces where
different surface priors might be useful. This extension
would be relatively easy. Wrap this in a check for surface_path
vs. surface_path_dir. Each file in the dir can undergo the
same check coded here.
Args:
surface_path: path to surface model or config dict
wl: instrument center wavelengths
surface_wavelength_path: path to wavelength file
"""
if os.path.isfile(surface_path):
if surface_path.endswith(".mat"):
# check wavelength grid of surface model if provided
model_dict = loadmat(surface_path)
wl_surface = model_dict["wl"][0]
if not len(wl):
logging.info("No wl array given. Not checking channel number matchup")
elif len(wl_surface) != len(wl):
raise ValueError(
"Number of channels provided in surface model file does not match"
" wavelengths in radiance cube. Please rebuild your surface model."
)
if not np.all(np.isclose(wl_surface, wl, atol=0.01)):
logging.warning(
"Center wavelengths provided in surface model file do not match"
" wavelengths in radiance cube. Please consider rebuilding your"
" surface model for optimal performance."
)
if multisurface:
# TODO Change the surface model structure for multisurface runs
# Instead of passing multiple surface.mat files throughout.
# Carry the surface type keys and dynamically select the matching type within surface.component
raise ValueError(
"Apply OE in multistate-mode can currently only be run from a .json surface file. Please use the path to the .json file as the surface_ath. Must include the 'surface_type' keys."
)
return {surface_category: surface_path}
elif surface_path.endswith(".json"):
logging.info(
"No surface model provided. Build new one using given config file."
)
if not surface_wavelength_path:
raise ValueError(
"Building surface model requires input surface wavelength path"
)
if not output_model_path:
logging.info(
"No output path provided via check_surface, "
"using output file within surface config."
)
configdir, _ = os.path.split(os.path.abspath(surface_path))
config = json_load_ascii(surface_path, shell_replace=True)
output_model_path = expand_path(configdir, config["output_model_file"])
surface_model(
config_path=surface_path,
wavelength_path=surface_wavelength_path,
output_path=output_model_path,
multisurface=multisurface,
)
# Handle a multisurface run with split
if multisurface:
config = json_load_ascii(surface_path, shell_replace=True)
surface_categories = []
for source in config["sources"]:
surface_category = source.get("surface_category")
if not surface_category:
raise ValueError(
"Multisurface ISOFIT surface configs require "
"surface category to be specific in config. "
"No 'surface_category' key found. Check config"
)
surface_categories.append(surface_category)
surface_paths = {}
for surface_category in np.unique(surface_categories):
name, ext = os.path.splitext(output_model_path)
surface_paths[str(surface_category)] = (
f"{name}_{str(surface_category)}{ext}"
)
else:
surface_paths = {surface_category: output_model_path}
for path in surface_paths.values():
try:
model_dict = loadmat(path)
except ValueError:
raise ValueError(
"Surface.mat file failed to load. " "Check configuration."
)
return surface_paths
else:
raise FileNotFoundError(
"Unrecognized format of surface file. Please provide either a .mat model or a .json config dict."
)
else:
raise FileNotFoundError(
"No surface file found. Please provide either a .mat model or a .json config dict."
)
[docs]
def build_presolve_config(
paths: Pathnames,
h2o_lut_grid: np.array,
n_cores: int = -1,
use_superpixels: bool = False,
surface_category="multicomponent_surface",
emulator_base: str = None,
uncorrelated_radiometric_uncertainty: float = 0.0,
dn_uncertainty_file: str = None,
segmentation_size: int = 400,
debug: bool = False,
inversion_windows=[[350.0, 1360.0], [1410, 1800.0], [1970.0, 2500.0]],
prebuilt_lut_path: str = None,
multipart_transmittance: bool = False,
) -> None:
"""Write an isofit config file for a presolve, with limited info.
Args:
paths: object containing references to all relevant file locations
h2o_lut_grid: the water vapor look up table grid isofit should use for this solve
n_cores: number of cores to use in processing
use_superpixels: flag whether or not to use superpixels for the solution
surface_category: type of surface to use
emulator_base: the basename of the emulator, if used
uncorrelated_radiometric_uncertainty: uncorrelated radiometric uncertainty parameter for isofit
dn_uncertainty_file: Path to a linearity .mat file to augment S matrix with linearity uncertainty
segmentation_size: image segmentation size if empirical line is used
debug: flag to enable debug_mode in the config.implementation
prebuilt_lut_path: lut path to use; if none, presolve config will create a new file
multipart_transmittance: flag to indicate whether a 4-component transmittance model is to be used
"""
# Determine number of spectra included in each retrieval. If we are
# operating on segments, this will average down instrument noise
if use_superpixels:
spectra_per_inversion = segmentation_size
else:
spectra_per_inversion = 1
if emulator_base is None:
engine_name = "modtran"
elif emulator_base.endswith(".jld2"):
engine_name = "KernelFlowsGP"
else:
engine_name = "sRTMnet"
if prebuilt_lut_path is None:
lut_path = join(paths.lut_h2o_directory, "lut.nc")
else:
lut_path = prebuilt_lut_path
# set up specific presolve LUT grid
lut_grid = {"H2OSTR": [float(x) for x in h2o_lut_grid]}
if emulator_base is not None and os.path.splitext(emulator_base)[1] == ".jld2":
from isofit.radiative_transfer.engines.kernel_flows import bounds_check
bounds_check(lut_grid, emulator_base, modify=True)
radiative_transfer_config = {
"radiative_transfer_engines": {
"vswir": {
"engine_name": engine_name,
"multipart_transmittance": multipart_transmittance,
"lut_path": lut_path,
"sim_path": paths.lut_h2o_directory,
"template_file": paths.h2o_template_path,
"lut_names": {"H2OSTR": get_lut_subset(h2o_lut_grid)},
"statevector_names": ["H2OSTR"],
}
},
"statevector": {
"H2OSTR": {
"bounds": [
float(np.min(lut_grid["H2OSTR"])),
float(np.max(lut_grid["H2OSTR"])),
],
"scale": 0.01,
"init": np.percentile(lut_grid["H2OSTR"], 25),
"prior_sigma": 100.0,
"prior_mean": 1.5,
}
},
"lut_grid": lut_grid,
"unknowns": {"H2O_ABSCO": 0.0},
}
if emulator_base is not None:
radiative_transfer_config["radiative_transfer_engines"]["vswir"][
"emulator_file"
] = abspath(emulator_base)
if multipart_transmittance:
radiative_transfer_config["radiative_transfer_engines"]["vswir"][
"emulator_aux_file"
] = emulator_base
else:
radiative_transfer_config["radiative_transfer_engines"]["vswir"][
"emulator_aux_file"
] = abspath(os.path.splitext(emulator_base)[0] + "_aux.npz")
radiative_transfer_config["radiative_transfer_engines"]["vswir"][
"earth_sun_distance_file"
] = paths.earth_sun_distance_path
radiative_transfer_config["radiative_transfer_engines"]["vswir"][
"irradiance_file"
] = paths.irradiance_file
radiative_transfer_config["radiative_transfer_engines"]["vswir"][
"engine_base_dir"
] = paths.sixs_path
else:
radiative_transfer_config["radiative_transfer_engines"]["vswir"][
"engine_base_dir"
] = paths.modtran_path
# make isofit configuration
isofit_config_h2o = {
"output": {"estimated_state_file": paths.h2o_subs_path},
"input": {},
"forward_model": {
"instrument": {
"wavelength_file": paths.wavelength_path,
"integrations": spectra_per_inversion,
"unknowns": {
"uncorrelated_radiometric_uncertainty": uncorrelated_radiometric_uncertainty,
"dn_uncertainty_file": dn_uncertainty_file,
},
},
"surface": make_surface_config(
paths, surface_category, use_superpixels=use_superpixels
),
"radiative_transfer": radiative_transfer_config,
},
"implementation": {
"ray_temp_dir": paths.ray_temp_dir,
"inversion": {"windows": inversion_windows},
"n_cores": n_cores,
"debug_mode": debug,
"isofit_version": __version__,
},
}
if paths.input_channelized_uncertainty_path is not None:
isofit_config_h2o["forward_model"]["instrument"]["unknowns"][
"channelized_radiometric_uncertainty_file"
] = paths.channelized_uncertainty_working_path
if paths.input_model_discrepancy_path is not None:
isofit_config_h2o["forward_model"][
"model_discrepancy_file"
] = paths.model_discrepancy_working_path
if paths.noise_path is not None:
isofit_config_h2o["forward_model"]["instrument"][
"parametric_noise_file"
] = paths.noise_path
else:
isofit_config_h2o["forward_model"]["instrument"]["SNR"] = 1000
if paths.rdn_factors_path:
isofit_config_h2o["input"][
"radiometry_correction_file"
] = paths.rdn_factors_path
if use_superpixels:
isofit_config_h2o["input"]["measured_radiance_file"] = paths.rdn_subs_path
isofit_config_h2o["input"]["loc_file"] = paths.loc_subs_path
isofit_config_h2o["input"]["obs_file"] = paths.obs_subs_path
else:
isofit_config_h2o["input"][
"measured_radiance_file"
] = paths.radiance_working_path
isofit_config_h2o["input"]["loc_file"] = paths.loc_working_path
isofit_config_h2o["input"]["obs_file"] = paths.obs_working_path
# write presolve config
with open(paths.h2o_config_path, "w") as fout:
fout.write(
json.dumps(isofit_config_h2o, cls=SerialEncoder, indent=4, sort_keys=True)
)
# Create a template version of the config
env.toTemplate(paths.h2o_config_path, working_directory=paths.working_directory)
[docs]
def build_main_config(
paths: Pathnames,
lut_params: LUTConfig,
h2o_lut_grid: np.array = None,
elevation_lut_grid: np.array = None,
to_sensor_zenith_lut_grid: np.array = None,
to_sun_zenith_lut_grid: np.array = None,
relative_azimuth_lut_grid: np.array = None,
mean_latitude: float = None,
mean_longitude: float = None,
dt: datetime = None,
use_superpixels: bool = True,
n_cores: int = -1,
surface_category="multicomponent_surface",
emulator_base: str = None,
uncorrelated_radiometric_uncertainty: float = 0.0,
dn_uncertainty_file: str = None,
multiple_restarts: bool = False,
segmentation_size=400,
pressure_elevation: bool = False,
debug: bool = False,
inversion_windows=[[350.0, 1360.0], [1410, 1800.0], [1970.0, 2500.0]],
prebuilt_lut_path: str = None,
multipart_transmittance: bool = False,
surface_mapping: dict = None,
retrieve_co2: bool = False,
) -> None:
"""Write an isofit config file for the main solve, using the specified pathnames and all given info
Args:
paths: object containing references to all relevant file locations
lut_params: configuration parameters for the lut grid
h2o_lut_grid: the water vapor look up table grid isofit should use for this solve
elevation_lut_grid: the ground elevation look up table grid isofit should use for this solve
to_sensor_zenith_lut_grid: the to-sensor zenith angle look up table grid isofit should use for this
solve
to_sun_zenith_lut_grid: the to-sun zenith angle look up table grid isofit should use for this
solve
relative_azimuth_lut_grid: the relative to-sun azimuth angle look up table grid isofit should use for
this solve
mean_latitude: the latitude isofit should use for this solve
mean_longitude: the longitude isofit should use for this solve
dt: the datetime object corresponding to this flightline to use for this solve
use_superpixels: flag whether or not to use superpixels for the solution
n_cores: the number of cores to use during processing
surface_category: type of surface to use
emulator_base: the basename of the emulator, if used
uncorrelated_radiometric_uncertainty: uncorrelated radiometric uncertainty parameter for isofit
dn_uncertainty_file: Path to a linearity .mat file to augment S matrix with linearity uncertainty
multiple_restarts: if true, use multiple restarts
segmentation_size: image segmentation size if empirical line is used
pressure_elevation: if true, retrieve pressure elevation
debug: if true, run ISOFIT in debug mode
multipart_transmittance: flag to indicate whether a 4-component transmittance model is to be used
surface_mapping: optional object to pass mapping between surface class and surface model
retrieve_co2: flag to include CO2 in lut and retrieval
"""
# Determine number of spectra included in each retrieval. If we are
# operating on segments, this will average down instrument noise
if use_superpixels:
spectra_per_inversion = segmentation_size
else:
spectra_per_inversion = 1
if prebuilt_lut_path is None:
lut_path = join(paths.full_lut_directory, "lut.nc")
else:
lut_path = abspath(prebuilt_lut_path)
if emulator_base is None:
engine_name = "modtran"
elif emulator_base.endswith(".jld2"):
engine_name = "KernelFlowsGP"
else:
engine_name = "sRTMnet"
radiative_transfer_config = {
"radiative_transfer_engines": {
"vswir": {
"engine_name": engine_name,
"multipart_transmittance": multipart_transmittance,
"sim_path": paths.full_lut_directory,
"lut_path": lut_path,
"aerosol_template_file": paths.aerosol_tpl_path,
"template_file": paths.modtran_template_path,
}
},
"statevector": {},
"lut_grid": {},
"unknowns": {"H2O_ABSCO": 0.0},
}
if emulator_base is not None:
radiative_transfer_config["radiative_transfer_engines"]["vswir"][
"emulator_file"
] = abspath(emulator_base)
if multipart_transmittance:
radiative_transfer_config["radiative_transfer_engines"]["vswir"][
"emulator_aux_file"
] = emulator_base
else:
radiative_transfer_config["radiative_transfer_engines"]["vswir"][
"emulator_aux_file"
] = abspath(os.path.splitext(emulator_base)[0] + "_aux.npz")
radiative_transfer_config["radiative_transfer_engines"]["vswir"][
"earth_sun_distance_file"
] = paths.earth_sun_distance_path
radiative_transfer_config["radiative_transfer_engines"]["vswir"][
"irradiance_file"
] = paths.irradiance_file
radiative_transfer_config["radiative_transfer_engines"]["vswir"][
"engine_base_dir"
] = paths.sixs_path
else:
radiative_transfer_config["radiative_transfer_engines"]["vswir"][
"engine_base_dir"
] = paths.modtran_path
# add aerosol elements from climatology
aerosol_state_vector, aerosol_lut_grid, aerosol_model_path = load_climatology(
paths.aerosol_climatology,
mean_latitude,
mean_longitude,
dt,
lut_params=lut_params,
)
radiative_transfer_config["radiative_transfer_engines"]["vswir"][
"aerosol_model_file"
] = aerosol_model_path
if prebuilt_lut_path is None:
if h2o_lut_grid is not None and len(h2o_lut_grid) > 1:
radiative_transfer_config["lut_grid"]["H2OSTR"] = h2o_lut_grid.tolist()
if elevation_lut_grid is not None and len(elevation_lut_grid) > 1:
radiative_transfer_config["lut_grid"][
"surface_elevation_km"
] = elevation_lut_grid.tolist()
if to_sensor_zenith_lut_grid is not None and len(to_sensor_zenith_lut_grid) > 1:
radiative_transfer_config["lut_grid"][
"observer_zenith"
] = to_sensor_zenith_lut_grid.tolist()
if to_sun_zenith_lut_grid is not None and len(to_sun_zenith_lut_grid) > 1:
radiative_transfer_config["lut_grid"][
"solar_zenith"
] = to_sun_zenith_lut_grid.tolist()
if relative_azimuth_lut_grid is not None and len(relative_azimuth_lut_grid) > 1:
radiative_transfer_config["lut_grid"][
"relative_azimuth"
] = relative_azimuth_lut_grid.tolist()
if retrieve_co2 and lut_params.co2_range is not None:
radiative_transfer_config["lut_grid"]["CO2"] = lut_params.co2_range
radiative_transfer_config["lut_grid"].update(aerosol_lut_grid)
rtc_ln = {}
for key in radiative_transfer_config["lut_grid"].keys():
rtc_ln[key] = None
radiative_transfer_config["radiative_transfer_engines"]["vswir"][
"lut_names"
] = rtc_ln
if emulator_base is not None and os.path.splitext(emulator_base)[1] == ".jld2":
from isofit.radiative_transfer.engines.kernel_flows import bounds_check
bounds_check(radiative_transfer_config["lut_grid"], emulator_base, modify=True)
# modify so we set the statevector appropriately
if "H2OSTR" in radiative_transfer_config["lut_grid"]:
h2o_lut_grid = np.array(radiative_transfer_config["lut_grid"]["H2OSTR"])
if "surface_elevation_km" in radiative_transfer_config["lut_grid"]:
elevation_lut_grid = np.array(
radiative_transfer_config["lut_grid"]["surface_elevation_km"]
)
if prebuilt_lut_path is not None:
ncds = nc.Dataset(prebuilt_lut_path, "r")
# first, check if observer zenith angle in prebuilt LUT comes in MODTRAN convention
# and convert lut grid as needed
try:
if any(np.array(ncds["observer_zenith"]) > 90.0):
to_sensor_zenith_lut_grid = np.sort(
[180 - x for x in to_sensor_zenith_lut_grid]
)
except IndexError:
logging.warning(
"Key observer_zenith not found in prebuilt LUT. Conversion to MODTRAN convention not necessary."
)
radiative_transfer_config["radiative_transfer_engines"]["vswir"]["lut_names"][
"H2OSTR"
] = get_lut_subset(h2o_lut_grid)
radiative_transfer_config["radiative_transfer_engines"]["vswir"]["lut_names"][
"surface_elevation_km"
] = get_lut_subset(elevation_lut_grid)
radiative_transfer_config["radiative_transfer_engines"]["vswir"]["lut_names"][
"observer_zenith"
] = get_lut_subset(to_sensor_zenith_lut_grid)
radiative_transfer_config["radiative_transfer_engines"]["vswir"]["lut_names"][
"solar_zenith"
] = get_lut_subset(to_sun_zenith_lut_grid)
radiative_transfer_config["radiative_transfer_engines"]["vswir"]["lut_names"][
"relative_azimuth"
] = get_lut_subset(relative_azimuth_lut_grid)
for key in aerosol_lut_grid.keys():
radiative_transfer_config["radiative_transfer_engines"]["vswir"][
"lut_names"
][key] = get_lut_subset(aerosol_lut_grid[key])
if retrieve_co2:
radiative_transfer_config["radiative_transfer_engines"]["vswir"][
"lut_names"
]["CO2"] = get_lut_subset(lut_params.co2_range)
rm_keys = []
for key, item in radiative_transfer_config["radiative_transfer_engines"][
"vswir"
]["lut_names"].items():
if key not in ncds.variables:
logging.warning(
f"Key {key} not found in prebuilt LUT, removing it from LUT. Spacing would have been: {item}"
)
rm_keys.append(key)
for key in rm_keys:
del radiative_transfer_config["radiative_transfer_engines"]["vswir"][
"lut_names"
][key]
# Now do statevector
if h2o_lut_grid is not None:
radiative_transfer_config["statevector"]["H2OSTR"] = {
"bounds": [h2o_lut_grid[0], h2o_lut_grid[-1]],
"scale": 1,
"init": (h2o_lut_grid[1] + h2o_lut_grid[-1]) / 2.0,
"prior_sigma": 100.0,
"prior_mean": (h2o_lut_grid[1] + h2o_lut_grid[-1]) / 2.0,
}
if pressure_elevation:
radiative_transfer_config["statevector"]["surface_elevation_km"] = {
"bounds": [elevation_lut_grid[0], elevation_lut_grid[-1]],
"scale": 100,
"init": (elevation_lut_grid[0] + elevation_lut_grid[-1]) / 2.0,
"prior_sigma": 1000.0,
"prior_mean": (elevation_lut_grid[0] + elevation_lut_grid[-1]) / 2.0,
}
if retrieve_co2:
radiative_transfer_config["statevector"]["CO2"] = {
"bounds": [lut_params.co2_range[0], lut_params.co2_range[-1]],
"scale": 10,
"init": 420.0,
"prior_sigma": 100.0,
"prior_mean": 420.0,
}
radiative_transfer_config["statevector"].update(aerosol_state_vector)
# MODTRAN should know about our whole LUT grid and all of our statevectors, so copy them in
radiative_transfer_config["radiative_transfer_engines"]["vswir"][
"statevector_names"
] = list(radiative_transfer_config["statevector"].keys())
# make isofit configuration
isofit_config_modtran = {
"input": {},
"output": {},
"forward_model": {
"instrument": {
"wavelength_file": paths.wavelength_path,
"integrations": spectra_per_inversion,
"unknowns": {
"uncorrelated_radiometric_uncertainty": uncorrelated_radiometric_uncertainty,
"dn_uncertainty_file": dn_uncertainty_file,
},
},
"surface": make_surface_config(
paths,
surface_category,
pressure_elevation,
elevation_lut_grid,
surface_mapping=surface_mapping,
use_superpixels=use_superpixels,
),
"radiative_transfer": radiative_transfer_config,
},
"implementation": {
"ray_temp_dir": paths.ray_temp_dir,
"inversion": {"windows": inversion_windows},
"n_cores": n_cores,
"debug_mode": debug,
"isofit_version": __version__,
},
}
if use_superpixels:
isofit_config_modtran["input"]["measured_radiance_file"] = paths.rdn_subs_path
isofit_config_modtran["input"]["loc_file"] = paths.loc_subs_path
isofit_config_modtran["input"]["obs_file"] = paths.obs_subs_path
if paths.svf_working_path:
isofit_config_modtran["input"]["skyview_factor_file"] = paths.svf_subs_path
isofit_config_modtran["output"]["estimated_state_file"] = paths.state_subs_path
isofit_config_modtran["output"][
"posterior_uncertainty_file"
] = paths.uncert_subs_path
isofit_config_modtran["output"][
"estimated_reflectance_file"
] = paths.rfl_subs_path
else:
isofit_config_modtran["input"][
"measured_radiance_file"
] = paths.radiance_working_path
isofit_config_modtran["input"]["loc_file"] = paths.loc_working_path
isofit_config_modtran["input"]["obs_file"] = paths.obs_working_path
if paths.svf_working_path:
isofit_config_modtran["input"][
"skyview_factor_file"
] = paths.svf_working_path
isofit_config_modtran["output"][
"posterior_uncertainty_file"
] = paths.uncert_working_path
isofit_config_modtran["output"][
"estimated_reflectance_file"
] = paths.rfl_working_path
isofit_config_modtran["output"][
"estimated_state_file"
] = paths.state_working_path
if multiple_restarts:
grid = {}
if h2o_lut_grid is not None:
h2o_delta = float(h2o_lut_grid[-1]) - float(h2o_lut_grid[0])
grid["H2OSTR"] = [
round(h2o_lut_grid[0] + h2o_delta * 0.02, 4),
round(h2o_lut_grid[-1] - h2o_delta * 0.02, 4),
]
# We will initialize using different AODs for the first aerosol in the LUT
if len(aerosol_lut_grid) > 0:
key = list(aerosol_lut_grid.keys())[0]
aer_delta = aerosol_lut_grid[key][-1] - aerosol_lut_grid[key][0]
grid[key] = [
round(aerosol_lut_grid[key][0] + aer_delta * 0.02, 4),
round(aerosol_lut_grid[key][-1] - aer_delta * 0.02, 4),
]
isofit_config_modtran["implementation"]["inversion"]["integration_grid"] = grid
isofit_config_modtran["implementation"]["inversion"][
"inversion_grid_as_preseed"
] = True
if paths.input_channelized_uncertainty_path is not None:
isofit_config_modtran["forward_model"]["instrument"]["unknowns"][
"channelized_radiometric_uncertainty_file"
] = paths.channelized_uncertainty_working_path
if paths.input_model_discrepancy_path is not None:
isofit_config_modtran["forward_model"][
"model_discrepancy_file"
] = paths.model_discrepancy_working_path
if paths.noise_path is not None:
isofit_config_modtran["forward_model"]["instrument"][
"parametric_noise_file"
] = paths.noise_path
else:
isofit_config_modtran["forward_model"]["instrument"]["SNR"] = 500
if paths.rdn_factors_path:
isofit_config_modtran["input"][
"radiometry_correction_file"
] = paths.rdn_factors_path
# write main config file
with open(paths.isofit_full_config_path, "w") as fout:
fout.write(
json.dumps(
isofit_config_modtran, cls=SerialEncoder, indent=4, sort_keys=True
)
)
# Create a template version of the config
env.toTemplate(
paths.isofit_full_config_path, working_directory=paths.working_directory
)
[docs]
def get_lut_subset(vals):
"""Populate lut_names for the appropriate style of subsetting
Args:
vals: the values to use for subsetting
"""
if vals is not None and len(vals) == 1:
return {"interp": vals[0]}
elif vals is not None and len(vals) > 1:
return {"gte": vals[0], "lte": vals[-1]}
else:
return None
[docs]
def write_modtran_template(
atmosphere_type: str,
fid: str,
altitude_km: float,
dayofyear: int,
to_sensor_azimuth: float,
to_sensor_zenith: float,
to_sun_zenith: float,
relative_azimuth: float,
gmtime: float,
elevation_km: float,
output_file: str,
ihaze_type: str = "AER_RURAL",
):
"""Write a MODTRAN template file for use by isofit look up tables
Args:
atmosphere_type: label for the type of atmospheric profile to use in modtran
fid: flight line id (name)
altitude_km: altitude of the sensor in km
dayofyear: the current day of the given year
to_sensor_azimuth: azimuth view angle to the sensor, in degrees
to_sensor_zenith: sensor/observer zenith angle, in degrees
to_sun_zenith: final altitude solar zenith angle (0→180°)
relative_azimuth: final altitude relative solar azimuth (0→360°)
gmtime: greenwich mean time
elevation_km: elevation of the land surface in km
output_file: location to write the modtran template file to
ihaze_type: type of extinction and default meteorological range for the boundary-layer aerosol model
"""
# make modtran configuration
output_template = {
"MODTRAN": [
{
"MODTRANINPUT": {
"NAME": fid,
"DESCRIPTION": "",
"CASE": 0,
"RTOPTIONS": {
"MODTRN": "RT_CORRK_FAST",
"LYMOLC": False,
"T_BEST": False,
"IEMSCT": "RT_SOLAR_AND_THERMAL",
"IMULT": "RT_DISORT",
"DISALB": False,
"NSTR": 8,
"SOLCON": 0.0,
},
"ATMOSPHERE": {
"MODEL": atmosphere_type,
"M1": atmosphere_type,
"M2": atmosphere_type,
"M3": atmosphere_type,
"M4": atmosphere_type,
"M5": atmosphere_type,
"M6": atmosphere_type,
"CO2MX": 420.0,
"H2OSTR": 1.0,
"H2OUNIT": "g",
"O3STR": 0.3,
"O3UNIT": "a",
},
"AEROSOLS": {"IHAZE": ihaze_type},
"GEOMETRY": {
"ITYPE": 3,
"H1ALT": altitude_km,
"IDAY": dayofyear,
"IPARM": 12,
"PARM1": relative_azimuth,
"PARM2": to_sun_zenith,
"TRUEAZ": to_sensor_azimuth,
"OBSZEN": 180 - to_sensor_zenith, # MODTRAN convention
"GMTIME": gmtime,
},
"SURFACE": {
"SURFTYPE": "REFL_LAMBER_MODEL",
"GNDALT": elevation_km,
"NSURF": 1,
"SURFP": {"CSALB": "LAMB_CONST_0_PCT"},
},
"SPECTRAL": {
"V1": 340.0,
"V2": 2520.0,
"DV": 0.1,
"FWHM": 0.1,
"YFLAG": "R",
"XFLAG": "N",
"FLAGS": "NT A ",
"BMNAME": "p1_2013",
},
"FILEOPTIONS": {"NOPRNT": 2, "CKPRNT": True},
}
}
]
}
# write modtran_template
with open(output_file, "w") as fout:
fout.write(
json.dumps(output_template, cls=SerialEncoder, indent=4, sort_keys=True)
)
[docs]
def load_climatology(
config_path: str,
latitude: float,
longitude: float,
acquisition_datetime: datetime,
lut_params: LUTConfig,
):
"""Load climatology data, based on location and configuration
Args:
config_path: path to the base configuration directory for isofit
latitude: latitude to set for the segment (mean of acquisition suggested)
longitude: latitude to set for the segment (mean of acquisition suggested)
acquisition_datetime: datetime to use for the segment( mean of acquisition suggested)
lut_params: parameters to use to define lut grid
:Returns
tuple containing:
aerosol_state_vector - A dictionary that defines the aerosol state vectors for isofit
aerosol_lut_grid - A dictionary of the aerosol lookup table (lut) grid to be explored
aerosol_model_path - A path to the location of the aerosol model to use with MODTRAN.
"""
aerosol_model_path = str(env.path("data", "aerosol_model.txt"))
aerosol_state_vector = {}
aerosol_lut_grid = {}
aerosol_lut_ranges = [
lut_params.aerosol_0_range,
lut_params.aerosol_1_range,
lut_params.aerosol_2_range,
]
aerosol_lut_spacing = [
lut_params.aerosol_0_spacing,
lut_params.aerosol_1_spacing,
lut_params.aerosol_2_spacing,
]
aerosol_lut_spacing_mins = [
lut_params.aerosol_0_spacing_min,
lut_params.aerosol_1_spacing_min,
lut_params.aerosol_2_spacing_min,
]
for _a, alr in enumerate(aerosol_lut_ranges):
aerosol_lut = lut_params.get_grid(
alr[0], alr[1], aerosol_lut_spacing[_a], aerosol_lut_spacing_mins[_a]
)
if aerosol_lut is not None:
aerosol_state_vector["AERFRAC_{}".format(_a)] = {
"bounds": [float(alr[0]), float(alr[1])],
"scale": 1,
"init": float((alr[1] - alr[0]) / 10.0 + alr[0]),
"prior_sigma": 10.0,
"prior_mean": float((alr[1] - alr[0]) / 10.0 + alr[0]),
}
aerosol_lut_grid["AERFRAC_{}".format(_a)] = aerosol_lut.tolist()
aot_550_lut = lut_params.get_grid(
lut_params.aot_550_range[0],
lut_params.aot_550_range[1],
lut_params.aot_550_spacing,
lut_params.aot_550_spacing_min,
)
if aot_550_lut is not None:
aerosol_lut_grid["AOT550"] = aot_550_lut.tolist()
alr = [aerosol_lut_grid["AOT550"][0], aerosol_lut_grid["AOT550"][-1]]
aerosol_state_vector["AOT550"] = {
"bounds": [float(alr[0]), float(alr[1])],
"scale": 1,
"init": float((alr[1] - alr[0]) / 10.0 + alr[0]),
"prior_sigma": 10.0,
"prior_mean": float((alr[1] - alr[0]) / 10.0 + alr[0]),
}
logging.info("Loading Climatology")
# If a configuration path has been provided, use it to get relevant info
if config_path is not None:
month = acquisition_datetime.timetuple().tm_mon
year = acquisition_datetime.timetuple().tm_year
with open(config_path, "r") as fin:
for case in json.load(fin)["cases"]:
match = True
logging.info("matching", latitude, longitude, month, year)
for criterion, interval in case["criteria"].items():
logging.info(criterion, interval, "...")
if criterion == "latitude":
if latitude < interval[0] or latitude > interval[1]:
match = False
if criterion == "longitude":
if longitude < interval[0] or longitude > interval[1]:
match = False
if criterion == "month":
if month < interval[0] or month > interval[1]:
match = False
if criterion == "year":
if year < interval[0] or year > interval[1]:
match = False
if match:
aerosol_state_vector = case["aerosol_state_vector"]
aerosol_lut_grid = case["aerosol_lut_grid"]
aerosol_model_path = case["aerosol_mdl_path"]
break
logging.info(
"Climatology Loaded. Aerosol State Vector:\n{}\nAerosol LUT Grid:\n{}\nAerosol"
" model path:{}".format(
aerosol_state_vector, aerosol_lut_grid, aerosol_model_path
)
)
return aerosol_state_vector, aerosol_lut_grid, aerosol_model_path
[docs]
def calc_modtran_max_water(paths: Pathnames) -> float:
"""MODTRAN may put a ceiling on "legal" H2O concentrations. This function calculates that ceiling. The intended
use is to make sure the LUT does not contain useless gridpoints above it.
Args:
paths: object containing references to all relevant file locations
Returns:
max_water - maximum MODTRAN H2OSTR value for provided obs conditions
"""
max_water = None
# TODO: this is effectively redundant from the radiative_transfer->modtran. Either devise a way
# to port in from there, or put in utils to reduce redundancy.
xdir = {"linux": "linux", "darwin": "macos", "windows": "windows"}
name = "H2O_bound_test"
filebase = os.path.join(paths.lut_h2o_directory, name)
with open(paths.h2o_template_path, "r") as f:
bound_test_config = json.load(f)
bound_test_config["MODTRAN"][0]["MODTRANINPUT"]["NAME"] = name
bound_test_config["MODTRAN"][0]["MODTRANINPUT"]["ATMOSPHERE"]["H2OSTR"] = 50
with open(filebase + ".json", "w") as fout:
fout.write(
json.dumps(bound_test_config, cls=SerialEncoder, indent=4, sort_keys=True)
)
cmd = os.path.join(
paths.modtran_path, "bin", xdir[platform], "mod6c_cons " + filebase + ".json"
)
try:
subprocess.call(cmd, shell=True, timeout=10, cwd=paths.lut_h2o_directory)
except:
pass
with open(filebase + ".tp6", errors="ignore") as tp6file:
for count, line in enumerate(tp6file):
if "The water column is being set to the maximum" in line:
max_water = line.split(",")[1].strip()
max_water = float(max_water.split(" ")[0])
break
if max_water is None:
logging.error(
"Could not find MODTRAN H2O upper bound in file {}".format(
filebase + ".tp6"
)
)
raise KeyError("Could not find MODTRAN H2O upper bound")
return max_water
[docs]
def define_surface_types(
tsip: dict,
rdnfile: str,
obsfile: str,
out_class_path: str,
wl: np.array,
fwhm: np.array,
):
if np.all(wl < 10):
wl = units.micron_to_nm(wl)
fwhm = unts.micron_to_nm(fwhm)
irr_file = os.path.join(
os.path.dirname(isofit.__file__), "..", "..", "data", "kurucz_0.1nm.dat"
)
irr_wl, irr = np.loadtxt(irr_file, comments="#").T
irr = irr / 10 # convert to uW cm-2 sr-1 nm-1
irr_resamp = resample_spectrum(irr, irr_wl, wl, fwhm)
irr_resamp = np.array(irr_resamp, dtype=np.float32)
irr = irr_resamp
rdn_ds = envi.open(envi_header(rdnfile)).open_memmap(interleave="bip")
obs_src = envi.open(envi_header(obsfile))
obs_ds = obs_src.open_memmap(interleave="bip")
# determine glint bands having negligible water reflectance
try:
b1000 = np.argmin(abs(wl - tsip["water"]["toa_threshold_wavelengths"][0]))
b1380 = np.argmin(abs(wl - tsip["water"]["toa_threshold_wavelengths"][1]))
except KeyError:
logging.info(
"No threshold wavelengths for water classification found in config file. "
"Setting to 1000 and 1380 nm."
)
b1000 = np.argmin(abs(wl - 1000))
b1380 = np.argmin(abs(wl - 1380))
# determine cloud bands having high TOA reflectance
try:
b450 = np.argmin(abs(wl - tsip["cloud"]["toa_threshold_wavelengths"][0]))
b1250 = np.argmin(abs(wl - tsip["cloud"]["toa_threshold_wavelengths"][1]))
b1650 = np.argmin(abs(wl - tsip["cloud"]["toa_threshold_wavelengths"][2]))
except KeyError:
logging.info(
"No threshold wavelengths for cloud classification found in config file. "
"Setting to 450, 1250, and 1650 nm."
)
b450 = np.argmin(abs(wl - 450))
b1250 = np.argmin(abs(wl - 1250))
b1650 = np.argmin(abs(wl - 1650))
classes = np.zeros(rdn_ds.shape[:2])
for line in range(classes.shape[0]):
for sample in range(classes.shape[1]):
zen = np.cos(np.deg2rad(obs_ds[line, sample, 4]))
rho = (((rdn_ds[line, sample, :] * np.pi) / irr.T).T / np.cos(zen)).T
rho[rho[0] < -9990, :] = -9999.0
if rho[0] < -9999:
classes[line, sample] = -1
continue
# Cloud threshold from Sandford et al.
total = (
np.array(
rho[b450] > tsip["cloud"]["toa_threshold_values"][0], dtype=int
)
+ np.array(
rho[b1250] > tsip["cloud"]["toa_threshold_values"][1], dtype=int
)
+ np.array(
rho[b1650] > tsip["cloud"]["toa_threshold_values"][2], dtype=int
)
)
if rho[b1000] < tsip["water"]["toa_threshold_values"][0]:
classes[line, sample] = 2
if total > 2 or rho[b1380] > tsip["water"]["toa_threshold_values"][1]:
classes[line, sample] = 1
header = obs_src.metadata.copy()
header["bands"] = 1
if "band names" in header.keys():
header["band names"] = "Class"
output_ds = envi.create_image(
envi_header(out_class_path), header, ext="", force=True
)
output_mm = output_ds.open_memmap(interleave="bip", writable=True)
output_mm[:, :, 0] = classes
return classes
[docs]
def copy_file_subset(matching_indices: np.array, pathnames: List):
"""Copy over subsets of given files to new locations
Args:
matching_indices (np.array): indices to select from (y dimension) from source dataset
pathnames (List): list of tuples (input_filename, output_filename) to read/write to/from
"""
for inp, outp in pathnames:
input_ds = envi.open(envi_header(inp), inp)
header = input_ds.metadata.copy()
header["lines"] = np.sum(matching_indices)
header["samples"] = 1
output_ds = envi.create_image(envi_header(outp), header, ext="", force=True)
output_mm = output_ds.open_memmap(interleave="bip", writable=True)
input_mm = input_ds.open_memmap(interleave="bip", writable=True)
output_mm[:, 0, :] = input_mm[matching_indices[:, :, 0], ...].copy()
[docs]
def reassemble_cube(matching_indices: np.array, paths: Pathnames):
"""Copy over subsets of given files to new locations
Args:
matching_indices (np.array): indices to select from (y dimension) from source dataset
paths (Pathnames): output file array set
"""
logging.info(f"Reassemble {paths.rfl_subs_path}")
input_ds = envi.open(envi_header(paths.surface_subs_files["base"]["rfl"]))
header = input_ds.metadata.copy()
header["lines"] = len(matching_indices)
output_ds = envi.create_image(
envi_header(paths.rfl_subs_path), header, ext="", force=True
)
output_mm = output_ds.open_memmap(interleave="bip", writable=True)
for _st, surface_type in enumerate(list(paths.surface_config_paths.keys())):
if np.sum(matching_indices == _st) > 0:
input_ds = envi.open(
envi_header(paths.surface_subs_files[surface_type]["rfl"])
)
output_mm[matching_indices == _st, ...] = input_ds.open_memmap(
interleave="bip"
).copy()[:, 0, :]
# TODO: only records reflectance uncertainties, could grab additional states (consistent between classes)
logging.info(f"Reassemble {paths.uncert_subs_path}")
input_ds = envi.open(envi_header(paths.surface_subs_files["base"]["uncert"]))
rdn_ds = envi.open(envi_header(paths.surface_subs_files["base"]["rdn"]))
header = input_ds.metadata.copy()
header["lines"] = len(matching_indices)
header["bands"] = rdn_ds.metadata["bands"]
if "band names" in header.keys():
header["band names"] = [
input_ds.metadata["band names"][x] for x in range(int(header["bands"]))
]
output_ds = envi.create_image(
envi_header(paths.uncert_subs_path), header, ext="", force=True
)
output_mm = output_ds.open_memmap(interleave="bip", writable=True)
for _st, surface_type in enumerate(list(paths.surface_config_paths.keys())):
if np.sum(matching_indices == _st) > 0:
input_ds = envi.open(
envi_header(paths.surface_subs_files[surface_type]["uncert"])
)
output_mm[matching_indices == _st, ...] = input_ds.open_memmap(
interleave="bip"
)[:, :, : int(header["bands"])].copy()[:, 0, :]
[docs]
def sensor_name_to_dt(sensor: str, fid: str):
inversion_window_update = None
if sensor == "ang":
# parse flightline ID (AVIRIS-NG assumptions)
dt = datetime.strptime(fid[3:], "%Y%m%dt%H%M%S")
elif sensor == "av3":
# parse flightline ID (AVIRIS-3 assumptions)
dt = datetime.strptime(fid[3:], "%Y%m%dt%H%M%S")
inversion_window_update = [[380.0, 1350.0], [1435, 1800.0], [1970.0, 2500.0]]
elif sensor == "av5":
# parse flightline ID (AVIRIS-5 assumptions)
dt = datetime.strptime(fid[3:], "%Y%m%dt%H%M%S")
elif sensor == "avcl":
# parse flightline ID (AVIRIS-Classic assumptions)
dt = datetime.strptime("20{}t000000".format(fid[1:7]), "%Y%m%dt%H%M%S")
elif sensor == "emit":
# parse flightline ID (EMIT assumptions)
dt = datetime.strptime(fid[:19], "emit%Y%m%dt%H%M%S")
INVERSION_WINDOWS = [[380.0, 1325.0], [1435, 1770.0], [1965.0, 2500.0]]
elif sensor == "enmap":
# parse flightline ID (EnMAP assumptions)
dt = datetime.strptime(fid[:15], "%Y%m%dt%H%M%S")
elif sensor == "hyp":
# parse flightline ID (Hyperion assumptions)
dt = datetime.strptime(fid[10:17], "%Y%j")
elif sensor == "neon":
# parse flightline ID (NEON assumptions)
dt = datetime.strptime(fid, "NIS01_%Y%m%d_%H%M%S")
elif sensor == "prism":
# parse flightline ID (PRISM assumptions)
dt = datetime.strptime(fid[3:], "%Y%m%dt%H%M%S")
elif sensor == "prisma":
# parse flightline ID (PRISMA assumptions)
dt = datetime.strptime(fid, "%Y%m%d%H%M%S")
elif sensor == "gao":
# parse flightline ID (GAO/CAO assumptions)
dt = datetime.strptime(fid[3:-5], "%Y%m%dt%H%M%S")
elif sensor == "oci":
# parse flightline ID (PACE OCI assumptions)
dt = datetime.strptime(fid[9:24], "%Y%m%dT%H%M%S")
elif sensor == "tanager":
# parse flightline ID (Tanager assumptions)
dt = datetime.strptime(fid[:15], "%Y%m%d_%H%M%S")
elif sensor[:3] == "NA-":
dt = datetime.strptime(sensor[3:], "%Y%m%d")
else:
raise ValueError(
"Datetime object could not be obtained. Please check file name of input"
" data."
)
return dt, inversion_window_update
[docs]
def get_wavelengths(
envi_file: str, wavelength_path: str = None
) -> (np.array, np.array):
"""Get wavelengths and FWHM from the header of an ENVI file
Args:
envi_file: path to the ENVI file to read wavelengths from
wavelength_path: optional path to a file containing wavelengths and FWHM
Returns:
tuple containing:
wl - array of wavelengths in nm
fwhm - array of full width at half maximum in nm
"""
# get radiance file, wavelengths, fwhm
radiance_dataset = envi.open(envi_header(envi_file))
wl_ds = np.array([float(w) for w in radiance_dataset.metadata["wavelength"]])
if wavelength_path:
if os.path.isfile(wavelength_path):
chn, wl, fwhm = np.loadtxt(wavelength_path).T
if len(chn) != len(wl_ds):
raise ValueError(
"Number of channels in wavelength file do not match"
" wavelengths in radiance cube. Please adjust your wavelength file."
)
else:
pass
else:
logging.info(
"No wavelength file provided. Obtaining wavelength grid from ENVI header of radiance cube."
)
wl = wl_ds
if "fwhm" in radiance_dataset.metadata:
fwhm = np.array([float(f) for f in radiance_dataset.metadata["fwhm"]])
elif "FWHM" in radiance_dataset.metadata:
fwhm = np.array([float(f) for f in radiance_dataset.metadata["FWHM"]])
else:
fwhm = np.ones(wl.shape) * (wl[1] - wl[0])
# Close out radiance dataset to avoid potential confusion
del radiance_dataset
# Convert to microns if needed
if wl[0] > 100:
logging.info("Wavelength units of nm inferred...converting to microns")
wl = units.nm_to_micron(wl)
fwhm = units.nm_to_micron(fwhm)
return wl, fwhm
[docs]
def write_wavelength_file(filename, wl, fwhm):
"""Write a wavelength file in isofit-expected format
Units can be either nm or microns, but should be the same
Args:
filename: path to the file to write
wl: array of wavelengths
fwhm: array of full width at half maximum
Returns:
None
"""
# write wavelength file
wl_data = np.concatenate(
[np.arange(len(wl))[:, np.newaxis], wl[:, np.newaxis], fwhm[:, np.newaxis]],
axis=1,
)
np.savetxt(filename, wl_data, delimiter=" ")
[docs]
def make_surface_config(
paths: Pathnames,
surface_category="multicomponent_surface",
pressure_elevation=None,
elevation_lut_grid=[],
surface_mapping: dict = None,
use_superpixels=False,
):
"""
Constructs the surface component of the config
Args:
paths: Pathnames object with all key values passed from apply_oe
surface_category: Base surface category
Returns:
surface_config_dict: Dictionary with all surface parameters and file
locations.
"""
# Initialize config dict
surface_config_dict = {
"multi_surface_flag": False,
}
# Check to see if a classification file is being propogated
# If so, use multisurface
if paths.surface_class_working_path:
surface_config_dict["Surfaces"] = {}
if use_superpixels:
surface_config_dict["surface_class_file"] = paths.surface_class_subs_path
else:
surface_config_dict["surface_class_file"] = paths.surface_class_working_path
surface_config_dict["base_surface_class_file"] = (
paths.surface_class_working_path
)
surface_config_dict["multi_surface_flag"] = True
# Get the surface categories present.
surface_classes_present = np.unique(
envi.open(envi_header(paths.surface_class_working_path)).open_memmap(
inteleave="bip"
)
)
# Iterate through all classes present in class image
for i in surface_classes_present:
surface_category = SurfaceMapping[int(i)]
# If surface_path given, use for all surfaces
surface_path = paths.surface_working_paths[surface_category]
# Set up "Surfaces" component of surface config
surface_config_dict["Surfaces"][surface_category] = {
"surface_int": int(i),
"surface_file": surface_path,
"surface_category": surface_category,
}
# Single surface run
else:
surface_config_dict["surface_file"] = paths.surface_working_paths[
surface_category
]
surface_config_dict["surface_category"] = surface_category
return surface_config_dict