"""
Utility functions for the downloader modules
"""
import io
import json
import os
import shutil
import tarfile
import urllib.request
from email.message import EmailMessage
from pathlib import Path
from zipfile import ZipFile
import click
from isofit.data import env
[docs]
def download_file(url, dstname=None, overwrite=True):
"""
Stream downloads a file
Parameters
----------
url : str
URL to download
dstname : str
Destination file name
overwrite : bool, default=True
Overwrite the destination file if it already exists
Returns
-------
outfile : str
Output downloaded filepath
"""
response = urllib.request.urlopen(url)
total = 0
if length := response.info()["Content-Length"]:
total = int(length)
# Using Python's 'email' module for this is certainly odd, but due to an
# upcoming deprecation, this is actually the officially recommended way
# to do this: https://docs.python.org/3/library/cgi.html#cgi.parse_header
msg = EmailMessage()
msg["Content-Disposition"] = response.headers["Content-Disposition"]
outfile = Path(dstname or msg["Content-Disposition"].params["filename"])
if outfile.exists() and not overwrite:
raise FileExistsError(outfile)
with click.progressbar(length=total, label="Downloading file") as bar:
with open(outfile, "wb") as file:
while chunk := response.read(io.DEFAULT_BUFFER_SIZE):
file.write(chunk)
bar.update(io.DEFAULT_BUFFER_SIZE)
return outfile
[docs]
def unzip(file, path=None, rename=None, overwrite=False, cleanup=True):
"""
Unzips a zipfile
Parameters
----------
path : str, default=None
Path to extract the zipfile to. Defaults to the directory the zip is found in
rename : str, default=None
Renames the extracted data to this
overwrite : bool, default=False
Overwrites the path destination with the zip contents if enabled
cleanup : bool, default=True
Removes the zip file after completion
Returns
-------
dst : str
The extracted output path
"""
path = Path(path or os.path.dirname(file))
with ZipFile(file) as z:
name = z.namelist()[0]
# Verify the output target doesn't exist
dst = path / (rename or name)
if dst.exists() and not overwrite:
raise FileExistsError(dst)
z.extractall(path)
src = Path(path) / name
if rename:
if dst.exists():
shutil.copytree(src, dst, dirs_exist_ok=True)
shutil.rmtree(src)
else:
shutil.move(src, dst)
if cleanup:
os.remove(file)
return dst
[docs]
def untar(file, output):
"""
Untars a .tar file. Removes the tar file after extracting
Parameters
----------
file : str
.tar file to extract
output : str
Path to output to
Returns
-------
output : str
The extracted output path
"""
with tarfile.open(file) as tar:
tar.extractall(path=output)
os.remove(file)
return output
[docs]
def prepare_output(output, default, isdir=False, overwrite=False):
"""
Prepares the output path by ensuring the parents exist and itself doesn't presently exist.
Parameters
----------
output : str | None
Path to download to
default : str
Default path defined by the ini file
isdir : bool, default=False
This is supposed to be a directory
overwrite : bool, default=False
Ignore if the output already exists
"""
if not output:
output = default
output = Path(output)
print(f"Output as: {output}")
if not overwrite and output.exists():
print(
f"Path already exists, please remove it or set the overwrite flag if you would like to redownload"
)
return
try:
if isdir:
output.mkdir(parents=True, exist_ok=True)
else:
output.parent.mkdir(parents=True, exist_ok=True)
except Exception as e:
print(f"Failed to create output directory: {e}")
return
return output
@click.group("download", invoke_without_command=True, no_args_is_help=True)
[docs]
def cli():
"""\
Download extra ISOFIT files that do not come with the default installation
"""
pass
@cli.command(name="paths")
[docs]
def preview_paths():
"""\
Preview download path locations. Paths can be changed from the default by using the overrides on the `isofit` command. See more via `isofit --help`
\b
Example:
Change the default `data` and `examples` paths
$ isofit --data /path/to/data --examples /different/path/examples download paths
Download paths will default to:
- data = /path/to/data
- examples = /different/path/examples
\b
These will be saved and may be reviewed:
$ isofit download paths
Download paths will default to:
- data = /path/to/data
- examples = /different/path/examples
"""
print("Download paths will default to:")
for key, path in env.items("dirs"):
print(f"- {key} = {path}")
[docs]
def isUpToDateGithub(owner, repo, name, path=None, debug=print, error=print, **_):
"""
Checks the installed version against the latest release on Github
Parameters
----------
owner : str
Github repository owner
repo : str
Repository name
name : str
Name of the downloader module to retrieve the path from the env ini if path is
not provided
path : str, default=None
Path to update. If None, defaults to the ini path
debug : function, default=print
Print function to use for debug messages, eg. logging.debug
error : function, default=print
Print function to use for error messages, eg. logging.error
**_ : dict
Ignores unused params that may be used by other validate functions. This is to
maintain compatibility with other functions
Returns
-------
bool
True if the path is up to date, False otherwise
Notes
-----
The Github workflows watch for the string "[x]" to determine if the cache needs to
update the data of this module. If your module does not include this string, the
workflows will never detect updates.
"""
if path is None:
path = env[name]
debug(f"Checking for updates for examples on path: {path}")
file = Path(path) / "version.txt"
if not file.exists():
error(
"[x] Failed to find a version.txt file under the given path. Version is unknown"
)
return False
metadata = release_metadata(owner, repo, "latest")
with file.open("r") as f:
current = f.read()
if current != (latest := metadata["tag_name"]):
error(f"[x] Latest is {latest}, currently installed is {current}")
return False
debug(f"[OK] Path is up to date, current version is: {current}")
return True
[docs]
def pullFromRepo(owner, repo, tag, output, version=True, overwrite=False):
"""
Pulls a release zipfile from a Github repository
Parameters
----------
owner : str
Github repository owner
repo : str
Repository name
tag : str
Tag of the release to pull
output : pathlib.Path
Output path to write to
version : bool, default=True
Write the tag's name in a version.txt file
overwrite : bool, default=False
Ignore if the output already exists
Returns
-------
avail : pathlib.Path
Available path
"""
metadata = release_metadata("isofit", "6S", tag)
print(f"Pulling release {metadata['tag_name']}")
zipfile = download_file(metadata["zipball_url"], output.parent / f"{repo}.zip")
avail = unzip(zipfile, path=output.parent, rename=output.name, overwrite=overwrite)
if version:
with open(output / "version.txt", "w") as file:
file.write(metadata["tag_name"])
return avail