Source code for isofit.utils.extractions
#! /usr/bin/env python3
#
# Copyright 2019 California Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ISOFIT: Imaging Spectrometer Optimal FITting
# Author: David R Thompson, david.r.thompson@jpl.nasa.gov
#
from __future__ import annotations
import logging
import numpy as np
from spectral.io import envi
from isofit import ray
from isofit.core.common import envi_header
from isofit.core.fileio import write_bil_chunk
@ray.remote(num_cpus=1)
[docs]
def extract_chunk(
lstart: int,
lend: int,
in_file: str,
labels: np.array,
flag: float,
reducer: FunctionType,
logfile=None,
loglevel="INFO",
):
"""
Extract a small chunk of the image
Args:
lstart: line to start extraction at
lend: line to end extraction at
in_file: file to read image from
labels: labels to use for data read
flag: nodata value of image
logfile: logging file name
loglevel: logging level
Returns:
out_index: array of output indices (based on labels)
out_data: array of output data
"""
logging.basicConfig(
format="%(levelname)s:%(asctime)s ||| %(message)s",
level=loglevel,
filename=logfile,
datefmt="%Y-%m-%d,%H:%M:%S",
)
logging.info(f"{lstart}: starting")
in_img = envi.open(envi_header(in_file))
img_mm = in_img.open_memmap(interleave="bip", writable=False)
# Which labels will we extract? ignore zero index
active = labels[lstart:lend, :]
active = active[active >= 1]
active = np.unique(active)
logging.debug(f"{lstart}: found {len(active)} unique labels")
if len(active) == 0:
return None, None
# Handle labels extending outside our chunk by expanding margins
cs = lend - lstart
boundary_min = max(lstart - cs, 0)
boundary_max = min(lend + cs, labels.shape[0])
active_area = np.zeros((boundary_max - boundary_min, labels.shape[1]))
for i in active:
active_area[labels[boundary_min:boundary_max, :] == i] = True
active_locs = np.where(active_area)
lstart_adjust = min(active_locs[0]) + boundary_min
lend_adjust = max(active_locs[0]) + boundary_min + 1
cstart_adjust = min(active_locs[1])
cend_adjust = max(active_locs[1]) + 1
logging.debug(
f"{lstart} area subset: {lstart_adjust}, {lend_adjust} :::: {cstart_adjust},"
f" {cend_adjust}"
)
chunk_lbl = np.array(labels[lstart_adjust:lend_adjust, cstart_adjust:cend_adjust])
chunk_inp = np.array(
img_mm[lstart_adjust:lend_adjust, cstart_adjust:cend_adjust, :]
)
out_data = np.zeros((len(active), img_mm.shape[-1])) + flag
logging.debug(f"{lstart}: running extraction from local array")
for _lab, lab in enumerate(active):
out_data[_lab, :] = 0
locs = np.where(chunk_lbl == lab)
out_data[_lab, :] = reducer(chunk_inp[locs[0], locs[1], :])
unique_labels = np.unique(labels)
unique_labels = unique_labels[unique_labels >= 1]
if unique_labels[0] != 0:
unique_labels = np.hstack([np.zeros(1), unique_labels])
match_idx = np.searchsorted(unique_labels, active)
out_data[np.logical_not(np.isfinite(out_data))] = flag
logging.debug(f"{lstart}: complete")
return match_idx, out_data
[docs]
def extractions(
inputfile,
labels,
output,
chunksize,
flag,
reducer: FunctionType,
n_cores: int = 1,
ray_address: str = None,
ray_redis_password: str = None,
ray_temp_dir: str = None,
ray_ip_head=None,
logfile: str = None,
loglevel: str = "INFO",
):
"""..."""
in_file = inputfile
lbl_file = labels
out_file = output
nchunk = chunksize
dtm = {"4": np.float32, "5": np.float64}
# Open input data, get dimensions
in_img = envi.open(envi_header(in_file), in_file)
meta = in_img.metadata
nl, nb, ns = [int(meta[n]) for n in ("lines", "bands", "samples")]
del in_img
lbl_img = envi.open(envi_header(lbl_file), lbl_file)
labels = lbl_img.read_band(0)
del lbl_img
un_labels = np.unique(labels).tolist()
if 0 not in un_labels:
un_labels.insert(0, 0)
nout = len(un_labels)
# Start up a ray instance for parallel work
rayargs = {
"ignore_reinit_error": True,
"local_mode": n_cores == 1,
"address": ray_address,
"include_dashboard": False,
"_temp_dir": ray_temp_dir,
"_redis_password": ray_redis_password,
}
# We can only set the num_cpus if running on a single-node
if ray_ip_head is None and ray_redis_password is None:
rayargs["num_cpus"] = n_cores
ray.init(**rayargs)
labelid = ray.put(labels)
jobs = []
for lstart in np.arange(0, nl, nchunk):
lend = min(lstart + nchunk, nl)
jobs.append(
extract_chunk.remote(
lstart,
lend,
in_file,
labelid,
flag,
reducer,
logfile=logfile,
loglevel=loglevel,
)
)
# Collect results
rreturn = [ray.get(jid) for jid in jobs]
## Iterate through image "chunks," segmenting as we go
out = np.zeros((nout, nb, 1))
for idx, ret in rreturn:
if ret is not None:
out[idx, :, 0] = ret
del rreturn
meta["lines"] = str(nout)
meta["bands"] = str(nb)
meta["samples"] = "1"
meta["interleave"] = "bil"
out_img = envi.create_image(
envi_header(out_file), metadata=meta, ext="", force=True
)
del out_img
if dtm[meta["data type"]] == np.float32:
type = "float32"
else:
type = "float64"
write_bil_chunk(out, out_file, 0, out.shape, dtype=type)