Source code for isofit.utils.wavelength_cal

#! /usr/bin/env python3
#
# Authors: Philip G. Brodrick
#

import json
import logging
import os
import subprocess
from os.path import exists, join
from shutil import copyfile

import click
import numpy as np
import ray
from spectral.io import envi

import isofit.utils.template_construction as tmpl
from isofit.configs import configs
from isofit.core import instrument, isofit
from isofit.core.common import envi_header
from isofit.utils.apply_oe import (
    INVERSION_WINDOWS,
    RTM_CLEANUP_LIST,
    SUPPORTED_SENSORS,
    UNCORRELATED_RADIOMETRIC_UNCERTAINTY,
)


[docs] def add_wavelength_elements(config_path, state_type="shift", spline_indices=[]): config = json.load(open(config_path, "r")) # Set RTM WL range hi_res_wavlengths = os.path.join( f"{os.path.dirname(config_path)}", "..", "data", "wavelengths_highres.txt" ) x = np.arange(360, 2511, 0.025) / 1000.0 w = np.ones(len(x)) * 0.025 / 1000.0 n = np.ones(len(x)) D = np.c_[n, x, w] np.savetxt(hi_res_wavlengths, D, fmt="%8.6f") config["forward_model"]["radiative_transfer"]["radiative_transfer_engines"][ "vswir" ]["wavelength_file"] = hi_res_wavlengths # Flag case where spline_indices is empty and using spline if not spline_indices and ( (state_type == "spline") or (state_type == "spline-only") ): errstr = "Spline state given, but no indices provided" raise ValueError(errstr) config["forward_model"]["instrument"]["calibration_fixed"] = False if state_type == "shift": config["forward_model"]["instrument"]["statevector"] = { "GROW_FWHM": { "bounds": [-5, 5], "init": 0, "prior_mean": 0, "prior_sigma": 100.0, "scale": 1, }, "WL_SHIFT": { "bounds": [-5, 5], "init": 0, "prior_mean": 0, "prior_sigma": 100.0, "scale": 1, }, } elif state_type == "shift-only": config["forward_model"]["instrument"]["statevector"] = { "WL_SHIFT": { "bounds": [-5, 5], "init": 0, "prior_mean": 0, "prior_sigma": 100.0, "scale": 1, } } elif state_type == "spline": config["forward_model"]["instrument"]["statevector"] = { "GROW_FWHM": { "bounds": [-5, 5], "init": 0, "prior_mean": 0, "prior_sigma": 100.0, "scale": 1, } } for spline_index in spline_indices: config["forward_model"]["instrument"]["statevector"][ f"WLSPL_{spline_index}" ] = { "bounds": [-5, 5], "init": 0, "prior_mean": 0, "prior_sigma": 100.0, "scale": 1, } elif state_type == "spline-only": for spline_index in spline_indices: config["forward_model"]["instrument"]["statevector"][ f"WLSPL_{spline_index}" ] = { "bounds": [-5, 5], "init": 0, "prior_mean": 0, "prior_sigma": 100.0, "scale": 1, } logging.info(f"Writing config update with WL cal parameters to {config_path}") with open(config_path, "w") as fout: fout.write(json.dumps(config, cls=tmpl.SerialEncoder, indent=4, sort_keys=True))
[docs] def average_columns( input_radiance_file: str, input_loc_file: str, input_obs_file: str, output_radiance_file: str, output_loc_file: str, output_obs_file: str, start_column: int = None, end_column: int = None, column_interval: int = 1, ): """ Averages the columns of input radiance, location, and observation files and saves the results to output files. Args: input_radiance_file (str): Path to the input radiance file. input_loc_file (str): Path to the input location file. input_obs_file (str): Path to the input observation file. output_radiance_file (str): Path to the output radiance file. output_loc_file (str): Path to the output location file. output_obs_file (str): Path to the output observation file. start_column (int, optional): Starting column index for averaging. Defaults to None, which means start from the first column. end_column (int, optional): Ending column index for averaging. Defaults to None, which means average until the last column. column_interval (int, optional): Step size over columns; skips, doesn't average. Useful to increase runtime speeds. Returns: rows (int): The number of rows in the averaged output files. """ rows = 0 for infile, outfile in zip( [input_radiance_file, input_loc_file, input_obs_file], [output_radiance_file, output_loc_file, output_obs_file], ): in_ds = envi.open(envi_header(infile), infile) mm = in_ds.open_memmap(interleave="bip").copy() # Stay robust to nodata values - non-finite, header # specified nodata, and -9999 (in case unspecified) nodata = np.isfinite(mm) == False nodata[mm == -9999] = True if "data ignore value" in in_ds.metadata: nodata[mm == in_ds.metadata["data ignore value"]] = True mm[nodata] = np.nan rows = mm.shape[0] avg = np.nanmean(mm, axis=0) # Subsample if specified if start_column is None: start_column = 0 if end_column is None: end_column = avg.shape[0] avg = avg[start_column:end_column:column_interval, :] meta = in_ds.metadata.copy() meta["lines"] = 1 meta["samples"] = avg.shape[0] out_ds = envi.create_image( envi_header(outfile), shape=(1, avg.shape[0], avg.shape[1]), ext="", dtype=in_ds.dtype, interleave="bil", force=True, metadata=meta, ) out_ds.open_memmap(interleave="bip", writable=True)[:] = avg del in_ds, out_ds return rows
[docs] def wavelength_cal( input_radiance, input_loc, input_obs, working_directory, sensor, surface_path, copy_input_files=False, modtran_path=None, wavelength_path=None, surface_category="multicomponent_surface", rdn_factors_path=None, atmosphere_type="ATM_MIDLAT_SUMMER", channelized_uncertainty_path=None, model_discrepancy_path=None, lut_config_file=None, logging_level="INFO", log_file=None, n_cores=1, ray_temp_dir="/tmp/ray", emulator_base=None, prebuilt_lut=None, inversion_windows=None, wl_state_type="shift", spline_indices=[], force_with_geo=False, start_column=None, end_column=None, column_interval=1, ): """\ Runs a wavelength calibration on an input scene. Observation (obs) and location (loc) files are used to determine appropriate geometry lookup tables and provide a heuristic means of determining atmospheric water ranges. \b Parameters ---------- input_radiance : str Radiance data cube. Expected to be ENVI format input_loc : str Location data cube of shape (Lon, Lat, Elevation). Expected to be ENVI format input_obs : str Observation data cube of shape: (path length, to-sensor azimuth, to-sensor zenith, to-sun azimuth, to-sun zenith, phase, slope, aspect, cosine i, UTC time) Expected to be ENVI format working_directory : str Directory to stage multiple outputs, will contain subdirectories sensor : str The sensor used for acquisition, will be used to set noise and datetime settings surface_path : str Path to surface model or json dict of surface model configuration copy_input_files : bool, default=False Flag to choose to copy input_radiance, input_loc, and input_obs locally into the working_directory modtran_path : str, default=None Location of MODTRAN utility. Alternately set with `MODTRAN_DIR` environment variable wavelength_path : str, default=None Location to get wavelength information from, if not specified the radiance header will be used surface_category : str, default="multicomponent_surface" The type of ISOFIT surface priors to use. Default is multicomponent_surface rdn_factors_path : str, default=None Specify a radiometric correction factor, if desired atmosphere_type : str, default="ATM_MIDLAT_SUMMER" Atmospheric profile to be used for MODTRAN simulations. Unused for other radiative transfer models. channelized_uncertainty_path : str, default=None Path to a channelized uncertainty file model_discrepancy_path : str, default=None Modifies S_eps in the OE formalism as the Gamma additive term, as: S_eps = Sy + Kb.dot(self.Sb).dot(Kb.T) + Gamma lut_config_file : str, default=None Path to a look up table configuration file, which will override defaults choices logging_level : str, default="INFO" Logging level with which to run ISOFIT log_file : str, default=None File path to write ISOFIT logs to n_cores : int, default=1 Number of cores to run ISOFIT with. Substantial parallelism is available, and full runs will be very slow in serial. Suggested to max this out on the available system ray_temp_dir : str, default="/tmp/ray" Location of temporary directory for ray parallelization engine emulator_base : str, default=None Location of emulator base path. Point this at the model folder (or h5 file) of sRTMnet to use the emulator instead of MODTRAN. An additional file with the same basename and the extention _aux.npz must accompany e.g. /path/to/emulator.h5 /path/to/emulator_aux.npz prebuilt_lut : str, default=None Use this pre-constructed look up table for all retrievals. Must be an ISOFIT-compatible RTE NetCDF inversion_windows : list[float], default=None Override the default inversion windows. Will supercede any sensor specific defaults that are in place. Must be in 2-item tuples force_with_geo : bool, default=False If True, will allow the wavelength_cal to run on georeferenced data. Not recommended, unless you *really* know what you are doing, or know your metadata is odd. start_column : int, default=None Starting column index for averaging. Defaults to None, which means start from the first column. end_column : int, default=None Ending column index for averaging. Defaults to None, which means average until the last column. column_interval : int, default=1 Step size over columns; skips, doesn't average. Useful to increase runtime speeds. """ radiance_meta = envi.open(envi_header(input_radiance)).metadata if ( force_with_geo is False and "map info" in radiance_meta and radiance_meta["map info"] is not None ): to_raise = """Radiance metadata has non-blank map info string, implying that it is a georeferenced file. wavelength_cal will perform columnwise averages that will break this. Please adjust or use the '--force_with_geo' flag.""" logging.error(to_raise) raise ValueError(to_raise) ##################### Front Matter ######################### # Determine if we run in multipart-transmittance (4c) mode if emulator_base is not None: if emulator_base.endswith(".jld2"): multipart_transmittance = False else: if emulator_base.endswith(".npz"): emulator_aux_file = emulator_base else: emulator_aux_file = os.path.abspath( os.path.splitext(emulator_base)[0] + "_aux.npz" ) aux = np.load(emulator_aux_file) if ( "transm_down_dir" and "transm_down_dif" and "transm_up_dir" and "transm_up_dif" in aux["rt_quantities"] ): multipart_transmittance = True else: multipart_transmittance = False else: # This is the MODTRAN case. Do we want to enable the 4c mode by default? multipart_transmittance = True if sensor not in SUPPORTED_SENSORS: if sensor[:3] != "NA-": errstr = ( f'Argument error: sensor must be one of {SUPPORTED_SENSORS} or "NA-*"' ) raise ValueError(errstr) if os.path.isdir(working_directory) is False: os.mkdir(working_directory) logging.basicConfig( format="%(levelname)s:%(asctime)s || %(filename)s:%(funcName)s() | %(message)s", level=logging_level, filename=log_file, datefmt="%Y-%m-%d,%H:%M:%S", ) ################## Staging Setup ########################## logging.info("Checking input data files...") rdn_dataset = envi.open(envi_header(input_radiance)) rdn_size = (rdn_dataset.shape[0], rdn_dataset.shape[1]) del rdn_dataset for infile_name, infile in zip( ["input_radiance", "input_loc", "input_obs"], [input_radiance, input_loc, input_obs], ): if os.path.isfile(infile) is False: err_str = ( f"Input argument {infile_name} give as: {infile}. File not found on" " system." ) raise ValueError("argument " + err_str) if infile_name != "input_radiance": input_dataset = envi.open(envi_header(infile), infile) input_size = (input_dataset.shape[0], input_dataset.shape[1]) if not (input_size[0] == rdn_size[0] and input_size[1] == rdn_size[1]): err_str = ( f"Input file: {infile_name} size is {input_size}, which does not" f" match input_radiance size: {rdn_size}" ) raise ValueError(err_str) logging.info("...Data file checks complete") lut_params = tmpl.LUTConfig(lut_config_file, emulator_base, False) logging.info("Setting up files and directories....") skyview_factor = None surface_class_file = None paths = tmpl.Pathnames( input_radiance=input_radiance, input_loc=input_loc, input_obs=input_obs, surface_class_file=surface_class_file, sensor=sensor, surface_path=surface_path, working_directory=working_directory, copy_input_files=copy_input_files, modtran_path=modtran_path, rdn_factors_path=rdn_factors_path, model_discrepancy_path=model_discrepancy_path, aerosol_climatology_path=None, channelized_uncertainty_path=channelized_uncertainty_path, ray_temp_dir=ray_temp_dir, subs=False, skyview_factor=skyview_factor, ) paths.make_directories() paths.stage_files() logging.info("...file/directory setup complete") # Based on the sensor type, get appropriate year/month/day info from initial condition. # We'll adjust for line length and UTC day overrun later global INVERSION_WINDOWS dt, sensor_inversion_window = tmpl.sensor_name_to_dt(sensor, paths.fid) if sensor_inversion_window is not None: INVERSION_WINDOWS = sensor_inversion_window if inversion_windows: assert all( [len(window) == 2 for window in inversion_windows] ), "Inversion windows must be in pairs" INVERSION_WINDOWS = inversion_windows logging.info(f"Using inversion windows: {INVERSION_WINDOWS}") # Collapse data row-wise logging.info("Collapsing data row-wise...") n_rows = average_columns( paths.input_radiance_file, paths.input_loc_file, paths.input_obs_file, paths.rdn_subs_path, paths.loc_subs_path, paths.obs_subs_path, start_column, end_column, column_interval, ) dayofyear = dt.timetuple().tm_yday ( h_m_s, day_increment, mean_path_km, mean_to_sensor_azimuth, mean_to_sensor_zenith, mean_to_sun_azimuth, mean_to_sun_zenith, mean_relative_azimuth, valid, to_sensor_zenith_lut_grid, to_sun_zenith_lut_grid, relative_azimuth_lut_grid, ) = tmpl.get_metadata_from_obs(paths.obs_subs_path, lut_params) # overwrite the time in case original obs has an error in that band if h_m_s[0] != dt.hour and h_m_s[0] >= 24: h_m_s[0] = dt.hour logging.info( "UTC hour did not match start time minute. Adjusting to that value." ) if h_m_s[1] != dt.minute and h_m_s[1] >= 60: h_m_s[1] = dt.minute logging.info( "UTC minute did not match start time minute. Adjusting to that value." ) if day_increment: dayofyear += 1 gmtime = float(h_m_s[0] + h_m_s[1] / 60.0) # get radiance file, wavelengths, fwhm wl, fwhm = tmpl.get_wavelengths( paths.radiance_working_path, wavelength_path, ) tmpl.write_wavelength_file(paths.wavelength_path, wl, fwhm) # check and rebuild surface model if needed use_multisurface = False paths.surface_paths = tmpl.check_surface_model( surface_path=surface_path, output_model_path=paths.surface_template_path, wl=wl, surface_wavelength_path=paths.wavelength_path, surface_category=surface_category, multisurface=use_multisurface, ) # re-stage surface model if needed paths.surface_working_paths = {} for key, value in paths.surface_paths.items(): name, ext = os.path.splitext(paths.surface_template_path) if use_multisurface: surface_working_path = f"{name}_{key}{ext}" else: surface_working_path = f"{name}{ext}" if value != surface_working_path and surface_path.endswith(".mat"): copyfile(value, surface_working_path) paths.surface_working_paths[key] = surface_working_path ( mean_latitude, mean_longitude, mean_elevation_km, elevation_lut_grid, ) = tmpl.get_metadata_from_loc( paths.loc_working_path, lut_params, pressure_elevation=False ) if emulator_base is not None: if elevation_lut_grid is not None and np.any(elevation_lut_grid < 0): to_rem = elevation_lut_grid[elevation_lut_grid < 0].copy() elevation_lut_grid[elevation_lut_grid < 0] = 0 elevation_lut_grid = np.unique(elevation_lut_grid) if len(elevation_lut_grid) == 1: elevation_lut_grid = None mean_elevation_km = elevation_lut_grid[ 0 ] # should be 0, but just in case logging.info( "Scene contains target lut grid elements < 0 km, and uses 6s (via" " sRTMnet). 6s does not support targets below sea level in km units. " f" Setting grid points {to_rem} to 0." ) if mean_elevation_km < 0: mean_elevation_km = 0 logging.info( f"Scene contains a mean target elevation < 0. 6s does not support" f" targets below sea level in km units. Setting mean elevation to 0." ) mean_altitude_km = ( mean_elevation_km + np.cos(np.deg2rad(mean_to_sensor_zenith)) * mean_path_km ) logging.info("Observation means:") logging.info(f"Path (km): {mean_path_km}") logging.info(f"To-sensor azimuth (deg): {mean_to_sensor_azimuth}") logging.info(f"To-sensor zenith (deg): {mean_to_sensor_zenith}") logging.info(f"To-sun azimuth (deg): {mean_to_sun_azimuth}") logging.info(f"To-sun zenith (deg): {mean_to_sun_zenith}") logging.info(f"Relative to-sun azimuth (deg): {mean_relative_azimuth}") logging.info(f"Altitude (km): {mean_altitude_km}") if emulator_base is not None and mean_altitude_km > 99: if not emulator_base.endswith(".jld2"): logging.info( "Adjusting altitude to 99 km for integration with 6S, because emulator is" " chosen." ) mean_altitude_km = 99 # We will use the model discrepancy with covariance OR uncorrelated # Calibration error, but not both. if model_discrepancy_path is not None: uncorrelated_radiometric_uncertainty = 0 else: uncorrelated_radiometric_uncertainty = UNCORRELATED_RADIOMETRIC_UNCERTAINTY h2o_lut_grid = lut_params.get_grid( lut_params.h2o_range[0], lut_params.h2o_range[1], lut_params.h2o_spacing, lut_params.h2o_spacing_min, ) logging.info("Full (non-aerosol) LUTs:") logging.info(f"Elevation: {elevation_lut_grid}") logging.info(f"To-sensor zenith: {to_sensor_zenith_lut_grid}") logging.info(f"To-sun zenith: {to_sun_zenith_lut_grid}") logging.info(f"Relative to-sun azimuth: {relative_azimuth_lut_grid}") logging.info(f"H2O Vapor: {h2o_lut_grid}") if ( not exists(paths.state_subs_path) or not exists(paths.uncert_subs_path) or not exists(paths.rfl_subs_path) ): tmpl.write_modtran_template( atmosphere_type=atmosphere_type, fid=paths.fid, altitude_km=mean_altitude_km, dayofyear=dayofyear, to_sensor_azimuth=mean_to_sensor_azimuth, to_sensor_zenith=mean_to_sensor_zenith, to_sun_zenith=mean_to_sun_zenith, relative_azimuth=mean_relative_azimuth, gmtime=gmtime, elevation_km=mean_elevation_km, output_file=paths.modtran_template_path, ) tmpl.build_main_config( paths=paths, lut_params=lut_params, h2o_lut_grid=h2o_lut_grid, elevation_lut_grid=( elevation_lut_grid if elevation_lut_grid is not None else [mean_elevation_km] ), to_sensor_zenith_lut_grid=( to_sensor_zenith_lut_grid if to_sensor_zenith_lut_grid is not None else [mean_to_sensor_zenith] ), to_sun_zenith_lut_grid=( to_sun_zenith_lut_grid if to_sun_zenith_lut_grid is not None else [mean_to_sun_zenith] ), relative_azimuth_lut_grid=( relative_azimuth_lut_grid if relative_azimuth_lut_grid is not None else [mean_relative_azimuth] ), mean_latitude=mean_latitude, mean_longitude=mean_longitude, dt=dt, use_superpixels=True, n_cores=n_cores, surface_category=surface_category, emulator_base=emulator_base, uncorrelated_radiometric_uncertainty=uncorrelated_radiometric_uncertainty, multiple_restarts=False, segmentation_size=n_rows, pressure_elevation=False, prebuilt_lut_path=prebuilt_lut, inversion_windows=INVERSION_WINDOWS, multipart_transmittance=multipart_transmittance, ) add_wavelength_elements( paths.isofit_full_config_path, wl_state_type, spline_indices ) # Run retrieval logging.info("Running ISOFIT with full LUT") retrieval_full = isofit.Isofit( paths.isofit_full_config_path, level="INFO", logfile=log_file ) retrieval_full.run() del retrieval_full # clean up unneeded storage if emulator_base is None: for to_rm in RTM_CLEANUP_LIST: cmd = "rm " + join(paths.full_lut_directory, to_rm) logging.info(cmd) subprocess.call(cmd, shell=True)
[docs] def get_wavelength_adjustment( config_file: str, output_file: str = None, filter_edges: int = 0 ): """ Get the wavelength adjustment based on a previous isofit wavelength_cal run. Args: config_file (str): The configuration file for the wavelength_cal run. output_file (str): The file to write the wavelength adjustment to. filter_edges (int): number of edge columns to trim from average Returns: None """ logging.info("Getting wavelength adjustment...") # Load the instrument object config = configs.load_config(config_file) instrument = instrument.Instrument(config) # Load the statevector - we could do this by loading the forward # model and the indicies, but in case the LUT is large, we're # shortcutting here state_ds = envi.open(envi_header(config["output"]["estimated_state_file"])) state_possible_names = ["GRWO_FWHM", "FWHMSPL", "WL_SPACE", "WL_SHIFT", "WLSPL"] band_names = list(state_ds.metadata["band names"]) instrument_idx = np.array( [ np.any([bn.startswith(name) for name in state_possible_names]) for bn in band_names ] ) instrument_state = state_ds.open_memmap(interleave="bip", writable=False)[ :, instrument_idx ] # average # Filter first and last row to avoid instability if filter_edges > 0: mean_state = np.mean( instrument_state[filter_edges : -1 * filter_edges, :], axis=0 ) else: mean_state = np.mean(instrument_state, axis=0) # optionally filter # run calibration() wl, fwhm = instrument.calibration(mean_state) # write file tmpl.write_wavelength_file(output_file, wl, fwhm)
@click.command( name="get_wavelength_adjustment", help=get_wavelength_adjustment.__doc__, no_args_is_help=True, ) @click.argument("config_file") @click.argument("output_file") @click.argument("--filter_edges") @click.option( "--debug-args", help="Prints the arguments list without executing the command", is_flag=True, )
[docs] def cli_get_wavelength_adjustment(debug_args, profile, **kwargs): if debug_args: print("Arguments to be passed:") for key, value in kwitems(): print(f" {key} = {value!r}") else: get_wavelength_adjustment(**kwargs) print("Done")
# Input arguments @click.command(name="wavelength_cal", help=wavelength_cal.__doc__, no_args_is_help=True) @click.argument("input_radiance") @click.argument("input_loc") @click.argument("input_obs") @click.argument("working_directory") @click.argument("sensor") @click.option("--surface_path", "-sp", required=True, type=str) @click.option("--copy_input_files", is_flag=True, default=False) @click.option("--modtran_path") @click.option("--wavelength_path") @click.option("--surface_category", default="multicomponent_surface") @click.option("--rdn_factors_path") @click.option("--atmosphere_type", default="ATM_MIDLAT_SUMMER") @click.option("--channelized_uncertainty_path") @click.option("--model_discrepancy_path") @click.option("--lut_config_file") @click.option("--logging_level", default="INFO") @click.option("--log_file") @click.option("--n_cores", type=int, default=1) @click.option("--ray_temp_dir", default="/tmp/ray") @click.option("--emulator_base") @click.option("--prebuilt_lut", type=str) @click.option("--inversion_windows", type=float, nargs=2, multiple=True) @click.option( "--wl_state_type", type=click.Choice( ["shift", "shift-only", "spline", "spline-only"], case_sensitive=True ), default="shift", ) @click.option("--spline_indices", "-si", type=int, multiple=True, default=None) @click.option("--force_with_geo", is_flag=True, default=False) @click.option("--start_column", type=int, default=None) @click.option("--end_column", type=int, default=None) @click.option("--column_interval", type=int, default=1) @click.option( "--debug-args", help="Prints the arguments list without executing the command", is_flag=True, ) @click.option("--profile")
[docs] def cli_wavelength_cal(debug_args, profile, **kwargs): if debug_args: print("Arguments to be passed:") for key, value in kwitems(): print(f" {key} = {value!r}") else: if profile: import cProfile import pstats profiler = cProfile.Profile() profiler.enable() wavelength_cal(**kwargs) if profile: profiler.disable() stats = pstats.Stats(profiler) stats.dump_stats(profile) print("Done")