## Brian Blaylock
## September 16, 2020
"""
=============
Retrieve Data
=============
Download and read data from the R-series Geostationary Operational
Environmental Satellite data.
Data is downloaded from Amazon Web Services and can be returned
as a file list or read as an xarray.Dataset. If the data is not
available in a local directory, it is loaded directly into memory.
https://registry.opendata.aws/noaa-goes/
"""
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
from datetime import datetime, timedelta
from pathlib import Path
import numpy as np
import pandas as pd
import s3fs
import xarray as xr
# NOTE: These config dict values are retrieved from __init__ and read
# from the file ${HOME}/.config/goes2go/config.toml
from . import config
# Connect to AWS public buckets
fs = s3fs.S3FileSystem(anon=True)
# Define parameter options and aliases
# ------------------------------------
_satellite = {
"noaa-goes16": [16, "16", "G16", "EAST", "GOES16"],
"noaa-goes17": [17, "17", "G17", "WEST", "GOES17"],
"noaa-goes18": [18, "18", "G18", "WEST", "GOES18"],
}
_domain = {
"C": ["CONUS"],
"F": ["FULL", "FULLDISK", "FULL DISK"],
"M": ["MESOSCALE", "M1", "M2"],
}
_product = {
# Assume goes17 and goes18 have same products as goes16
i.split("/")[-1]: []
for i in fs.ls("noaa-goes16")
}
_product.pop("index.html", None)
_product["GLM-L2-LCFA"] = ["GLM"]
_product["ABI-L2-MCMIPC"] = ["ABIC"]
_product["ABI-L2-MCMIPF"] = ["ABIF"]
_product["ABI-L2-MCMIPM"] = ["ABIM"]
def _check_param_inputs(**params):
"""Check the input parameters for correct name or alias.
Specifically, check the input for product, domain, and satellite are
in the list of accepted values. If not, then look if it has an alias.
"""
# Kinda messy, but gets the job done.
params.setdefault("verbose", True)
satellite = params["satellite"]
domain = params["domain"]
product = params["product"]
verbose = params["verbose"]
## Determine the Satellite
if satellite not in _satellite:
satellite = str(satellite).upper()
for key, aliases in _satellite.items():
if satellite in aliases:
satellite = key
assert (
satellite in _satellite
), f"satellite must be one of {list(_satellite.keys())} or an alias {list(_satellite.values())}"
## Determine the Domain (only needed for ABI product)
if product.upper().startswith("ABI"):
if product[-1] in _domain:
# If the product has the domain, this takes priority
domain = product[-1]
elif isinstance(domain, str):
domain = domain.upper()
if domain in ["M1", "M2"]:
product = product + "M"
else:
for key, aliases in _domain.items():
if domain in aliases:
domain = key
product = product + domain
assert (
(domain in _domain) or (domain in ["M1", "M2"])
), f"domain must be one of {list(_domain.keys())} or an alias {list(_domain.values())}"
else:
domain = None
## Determine the Product
if product not in _product:
for key, aliases in _product.items():
if product.upper() in aliases:
product = key
assert (
product in _product
), f"product must be one of {list(_product .keys())} or an alias {list(_product .values())}"
return satellite, product, domain
def _goes_file_df(satellite, product, start, end, bands=None, refresh=True):
"""Get list of requested GOES files as pandas.DataFrame.
Parameters
----------
satellite : str
product : str
start : datetime
end : datetime
band : None, int, or list
Specify the ABI channels to retrieve.
refresh : bool
Refresh the s3fs.S3FileSystem object when files are listed.
Default True will refresh and not use a cached list.
"""
params = locals()
start = pd.to_datetime(start)
end = pd.to_datetime(end)
DATES = pd.date_range(f"{start:%Y-%m-%d %H:00}", f"{end:%Y-%m-%d %H:00}", freq="1H")
# List all files for each date
# ----------------------------
files = []
for DATE in DATES:
files += fs.ls(f"{satellite}/{product}/{DATE:%Y/%j/%H/}", refresh=refresh)
# Build a table of the files
# --------------------------
df = pd.DataFrame(files, columns=["file"])
df[["product_mode", "satellite", "start", "end", "creation"]] = (
df["file"].str.rsplit("_", expand=True, n=5).loc[:, 1:]
)
# Todo: this could use some clean up !
if product.startswith("ABI"):
product_mode = df.product_mode.str.rsplit("-", n=1, expand=True)
df["product"] = product_mode[0]
df["mode_bands"] = product_mode[1]
mode_bands = df.mode_bands.str.split("C", expand=True)
df["mode"] = mode_bands[0].str[1:].astype(int)
try:
df["band"] = mode_bands[1].astype(int)
except:
# No channel data
df["band"] = None
# Filter files by band number
if bands is not None:
if not hasattr(bands, "__len__"):
bands = [bands]
df = df.loc[df.band.isin(bands)]
# Filter files by requested time range
# ------------------------------------
# Convert filename datetime string to datetime object
df["start"] = pd.to_datetime(df.start, format="s%Y%j%H%M%S%f")
df["end"] = pd.to_datetime(df.end, format="e%Y%j%H%M%S%f")
df["creation"] = pd.to_datetime(df.creation, format="c%Y%j%H%M%S%f.nc")
# Filter by files within the requested time range
df = df.loc[df.start >= start].loc[df.end <= end].reset_index(drop=True)
for i in params:
df.attrs[i] = params[i]
return df
def _download(df, save_dir, overwrite, max_threads=10, verbose=False):
"""Download the files from a DataFrame listing with multithreading."""
def do_download(src):
dst = Path(save_dir) / src
if not dst.parent.is_dir():
dst.parent.mkdir(parents=True, exist_ok=True)
if dst.is_file() and not overwrite:
if verbose:
print(f" 👮🏻♂️ File already exists. Do not overwrite: {dst}")
else:
# Downloading file from AWS
fs.get(src, str(dst))
################
# Multithreading
tasks = len(df)
threads = min(tasks, max_threads)
with ThreadPoolExecutor(threads) as exe:
futures = [exe.submit(do_download, src) for src in df.file]
# nothing is returned in the list
this_list = [future.result() for future in as_completed(futures)]
print(
f"📦 Finished downloading [{len(df)}] files to [{save_dir/Path(df.file[0]).parents[3]}]."
)
def _as_xarray_MP(src, save_dir, i=None, n=None, verbose=True):
"""Open a file as a xarray.Dataset -- a multiprocessing helper."""
# File destination
local_copy = Path(save_dir) / src
if local_copy.is_file():
if verbose:
print(
f"\r📖💽 Reading ({i:,}/{n:,}) file from LOCAL COPY [{local_copy}].",
end=" ",
)
with open(local_copy, "rb") as f:
ds = xr.load_dataset(f)
else:
if verbose:
print(
f"\r📖☁ Reading ({i:,}/{n:,}) file from AWS to MEMORY [{src}].", end=" "
)
with fs.open(src, "rb") as f:
ds = xr.load_dataset(f)
# Turn some attributes to coordinates so they will be preserved
# when we concat multiple GOES DataSets together.
attr2coord = [
"dataset_name",
"date_created",
"time_coverage_start",
"time_coverage_end",
]
for i in attr2coord:
if i in ds.attrs:
ds.coords[i] = ds.attrs.pop(i)
ds["filename"] = src
return ds
def _as_xarray(df, **params):
"""Download files in the list to the desired path.
Use multiprocessing to speed up the download process.
Parameters
----------
df : pandas.DataFrame
A list of files in the GOES s3 bucket.
This DataFrame must have a column of "files"
params : dict
Parameters from `goes_*` function.
"""
params.setdefault("max_cpus", None)
params.setdefault("verbose", True)
save_dir = params["save_dir"]
max_cpus = params["max_cpus"]
verbose = params["verbose"]
n = len(df.file)
if n == 0:
print("🛸 No data....🌌")
elif n == 1:
# If we only have one file, we don't need multiprocessing
ds = _as_xarray_MP(df.iloc[0].file, save_dir, 1, 1, verbose)
else:
# Use Multiprocessing to read multiple files.
if max_cpus is None:
max_cpus = multiprocessing.cpu_count()
cpus = np.minimum(multiprocessing.cpu_count(), max_cpus)
cpus = np.minimum(cpus, n)
inputs = [(src, save_dir, i, n) for i, src in enumerate(df.file, start=1)]
with multiprocessing.Pool(cpus) as p:
results = p.starmap(_as_xarray_MP, inputs)
p.close()
p.join()
# Need some work to concat the datasets
if df.attrs["product"].startswith("ABI"):
print("concatenate Datasets", end="")
ds = xr.concat(results, dim="t")
else:
ds = results
if verbose:
print(f"\r{'':1000}\r📚 Finished reading [{n}] files into xarray.Dataset.")
ds.attrs["path"] = df.file.to_list()
return ds
###############################################################################
###############################################################################
[docs]
def goes_timerange(
start=None,
end=None,
recent=None,
*,
satellite=config["timerange"].get("satellite"),
product=config["timerange"].get("product"),
domain=config["timerange"].get("domain"),
return_as=config["timerange"].get("return_as"),
download=config["timerange"].get("download"),
overwrite=config["timerange"].get("overwrite"),
save_dir=config["timerange"].get("save_dir"),
max_cpus=config["timerange"].get("max_cpus"),
bands=None,
s3_refresh=config["timerange"].get("s3_refresh"),
verbose=config["timerange"].get("verbose", True),
):
"""
Get GOES data for a time range.
Parameters
----------
start, end : datetime
Required if recent is None.
recent : timedelta or pandas-parsable timedelta str
Required if start and end are None. If timedelta(hours=1), will
get the most recent files for the past hour.
satellite : {'goes16', 'goes17', 'goes18'}
Specify which GOES satellite.
The following alias may also be used:
- ``'goes16'``: 16, 'G16', or 'EAST'
- ``'goes17'``: 17, 'G17', or 'WEST'
- ``'goes18'``: 18, 'G18', or 'WEST'
product : {'ABI', 'GLM', other GOES product}
Specify the product name.
- 'ABI' is an alias for ABI-L2-MCMIP Multichannel Cloud and Moisture Imagery
- 'GLM' is an alias for GLM-L2-LCFA Geostationary Lightning Mapper
Others may include ``'ABI-L1b-Rad'``, ``'ABI-L2-DMW'``, etc.
For more available products, look at this `README
<https://docs.opendata.aws/noaa-goes16/cics-readme.html>`_
domain : {'C', 'F', 'M'}
ABI scan region indicator. Only required for ABI products if the
given product does not end with C, F, or M.
- C: Contiguous United States (alias 'CONUS')
- F: Full Disk (alias 'FULL')
- M: Mesoscale (alias 'MESOSCALE')
return_as : {'xarray', 'filelist'}
Return the data as an xarray.Dataset or as a list of files
download : bool
- True: Download the data to disk to the location set by :guilabel:`save_dir`
- False: Just load the data into memory.
save_dir : pathlib.Path or str
Path to save the data.
overwrite : bool
- True: Download the file even if it exists.
- False Do not download the file if it already exists
max_cpus : int
bands : None, int, or list
ONLY FOR L1b-Rad products; specify the bands you want
s3_refresh : bool
Refresh the s3fs.S3FileSystem object when files are listed.
"""
# If `start`, or `end` is a string, parse with Pandas
if isinstance(start, str):
start = pd.to_datetime(start)
if isinstance(end, str):
end = pd.to_datetime(end)
# If `recent` is a string (like recent='1H'), parse with Pandas
if isinstance(recent, str):
recent = pd.to_timedelta(recent)
params = locals()
satellite, product, domain = _check_param_inputs(**params)
params["satellite"] = satellite
params["product"] = product
params["domain"] = domain
check1 = start is not None and end is not None
check2 = recent is not None
assert check1 or check2, "🤔 `start` and `end` *or* `recent` is required"
if check1:
assert hasattr(start, "second") and hasattr(
end, "second"
), "`start` and `end` must be a datetime object"
elif check2:
assert hasattr(recent, "seconds"), "`recent` must be a timedelta object"
# Parameter Setup
# ---------------
# Create a range of directories to check. The GOES S3 bucket is
# organized by hour of day.
if recent is not None:
start = datetime.utcnow() - recent
end = datetime.utcnow()
df = _goes_file_df(satellite, product, start, end, bands=bands, refresh=s3_refresh)
if download:
_download(df, save_dir=save_dir, overwrite=overwrite, verbose=verbose)
if return_as == "filelist":
df.attrs["filePath"] = save_dir
return df
elif return_as == "xarray":
return _as_xarray(df, **params)
[docs]
def goes_latest(
*,
satellite=config["latest"].get("satellite"),
product=config["latest"].get("product"),
domain=config["latest"].get("domain"),
return_as=config["latest"].get("return_as"),
download=config["latest"].get("download"),
overwrite=config["latest"].get("overwrite"),
save_dir=config["latest"].get("save_dir"),
bands=None,
s3_refresh=config["latest"].get("s3_refresh"),
verbose=config["latest"].get("verbose", True),
):
"""
Get the latest available GOES data.
Parameters
----------
satellite : {'goes16', 'goes17', 'goes18'}
Specify which GOES satellite.
The following alias may also be used:
- ``'goes16'``: 16, 'G16', or 'EAST'
- ``'goes17'``: 17, 'G17', or 'WEST'
- ``'goes18'``: 18, 'G18', or 'WEST'
product : {'ABI', 'GLM', other GOES product}
Specify the product name.
- 'ABI' is an alias for ABI-L2-MCMIP Multichannel Cloud and Moisture Imagery
- 'GLM' is an alias for GLM-L2-LCFA Geostationary Lightning Mapper
Others may include ``'ABI-L1b-Rad'``, ``'ABI-L2-DMW'``, etc.
For more available products, look at this `README
<https://docs.opendata.aws/noaa-goes16/cics-readme.html>`_
domain : {'C', 'F', 'M'}
ABI scan region indicator. Only required for ABI products if the
given product does not end with C, F, or M.
- C: Contiguous United States (alias 'CONUS')
- F: Full Disk (alias 'FULL')
- M: Mesoscale (alias 'MESOSCALE')
return_as : {'xarray', 'filelist'}
Return the data as an xarray.Dataset or as a list of files
download : bool
- True: Download the data to disk to the location set by :guilabel:`save_dir`
- False: Just load the data into memory.
save_dir : pathlib.Path or str
Path to save the data.
overwrite : bool
- True: Download the file even if it exists.
- False Do not download the file if it already exists
bands : None, int, or list
ONLY FOR L1b-Rad products; specify the bands you want
s3_refresh : bool
Refresh the s3fs.S3FileSystem object when files are listed.
"""
params = locals()
satellite, product, domain = _check_param_inputs(**params)
params["satellite"] = satellite
params["product"] = product
params["domain"] = domain
# Parameter Setup
# ---------------
# Create a range of directories to check. The GOES S3 bucket is
# organized by hour of day. Look in the current hour and last hour.
start = datetime.utcnow() - timedelta(hours=1)
end = datetime.utcnow()
df = _goes_file_df(satellite, product, start, end, bands=bands, refresh=s3_refresh)
# Filter for specific mesoscale domain
if domain is not None and domain.upper() in ["M1", "M2"]:
df = df[df["file"].str.contains(f"{domain.upper()}-M")]
# Get the most recent file (latest start date)
df = df.loc[df.start == df.start.max()].reset_index(drop=True)
if download:
_download(df, save_dir=save_dir, overwrite=overwrite, verbose=verbose)
if return_as == "filelist":
df.attrs["filePath"] = save_dir
return df
elif return_as == "xarray":
return _as_xarray(df, **params)
[docs]
def goes_nearesttime(
attime,
within=pd.to_timedelta(config["nearesttime"].get("within", "1H")),
*,
satellite=config["nearesttime"].get("satellite"),
product=config["nearesttime"].get("product"),
domain=config["nearesttime"].get("domain"),
return_as=config["nearesttime"].get("return_as"),
download=config["nearesttime"].get("download"),
overwrite=config["nearesttime"].get("overwrite"),
save_dir=config["nearesttime"].get("save_dir"),
bands=None,
s3_refresh=config["nearesttime"].get("s3_refresh"),
verbose=config["nearesttime"].get("verbose", True),
):
"""
Get the GOES data nearest a specified time.
Parameters
----------
attime : datetime
Time to find the nearest observation for.
May also use a pandas-interpretable datetime string.
within : timedelta or pandas-parsable timedelta str
Timerange tht the nearest observation must be.
satellite : {'goes16', 'goes17', 'goes18'}
Specify which GOES satellite.
The following alias may also be used:
- ``'goes16'``: 16, 'G16', or 'EAST'
- ``'goes17'``: 17, 'G17', or 'WEST'
- ``'goes18'``: 18, 'G18', or 'WEST'
product : {'ABI', 'GLM', other GOES product}
Specify the product name.
- 'ABI' is an alias for ABI-L2-MCMIP Multichannel Cloud and Moisture Imagery
- 'GLM' is an alias for GLM-L2-LCFA Geostationary Lightning Mapper
Others may include ``'ABI-L1b-Rad'``, ``'ABI-L2-DMW'``, etc.
For more available products, look at this `README
<https://docs.opendata.aws/noaa-goes16/cics-readme.html>`_
domain : {'C', 'F', 'M'}
ABI scan region indicator. Only required for ABI products if the
given product does not end with C, F, or M.
- C: Contiguous United States (alias 'CONUS')
- F: Full Disk (alias 'FULL')
- M: Mesoscale (alias 'MESOSCALE')
return_as : {'xarray', 'filelist'}
Return the data as an xarray.Dataset or as a list of files
download : bool
- True: Download the data to disk to the location set by :guilabel:`save_dir`
- False: Just load the data into memory.
save_dir : pathlib.Path or str
Path to save the data.
overwrite : bool
- True: Download the file even if it exists.
- False: Do not download the file if it already exists
bands : None, int, or list
ONLY FOR L1b-Rad products; specify the bands you want
s3_refresh : bool
Refresh the s3fs.S3FileSystem object when files are listed.
"""
if isinstance(attime, str):
attime = pd.to_datetime(attime)
if isinstance(within, str):
within = pd.to_timedelta(within)
params = locals()
satellite, product, _ = _check_param_inputs(**params)
params["satellite"] = satellite
params["product"] = product
# Parameter Setup
# ---------------
# Create a range of directories to check. The GOES S3 bucket is
# organized by hour of day.
start = attime - within
end = attime + within
df = _goes_file_df(satellite, product, start, end, bands=bands, refresh=s3_refresh)
# return df, start, end, attime
# Filter for specific mesoscale domain
if domain.upper() in ["M1", "M2"]:
df = df[df["file"].str.contains(f"{domain.upper()}-M")]
# Get row that matches the nearest time
df = df.sort_values("start")
df = df.set_index(df.start)
unique_times_index = df.index.unique()
nearest_time_index = unique_times_index.get_indexer([attime], method="nearest")
nearest_time = unique_times_index[nearest_time_index]
df = df.loc[nearest_time]
df = df.reset_index(drop=True)
n = len(df.file)
if n == 0:
print("🛸 No data....🌌")
return None
if download:
_download(df, save_dir=save_dir, overwrite=overwrite, verbose=verbose)
if return_as == "filelist":
df.attrs["filePath"] = save_dir
return df
elif return_as == "xarray":
return _as_xarray(df, **params)