Source code for isofit.core.fileio

#! /usr/bin/env python3
#
#  Copyright 2018 California Institute of Technology
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#
# ISOFIT: Imaging Spectrometer Optimal FITting
# Author: David R Thompson, david.r.thompson@jpl.nasa.gov
#
from __future__ import annotations

import logging
import os
from collections import OrderedDict
from typing import List

import numpy as np
import scipy.io
import xarray as xr
from spectral.io import envi

import isofit
from isofit.core import units
from isofit.core.common import (
    envi_header,
    eps,
    load_spectrum,
    load_wavelen,
    resample_spectrum,
)
from isofit.core.geometry import Geometry
from isofit.core.multistate import match_statevector
from isofit.data import env
from isofit.inversion.inverse_simple import invert_algebraic

### Variables ###
[docs] Logger = logging.getLogger(__file__)
# Constants related to file I/O
[docs] typemap = { np.uint8: 1, np.int16: 2, np.int32: 3, np.float32: 4, np.float64: 5, np.complex64: 6, np.complex128: 9, np.uint16: 12, np.uint32: 13, np.int64: 14, np.uint64: 15, }
[docs] max_frames_size = 100
### Classes ###
[docs] class SpectrumFile: """A buffered file object that contains configuration information about formatting, etc.""" def __init__( self, fname, write=False, n_rows=None, n_cols=None, n_bands=None, interleave=None, dtype=np.float32, wavelengths=None, fwhm=None, band_names=None, bad_bands="[]", zrange="{0.0, 1.0}", flag=-9999.0, ztitles="{Wavelength (nm), Magnitude}", map_info="{}", engine_name=None, isofit_version=None, ): """."""
[docs] self.frames = OrderedDict()
[docs] self.write = write
[docs] self.fname = os.path.abspath(fname)
[docs] self.wl = wavelengths
[docs] self.band_names = band_names
[docs] self.fwhm = fwhm
[docs] self.flag = flag
[docs] self.n_rows = n_rows
[docs] self.n_cols = n_cols
[docs] self.n_bands = n_bands
if self.fname.endswith(".txt"): # The .txt suffix implies a space-separated ASCII text file of # one or more data columns. This is cheap to load and store, so # we do not defer read/write operations. logging.debug("Inferred ASCII file format for %s" % self.fname) self.format = "ASCII" if not self.write: self.data, self.wl = load_spectrum(self.fname) self.n_rows, self.n_cols, self.map_info = 1, 1, "{}" if self.wl is not None: self.n_bands = len(self.wl) else: self.n_bands = None self.meta = {} elif self.fname.endswith(".mat"): # The .mat suffix implies a matlab-style file, i.e. a dictionary # of 2D arrays and other matlab-like objects. This is typically # only used for specific output products associated with single # spectrum retrievals; there is no read option. logging.debug("Inferred MATLAB file format for %s" % self.fname) self.format = "MATLAB" if not self.write: logging.error("Unsupported MATLAB file in input block") raise IOError("MATLAB format in input block not supported") elif self.fname.endswith(".nc"): logging.debug(f"Inferred NETCDF file format for {self.fname}") self.format = "NETCDF" if not self.write: self.dataset = xr.open_dataset(self.fname) self.data = self.dataset.radiance self.meta = self.data.attrs self.n_rows = self.data.downtrack.size self.n_cols = self.data.crosstrack.size self.n_bands = self.data.bands.size else: # EMIT-specific metadata meta = { "ncei_template_version": "NCEI_NetCDF_Swath_Template_v2.0", "summary": "The Earth Surface Mineral Dust Source Investigation (EMIT) is an Earth Ventures-Instrument (EVI-4) Mission that maps the surface mineralogy of arid dust source regions via imaging spectroscopy in the visible and short-wave infrared (VSWIR). Installed on the International Space Station (ISS), the EMIT instrument is a Dyson imaging spectrometer that uses contiguous spectroscopic measurements from 410 to 2450 nm to resolve absoprtion features of iron oxides, clays, sulfates, carbonates, and other dust-forming minerals. During its one-year mission, EMIT will observe the sunlit Earth's dust source regions that occur within +/-52° latitude and produce maps of the source regions that can be used to improve forecasts of the role of mineral dust in the radiative forcing (warming or cooling) of the atmosphere.\\n\\nThis file contains L2A estimated surface reflectances and geolocation data. Reflectance estimates are created using an Optimal Estimation technique - see ATBD for details. Reflectance values are reported as fractions (relative to 1).", "keywords": "Imaging Spectroscopy, minerals, EMIT, dust, radiative forcing", "Conventions": "CF-1.63", "sensor": "EMIT (Earth Surface Mineral Dust Source Investigation)", "instrument": "EMIT", "platform": "ISS", "institution": "NASA Jet Propulsion Laboratory/California Institute of Technology", "license": "", "naming_authority": "", "date_created": str(dtt.now()), "keywords_vocabulary": "NASA Global Change Master Directory (GCMD) Science Keywords", "stdname_vocabulary": "NetCDF Climate and Forecast (CF) Metadata Convention", "creator_name": "ISOFIT", "creator_url": "", "project": "Earth Surface Mineral Dust Source Investigation", "project_url": "https://emit.jpl.nasa.gov/", "publisher_name": "", "publisher_url": "", "publisher_email": "", "identifier_product_doi_authority": "https://doi.org", "flight_line": "", "time_coverage_start": "", "time_coverage_end": "", "software_build_version": "", "software_delivery_version": "", "product_version": "", "history": "ISOFIT generated", "crosstrack_orientation": "", "easternmost_longitude": None, "northernmost_latitude": None, "westernmost_longitude": None, "southernmost_latitude": None, "spatialResolution": None, "spatial_ref": "", "geotransform": [], "day_night_flag": "", "title": "EMIT L2A Estimated Surface Reflectance", } self.n_rows = n_rows self.n_cols = n_cols self.n_bands = n_bands self.dataset = xr.Dataset( { "reflectance": ( ("downtrack", "crosstrack", "bands"), np.zeros((n_rows, n_cols, n_bands)), ) } ) self.dataset.attrs = meta self.data = self.dataset.reflectance # Initialize the file self.dataset.to_netcdf(self.fname) else: # Otherwise we assume it is an ENVI-format file, which is # basically just a binary data cube with a detached human- # readable ASCII header describing dimensions, interleave, and # metadata. We buffer this data in self.frames, reading and # writing individual rows of the cube on-demand. logging.debug("Inferred ENVI file format for %s" % self.fname) self.format = "ENVI" if not self.write: # If we are an input file, the header must preexist. if not os.path.exists(envi_header(self.fname)): logging.error("Could not find %s" % (envi_header(self.fname))) raise IOError("Could not find %s" % (envi_header(self.fname))) # open file and copy metadata self.file = envi.open(envi_header(self.fname), fname) self.meta = self.file.metadata.copy() self.n_rows = int(self.meta["lines"]) self.n_cols = int(self.meta["samples"]) self.n_bands = int(self.meta["bands"]) if "data ignore value" in self.meta: self.flag = float(self.meta["data ignore value"]) else: self.flag = -9999.0 else: # If we are an output file, we may have to build the header # from scratch. Hopefully the caller has supplied the # necessary metadata details. meta = { "description": ( f"L2A per-pixel surface retrieval (engine={engine_name}, isofit_version={isofit_version})" ), "lines": n_rows, "samples": n_cols, "bands": n_bands, "byte order": 0, "header offset": 0, "map info": map_info, "file_type": "ENVI Standard", "sensor type": "unknown", "interleave": interleave, "data type": typemap[dtype], "wavelength units": "Nanometers", "z plot range": zrange, "z plot titles": ztitles, "fwhm": fwhm, "bbl": bad_bands, "band names": band_names, "wavelength": self.wl, "data ignore value": self.flag, } for k, v in meta.items(): if v is None: logging.error("Must specify %s" % (k)) raise IOError("Must specify %s" % (k)) if os.path.isfile(envi_header(fname)) is False: self.file = envi.create_image( envi_header(fname), meta, ext="", force=True ) else: self.file = envi.open(envi_header(fname)) self.open_map_with_retries()
[docs] def open_map_with_retries(self): """Try to open a memory map, handling Beowulf I/O issues.""" self.memmap = None for attempt in range(10): self.memmap = self.file.open_memmap(interleave="bip", writable=self.write) if self.memmap is not None: return raise IOError("could not open memmap for " + self.fname)
[docs] def get_frame(self, row): """The frame is a 2D array, essentially a list of spectra. The self.frames list acts as a hash table to avoid storing the entire cube in memory. So we read them or create them on an as-needed basis. When the buffer flushes via a call to flush_buffers, they will be deleted.""" if row not in self.frames: if not self.write: d = self.memmap[row, :, :] self.frames[row] = d.copy() else: self.frames[row] = np.nan * np.zeros((self.n_cols, self.n_bands)) return self.frames[row]
[docs] def write_spectrum(self, row, col, x): """We write a spectrum. If a binary format file, we simply change the data cached in self.frames and defer file I/O until flush_buffers is called.""" if self.format == "ASCII": # Multicolumn output for ASCII products np.savetxt(self.fname, x, fmt="%10.6f") elif self.format == "MATLAB": # Dictionary output for MATLAB products scipy.io.savemat(self.fname, x) elif self.format == "NETCDF": self.data[row, col][:] = x else: # Omit wavelength column for spectral products frame = self.get_frame(row) if x.ndim == 2: x = x[:, -1] frame[col, :] = x
[docs] def read_spectrum(self, row, col): """Get a spectrum from the frame list or ASCII file. Note that if we are an ASCII file, we have already read the single spectrum and return it as-is (ignoring the row/column indices).""" if self.format == "ASCII": return self.data elif self.format == "NETCDF": return self.data.sel(downtrack=row, crosstrack=col).data else: frame = self.get_frame(row) return frame[col]
[docs] def flush_buffers(self): """Write to file, and refresh the memory map object.""" if self.format == "ENVI": if self.write: for row, frame in self.frames.items(): valid = np.logical_not(np.isnan(frame[:, 0])) self.memmap[row, valid, :] = frame[valid, :] self.frames = OrderedDict() del self.file self.file = envi.open(envi_header(self.fname), self.fname) self.open_map_with_retries() elif self.format == "NETCDF": self.dataset.to_netcdf(self.fname)
[docs] class InputData: def __init__(self):
[docs] self.meas = None
[docs] self.geom = None
[docs] self.reference_reflectance = None
[docs] def clear(self): self.__init__()
[docs] class IO: """...""" def __init__(self, config: Config, forward: ForwardModel, full_statevec: list = []): """Initialization specifies retrieval subwindows for calculating measurement cost distributions."""
[docs] self.config = config
[docs] self.radiance_correction = None
[docs] self.meas_wl = forward.instrument.wl_init
[docs] self.meas_fwhm = forward.instrument.fwhm_init
[docs] self.writes = 0
[docs] self.reads = 0
[docs] self.n_rows = 1
[docs] self.n_cols = 1
[docs] self.bbl = "{" + ",".join([str(1) for n in range(len(self.meas_wl))]) + "}"
[docs] self.engine_name = ( config.forward_model.radiative_transfer.radiative_transfer_engines[ 0 ].engine_name )
# Use the pre-defined full statevec if len(full_statevec): self.full_statevec = full_statevec else: self.full_statevec = forward.statevec
[docs] self.n_sv = len(self.full_statevec)
[docs] self.n_chan = len(self.meas_wl)
[docs] self.flush_rate = config.implementation.io_buffer_size
[docs] self.simulation_mode = config.implementation.mode == "simulation"
[docs] self.total_time = 0
[docs] self.current_input_data = InputData()
# Names of either the wavelength or statevector outputs wl_names = [("Channel %i" % i) for i in range(self.n_chan)] sv_names = self.full_statevec.copy() self.input_datasets, self.output_datasets, self.map_info = {}, {}, "{}" # Load input files and record relevant metadata for element, element_name in zip(*self.config.input.get_elements()): self.input_datasets[element_name] = SpectrumFile(element) if (self.input_datasets[element_name].n_rows > self.n_rows) or ( self.input_datasets[element_name].n_cols > self.n_cols ): self.n_rows = self.input_datasets[element_name].n_rows self.n_cols = self.input_datasets[element_name].n_cols for inherit in ["map info", "bbl"]: if inherit in self.input_datasets[element_name].meta: setattr( self, inherit.replace(" ", "_"), self.input_datasets[element_name].meta[inherit], ) for element, element_header, element_name in zip( *self.config.output.get_output_files() ): band_names, ztitle, zrange = element_header if band_names == "statevector": band_names = sv_names elif band_names == "wavelength": band_names = wl_names elif band_names == "atm_coeffs": band_names = wl_names * 5 else: band_names = "{}" n_bands = len(band_names) self.output_datasets[element_name] = SpectrumFile( element, write=True, n_rows=self.n_rows, n_cols=self.n_cols, n_bands=n_bands, interleave="bip", dtype=np.float32, wavelengths=self.meas_wl, fwhm=self.meas_fwhm, band_names=band_names, bad_bands=self.bbl, map_info=self.map_info, zrange=zrange, ztitles=ztitle, engine_name=self.engine_name, isofit_version=config.implementation.isofit_version, ) # Do we apply a radiance correction? if self.config.input.radiometry_correction_file is not None: filename = self.config.input.radiometry_correction_file self.radiance_correction, wl = load_spectrum(filename) # Load the earth sun distance data
[docs] self.esd = self.load_esd()
[docs] def get_components_at_index(self, row: int, col: int) -> InputData: """ Load data from input files at the specified (row, col) index. Args: row: row to retrieve data from col: column to retrieve data from Returns: InputData: object containing all current data reads """ # Prepare out input data object by blanking it out self.current_input_data.clear() # Determine the appropriate row, column index. and initialize the # data dictionary with empty entries. data = dict([(i, None) for i in self.config.input.get_all_element_names()]) logging.debug(f"Row {row} Column {col}") # Read data from any of the input files that are defined. for source in self.input_datasets: data[source] = self.input_datasets[source].read_spectrum(row, col) self.reads += 1 if self.reads >= self.flush_rate: self.flush_buffers() if self.simulation_mode: # If solving the inverse problem, the measurment is the surface reflectance meas = data["reflectance_file"].copy() else: # If solving the inverse problem, the measurment is the radiance # We apply the calibration correciton here for simplicity. meas = data["measured_radiance_file"] if meas is not None: meas = meas.copy() if data["radiometry_correction_file"] is not None: meas *= data["radiometry_correction_file"] self.current_input_data.meas = meas if self.current_input_data.meas is None or np.all( self.current_input_data.meas < -49 ): return None ## Check for any bad data flags for source in self.input_datasets: if np.allclose(data[source], self.input_datasets[source].flag): return None # Check if Sky view is used, else it is equal to 1.0. if "skyview_factor_file" not in data or data["skyview_factor_file"] is None: data["skyview_factor_file"] = 1.0 # We build the geometry object for this spectrum. For files not # specified in the input configuration block, the associated entries # will be 'None'. The Geometry object will use reasonable defaults. geom = Geometry( obs=data["obs_file"], loc=data["loc_file"], esd=self.esd, bg_rfl=data["background_reflectance_file"], svf=data["skyview_factor_file"], ) self.current_input_data.geom = geom self.current_input_data.reference_reflectance = data[ "reference_reflectance_file" ] return self.current_input_data
[docs] def flush_buffers(self): """Write all buffered output data to disk, and erase read buffers.""" for file_dictionary in [self.input_datasets, self.output_datasets]: for name, fi in file_dictionary.items(): fi.flush_buffers() self.reads = 0 self.writes = 0
[docs] def write_datasets( self, row: int, col: int, output: dict, states: List, flush_immediately=False ): """ Write all valid datasets to disk (possibly buffered). Args: row: row to write to col: column to write to output: dictionary with keys corresponding to config.input file references states: results states from inversion. In the MCMC case, these are interpreted as samples from the posterior, otherwise they are a gradient descent trajectory (with the last spectrum being the converged solution). flush_immediately: IO argument telling us to immediately write to disk, ignoring config settings """ for product in self.output_datasets: logging.debug("IO: Writing " + product) self.output_datasets[product].write_spectrum(row, col, output[product]) # Special case! samples file is matlab format. if self.config.output.mcmc_samples_file is not None: logging.debug("IO: Writing mcmc_samples_file") mdict = {"samples": states} scipy.io.savemat(self.config.output.mcmc_samples_file, mdict) self.writes += 1 if self.writes >= self.flush_rate or flush_immediately: self.flush_buffers()
[docs] def build_output( self, states: List, input_data: InputData, fm: ForwardModel, iv: Inversion, fill_value=-9999.0, ): """ Build the output to be written to disk as a dictionary Args: states: results states from inversion. In the MCMC case, these are interpreted as samples from the posterior, otherwise they are a gradient descent trajectory (with the last spectrum being the converged solution). input_data: an InputData object fm: the forward model used to solve the inversion iv: the inversion object """ if len(states) == 0: # Write a bad data flag atm_bad = np.zeros(len(fm.instrument.n_chan) * 5) + fill_value state_bad = np.zeros(len(fm.statevec)) + fill_value data_bad = np.zeros(fm.instrument.n_chan) + fill_value to_write = { "estimated_state_file": state_bad, "estimated_reflectance_file": data_bad, "estimated_emission_file": data_bad, "modeled_radiance_file": data_bad, "apparent_reflectance_file": data_bad, "path_radiance_file": data_bad, "simulated_measurement_file": data_bad, "algebraic_inverse_file": data_bad, "atmospheric_coefficients_file": atm_bad, "radiometry_correction_file": data_bad, "spectral_calibration_file": data_bad, "posterior_uncertainty_file": state_bad, } else: to_write = {} meas = input_data.meas geom = input_data.geom reference_reflectance = input_data.reference_reflectance if self.config.implementation.mode == "inversion_mcmc": state_est = states.mean(axis=0) else: state_est = states[-1, :] # Make important check that fm.statevec !> full_statevec. if len(fm.statevec) > len(self.full_statevec): logging.error( "Length of the output statevector is shorter than the " "forward model currently being written. Potential issue " "with initialization of IO class." ) raise IOError("len(fm.statevec) > len(self.full_statevec)") ############ Start with all of the 'independent' calculations if "estimated_state_file" in self.output_datasets: # state_est transformed to reflect io.full_statevec to_write["estimated_state_file"] = match_statevector( state_est, self.full_statevec, fm.statevec ) if "path_radiance_file" in self.output_datasets: # Note: for glint models, this will return atm + glint path_est = fm.calc_meas( state_est, geom, rfl=np.zeros(self.meas_wl.shape) ) to_write["path_radiance_file"] = np.column_stack( (fm.instrument.wl_init, path_est) ) if "spectral_calibration_file" in self.output_datasets: # Spectral calibration wl, fwhm = fm.calibration(state_est) cal = np.column_stack( [ np.arange(0, len(wl)), units.nm_to_micron(wl), units.nm_to_micron(fwhm), ] ) to_write["spectral_calibration_file"] = cal if "posterior_uncertainty_file" in self.output_datasets: S_hat, K, G = iv.calc_posterior(state_est, geom, meas) to_write["posterior_uncertainty_file"] = match_statevector( np.sqrt(np.diag(S_hat)), self.full_statevec, fm.statevec ) ############ Now proceed to the calcs where they may be some overlap if any( item in ["estimated_emission_file", "apparent_reflectance_file"] for item in self.output_datasets ): Ls_est = fm.calc_Ls(state_est, geom) if any( item in [ "estimated_reflectance_file", "apparent_reflectance_file", "modeled_radiance_file", "simulated_measurement_file", ] for item in self.output_datasets ): lamb_est = fm.calc_lamb(state_est, geom) if any( item in ["modeled_radiance_file", "simulated_measurement_file"] for item in self.output_datasets ): meas_est = fm.calc_meas(state_est, geom) if "modeled_radiance_file" in self.output_datasets: to_write["modeled_radiance_file"] = np.column_stack( (fm.instrument.wl_init, meas_est) ) if "simulated_measurement_file" in self.output_datasets: meas_sim = fm.instrument.simulate_measurement(meas_est, geom) to_write["simulated_measurement_file"] = np.column_stack( (self.meas_wl, meas_sim) ) if "estimated_emission_file" in self.output_datasets: to_write["estimated_emission_file"] = np.column_stack( (self.meas_wl, Ls_est) ) if "estimated_reflectance_file" in self.output_datasets: to_write["estimated_reflectance_file"] = np.column_stack( (self.meas_wl, lamb_est) ) if "apparent_reflectance_file" in self.output_datasets: # Upward emission & glint and apparent reflectance apparent_rfl_est = lamb_est + Ls_est to_write["apparent_reflectance_file"] = np.column_stack( (self.meas_wl, apparent_rfl_est) ) x_surface, x_RT, x_instrument = fm.unpack(state_est) if any( item in ["algebraic_inverse_file", "atmospheric_coefficients_file"] for item in self.output_datasets ): rfl_alg_opt, coeffs = invert_algebraic( fm.surface, fm.RT, fm.instrument, x_surface, x_RT, x_instrument, meas, geom, ) if "algebraic_inverse_file" in self.output_datasets: to_write["algebraic_inverse_file"] = np.column_stack( (self.meas_wl, rfl_alg_opt) ) if "atmospheric_coefficients_file" in self.output_datasets: rhoatm, sphalb, L_tot, transup, L_Up = coeffs verified_geom = geom.verify(fm.RT.coszen) coszen, cos_i = verified_geom["coszen"], verified_geom["cos_i"] solar_irr = fm.RT.rt_engines[0].solar_irr atm_vars = [rhoatm, sphalb, L_tot, solar_irr] atm = np.column_stack( atm_vars + [np.ones((len(self.meas_wl), 1)) * coszen] ) atm = atm.T.reshape((len(self.meas_wl) * 5,)) to_write["atmospheric_coefficients_file"] = atm if "radiometry_correction_file" in self.output_datasets: factors = np.ones(len(self.meas_wl)) if "reference_reflectance_file" in self.input_datasets: meas_est = fm.calc_meas(state_est, geom, rfl=reference_reflectance) factors = meas_est / meas else: logging.warning("No reflectance reference") to_write["radiometry_correction_file"] = factors return to_write
[docs] def write_spectrum( self, row: int, col: int, states: List, fm: ForwardModel, iv: Inversion, flush_immediately=False, input_data: InputData = None, ): """ Convenience function to build and write output in one step Args: row: data row to write col: data column to write states: results states from inversion. In the MCMC case, these are interpreted as samples from the posterior, otherwise they are a gradient descent trajectory (with the last spectrum being the converged solution). meas: measurement radiance geom: geometry object of the observation fm: the forward model used to solve the inversion iv: the inversion object flush_immediately: IO argument telling us to immediately write to disk, ignoring config settings input_data: optionally overwride self.current_input_data """ if input_data is None: to_write = self.build_output(states, self.current_input_data, fm, iv) else: to_write = self.build_output(states, input_data, fm, iv) self.write_datasets( row, col, to_write, states, flush_immediately=flush_immediately )
@staticmethod
[docs] def initialize_output_files(config, n_rows, n_cols, full_statevector): wl_init, fwhm_init = load_wavelen( config.forward_model.instrument.wavelength_file ) wl_names = [("Channel %i" % i) for i in range(len(wl_init))] bbl = "{" + ",".join([str(1) for n in range(len(wl_init))]) + "}" engine_name = ( config.forward_model.radiative_transfer.radiative_transfer_engines[ 0 ].engine_name ) for element, element_header, element_name in zip( *config.output.get_output_files() ): band_names, ztitle, zrange = element_header if band_names == "statevector": band_names = full_statevector elif band_names == "wavelength": band_names = wl_names elif band_names == "atm_coeffs": band_names = wl_names * 5 else: band_names = "{}" n_bands = len(band_names) _ = SpectrumFile( element, write=True, n_rows=n_rows, n_cols=n_cols, n_bands=n_bands, interleave="bip", dtype=np.float32, wavelengths=wl_init, fwhm=fwhm_init, band_names=band_names, bad_bands=bbl, map_info="{}", zrange=zrange, ztitles=ztitle, engine_name=engine_name, isofit_version=config.implementation.isofit_version, )
@staticmethod
[docs] def load_esd(file=None): """ Loads an earth_sun_distance file. Defaults to the [env.data]/earth_sun_distance.txt if not provided Parameters ---------- file : str, default=None ESD file to load Returns ------- np.array Loaded ESD. If the file fails to load, creates a default """ if file is None: file = env.path("data", "earth_sun_distance.txt") try: esd = np.loadtxt(file) logging.debug(f"Loaded ESD from file: {file}") except FileNotFoundError: logging.warning( "Earth-sun-distance file not found on system. " "Proceeding without might cause some inaccuracies down the line." ) esd = np.ones((366, 2)) esd[:, 0] = np.arange(1, 367, 1) return esd
[docs] def write_bil_chunk( dat: np.array, outfile: str, line: int, shape: tuple, dtype: str = "float32" ) -> None: """ Write a chunk of data to a binary, BIL formatted data cube. Args: dat: data to write outfile: output file to write to line: line of the output file to write to shape: shape of the output file dtype: output data type Returns: None """ outfile = open(outfile, "rb+") outfile.seek(line * shape[1] * shape[2] * np.dtype(dtype).itemsize) outfile.write(dat.astype(dtype).tobytes()) outfile.close()
[docs] def initialize_output(output_metadata, outpath, out_shape, **kwargs): """ Initialize output file by updating metadata and creating object. Args: output_metadata: dict - Dictionary with envi header information outpath: str - path to output file out_shape: tuple - dimensions of initialized file keys_to_del: list - keys to remove from output_metadata kwargs - key-argument pairs to add to output_metadata """ for key, value in kwargs.items(): output_metadata[key] = value out_file = envi.create_image( envi_header(outpath), ext="", metadata=output_metadata, force=True ) out_mm = out_file.open_memmap(interleave="source", writable=True) out_mm[:, :] = np.zeros(out_shape, dtype=np.float32) del out_file return outpath