common module¶
The common module contains common functions and classes used by the other modules.
convert_coords(coords, from_epsg, to_epsg)
¶
Convert a list of coordinates from one EPSG to another.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
coords |
List[Tuple[float, float]] |
List of tuples containing coordinates in the format (latitude, longitude). |
required |
from_epsg |
str |
Source EPSG code (default is "epsg:4326"). |
required |
to_epsg |
str |
Target EPSG code (default is "epsg:32615"). |
required |
Returns:
Type | Description |
---|---|
List[Tuple[float, float]] |
List of tuples containing converted coordinates in the format (x, y). |
Source code in hypercoast/common.py
def convert_coords(
coords: List[Tuple[float, float]], from_epsg: str, to_epsg: str
) -> List[Tuple[float, float]]:
"""
Convert a list of coordinates from one EPSG to another.
Args:
coords: List of tuples containing coordinates in the format (latitude, longitude).
from_epsg: Source EPSG code (default is "epsg:4326").
to_epsg: Target EPSG code (default is "epsg:32615").
Returns:
List of tuples containing converted coordinates in the format (x, y).
"""
import pyproj
# Define the coordinate transformation
transformer = pyproj.Transformer.from_crs(from_epsg, to_epsg, always_xy=True)
# Convert each coordinate
converted_coords = [transformer.transform(lon, lat) for lat, lon in coords]
return converted_coords
download_acolite(outdir='.', platform=None)
¶
Downloads the Acolite release based on the OS platform and extracts it to the specified directory. For more information, see the Acolite manual https://github.com/acolite/acolite/releases.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
outdir |
str |
The output directory where the file will be Acolite and extracted. |
'.' |
platform |
Optional[str] |
The platform for which to download acolite. If None, the current system platform is used. Valid values are 'linux', 'darwin', and 'windows'. |
None |
Returns:
Type | Description |
---|---|
str |
The path to the extracted Acolite directory. |
Exceptions:
Type | Description |
---|---|
Exception |
If the platform is unsupported or the download fails. |
Source code in hypercoast/common.py
def download_acolite(outdir: str = ".", platform: Optional[str] = None) -> str:
"""
Downloads the Acolite release based on the OS platform and extracts it to the specified directory.
For more information, see the Acolite manual https://github.com/acolite/acolite/releases.
Args:
outdir (str): The output directory where the file will be Acolite and extracted.
platform (Optional[str]): The platform for which to download acolite. If None, the current system platform is used.
Valid values are 'linux', 'darwin', and 'windows'.
Returns:
str: The path to the extracted Acolite directory.
Raises:
Exception: If the platform is unsupported or the download fails.
"""
import platform as pf
import requests
import tarfile
from tqdm import tqdm
base_url = "https://github.com/acolite/acolite/releases/download/20231023.0/"
if platform is None:
platform = pf.system().lower()
else:
platform = platform.lower()
if platform == "linux":
download_url = base_url + "acolite_py_linux_20231023.0.tar.gz"
root_dir = "acolite_py_linux"
elif platform == "darwin":
download_url = base_url + "acolite_py_mac_20231023.0.tar.gz"
root_dir = "acolite_py_mac"
elif platform == "windows":
download_url = base_url + "acolite_py_win_20231023.0.tar.gz"
root_dir = "acolite_py_win"
else:
raise Exception(f"Unsupported OS platform: {platform}")
if not os.path.exists(outdir):
os.makedirs(outdir)
extracted_path = os.path.join(outdir, root_dir)
file_name = os.path.join(outdir, download_url.split("/")[-1])
if os.path.exists(file_name):
print(f"{file_name} already exists. Skip downloading.")
return extracted_path
response = requests.get(download_url, stream=True)
total_size = int(response.headers.get("content-length", 0))
block_size = 8192
if response.status_code == 200:
with open(file_name, "wb") as file, tqdm(
desc=file_name,
total=total_size,
unit="iB",
unit_scale=True,
unit_divisor=1024,
) as bar:
for chunk in response.iter_content(chunk_size=block_size):
if chunk:
file.write(chunk)
bar.update(len(chunk))
print(f"Downloaded {file_name}")
else:
raise Exception(f"Failed to download file from {download_url}")
# Unzip the file
with tarfile.open(file_name, "r:gz") as tar:
tar.extractall(path=outdir)
print(f"Extracted to {extracted_path}")
return extracted_path
download_ecostress(granules, out_dir=None, threads=8)
¶
Downloads NASA ECOSTRESS granules.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
granules |
List[dict] |
The granules to download. |
required |
out_dir |
str |
The output directory where the granules will be downloaded. Defaults to None (current directory). |
None |
threads |
int |
The number of threads to use for downloading. Defaults to 8. |
8 |
Source code in hypercoast/common.py
def download_ecostress(
granules: List[dict],
out_dir: Optional[str] = None,
threads: int = 8,
) -> None:
"""Downloads NASA ECOSTRESS granules.
Args:
granules (List[dict]): The granules to download.
out_dir (str, optional): The output directory where the granules will be
downloaded. Defaults to None (current directory).
threads (int, optional): The number of threads to use for downloading.
Defaults to 8.
"""
download_nasa_data(granules=granules, out_dir=out_dir, threads=threads)
download_emit(granules, out_dir=None, threads=8)
¶
Downloads NASA EMIT granules.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
granules |
List[dict] |
The granules to download. |
required |
out_dir |
str |
The output directory where the granules will be downloaded. Defaults to None (current directory). |
None |
threads |
int |
The number of threads to use for downloading. Defaults to 8. |
8 |
Source code in hypercoast/common.py
def download_emit(
granules: List[dict],
out_dir: Optional[str] = None,
threads: int = 8,
) -> None:
"""Downloads NASA EMIT granules.
Args:
granules (List[dict]): The granules to download.
out_dir (str, optional): The output directory where the granules will be
downloaded. Defaults to None (current directory).
threads (int, optional): The number of threads to use for downloading.
Defaults to 8.
"""
download_nasa_data(granules=granules, out_dir=out_dir, threads=threads)
download_file(url=None, output=None, quiet=True, proxy=None, speed=None, use_cookies=True, verify=True, id=None, fuzzy=False, resume=False, unzip=True, overwrite=False, subfolder=False)
¶
Download a file from URL, including Google Drive shared URL.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
Google Drive URL is also supported. Defaults to None. |
None |
output |
str |
Output filename. Default is basename of URL. |
None |
quiet |
bool |
Suppress terminal output. Default is True. |
True |
proxy |
str |
Proxy. Defaults to None. |
None |
speed |
float |
Download byte size per second (e.g., 256KB/s = 256 * 1024). Defaults to None. |
None |
use_cookies |
bool |
Flag to use cookies. Defaults to True. |
True |
verify |
bool | str |
Either a bool, in which case it controls whether the server's TLS certificate is verified, or a string, in which case it must be a path to a CA bundle to use. Default is True.. Defaults to True. |
True |
id |
str |
Google Drive's file ID. Defaults to None. |
None |
fuzzy |
bool |
Fuzzy extraction of Google Drive's file Id. Defaults to False. |
False |
resume |
bool |
Resume the download from existing tmp file if possible. Defaults to False. |
False |
unzip |
bool |
Unzip the file. Defaults to True. |
True |
overwrite |
bool |
Overwrite the file if it already exists. Defaults to False. |
False |
subfolder |
bool |
Create a subfolder with the same name as the file. Defaults to False. |
False |
Returns:
Type | Description |
---|---|
str |
The output file path. |
Source code in hypercoast/common.py
def download_file(
url: Optional[str] = None,
output: Optional[str] = None,
quiet: Optional[bool] = True,
proxy: Optional[str] = None,
speed: Optional[float] = None,
use_cookies: Optional[bool] = True,
verify: Optional[bool] = True,
id: Optional[str] = None,
fuzzy: Optional[bool] = False,
resume: Optional[bool] = False,
unzip: Optional[bool] = True,
overwrite: Optional[bool] = False,
subfolder: Optional[bool] = False,
) -> str:
"""Download a file from URL, including Google Drive shared URL.
Args:
url (str, optional): Google Drive URL is also supported. Defaults to None.
output (str, optional): Output filename. Default is basename of URL.
quiet (bool, optional): Suppress terminal output. Default is True.
proxy (str, optional): Proxy. Defaults to None.
speed (float, optional): Download byte size per second (e.g., 256KB/s = 256 * 1024). Defaults to None.
use_cookies (bool, optional): Flag to use cookies. Defaults to True.
verify (bool | str, optional): Either a bool, in which case it controls whether the server's TLS certificate is verified, or a string,
in which case it must be a path to a CA bundle to use. Default is True.. Defaults to True.
id (str, optional): Google Drive's file ID. Defaults to None.
fuzzy (bool, optional): Fuzzy extraction of Google Drive's file Id. Defaults to False.
resume (bool, optional): Resume the download from existing tmp file if possible. Defaults to False.
unzip (bool, optional): Unzip the file. Defaults to True.
overwrite (bool, optional): Overwrite the file if it already exists. Defaults to False.
subfolder (bool, optional): Create a subfolder with the same name as the file. Defaults to False.
Returns:
str: The output file path.
"""
import zipfile
import tarfile
import gdown
if output is None:
if isinstance(url, str) and url.startswith("http"):
output = os.path.basename(url)
out_dir = os.path.abspath(os.path.dirname(output))
if not os.path.exists(out_dir):
os.makedirs(out_dir)
if isinstance(url, str):
if os.path.exists(os.path.abspath(output)) and (not overwrite):
print(
f"{output} already exists. Skip downloading. Set overwrite=True to overwrite."
)
return os.path.abspath(output)
else:
url = github_raw_url(url)
if "https://drive.google.com/file/d/" in url:
fuzzy = True
output = gdown.download(
url, output, quiet, proxy, speed, use_cookies, verify, id, fuzzy, resume
)
if unzip:
if output.endswith(".zip"):
with zipfile.ZipFile(output, "r") as zip_ref:
if not quiet:
print("Extracting files...")
if subfolder:
basename = os.path.splitext(os.path.basename(output))[0]
output = os.path.join(out_dir, basename)
if not os.path.exists(output):
os.makedirs(output)
zip_ref.extractall(output)
else:
zip_ref.extractall(os.path.dirname(output))
elif output.endswith(".tar.gz") or output.endswith(".tar"):
if output.endswith(".tar.gz"):
mode = "r:gz"
else:
mode = "r"
with tarfile.open(output, mode) as tar_ref:
if not quiet:
print("Extracting files...")
if subfolder:
basename = os.path.splitext(os.path.basename(output))[0]
output = os.path.join(out_dir, basename)
if not os.path.exists(output):
os.makedirs(output)
tar_ref.extractall(output)
else:
tar_ref.extractall(os.path.dirname(output))
return os.path.abspath(output)
download_nasa_data(granules, out_dir=None, provider=None, threads=8)
¶
Downloads NASA Earthdata granules.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
granules |
List[dict] |
The granules to download. |
required |
out_dir |
str |
The output directory where the granules will be downloaded. Defaults to None (current directory). |
None |
provider |
str |
The provider of the granules. |
None |
threads |
int |
The number of threads to use for downloading. Defaults to 8. |
8 |
Source code in hypercoast/common.py
def download_nasa_data(
granules: List[dict],
out_dir: Optional[str] = None,
provider: Optional[str] = None,
threads: int = 8,
) -> None:
"""Downloads NASA Earthdata granules.
Args:
granules (List[dict]): The granules to download.
out_dir (str, optional): The output directory where the granules will be downloaded. Defaults to None (current directory).
provider (str, optional): The provider of the granules.
threads (int, optional): The number of threads to use for downloading. Defaults to 8.
"""
leafmap.nasa_data_download(
granules=granules, out_dir=out_dir, provider=provider, threads=threads
)
download_pace(granules, out_dir=None, threads=8)
¶
Downloads NASA PACE granules.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
granules |
List[dict] |
The granules to download. |
required |
out_dir |
str |
The output directory where the granules will be downloaded. Defaults to None (current directory). |
None |
threads |
int |
The number of threads to use for downloading. Defaults to 8. |
8 |
Source code in hypercoast/common.py
def download_pace(
granules: List[dict],
out_dir: Optional[str] = None,
threads: int = 8,
) -> None:
"""Downloads NASA PACE granules.
Args:
granules (List[dict]): The granules to download.
out_dir (str, optional): The output directory where the granules will be
downloaded. Defaults to None (current directory).
threads (int, optional): The number of threads to use for downloading.
Defaults to 8.
"""
download_nasa_data(granules=granules, out_dir=out_dir, threads=threads)
extract_date_from_filename(filename)
¶
Extracts a date from a filename assuming the date is in 'YYYYMMDD' format.
This function searches the filename for a sequence of 8 digits that represent a date in 'YYYYMMDD' format. If such a sequence is found, it converts the sequence into a pandas Timestamp object. If no such sequence is found, the function returns None.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
The filename from which to extract the date. |
required |
Returns:
Type | Description |
---|---|
Optional[pd.Timestamp] |
A pandas Timestamp object representing the date found in the filename, or None if no date in 'YYYYMMDD' format is found. |
Examples:
>>> extract_date_from_filename("example_20230101.txt")
Timestamp('2023-01-01 00:00:00')
>>> extract_date_from_filename("no_date_in_this_filename.txt")
None
Source code in hypercoast/common.py
def extract_date_from_filename(filename: str):
"""
Extracts a date from a filename assuming the date is in 'YYYYMMDD' format.
This function searches the filename for a sequence of 8 digits that represent a date in
'YYYYMMDD' format. If such a sequence is found, it converts the sequence into a pandas
Timestamp object. If no such sequence is found, the function returns None.
Args:
filename (str): The filename from which to extract the date.
Returns:
Optional[pd.Timestamp]: A pandas Timestamp object representing the date found in the filename,
or None if no date in 'YYYYMMDD' format is found.
Examples:
>>> extract_date_from_filename("example_20230101.txt")
Timestamp('2023-01-01 00:00:00')
>>> extract_date_from_filename("no_date_in_this_filename.txt")
None
"""
import re
import pandas as pd
# Assuming the date format in filename is 'YYYYMMDD'
date_match = re.search(r"\d{8}", filename)
if date_match:
return pd.to_datetime(date_match.group(), format="%Y%m%d")
else:
return None
extract_spectral(ds, lat, lon, name='data')
¶
Extracts spectral signature from a given xarray Dataset.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ds |
xarray.Dataset |
The dataset containing the spectral data. |
required |
lat |
float |
The latitude of the point to extract. |
required |
lon |
float |
The longitude of the point to extract. |
required |
Returns:
Type | Description |
---|---|
xarray.DataArray |
The extracted data. |
Source code in hypercoast/common.py
def extract_spectral(
ds: xr.Dataset, lat: float, lon: float, name: str = "data"
) -> xr.DataArray:
"""
Extracts spectral signature from a given xarray Dataset.
Args:
ds (xarray.Dataset): The dataset containing the spectral data.
lat (float): The latitude of the point to extract.
lon (float): The longitude of the point to extract.
Returns:
xarray.DataArray: The extracted data.
"""
crs = ds.rio.crs
x, y = convert_coords([[lat, lon]], "epsg:4326", crs)[0]
values = ds.sel(x=x, y=y, method="nearest")[name].values
da = xr.DataArray(values, dims=["band"], coords={"band": ds.coords["band"]})
return da
github_raw_url(url)
¶
Get the raw URL for a GitHub file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
The GitHub URL. |
required |
Returns:
Type | Description |
---|---|
str |
The raw URL. |
Source code in hypercoast/common.py
def github_raw_url(url: str) -> str:
"""Get the raw URL for a GitHub file.
Args:
url (str): The GitHub URL.
Returns:
str: The raw URL.
"""
if isinstance(url, str) and url.startswith("https://github.com/") and "blob" in url:
url = url.replace("github.com", "raw.githubusercontent.com").replace(
"blob/", ""
)
return url
image_cube(dataset, variable='reflectance', cmap='jet', clim=(0, 0.5), title='Reflectance', rgb_bands=None, rgb_wavelengths=None, rgb_gamma=1.0, rgb_cmap=None, rgb_clim=None, rgb_args={}, widget=None, plotter_args={}, show_axes=True, grid_origin=(0, 0, 0), grid_spacing=(1, 1, 1), **kwargs)
¶
Creates an image cube from a dataset and plots it using PyVista.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset |
Union[str, xr.Dataset] |
The dataset to plot. Can be a path to a NetCDF file or an xarray Dataset. |
required |
variable |
str |
The variable to plot. Defaults to "reflectance". |
'reflectance' |
cmap |
str |
The colormap to use. Defaults to "jet". |
'jet' |
clim |
Tuple[float, float] |
The color limits. Defaults to (0, 0.5). |
(0, 0.5) |
title |
str |
The title for the scalar bar. Defaults to "Reflectance". |
'Reflectance' |
rgb_bands |
Optional[List[int]] |
The bands to use for the RGB image. Defaults to None. |
None |
rgb_wavelengths |
Optional[List[float]] |
The wavelengths to use for the RGB image. Defaults to None. |
None |
rgb_gamma |
float |
The gamma correction for the RGB image. Defaults to 1. |
1.0 |
rgb_cmap |
Optional[str] |
The colormap to use for the RGB image. Defaults to None. |
None |
rgb_clim |
Optional[Tuple[float, float]] |
The color limits for the RGB image. Defaults to None. |
None |
rgb_args |
Dict[str, Any] |
Additional arguments for the
|
{} |
widget |
Optional[str] |
The widget to use for the image cube. Can be one of the following: "box", "plane", "slice", "orthogonal", and "threshold". Defaults to None. |
None |
plotter_args |
Dict[str, Any] |
Additional arguments for the
|
{} |
show_axes |
bool |
Whether to show the axes. Defaults to True. |
True |
grid_origin |
Tuple[float, float, float] |
The origin of the grid. Defaults to (0, 0, 0). |
(0, 0, 0) |
grid_spacing |
Tuple[float, float, float] |
The spacing of the grid. |
(1, 1, 1) |
**kwargs |
Dict[str, Any] |
Additional arguments for the
|
{} |
Returns:
Type | Description |
---|---|
pv.Plotter |
The PyVista Plotter with the image cube added. |
Source code in hypercoast/common.py
def image_cube(
dataset,
variable: str = "reflectance",
cmap: str = "jet",
clim: Tuple[float, float] = (0, 0.5),
title: str = "Reflectance",
rgb_bands: Optional[List[int]] = None,
rgb_wavelengths: Optional[List[float]] = None,
rgb_gamma: float = 1.0,
rgb_cmap: Optional[str] = None,
rgb_clim: Optional[Tuple[float, float]] = None,
rgb_args: Dict[str, Any] = {},
widget=None,
plotter_args: Dict[str, Any] = {},
show_axes: bool = True,
grid_origin=(0, 0, 0),
grid_spacing=(1, 1, 1),
**kwargs: Any,
):
"""
Creates an image cube from a dataset and plots it using PyVista.
Args:
dataset (Union[str, xr.Dataset]): The dataset to plot. Can be a path to
a NetCDF file or an xarray Dataset.
variable (str, optional): The variable to plot. Defaults to "reflectance".
cmap (str, optional): The colormap to use. Defaults to "jet".
clim (Tuple[float, float], optional): The color limits. Defaults to (0, 0.5).
title (str, optional): The title for the scalar bar. Defaults to "Reflectance".
rgb_bands (Optional[List[int]], optional): The bands to use for the RGB
image. Defaults to None.
rgb_wavelengths (Optional[List[float]], optional): The wavelengths to
use for the RGB image. Defaults to None.
rgb_gamma (float, optional): The gamma correction for the RGB image.
Defaults to 1.
rgb_cmap (Optional[str], optional): The colormap to use for the RGB image.
Defaults to None.
rgb_clim (Optional[Tuple[float, float]], optional): The color limits for
the RGB image. Defaults to None.
rgb_args (Dict[str, Any], optional): Additional arguments for the
`add_mesh` method for the RGB image. Defaults to {}.
widget (Optional[str], optional): The widget to use for the image cube.
Can be one of the following: "box", "plane", "slice", "orthogonal",
and "threshold". Defaults to None.
plotter_args (Dict[str, Any], optional): Additional arguments for the
`pv.Plotter` constructor. Defaults to {}.
show_axes (bool, optional): Whether to show the axes. Defaults to True.
grid_origin (Tuple[float, float, float], optional): The origin of the grid.
Defaults to (0, 0, 0).
grid_spacing (Tuple[float, float, float], optional): The spacing of the grid.
**kwargs (Dict[str, Any], optional): Additional arguments for the
`add_mesh` method. Defaults to {}.
Returns:
pv.Plotter: The PyVista Plotter with the image cube added.
"""
import pyvista as pv
import xarray as xr
allowed_widgets = ["box", "plane", "slice", "orthogonal", "threshold"]
if widget is not None:
if widget not in allowed_widgets:
raise ValueError(f"widget must be one of the following: {allowed_widgets}")
if isinstance(dataset, str):
dataset = xr.open_dataset(dataset)
da = dataset[variable] # xarray DataArray
values = da.to_numpy()
# Create the spatial reference for the image cube
grid = pv.ImageData()
# Set the grid dimensions: shape because we want to inject our values on the POINT data
grid.dimensions = values.shape
# Edit the spatial reference
grid.origin = grid_origin # The bottom left corner of the data set
grid.spacing = grid_spacing # These are the cell sizes along each axis
# Add the data values to the cell data
grid.point_data["values"] = values.flatten(order="F") # Flatten the array
# Plot the image cube with the RGB image overlay
p = pv.Plotter(**plotter_args)
if "scalar_bar_args" not in kwargs:
kwargs["scalar_bar_args"] = {"title": title}
else:
kwargs["scalar_bar_args"]["title"] = title
if "show_edges" not in kwargs:
kwargs["show_edges"] = False
if widget == "box":
p.add_mesh_clip_box(grid, cmap=cmap, clim=clim, **kwargs)
elif widget == "plane":
if "normal" not in kwargs:
kwargs["normal"] = (0, 0, 1)
if "invert" not in kwargs:
kwargs["invert"] = True
if "normal_rotation" not in kwargs:
kwargs["normal_rotation"] = False
p.add_mesh_clip_plane(grid, cmap=cmap, clim=clim, **kwargs)
elif widget == "slice":
if "normal" not in kwargs:
kwargs["normal"] = (0, 0, 1)
if "normal_rotation" not in kwargs:
kwargs["normal_rotation"] = False
p.add_mesh_slice(grid, cmap=cmap, clim=clim, **kwargs)
elif widget == "orthogonal":
p.add_mesh_slice_orthogonal(grid, cmap=cmap, clim=clim, **kwargs)
elif widget == "threshold":
p.add_mesh_threshold(grid, cmap=cmap, clim=clim, **kwargs)
else:
p.add_mesh(grid, cmap=cmap, clim=clim, **kwargs)
if rgb_bands is not None or rgb_wavelengths is not None:
if rgb_bands is not None:
rgb_image = dataset.isel(wavelength=rgb_bands, method="nearest")[
variable
].to_numpy()
elif rgb_wavelengths is not None:
rgb_image = dataset.sel(wavelength=rgb_wavelengths, method="nearest")[
variable
].to_numpy()
x_dim, y_dim = rgb_image.shape[0], rgb_image.shape[1]
z_dim = 1
im = pv.ImageData(dimensions=(x_dim, y_dim, z_dim))
# Add scalar data, you may also need to flatten this
im.point_data["rgb_image"] = (
rgb_image.reshape(-1, rgb_image.shape[2], order="F") * rgb_gamma
)
grid_z_max = grid.bounds[5]
im.origin = (0, 0, grid_z_max)
if rgb_image.shape[2] < 3:
if rgb_cmap is None:
rgb_cmap = cmap
if rgb_clim is None:
rgb_clim = clim
if "cmap" not in rgb_args:
rgb_args["cmap"] = rgb_cmap
if "clim" not in rgb_args:
rgb_args["clim"] = rgb_clim
else:
if "rgb" not in rgb_args:
rgb_args["rgb"] = True
if "show_scalar_bar" not in rgb_args:
rgb_args["show_scalar_bar"] = False
if "show_edges" not in rgb_args:
rgb_args["show_edges"] = False
p.add_mesh(im, **rgb_args)
if show_axes:
p.show_axes()
return p
nasa_earth_login(strategy='all', persist=True, **kwargs)
¶
Logs in to NASA Earthdata.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
strategy |
str |
The login strategy. Defaults to "all". |
'all' |
persist |
bool |
Whether to persist the login. Defaults to True. |
True |
Source code in hypercoast/common.py
def nasa_earth_login(strategy: str = "all", persist: bool = True, **kwargs) -> None:
"""Logs in to NASA Earthdata.
Args:
strategy (str, optional): The login strategy. Defaults to "all".
persist (bool, optional): Whether to persist the login. Defaults to True.
"""
leafmap.nasa_data_login(strategy=strategy, persist=persist, **kwargs)
netcdf_groups(filepath)
¶
Get the list of groups in a NetCDF file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filepath |
str |
The path to the NetCDF file. |
required |
Returns:
Type | Description |
---|---|
list |
A list of group names in the NetCDF file. |
Examples:
>>> netcdf_groups('path/to/netcdf/file')
['group1', 'group2', 'group3']
Source code in hypercoast/common.py
def netcdf_groups(filepath: str) -> List[str]:
"""
Get the list of groups in a NetCDF file.
Args:
filepath (str): The path to the NetCDF file.
Returns:
list: A list of group names in the NetCDF file.
Example:
>>> netcdf_groups('path/to/netcdf/file')
['group1', 'group2', 'group3']
"""
import h5netcdf
with h5netcdf.File(filepath) as file:
groups = list(file)
return groups
open_dataset(filename, engine=None, chunks=None, **kwargs)
¶
Opens and returns an xarray Dataset from a file.
This function is a wrapper around xarray.open_dataset
that allows for additional
customization through keyword arguments.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
Path to the file to open. |
required |
engine |
Optional[str] |
Name of the engine to use for reading the file. If None, xarray's default engine is used. Examples include 'netcdf4', 'h5netcdf', 'zarr', etc. |
None |
chunks |
Optional[Dict[str, int]] |
Dictionary specifying how to chunk the dataset along each dimension.
For example, |
None |
**kwargs |
Any |
Additional keyword arguments passed to |
{} |
Returns:
Type | Description |
---|---|
xr.Dataset |
The opened dataset. |
Examples:
Open a NetCDF file without chunking:
>>> dataset = open_dataset('path/to/file.nc')
Open a Zarr dataset, chunking along the 'time' dimension:
>>> dataset = open_dataset('path/to/dataset.zarr', engine='zarr', chunks={'time': 10})
Source code in hypercoast/common.py
def open_dataset(
filename: str,
engine: Optional[str] = None,
chunks: Optional[Dict[str, int]] = None,
**kwargs: Any,
) -> xr.Dataset:
"""
Opens and returns an xarray Dataset from a file.
This function is a wrapper around `xarray.open_dataset` that allows for additional
customization through keyword arguments.
Args:
filename (str): Path to the file to open.
engine (Optional[str]): Name of the engine to use for reading the file. If None, xarray's
default engine is used. Examples include 'netcdf4', 'h5netcdf', 'zarr', etc.
chunks (Optional[Dict[str, int]]): Dictionary specifying how to chunk the dataset along each dimension.
For example, `{'time': 1}` would load the dataset in single-time-step chunks. If None,
the dataset is not chunked.
**kwargs: Additional keyword arguments passed to `xarray.open_dataset`.
Returns:
xr.Dataset: The opened dataset.
Examples:
Open a NetCDF file without chunking:
>>> dataset = open_dataset('path/to/file.nc')
Open a Zarr dataset, chunking along the 'time' dimension:
>>> dataset = open_dataset('path/to/dataset.zarr', engine='zarr', chunks={'time': 10})
"""
try:
dataset = xr.open_dataset(filename, engine=engine, chunks=chunks, **kwargs)
except Exception as e:
dataset = xr.open_dataset(filename, engine="h5netcdf", chunks=chunks, **kwargs)
return dataset
pca(input_file, output_file, n_components=3, **kwargs)
¶
Performs Principal Component Analysis (PCA) on a dataset.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_file |
str |
The input file containing the data to analyze. |
required |
output_file |
str |
The output file to save the PCA results. |
required |
n_components |
int |
The number of principal components to compute. Defaults to 3. |
3 |
**kwargs |
Additional keyword arguments to pass to the scikit-learn PCA function. For more info, see https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.PCA.html. |
{} |
Returns:
Type | Description |
---|---|
None |
This function does not return a value. It saves the PCA results to the output file. |
Source code in hypercoast/common.py
def pca(input_file, output_file, n_components=3, **kwargs):
"""
Performs Principal Component Analysis (PCA) on a dataset.
Args:
input_file (str): The input file containing the data to analyze.
output_file (str): The output file to save the PCA results.
n_components (int, optional): The number of principal components to compute. Defaults to 3.
**kwargs: Additional keyword arguments to pass to the scikit-learn PCA function.
For more info, see https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.PCA.html.
Returns:
None: This function does not return a value. It saves the PCA results to the output file.
"""
import rasterio
from sklearn.decomposition import PCA
# Function to load the GeoTIFF image
def load_geotiff(file_path):
with rasterio.open(file_path) as src:
image = src.read()
profile = src.profile
return image, profile
# Function to perform PCA
def perform_pca(image, n_components=3, **kwargs):
# Reshape the image to [n_bands, n_pixels]
n_bands, width, height = image.shape
image_reshaped = image.reshape(n_bands, width * height).T
# Perform PCA
pca = PCA(n_components=n_components, **kwargs)
principal_components = pca.fit_transform(image_reshaped)
# Reshape the principal components back to image dimensions
pca_image = principal_components.T.reshape(n_components, width, height)
return pca_image
# Function to save the PCA-transformed image
def save_geotiff(file_path, image, profile):
profile.update(count=image.shape[0])
with rasterio.open(file_path, "w", **profile) as dst:
dst.write(image)
image, profile = load_geotiff(input_file)
pca_image = perform_pca(image, n_components, **kwargs)
save_geotiff(output_file, pca_image, profile)
run_acolite(acolite_dir, settings_file=None, input_file=None, out_dir=None, polygon=None, l2w_parameters=None, rgb_rhot=True, rgb_rhos=True, map_l2w=True, verbose=True, **kwargs)
¶
Runs the Acolite software for atmospheric correction and water quality retrieval. For more information, see the Acolite manual https://github.com/acolite/acolite/releases
This function constructs and executes a command to run the Acolite software with the specified parameters. It supports running Acolite with a settings file or with individual parameters specified directly. Additional parameters can be passed as keyword arguments.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
acolite_dir |
str |
The directory where Acolite is installed. |
required |
settings_file |
Optional[str] |
The path to the Acolite settings file. If provided,
other parameters except |
None |
input_file |
Optional[str] |
The path to the input file for processing. Defaults to None. |
None |
out_dir |
Optional[str] |
The directory where output files will be saved. Defaults to None. |
None |
polygon |
Optional[str] |
The path to a polygon file for spatial subset. Defaults to None. |
None |
l2w_parameters |
Optional[str] |
Parameters for L2W processing. Defaults to None. |
None |
rgb_rhot |
bool |
Flag to generate RGB images using rhot. Defaults to True. |
True |
rgb_rhos |
bool |
Flag to generate RGB images using rhos. Defaults to True. |
True |
map_l2w |
bool |
Flag to map L2W products. Defaults to True. |
True |
verbose |
bool |
If True, prints the command output; otherwise, suppresses it. Defaults to True. |
True |
**kwargs |
Any |
Additional command line arguments to pass to acolite. Such as --l2w_export_geotiff, --merge_tiles, etc. |
{} |
Returns:
Type | Description |
---|---|
None |
This function does not return a value. It executes the Acolite software. |
Examples:
>>> run_acolite("/path/to/acolite", input_file="/path/to/inputfile", output="/path/to/output")
Source code in hypercoast/common.py
def run_acolite(
acolite_dir: str,
settings_file: Optional[str] = None,
input_file: Optional[str] = None,
out_dir: Optional[str] = None,
polygon: Optional[str] = None,
l2w_parameters: Optional[str] = None,
rgb_rhot: bool = True,
rgb_rhos: bool = True,
map_l2w: bool = True,
verbose: bool = True,
**kwargs: Any,
) -> None:
"""
Runs the Acolite software for atmospheric correction and water quality retrieval.
For more information, see the Acolite manual https://github.com/acolite/acolite/releases
This function constructs and executes a command to run the Acolite software with the specified
parameters. It supports running Acolite with a settings file or with individual parameters
specified directly. Additional parameters can be passed as keyword arguments.
Args:
acolite_dir (str): The directory where Acolite is installed.
settings_file (Optional[str], optional): The path to the Acolite settings file. If provided,
other parameters except `verbose` are ignored. Defaults to None.
input_file (Optional[str], optional): The path to the input file for processing. Defaults to None.
out_dir (Optional[str], optional): The directory where output files will be saved. Defaults to None.
polygon (Optional[str], optional): The path to a polygon file for spatial subset. Defaults to None.
l2w_parameters (Optional[str], optional): Parameters for L2W processing. Defaults to None.
rgb_rhot (bool, optional): Flag to generate RGB images using rhot. Defaults to True.
rgb_rhos (bool, optional): Flag to generate RGB images using rhos. Defaults to True.
map_l2w (bool, optional): Flag to map L2W products. Defaults to True.
verbose (bool, optional): If True, prints the command output; otherwise, suppresses it. Defaults to True.
**kwargs (Any): Additional command line arguments to pass to acolite. Such as
--l2w_export_geotiff, --merge_tiles, etc.
Returns:
None: This function does not return a value. It executes the Acolite software.
Example:
>>> run_acolite("/path/to/acolite", input_file="/path/to/inputfile", output="/path/to/output")
"""
import subprocess
from datetime import datetime
def get_formatted_current_time(format_str="%Y-%m-%d %H:%M:%S"):
current_time = datetime.now()
formatted_time = current_time.strftime(format_str)
return formatted_time
acolite_dir_name = os.path.split(os.path.dirname(acolite_dir))[-1]
acolite_exe = "acolite"
if acolite_dir_name.endswith("win"):
acolite_exe += ".exe"
if isinstance(input_file, list):
input_file = ",".join(input_file)
if not os.path.exists(out_dir):
os.makedirs(out_dir)
acolite_exe_path = os.path.join(acolite_dir, "dist", "acolite", acolite_exe)
acolite_cmd = [acolite_exe_path, "--cli"]
if settings_file is not None:
acolite_cmd.extend(["--settings", settings_file])
else:
lines = []
lines.append("## ACOLITE settings")
lines.append(f"## Written at {get_formatted_current_time()}")
if input_file is not None:
lines.append(f"inputfile={input_file}")
if out_dir is not None:
lines.append(f"output={out_dir}")
if polygon is not None:
lines.append(f"polygon={polygon}")
else:
lines.append("polygon=None")
if l2w_parameters is not None:
lines.append(f"l2w_parameters={l2w_parameters}")
if rgb_rhot:
lines.append("rgb_rhot=True")
else:
lines.append("rgb_rhot=False")
if rgb_rhos:
lines.append("rgb_rhos=True")
else:
lines.append("rgb_rhos=False")
if map_l2w:
lines.append("map_l2w=True")
else:
lines.append("map_l2w=False")
for key, value in kwargs.items():
lines.append(f"{key}={value}")
lines.append(f"runid={get_formatted_current_time('%Y%m%d_%H%M%S')}")
settings_filename = f"acolite_run_{get_formatted_current_time('%Y%m%d_%H%M%S')}_settings_user.txt"
settings_file = os.path.join(out_dir, settings_filename)
with open(settings_file, "w") as f:
f.write("\n".join(lines))
acolite_cmd.extend(["--settings", settings_file])
if verbose:
subprocess.run(acolite_cmd)
else:
subprocess.run(
acolite_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
search_datasets(count=-1, **kwargs)
¶
Searches for datasets using the EarthAccess API with optional filters.
This function wraps the earthaccess.search_datasets
function, allowing for
customized search queries based on a count limit and additional keyword arguments
which serve as filters for the search.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
count |
int |
The maximum number of datasets to return. A value of -1 indicates no limit. Defaults to -1. |
-1 |
**kwargs |
Any |
Additional keyword arguments to pass as search filters to the EarthAccess API. keyword: case-insensitive and supports wildcards ? and * short_name: e.g. ATL08 doi: DOI for a dataset daac: e.g. NSIDC or PODAAC provider: particular to each DAAC, e.g. POCLOUD, LPDAAC etc. temporal: a tuple representing temporal bounds in the form (date_from, date_to) bounding_box: a tuple representing spatial bounds in the form (lower_left_lon, lower_left_lat, upper_right_lon, upper_right_lat) |
{} |
Returns:
Type | Description |
---|---|
List[Dict[str, Any]] |
A list of dictionaries, where each dictionary contains information about a dataset found in the search. |
Examples:
>>> results = search_datasets(count=5, keyword='temperature')
>>> print(results)
Source code in hypercoast/common.py
def search_datasets(count: int = -1, **kwargs: Any) -> List[Dict[str, Any]]:
"""
Searches for datasets using the EarthAccess API with optional filters.
This function wraps the `earthaccess.search_datasets` function, allowing for
customized search queries based on a count limit and additional keyword arguments
which serve as filters for the search.
Args:
count (int, optional): The maximum number of datasets to return. A value of -1
indicates no limit. Defaults to -1.
**kwargs (Any): Additional keyword arguments to pass as search filters to the
EarthAccess API.
keyword: case-insensitive and supports wildcards ? and *
short_name: e.g. ATL08
doi: DOI for a dataset
daac: e.g. NSIDC or PODAAC
provider: particular to each DAAC, e.g. POCLOUD, LPDAAC etc.
temporal: a tuple representing temporal bounds in the form (date_from, date_to)
bounding_box: a tuple representing spatial bounds in the form
(lower_left_lon, lower_left_lat, upper_right_lon, upper_right_lat)
Returns:
List[Dict[str, Any]]: A list of dictionaries, where each dictionary contains
information about a dataset found in the search.
Example:
>>> results = search_datasets(count=5, keyword='temperature')
>>> print(results)
"""
import earthaccess
return earthaccess.search_datasets(count=count, **kwargs)
search_ecostress(bbox=None, temporal=None, count=-1, short_name='ECO_L2T_LSTE', output=None, crs='EPSG:4326', return_gdf=False, **kwargs)
¶
Searches for NASA ECOSTRESS granules.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bbox |
List[float] |
The bounding box coordinates [xmin, ymin, xmax, ymax]. |
None |
temporal |
str |
The temporal extent of the data. |
None |
count |
int |
The number of granules to retrieve. Defaults to -1 (retrieve all). |
-1 |
short_name |
str |
The short name of the dataset. Defaults to "ECO_L2T_LSTE". |
'ECO_L2T_LSTE' |
output |
str |
The output file path to save the GeoDataFrame as a file. |
None |
crs |
str |
The coordinate reference system (CRS) of the GeoDataFrame. Defaults to "EPSG:4326". |
'EPSG:4326' |
return_gdf |
bool |
Whether to return the GeoDataFrame in addition to the granules. Defaults to False. |
False |
**kwargs |
Additional keyword arguments for the earthaccess.search_data() function. |
{} |
Returns:
Type | Description |
---|---|
Union[List[dict], tuple] |
The retrieved granules. If return_gdf is True, also returns the resulting GeoDataFrame. |
Source code in hypercoast/common.py
def search_ecostress(
bbox: Optional[List[float]] = None,
temporal: Optional[str] = None,
count: int = -1,
short_name: Optional[str] = "ECO_L2T_LSTE",
output: Optional[str] = None,
crs: str = "EPSG:4326",
return_gdf: bool = False,
**kwargs,
) -> Union[List[dict], tuple]:
"""Searches for NASA ECOSTRESS granules.
Args:
bbox (List[float], optional): The bounding box coordinates [xmin, ymin, xmax, ymax].
temporal (str, optional): The temporal extent of the data.
count (int, optional): The number of granules to retrieve. Defaults to -1 (retrieve all).
short_name (str, optional): The short name of the dataset. Defaults to "ECO_L2T_LSTE".
output (str, optional): The output file path to save the GeoDataFrame as a file.
crs (str, optional): The coordinate reference system (CRS) of the GeoDataFrame. Defaults to "EPSG:4326".
return_gdf (bool, optional): Whether to return the GeoDataFrame in addition to the granules. Defaults to False.
**kwargs: Additional keyword arguments for the earthaccess.search_data() function.
Returns:
Union[List[dict], tuple]: The retrieved granules. If return_gdf is True, also returns the resulting GeoDataFrame.
"""
return search_nasa_data(
count=count,
short_name=short_name,
bbox=bbox,
temporal=temporal,
output=output,
crs=crs,
return_gdf=return_gdf,
**kwargs,
)
search_emit(bbox=None, temporal=None, count=-1, short_name='EMITL2ARFL', output=None, crs='EPSG:4326', return_gdf=False, **kwargs)
¶
Searches for NASA EMIT granules.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bbox |
List[float] |
The bounding box coordinates [xmin, ymin, xmax, ymax]. |
None |
temporal |
str |
The temporal extent of the data. |
None |
count |
int |
The number of granules to retrieve. Defaults to -1 (retrieve all). |
-1 |
short_name |
str |
The short name of the dataset. Defaults to "EMITL2ARFL". |
'EMITL2ARFL' |
output |
str |
The output file path to save the GeoDataFrame as a file. |
None |
crs |
str |
The coordinate reference system (CRS) of the GeoDataFrame. Defaults to "EPSG:4326". |
'EPSG:4326' |
return_gdf |
bool |
Whether to return the GeoDataFrame in addition to the granules. Defaults to False. |
False |
**kwargs |
Additional keyword arguments for the earthaccess.search_data() function. |
{} |
Returns:
Type | Description |
---|---|
Union[List[dict], tuple] |
The retrieved granules. If return_gdf is True, also returns the resulting GeoDataFrame. |
Source code in hypercoast/common.py
def search_emit(
bbox: Optional[List[float]] = None,
temporal: Optional[str] = None,
count: int = -1,
short_name: Optional[str] = "EMITL2ARFL",
output: Optional[str] = None,
crs: str = "EPSG:4326",
return_gdf: bool = False,
**kwargs,
) -> Union[List[dict], tuple]:
"""Searches for NASA EMIT granules.
Args:
bbox (List[float], optional): The bounding box coordinates [xmin, ymin, xmax, ymax].
temporal (str, optional): The temporal extent of the data.
count (int, optional): The number of granules to retrieve. Defaults to -1 (retrieve all).
short_name (str, optional): The short name of the dataset. Defaults to "EMITL2ARFL".
output (str, optional): The output file path to save the GeoDataFrame as a file.
crs (str, optional): The coordinate reference system (CRS) of the GeoDataFrame. Defaults to "EPSG:4326".
return_gdf (bool, optional): Whether to return the GeoDataFrame in addition to the granules. Defaults to False.
**kwargs: Additional keyword arguments for the earthaccess.search_data() function.
Returns:
Union[List[dict], tuple]: The retrieved granules. If return_gdf is True, also returns the resulting GeoDataFrame.
"""
return search_nasa_data(
count=count,
short_name=short_name,
bbox=bbox,
temporal=temporal,
output=output,
crs=crs,
return_gdf=return_gdf,
**kwargs,
)
search_nasa_data(count=-1, short_name=None, bbox=None, temporal=None, version=None, doi=None, daac=None, provider=None, output=None, crs='EPSG:4326', return_gdf=False, **kwargs)
¶
Searches for NASA Earthdata granules.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
count |
int |
The number of granules to retrieve. Defaults to -1 (retrieve all). |
-1 |
short_name |
str |
The short name of the dataset. |
None |
bbox |
List[float] |
The bounding box coordinates [xmin, ymin, xmax, ymax]. |
None |
temporal |
str |
The temporal extent of the data. |
None |
version |
str |
The version of the dataset. |
None |
doi |
str |
The Digital Object Identifier (DOI) of the dataset. |
None |
daac |
str |
The Distributed Active Archive Center (DAAC) of the dataset. |
None |
provider |
str |
The provider of the dataset. |
None |
output |
str |
The output file path to save the GeoDataFrame as a file. |
None |
crs |
str |
The coordinate reference system (CRS) of the GeoDataFrame. Defaults to "EPSG:4326". |
'EPSG:4326' |
return_gdf |
bool |
Whether to return the GeoDataFrame in addition to the granules. Defaults to False. |
False |
**kwargs |
Additional keyword arguments for the earthaccess.search_data() function. |
{} |
Returns:
Type | Description |
---|---|
Union[List[dict], tuple] |
The retrieved granules. If return_gdf is True, also returns the resulting GeoDataFrame. |
Source code in hypercoast/common.py
def search_nasa_data(
count: int = -1,
short_name: Optional[str] = None,
bbox: Optional[List[float]] = None,
temporal: Optional[str] = None,
version: Optional[str] = None,
doi: Optional[str] = None,
daac: Optional[str] = None,
provider: Optional[str] = None,
output: Optional[str] = None,
crs: str = "EPSG:4326",
return_gdf: bool = False,
**kwargs,
) -> Union[List[dict], tuple]:
"""Searches for NASA Earthdata granules.
Args:
count (int, optional): The number of granules to retrieve. Defaults to -1 (retrieve all).
short_name (str, optional): The short name of the dataset.
bbox (List[float], optional): The bounding box coordinates [xmin, ymin, xmax, ymax].
temporal (str, optional): The temporal extent of the data.
version (str, optional): The version of the dataset.
doi (str, optional): The Digital Object Identifier (DOI) of the dataset.
daac (str, optional): The Distributed Active Archive Center (DAAC) of the dataset.
provider (str, optional): The provider of the dataset.
output (str, optional): The output file path to save the GeoDataFrame as a file.
crs (str, optional): The coordinate reference system (CRS) of the GeoDataFrame. Defaults to "EPSG:4326".
return_gdf (bool, optional): Whether to return the GeoDataFrame in addition to the granules. Defaults to False.
**kwargs: Additional keyword arguments for the earthaccess.search_data() function.
Returns:
Union[List[dict], tuple]: The retrieved granules. If return_gdf is True, also returns the resulting GeoDataFrame.
"""
if isinstance(bbox, list):
bbox = tuple(bbox)
return leafmap.nasa_data_search(
count=count,
short_name=short_name,
bbox=bbox,
temporal=temporal,
version=version,
doi=doi,
daac=daac,
provider=provider,
output=output,
crs=crs,
return_gdf=return_gdf,
**kwargs,
)
search_pace(bbox=None, temporal=None, count=-1, short_name='PACE_OCI_L2_AOP_NRT', output=None, crs='EPSG:4326', return_gdf=False, **kwargs)
¶
Searches for NASA PACE granules.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bbox |
List[float] |
The bounding box coordinates [xmin, ymin, xmax, ymax]. |
None |
temporal |
str |
The temporal extent of the data. |
None |
count |
int |
The number of granules to retrieve. Defaults to -1 (retrieve all). |
-1 |
short_name |
str |
The short name of the dataset. Defaults to "PACE_OCI_L2_AOP_NRT". |
'PACE_OCI_L2_AOP_NRT' |
output |
str |
The output file path to save the GeoDataFrame as a file. |
None |
crs |
str |
The coordinate reference system (CRS) of the GeoDataFrame. Defaults to "EPSG:4326". |
'EPSG:4326' |
return_gdf |
bool |
Whether to return the GeoDataFrame in addition to the granules. Defaults to False. |
False |
**kwargs |
Additional keyword arguments for the earthaccess.search_data() function. |
{} |
Returns:
Type | Description |
---|---|
Union[List[dict], tuple] |
The retrieved granules. If return_gdf is True, also returns the resulting GeoDataFrame. |
Source code in hypercoast/common.py
def search_pace(
bbox: Optional[List[float]] = None,
temporal: Optional[str] = None,
count: int = -1,
short_name: Optional[str] = "PACE_OCI_L2_AOP_NRT",
output: Optional[str] = None,
crs: str = "EPSG:4326",
return_gdf: bool = False,
**kwargs,
) -> Union[List[dict], tuple]:
"""Searches for NASA PACE granules.
Args:
bbox (List[float], optional): The bounding box coordinates [xmin, ymin, xmax, ymax].
temporal (str, optional): The temporal extent of the data.
count (int, optional): The number of granules to retrieve. Defaults to -1 (retrieve all).
short_name (str, optional): The short name of the dataset. Defaults to "PACE_OCI_L2_AOP_NRT".
output (str, optional): The output file path to save the GeoDataFrame as a file.
crs (str, optional): The coordinate reference system (CRS) of the GeoDataFrame. Defaults to "EPSG:4326".
return_gdf (bool, optional): Whether to return the GeoDataFrame in addition to the granules. Defaults to False.
**kwargs: Additional keyword arguments for the earthaccess.search_data() function.
Returns:
Union[List[dict], tuple]: The retrieved granules. If return_gdf is True, also returns the resulting GeoDataFrame.
"""
return search_nasa_data(
count=count,
short_name=short_name,
bbox=bbox,
temporal=temporal,
output=output,
crs=crs,
return_gdf=return_gdf,
**kwargs,
)
search_pace_chla(bbox=None, temporal=None, count=-1, short_name='PACE_OCI_L3M_CHL_NRT', granule_name='*.DAY.*.0p1deg.*', output=None, crs='EPSG:4326', return_gdf=False, **kwargs)
¶
Searches for NASA PACE Chlorophyll granules.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bbox |
List[float] |
The bounding box coordinates [xmin, ymin, xmax, ymax]. |
None |
temporal |
str |
The temporal extent of the data. |
None |
count |
int |
The number of granules to retrieve. Defaults to -1 (retrieve all). |
-1 |
short_name |
str |
The short name of the dataset. Defaults to "PACE_OCI_L3M_CHL_NRT". |
'PACE_OCI_L3M_CHL_NRT' |
output |
str |
The output file path to save the GeoDataFrame as a file. |
None |
crs |
str |
The coordinate reference system (CRS) of the GeoDataFrame. Defaults to "EPSG:4326". |
'EPSG:4326' |
return_gdf |
bool |
Whether to return the GeoDataFrame in addition to the granules. Defaults to False. |
False |
**kwargs |
Additional keyword arguments for the earthaccess.search_data() function. |
{} |
Returns:
Type | Description |
---|---|
Union[List[dict], tuple] |
The retrieved granules. If return_gdf is True, also returns the resulting GeoDataFrame. |
Source code in hypercoast/common.py
def search_pace_chla(
bbox: Optional[List[float]] = None,
temporal: Optional[str] = None,
count: int = -1,
short_name: Optional[str] = "PACE_OCI_L3M_CHL_NRT",
granule_name: Optional[str] = "*.DAY.*.0p1deg.*",
output: Optional[str] = None,
crs: str = "EPSG:4326",
return_gdf: bool = False,
**kwargs,
) -> Union[List[dict], tuple]:
"""Searches for NASA PACE Chlorophyll granules.
Args:
bbox (List[float], optional): The bounding box coordinates [xmin, ymin, xmax, ymax].
temporal (str, optional): The temporal extent of the data.
count (int, optional): The number of granules to retrieve. Defaults to -1 (retrieve all).
short_name (str, optional): The short name of the dataset. Defaults to "PACE_OCI_L3M_CHL_NRT".
output (str, optional): The output file path to save the GeoDataFrame as a file.
crs (str, optional): The coordinate reference system (CRS) of the GeoDataFrame. Defaults to "EPSG:4326".
return_gdf (bool, optional): Whether to return the GeoDataFrame in addition to the granules. Defaults to False.
**kwargs: Additional keyword arguments for the earthaccess.search_data() function.
Returns:
Union[List[dict], tuple]: The retrieved granules. If return_gdf is True, also returns the resulting GeoDataFrame.
"""
return search_nasa_data(
count=count,
short_name=short_name,
bbox=bbox,
temporal=temporal,
granule_name=granule_name,
output=output,
crs=crs,
return_gdf=return_gdf,
**kwargs,
)
show_field_data(data, x_col='wavelength', y_col_prefix='(', x_label='Wavelengths (nm)', y_label='Reflectance', use_marker_cluster=True, min_width=400, max_width=600, min_height=200, max_height=250, layer_name='Marker Cluster', m=None, center=(20, 0), zoom=2)
¶
Displays field data on a map with interactive markers and popups showing time series data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Union[str, pd.DataFrame] |
Path to the CSV file or a pandas DataFrame containing the data. |
required |
x_col |
str |
Column name to use for the x-axis of the charts. Default is "wavelength". |
'wavelength' |
y_col_prefix |
str |
Prefix to identify the columns that contain the location-specific data. Default is "(". |
'(' |
x_label |
str |
Label for the x-axis of the charts. Default is "Wavelengths (nm)". |
'Wavelengths (nm)' |
y_label |
str |
Label for the y-axis of the charts. Default is "Reflectance". |
'Reflectance' |
use_marker_cluster |
bool |
Whether to use marker clustering. Default is True. |
True |
min_width |
int |
Minimum width of the popup. Default is 400. |
400 |
max_width |
int |
Maximum width of the popup. Default is 600. |
600 |
min_height |
int |
Minimum height of the popup. Default is 200. |
200 |
max_height |
int |
Maximum height of the popup. Default is 250. |
250 |
layer_name |
str |
Name of the marker cluster layer. Default is "Marker Cluster". |
'Marker Cluster' |
m |
Map |
An ipyleaflet Map instance to add the markers to. Default is None. |
None |
center |
Tuple[float, float] |
Center of the map as a tuple of (latitude, longitude). Default is (20, 0). |
(20, 0) |
zoom |
int |
Zoom level of the map. Default is 2. |
2 |
Returns:
Type | Description |
---|---|
Map |
An ipyleaflet Map with the added markers and popups. |
Source code in hypercoast/common.py
def show_field_data(
data: Union[str],
x_col: str = "wavelength",
y_col_prefix: str = "(",
x_label: str = "Wavelengths (nm)",
y_label: str = "Reflectance",
use_marker_cluster: bool = True,
min_width: int = 400,
max_width: int = 600,
min_height: int = 200,
max_height: int = 250,
layer_name: str = "Marker Cluster",
m: object = None,
center: Tuple[float, float] = (20, 0),
zoom: int = 2,
):
"""
Displays field data on a map with interactive markers and popups showing time series data.
Args:
data (Union[str, pd.DataFrame]): Path to the CSV file or a pandas DataFrame containing the data.
x_col (str): Column name to use for the x-axis of the charts. Default is "wavelength".
y_col_prefix (str): Prefix to identify the columns that contain the location-specific data. Default is "(".
x_label (str): Label for the x-axis of the charts. Default is "Wavelengths (nm)".
y_label (str): Label for the y-axis of the charts. Default is "Reflectance".
use_marker_cluster (bool): Whether to use marker clustering. Default is True.
min_width (int): Minimum width of the popup. Default is 400.
max_width (int): Maximum width of the popup. Default is 600.
min_height (int): Minimum height of the popup. Default is 200.
max_height (int): Maximum height of the popup. Default is 250.
layer_name (str): Name of the marker cluster layer. Default is "Marker Cluster".
m (Map, optional): An ipyleaflet Map instance to add the markers to. Default is None.
center (Tuple[float, float]): Center of the map as a tuple of (latitude, longitude). Default is (20, 0).
zoom (int): Zoom level of the map. Default is 2.
Returns:
Map: An ipyleaflet Map with the added markers and popups.
"""
import pandas as pd
import matplotlib.pyplot as plt
from ipyleaflet import Map, Marker, Popup, MarkerCluster
from ipywidgets import Output, VBox
# Read the CSV file
if isinstance(data, str):
data = pd.read_csv(data)
elif isinstance(data, pd.DataFrame):
pass
else:
raise ValueError("data must be a path to a CSV file or a pandas DataFrame")
# Extract locations from columns
locations = [col for col in data.columns if col.startswith(y_col_prefix)]
coordinates = [tuple(map(float, loc.strip("()").split())) for loc in locations]
# Create the map
if m is None:
m = Map(center=center, zoom=zoom)
# Function to create the chart
def create_chart(data, title):
fig, ax = plt.subplots(figsize=(10, 6)) # Adjust the figure size here
ax.plot(data[x_col], data["values"])
ax.set_title(title)
ax.set_xlabel(x_label)
ax.set_ylabel(y_label)
output = Output() # Adjust the output widget size here
with output:
plt.show()
return output
# Define a callback function to create and show the popup
def callback_with_popup_creation(location, values):
def f(**kwargs):
marker_center = kwargs["coordinates"]
output = create_chart(values, f"Location: {location}")
popup = Popup(
location=marker_center,
child=VBox([output]),
min_width=min_width,
max_width=max_width,
min_height=min_height,
max_height=max_height,
)
m.add_layer(popup)
return f
markers = []
# Add points to the map
for i, coord in enumerate(coordinates):
location = f"{coord}"
values = pd.DataFrame({x_col: data[x_col], "values": data[locations[i]]})
marker = Marker(location=coord, title=location, name=f"Marker {i + 1}")
marker.on_click(callback_with_popup_creation(location, values))
markers.append(marker)
if use_marker_cluster:
marker_cluster = MarkerCluster(markers=markers, name=layer_name)
m.add_layer(marker_cluster)
else:
for marker in markers:
m.add_layer(marker)
return m