#! /usr/bin/env python3
#
# Copyright 2018 California Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ISOFIT: Imaging Spectrometer Optimal FITting
# Author: David R Thompson, david.r.thompson@jpl.nasa.gov
#
from __future__ import annotations
import logging
import os
from collections import OrderedDict
from typing import List
import numpy as np
import scipy.io
import xarray as xr
from spectral.io import envi
import isofit
from isofit.core import units
from isofit.core.common import (
envi_header,
eps,
load_spectrum,
load_wavelen,
resample_spectrum,
)
from isofit.core.geometry import Geometry
from isofit.core.multistate import match_statevector
from isofit.data import env
from isofit.inversion.inverse_simple import invert_algebraic
### Variables ###
[docs]
Logger = logging.getLogger(__file__)
# Constants related to file I/O
[docs]
typemap = {
np.uint8: 1,
np.int16: 2,
np.int32: 3,
np.float32: 4,
np.float64: 5,
np.complex64: 6,
np.complex128: 9,
np.uint16: 12,
np.uint32: 13,
np.int64: 14,
np.uint64: 15,
}
### Classes ###
[docs]
class SpectrumFile:
"""A buffered file object that contains configuration information about formatting, etc."""
def __init__(
self,
fname,
write=False,
n_rows=None,
n_cols=None,
n_bands=None,
interleave=None,
dtype=np.float32,
wavelengths=None,
fwhm=None,
band_names=None,
bad_bands="[]",
zrange="{0.0, 1.0}",
flag=-9999.0,
ztitles="{Wavelength (nm), Magnitude}",
map_info="{}",
engine_name=None,
isofit_version=None,
):
"""."""
[docs]
self.frames = OrderedDict()
[docs]
self.fname = os.path.abspath(fname)
[docs]
self.band_names = band_names
if self.fname.endswith(".txt"):
# The .txt suffix implies a space-separated ASCII text file of
# one or more data columns. This is cheap to load and store, so
# we do not defer read/write operations.
logging.debug("Inferred ASCII file format for %s" % self.fname)
self.format = "ASCII"
if not self.write:
self.data, self.wl = load_spectrum(self.fname)
self.n_rows, self.n_cols, self.map_info = 1, 1, "{}"
if self.wl is not None:
self.n_bands = len(self.wl)
else:
self.n_bands = None
self.meta = {}
elif self.fname.endswith(".mat"):
# The .mat suffix implies a matlab-style file, i.e. a dictionary
# of 2D arrays and other matlab-like objects. This is typically
# only used for specific output products associated with single
# spectrum retrievals; there is no read option.
logging.debug("Inferred MATLAB file format for %s" % self.fname)
self.format = "MATLAB"
if not self.write:
logging.error("Unsupported MATLAB file in input block")
raise IOError("MATLAB format in input block not supported")
elif self.fname.endswith(".nc"):
logging.debug(f"Inferred NETCDF file format for {self.fname}")
self.format = "NETCDF"
if not self.write:
self.dataset = xr.open_dataset(self.fname)
self.data = self.dataset.radiance
self.meta = self.data.attrs
self.n_rows = self.data.downtrack.size
self.n_cols = self.data.crosstrack.size
self.n_bands = self.data.bands.size
else:
# EMIT-specific metadata
meta = {
"ncei_template_version": "NCEI_NetCDF_Swath_Template_v2.0",
"summary": "The Earth Surface Mineral Dust Source Investigation (EMIT) is an Earth Ventures-Instrument (EVI-4) Mission that maps the surface mineralogy of arid dust source regions via imaging spectroscopy in the visible and short-wave infrared (VSWIR). Installed on the International Space Station (ISS), the EMIT instrument is a Dyson imaging spectrometer that uses contiguous spectroscopic measurements from 410 to 2450 nm to resolve absoprtion features of iron oxides, clays, sulfates, carbonates, and other dust-forming minerals. During its one-year mission, EMIT will observe the sunlit Earth's dust source regions that occur within +/-52° latitude and produce maps of the source regions that can be used to improve forecasts of the role of mineral dust in the radiative forcing (warming or cooling) of the atmosphere.\\n\\nThis file contains L2A estimated surface reflectances and geolocation data. Reflectance estimates are created using an Optimal Estimation technique - see ATBD for details. Reflectance values are reported as fractions (relative to 1).",
"keywords": "Imaging Spectroscopy, minerals, EMIT, dust, radiative forcing",
"Conventions": "CF-1.63",
"sensor": "EMIT (Earth Surface Mineral Dust Source Investigation)",
"instrument": "EMIT",
"platform": "ISS",
"institution": "NASA Jet Propulsion Laboratory/California Institute of Technology",
"license": "",
"naming_authority": "",
"date_created": str(dtt.now()),
"keywords_vocabulary": "NASA Global Change Master Directory (GCMD) Science Keywords",
"stdname_vocabulary": "NetCDF Climate and Forecast (CF) Metadata Convention",
"creator_name": "ISOFIT",
"creator_url": "",
"project": "Earth Surface Mineral Dust Source Investigation",
"project_url": "https://emit.jpl.nasa.gov/",
"publisher_name": "",
"publisher_url": "",
"publisher_email": "",
"identifier_product_doi_authority": "https://doi.org",
"flight_line": "",
"time_coverage_start": "",
"time_coverage_end": "",
"software_build_version": "",
"software_delivery_version": "",
"product_version": "",
"history": "ISOFIT generated",
"crosstrack_orientation": "",
"easternmost_longitude": None,
"northernmost_latitude": None,
"westernmost_longitude": None,
"southernmost_latitude": None,
"spatialResolution": None,
"spatial_ref": "",
"geotransform": [],
"day_night_flag": "",
"title": "EMIT L2A Estimated Surface Reflectance",
}
self.n_rows = n_rows
self.n_cols = n_cols
self.n_bands = n_bands
self.dataset = xr.Dataset(
{
"reflectance": (
("downtrack", "crosstrack", "bands"),
np.zeros((n_rows, n_cols, n_bands)),
)
}
)
self.dataset.attrs = meta
self.data = self.dataset.reflectance
# Initialize the file
self.dataset.to_netcdf(self.fname)
else:
# Otherwise we assume it is an ENVI-format file, which is
# basically just a binary data cube with a detached human-
# readable ASCII header describing dimensions, interleave, and
# metadata. We buffer this data in self.frames, reading and
# writing individual rows of the cube on-demand.
logging.debug("Inferred ENVI file format for %s" % self.fname)
self.format = "ENVI"
if not self.write:
# If we are an input file, the header must preexist.
if not os.path.exists(envi_header(self.fname)):
logging.error("Could not find %s" % (envi_header(self.fname)))
raise IOError("Could not find %s" % (envi_header(self.fname)))
# open file and copy metadata
self.file = envi.open(envi_header(self.fname), fname)
self.meta = self.file.metadata.copy()
self.n_rows = int(self.meta["lines"])
self.n_cols = int(self.meta["samples"])
self.n_bands = int(self.meta["bands"])
if "data ignore value" in self.meta:
self.flag = float(self.meta["data ignore value"])
else:
self.flag = -9999.0
else:
# If we are an output file, we may have to build the header
# from scratch. Hopefully the caller has supplied the
# necessary metadata details.
meta = {
"description": (
f"L2A per-pixel surface retrieval (engine={engine_name}, isofit_version={isofit_version})"
),
"lines": n_rows,
"samples": n_cols,
"bands": n_bands,
"byte order": 0,
"header offset": 0,
"map info": map_info,
"file_type": "ENVI Standard",
"sensor type": "unknown",
"interleave": interleave,
"data type": typemap[dtype],
"wavelength units": "Nanometers",
"z plot range": zrange,
"z plot titles": ztitles,
"fwhm": fwhm,
"bbl": bad_bands,
"band names": band_names,
"wavelength": self.wl,
"data ignore value": self.flag,
}
for k, v in meta.items():
if v is None:
logging.error("Must specify %s" % (k))
raise IOError("Must specify %s" % (k))
if os.path.isfile(envi_header(fname)) is False:
self.file = envi.create_image(
envi_header(fname), meta, ext="", force=True
)
else:
self.file = envi.open(envi_header(fname))
self.open_map_with_retries()
[docs]
def open_map_with_retries(self):
"""Try to open a memory map, handling Beowulf I/O issues."""
self.memmap = None
for attempt in range(10):
self.memmap = self.file.open_memmap(interleave="bip", writable=self.write)
if self.memmap is not None:
return
raise IOError("could not open memmap for " + self.fname)
[docs]
def get_frame(self, row):
"""The frame is a 2D array, essentially a list of spectra. The
self.frames list acts as a hash table to avoid storing the
entire cube in memory. So we read them or create them on an
as-needed basis. When the buffer flushes via a call to
flush_buffers, they will be deleted."""
if row not in self.frames:
if not self.write:
d = self.memmap[row, :, :]
self.frames[row] = d.copy()
else:
self.frames[row] = np.nan * np.zeros((self.n_cols, self.n_bands))
return self.frames[row]
[docs]
def write_spectrum(self, row, col, x):
"""We write a spectrum. If a binary format file, we simply change
the data cached in self.frames and defer file I/O until
flush_buffers is called."""
if self.format == "ASCII":
# Multicolumn output for ASCII products
np.savetxt(self.fname, x, fmt="%10.6f")
elif self.format == "MATLAB":
# Dictionary output for MATLAB products
scipy.io.savemat(self.fname, x)
elif self.format == "NETCDF":
self.data[row, col][:] = x
else:
# Omit wavelength column for spectral products
frame = self.get_frame(row)
if x.ndim == 2:
x = x[:, -1]
frame[col, :] = x
[docs]
def read_spectrum(self, row, col):
"""Get a spectrum from the frame list or ASCII file. Note that if
we are an ASCII file, we have already read the single spectrum and
return it as-is (ignoring the row/column indices)."""
if self.format == "ASCII":
return self.data
elif self.format == "NETCDF":
return self.data.sel(downtrack=row, crosstrack=col).data
else:
frame = self.get_frame(row)
return frame[col]
[docs]
def flush_buffers(self):
"""Write to file, and refresh the memory map object."""
if self.format == "ENVI":
if self.write:
for row, frame in self.frames.items():
valid = np.logical_not(np.isnan(frame[:, 0]))
self.memmap[row, valid, :] = frame[valid, :]
self.frames = OrderedDict()
del self.file
self.file = envi.open(envi_header(self.fname), self.fname)
self.open_map_with_retries()
elif self.format == "NETCDF":
self.dataset.to_netcdf(self.fname)
[docs]
class IO:
"""..."""
def __init__(self, config: Config, forward: ForwardModel, full_statevec: list = []):
"""Initialization specifies retrieval subwindows for calculating
measurement cost distributions."""
[docs]
self.radiance_correction = None
[docs]
self.meas_wl = forward.instrument.wl_init
[docs]
self.meas_fwhm = forward.instrument.fwhm_init
[docs]
self.bbl = "{" + ",".join([str(1) for n in range(len(self.meas_wl))]) + "}"
[docs]
self.engine_name = (
config.forward_model.radiative_transfer.radiative_transfer_engines[
0
].engine_name
)
# Use the pre-defined full statevec
if len(full_statevec):
self.full_statevec = full_statevec
else:
self.full_statevec = forward.statevec
[docs]
self.n_sv = len(self.full_statevec)
[docs]
self.n_chan = len(self.meas_wl)
[docs]
self.flush_rate = config.implementation.io_buffer_size
[docs]
self.simulation_mode = config.implementation.mode == "simulation"
# Names of either the wavelength or statevector outputs
wl_names = [("Channel %i" % i) for i in range(self.n_chan)]
sv_names = self.full_statevec.copy()
self.input_datasets, self.output_datasets, self.map_info = {}, {}, "{}"
# Load input files and record relevant metadata
for element, element_name in zip(*self.config.input.get_elements()):
self.input_datasets[element_name] = SpectrumFile(element)
if (self.input_datasets[element_name].n_rows > self.n_rows) or (
self.input_datasets[element_name].n_cols > self.n_cols
):
self.n_rows = self.input_datasets[element_name].n_rows
self.n_cols = self.input_datasets[element_name].n_cols
for inherit in ["map info", "bbl"]:
if inherit in self.input_datasets[element_name].meta:
setattr(
self,
inherit.replace(" ", "_"),
self.input_datasets[element_name].meta[inherit],
)
for element, element_header, element_name in zip(
*self.config.output.get_output_files()
):
band_names, ztitle, zrange = element_header
if band_names == "statevector":
band_names = sv_names
elif band_names == "wavelength":
band_names = wl_names
elif band_names == "atm_coeffs":
band_names = wl_names * 5
else:
band_names = "{}"
n_bands = len(band_names)
self.output_datasets[element_name] = SpectrumFile(
element,
write=True,
n_rows=self.n_rows,
n_cols=self.n_cols,
n_bands=n_bands,
interleave="bip",
dtype=np.float32,
wavelengths=self.meas_wl,
fwhm=self.meas_fwhm,
band_names=band_names,
bad_bands=self.bbl,
map_info=self.map_info,
zrange=zrange,
ztitles=ztitle,
engine_name=self.engine_name,
isofit_version=config.implementation.isofit_version,
)
# Do we apply a radiance correction?
if self.config.input.radiometry_correction_file is not None:
filename = self.config.input.radiometry_correction_file
self.radiance_correction, wl = load_spectrum(filename)
# Load the earth sun distance data
[docs]
self.esd = self.load_esd()
[docs]
def get_components_at_index(self, row: int, col: int) -> InputData:
"""
Load data from input files at the specified (row, col) index.
Args:
row: row to retrieve data from
col: column to retrieve data from
Returns:
InputData: object containing all current data reads
"""
# Prepare out input data object by blanking it out
self.current_input_data.clear()
# Determine the appropriate row, column index. and initialize the
# data dictionary with empty entries.
data = dict([(i, None) for i in self.config.input.get_all_element_names()])
logging.debug(f"Row {row} Column {col}")
# Read data from any of the input files that are defined.
for source in self.input_datasets:
data[source] = self.input_datasets[source].read_spectrum(row, col)
self.reads += 1
if self.reads >= self.flush_rate:
self.flush_buffers()
if self.simulation_mode:
# If solving the inverse problem, the measurment is the surface reflectance
meas = data["reflectance_file"].copy()
else:
# If solving the inverse problem, the measurment is the radiance
# We apply the calibration correciton here for simplicity.
meas = data["measured_radiance_file"]
if meas is not None:
meas = meas.copy()
if data["radiometry_correction_file"] is not None:
meas *= data["radiometry_correction_file"]
self.current_input_data.meas = meas
if self.current_input_data.meas is None or np.all(
self.current_input_data.meas < -49
):
return None
## Check for any bad data flags
for source in self.input_datasets:
if np.allclose(data[source], self.input_datasets[source].flag):
return None
# Check if Sky view is used, else it is equal to 1.0.
if "skyview_factor_file" not in data or data["skyview_factor_file"] is None:
data["skyview_factor_file"] = 1.0
# We build the geometry object for this spectrum. For files not
# specified in the input configuration block, the associated entries
# will be 'None'. The Geometry object will use reasonable defaults.
geom = Geometry(
obs=data["obs_file"],
loc=data["loc_file"],
esd=self.esd,
bg_rfl=data["background_reflectance_file"],
svf=data["skyview_factor_file"],
)
self.current_input_data.geom = geom
self.current_input_data.reference_reflectance = data[
"reference_reflectance_file"
]
return self.current_input_data
[docs]
def flush_buffers(self):
"""Write all buffered output data to disk, and erase read buffers."""
for file_dictionary in [self.input_datasets, self.output_datasets]:
for name, fi in file_dictionary.items():
fi.flush_buffers()
self.reads = 0
self.writes = 0
[docs]
def write_datasets(
self, row: int, col: int, output: dict, states: List, flush_immediately=False
):
"""
Write all valid datasets to disk (possibly buffered).
Args:
row: row to write to
col: column to write to
output: dictionary with keys corresponding to config.input file references
states: results states from inversion. In the MCMC case, these are interpreted as samples from the
posterior, otherwise they are a gradient descent trajectory (with the last spectrum being the converged
solution).
flush_immediately: IO argument telling us to immediately write to disk, ignoring config settings
"""
for product in self.output_datasets:
logging.debug("IO: Writing " + product)
self.output_datasets[product].write_spectrum(row, col, output[product])
# Special case! samples file is matlab format.
if self.config.output.mcmc_samples_file is not None:
logging.debug("IO: Writing mcmc_samples_file")
mdict = {"samples": states}
scipy.io.savemat(self.config.output.mcmc_samples_file, mdict)
self.writes += 1
if self.writes >= self.flush_rate or flush_immediately:
self.flush_buffers()
[docs]
def build_output(
self,
states: List,
input_data: InputData,
fm: ForwardModel,
iv: Inversion,
fill_value=-9999.0,
):
"""
Build the output to be written to disk as a dictionary
Args:
states: results states from inversion. In the MCMC case, these are interpreted as samples from the
posterior, otherwise they are a gradient descent trajectory (with the last spectrum being the converged
solution).
input_data: an InputData object
fm: the forward model used to solve the inversion
iv: the inversion object
"""
if len(states) == 0:
# Write a bad data flag
atm_bad = np.zeros(len(fm.instrument.n_chan) * 5) + fill_value
state_bad = np.zeros(len(fm.statevec)) + fill_value
data_bad = np.zeros(fm.instrument.n_chan) + fill_value
to_write = {
"estimated_state_file": state_bad,
"estimated_reflectance_file": data_bad,
"estimated_emission_file": data_bad,
"modeled_radiance_file": data_bad,
"apparent_reflectance_file": data_bad,
"path_radiance_file": data_bad,
"simulated_measurement_file": data_bad,
"algebraic_inverse_file": data_bad,
"atmospheric_coefficients_file": atm_bad,
"radiometry_correction_file": data_bad,
"spectral_calibration_file": data_bad,
"posterior_uncertainty_file": state_bad,
}
else:
to_write = {}
meas = input_data.meas
geom = input_data.geom
reference_reflectance = input_data.reference_reflectance
if self.config.implementation.mode == "inversion_mcmc":
state_est = states.mean(axis=0)
else:
state_est = states[-1, :]
# Make important check that fm.statevec !> full_statevec.
if len(fm.statevec) > len(self.full_statevec):
logging.error(
"Length of the output statevector is shorter than the "
"forward model currently being written. Potential issue "
"with initialization of IO class."
)
raise IOError("len(fm.statevec) > len(self.full_statevec)")
############ Start with all of the 'independent' calculations
if "estimated_state_file" in self.output_datasets:
# state_est transformed to reflect io.full_statevec
to_write["estimated_state_file"] = match_statevector(
state_est, self.full_statevec, fm.statevec
)
if "path_radiance_file" in self.output_datasets:
# Note: for glint models, this will return atm + glint
path_est = fm.calc_meas(
state_est, geom, rfl=np.zeros(self.meas_wl.shape)
)
to_write["path_radiance_file"] = np.column_stack(
(fm.instrument.wl_init, path_est)
)
if "spectral_calibration_file" in self.output_datasets:
# Spectral calibration
wl, fwhm = fm.calibration(state_est)
cal = np.column_stack(
[
np.arange(0, len(wl)),
units.nm_to_micron(wl),
units.nm_to_micron(fwhm),
]
)
to_write["spectral_calibration_file"] = cal
if "posterior_uncertainty_file" in self.output_datasets:
S_hat, K, G = iv.calc_posterior(state_est, geom, meas)
to_write["posterior_uncertainty_file"] = match_statevector(
np.sqrt(np.diag(S_hat)), self.full_statevec, fm.statevec
)
############ Now proceed to the calcs where they may be some overlap
if any(
item in ["estimated_emission_file", "apparent_reflectance_file"]
for item in self.output_datasets
):
Ls_est = fm.calc_Ls(state_est, geom)
if any(
item
in [
"estimated_reflectance_file",
"apparent_reflectance_file",
"modeled_radiance_file",
"simulated_measurement_file",
]
for item in self.output_datasets
):
lamb_est = fm.calc_lamb(state_est, geom)
if any(
item in ["modeled_radiance_file", "simulated_measurement_file"]
for item in self.output_datasets
):
meas_est = fm.calc_meas(state_est, geom)
if "modeled_radiance_file" in self.output_datasets:
to_write["modeled_radiance_file"] = np.column_stack(
(fm.instrument.wl_init, meas_est)
)
if "simulated_measurement_file" in self.output_datasets:
meas_sim = fm.instrument.simulate_measurement(meas_est, geom)
to_write["simulated_measurement_file"] = np.column_stack(
(self.meas_wl, meas_sim)
)
if "estimated_emission_file" in self.output_datasets:
to_write["estimated_emission_file"] = np.column_stack(
(self.meas_wl, Ls_est)
)
if "estimated_reflectance_file" in self.output_datasets:
to_write["estimated_reflectance_file"] = np.column_stack(
(self.meas_wl, lamb_est)
)
if "apparent_reflectance_file" in self.output_datasets:
# Upward emission & glint and apparent reflectance
apparent_rfl_est = lamb_est + Ls_est
to_write["apparent_reflectance_file"] = np.column_stack(
(self.meas_wl, apparent_rfl_est)
)
x_surface, x_RT, x_instrument = fm.unpack(state_est)
if any(
item in ["algebraic_inverse_file", "atmospheric_coefficients_file"]
for item in self.output_datasets
):
rfl_alg_opt, coeffs = invert_algebraic(
fm.surface,
fm.RT,
fm.instrument,
x_surface,
x_RT,
x_instrument,
meas,
geom,
)
if "algebraic_inverse_file" in self.output_datasets:
to_write["algebraic_inverse_file"] = np.column_stack(
(self.meas_wl, rfl_alg_opt)
)
if "atmospheric_coefficients_file" in self.output_datasets:
rhoatm, sphalb, L_tot, transup, L_Up = coeffs
verified_geom = geom.verify(fm.RT.coszen)
coszen, cos_i = verified_geom["coszen"], verified_geom["cos_i"]
solar_irr = fm.RT.rt_engines[0].solar_irr
atm_vars = [rhoatm, sphalb, L_tot, solar_irr]
atm = np.column_stack(
atm_vars + [np.ones((len(self.meas_wl), 1)) * coszen]
)
atm = atm.T.reshape((len(self.meas_wl) * 5,))
to_write["atmospheric_coefficients_file"] = atm
if "radiometry_correction_file" in self.output_datasets:
factors = np.ones(len(self.meas_wl))
if "reference_reflectance_file" in self.input_datasets:
meas_est = fm.calc_meas(state_est, geom, rfl=reference_reflectance)
factors = meas_est / meas
else:
logging.warning("No reflectance reference")
to_write["radiometry_correction_file"] = factors
return to_write
[docs]
def write_spectrum(
self,
row: int,
col: int,
states: List,
fm: ForwardModel,
iv: Inversion,
flush_immediately=False,
input_data: InputData = None,
):
"""
Convenience function to build and write output in one step
Args:
row: data row to write
col: data column to write
states: results states from inversion. In the MCMC case, these are interpreted as samples from the
posterior, otherwise they are a gradient descent trajectory (with the last spectrum being the converged
solution).
meas: measurement radiance
geom: geometry object of the observation
fm: the forward model used to solve the inversion
iv: the inversion object
flush_immediately: IO argument telling us to immediately write to disk, ignoring config settings
input_data: optionally overwride self.current_input_data
"""
if input_data is None:
to_write = self.build_output(states, self.current_input_data, fm, iv)
else:
to_write = self.build_output(states, input_data, fm, iv)
self.write_datasets(
row, col, to_write, states, flush_immediately=flush_immediately
)
@staticmethod
[docs]
def initialize_output_files(config, n_rows, n_cols, full_statevector):
wl_init, fwhm_init = load_wavelen(
config.forward_model.instrument.wavelength_file
)
wl_names = [("Channel %i" % i) for i in range(len(wl_init))]
bbl = "{" + ",".join([str(1) for n in range(len(wl_init))]) + "}"
engine_name = (
config.forward_model.radiative_transfer.radiative_transfer_engines[
0
].engine_name
)
for element, element_header, element_name in zip(
*config.output.get_output_files()
):
band_names, ztitle, zrange = element_header
if band_names == "statevector":
band_names = full_statevector
elif band_names == "wavelength":
band_names = wl_names
elif band_names == "atm_coeffs":
band_names = wl_names * 5
else:
band_names = "{}"
n_bands = len(band_names)
_ = SpectrumFile(
element,
write=True,
n_rows=n_rows,
n_cols=n_cols,
n_bands=n_bands,
interleave="bip",
dtype=np.float32,
wavelengths=wl_init,
fwhm=fwhm_init,
band_names=band_names,
bad_bands=bbl,
map_info="{}",
zrange=zrange,
ztitles=ztitle,
engine_name=engine_name,
isofit_version=config.implementation.isofit_version,
)
@staticmethod
[docs]
def load_esd(file=None):
"""
Loads an earth_sun_distance file. Defaults to the
[env.data]/earth_sun_distance.txt if not provided
Parameters
----------
file : str, default=None
ESD file to load
Returns
-------
np.array
Loaded ESD. If the file fails to load, creates a default
"""
if file is None:
file = env.path("data", "earth_sun_distance.txt")
try:
esd = np.loadtxt(file)
logging.debug(f"Loaded ESD from file: {file}")
except FileNotFoundError:
logging.warning(
"Earth-sun-distance file not found on system. "
"Proceeding without might cause some inaccuracies down the line."
)
esd = np.ones((366, 2))
esd[:, 0] = np.arange(1, 367, 1)
return esd
[docs]
def write_bil_chunk(
dat: np.array, outfile: str, line: int, shape: tuple, dtype: str = "float32"
) -> None:
"""
Write a chunk of data to a binary, BIL formatted data cube.
Args:
dat: data to write
outfile: output file to write to
line: line of the output file to write to
shape: shape of the output file
dtype: output data type
Returns:
None
"""
outfile = open(outfile, "rb+")
outfile.seek(line * shape[1] * shape[2] * np.dtype(dtype).itemsize)
outfile.write(dat.astype(dtype).tobytes())
outfile.close()
[docs]
def initialize_output(output_metadata, outpath, out_shape, **kwargs):
"""
Initialize output file by updating metadata and creating object.
Args:
output_metadata: dict - Dictionary with envi header information
outpath: str - path to output file
out_shape: tuple - dimensions of initialized file
keys_to_del: list - keys to remove from output_metadata
kwargs - key-argument pairs to add to output_metadata
"""
for key, value in kwargs.items():
output_metadata[key] = value
out_file = envi.create_image(
envi_header(outpath), ext="", metadata=output_metadata, force=True
)
out_mm = out_file.open_memmap(interleave="source", writable=True)
out_mm[:, :] = np.zeros(out_shape, dtype=np.float32)
del out_file
return outpath