Core Functions

A set of functions available for the development of new diagnostics is provided in the aqua.diagnostics.core module.

class aqua.diagnostics.core.Diagnostic(model: str, exp: str, source: str, catalog: str | None = None, regrid: str | None = None, startdate: str | None = None, enddate: str | None = None, loglevel: str = 'WARNING')

Bases: object

Initialize the diagnostic class. This is a general purpose class that can be used by the diagnostic classes to retrieve data from a single model and to save the data to a netcdf file. It is not a working diagnostic class by itself.

Parameters:
  • model (str) – The model to be used.

  • exp (str) – The experiment to be used.

  • source (str) – The source to be used.

  • catalog (str) – The catalog to be used. If None, the catalog will be determined by the Reader.

  • regrid (str | None) – The target grid to be used for regridding. If None, no regridding will be done.

  • startdate (str | None) – The start date of the data to be retrieved. If None, all available data will be retrieved.

  • enddate (str | None) – The end date of the data to be retrieved. If None, all available data will be retrieved.

  • loglevel (str) – The log level to be used. Default is ‘WARNING’.

retrieve(var: str | None = None, reader_kwargs: dict = {}, months_required: int | None = None)

Retrieve the data from the model.

Parameters:
  • var (str | None) – The variable to be retrieved. If None, all variables will be retrieved.

  • reader_kwargs (dict) – Additional keyword arguments to be passed to the Reader.

  • months_required (int | None) – The number of months of data required. If None, no check will be performed.

self.data

The data retrieved from the model. If return_data is True, the data will be returned.

self.catalog

The catalog used to retrieve the data if no catalog was provided.

save_netcdf(data, diagnostic: str, diagnostic_product: str = None, outputdir: str = '.', rebuild: bool = True, create_catalog_entry: bool = False, dict_catalog_entry: dict = None, **kwargs)

Save the data to a netcdf file.

Parameters:
  • data (xarray Dataset or DataArray) – The data to be saved.

  • diagnostic (str) – The diagnostic name.

  • diagnostic_product (str) – The diagnostic product.

  • outputdir (str) – The path to save the data. Default is ‘.’.

  • rebuild (bool) – If True, the netcdf file will be rebuilt. Default is True.

  • create_catalog_entry (bool) – If True, a catalog entry will be created. Default is False.

  • dict_catalog_entry (dict, optional) – List of jinja and wildcard variables. Default is None. Keys are ‘jinjalist’ and ‘wildcardlist’.

Keyword Arguments:

**kwargs – Additional keyword arguments to be passed to the OutputSaver.save_netcdf method.

select_region(region: str = None, diagnostic: str = None, drop: bool = True)

Selects a geographic region from the dataset and updates self.data accordingly.

If a region name is provided, the method filters the data using the region’s predefined latitude and longitude bounds. The selected region name is stored in the dataset attributes.

It uses the _select_region method to perform the selection on the self.data attribute. Use the hidden _select_region method if you want to select a region on a different dataset.

Parameters:
  • region (str, optional) – Name of the region to select. If None, no filtering is applied.

  • diagnostic (str, optional) – Diagnostic category used to determine region bounds.

  • drop (bool, optional) – Whether to drop coordinates outside the selected region. Default is True.

Returns:

(region, lon_limits, lat_limits)

Return type:

tuple

class aqua.diagnostics.core.DiagnosticCLI(args, diagnostic_name, default_config, log_name=None)

Bases: object

Base class to centralize common CLI initialization operations.

Usage:
cli = DiagnosticCLI(

args=args, diagnostic_name=’timeseries’, config=’config_timeseries_atm.yaml’

) cli.prepare() cli.open_dask_cluster()

# Access prepared attributes logger = cli.logger config_dict = cli.config_dict outputdir = cli.outputdir …

# At the end cli.close_dask_cluster()

Initialize the CLI handler.

Parameters:
  • args – Parsed command-line arguments

  • diagnostic_name (str) – Name of the diagnostic (e.g., ‘timeseries’, ‘seaice’)

  • default_config (str) – Default config file name

  • log_name (str, optional) – Logger name. Defaults to ‘{diagnostic_name} CLI’

close_dask_cluster()

Close the dask cluster if it was opened.

dataset_args(dataset)

Helper to extract dataset arguments for diagnostics.

open_dask_cluster()

Open dask cluster if requested via CLI arguments.

Returns:

For method chaining

Return type:

self

prepare(**overrides)

Execute common setup operations (excluding cluster management).

This method: 1. Sets up logging 2. Loads and merges config 3. Extracts common options (regrid, realization, output settings)

Optional keyword arguments can be passed to override options extracted from configuration. Overrides are applied after extraction so they take precedence.

Returns:

For method chaining

Return type:

self

class aqua.diagnostics.core.OutputSaver(diagnostic: str, catalog: str | list | None = None, model: str | list | None = None, exp: str | list | None = None, realization: str | list | None = None, catalog_ref: str | list | None = None, model_ref: str | list | None = None, exp_ref: str | list | None = None, outputdir: str = '.', loglevel: str = 'WARNING')

Bases: object

Class to manage saving outputs, including NetCDF, PDF, and PNG files, with customized naming based on provided parameters and metadata.

Initialize the OutputSaver with diagnostic parameters and output directory. All the catalog, model, and experiment can be both a string or a list of strings.

Parameters:
  • diagnostic (str) – Name of the diagnostic.

  • catalog (str, list, optional) – Catalog name.

  • model (str, list, optional) – Model name.

  • exp (str, list, optional) – Experiment name.

  • realization (str, list, optional) – Realization name, can be a string or a integer. ‘r’ is appended if it is an integer.

  • catalog_ref (str, list, optional) – Reference catalog name.

  • model_ref (str, list, optional) – Reference model name.

  • exp_ref (str, list, optional) – Reference experiment name.

  • outputdir (str, optional) – Output directory. Defaults to current directory.

  • loglevel (str, optional) – Logging level. Defaults to ‘WARNING’.

create_metadata(diagnostic_product: str, extra_keys: dict | None = None, metadata: dict | None = None) dict

Create metadata dictionary for a plot or output file.

Parameters:
  • diagnostic_product (str) – Product of the diagnostic analysis.

  • extra_keys (dict, optional) – Dictionary of additional keys to include in the filename.

  • metadata (dict, optional) – Additional metadata to include in the PNG file.

generate_folder(extension: str = 'pdf')

Generate a folder for saving output files based on the specified format.

Parameters:

extension (str) – The extension of the output files (e.g., ‘pdf’, ‘png’, ‘netcdf’).

Returns:

The path to the generated folder.

Return type:

str

generate_name(diagnostic_product: str, extra_keys: dict | None = None) str

Generate a filename based on provided parameters and additional user-defined keywords

Parameters:
  • diagnostic_product (str, optional) – Product of the diagnostic analysis.

  • extra_keys (dict, optional) – Dictionary of additional keys to include in the filename.

Returns:

A string representing the generated filename.

Return type:

str

generate_path(extension: str, diagnostic_product: str, extra_keys: dict = None) str

Generate a full file path for saving output files based on the provided parameters. Simplified wrapper around generate_name and generate_folder to include the output directory.

save_figure(fig: Figure, diagnostic_product: str, extra_keys: dict | None = None, metadata: dict | None = None, save_pdf: bool = False, save_png: bool = True, rebuild: bool = True, dpi: int = 300)

Save a matplotlib figure in the specified format(s).

This method handles the format selection logic and delegates to save_pdf() and/or save_png() as needed.

Parameters:
  • fig – Matplotlib figure to save.

  • diagnostic_product (str) – Name of the diagnostic product.

  • extra_keys (dict) – Dictionary of additional keys for filename generation.

  • metadata (dict) – Dictionary of metadata to embed in the file.

  • save_pdf (bool) – Whether to save as PDF.

  • save_png (bool) – Whether to save as PNG.

  • rebuild (bool) – Whether to rebuild if file exists.

  • dpi (int) – Resolution for PNG output (ignored for PDF).

save_netcdf(dataset: Dataset, diagnostic_product: str, rebuild: bool = True, extra_keys: dict | None = None, metadata: dict | None = None, create_catalog_entry: bool = False, dict_catalog_entry: dict | None = None)

Save an xarray Dataset as a NetCDF file with a generated filename.

Parameters:
  • dataset (xr.Dataset) – The xarray Dataset to save.

  • diagnostic_product (str) – Product of the diagnostic analysis.

  • rebuild (bool, optional) – Whether to rebuild the output file if it already exists. Defaults to True.

  • extra_keys (dict, optional) – Dictionary of additional keys to include in the filename.

  • metadata (dict, optional) – Additional metadata to include in the NetCDF file.

  • create_catalog_entry (bool, optional) – Whether to create a catalog entry for the NetCDF file. Defaults to False.

  • dict_catalog_entry (dict, optional) – List of jinja and wildcard variables. Default is none.

save_pdf(fig: Figure, diagnostic_product: str, rebuild: bool = True, extra_keys: dict | None = None, metadata: dict | None = None)

Save a Matplotlib figure as a PDF.

save_png(fig: Figure, diagnostic_product: str, rebuild: bool = True, extra_keys: dict | None = None, metadata: dict | None = None, dpi: int = 300)

Save a Matplotlib figure as a PNG.

static unpack_list(value: str | list | None) str | list | None

Unpack a value that can be a string, list, or None.

Parameters:

value – The value to unpack. Can be string, list, or None.

Returns:

returns the single item - Otherwise: returns value as-is

Return type:

  • If value is a single-item list and special is None

aqua.diagnostics.core.close_cluster(client, cluster, private_cluster, loglevel: str = 'WARNING')

Close the dask cluster and client.

Parameters:
  • client (dask.distributed.Client) – dask client

  • cluster (dask.distributed.LocalCluster) – dask cluster

  • private_cluster (bool) – whether the cluster is private

  • loglevel (str) – logging level

aqua.diagnostics.core.get_diagnostic_configpath(diagnostic: str, folder='diagnostics', loglevel='WARNING') str

Get the path to the diagnostic configuration directory.

Parameters:
  • diagnostic (str) – diagnostic name

  • folder (str) – folder name. Default is “diagnostics”. Can be “tools” as well.

  • loglevel (str) – logging level. Default is ‘WARNING’.

Returns:

path to the diagnostic configuration directory

Return type:

str

aqua.diagnostics.core.load_diagnostic_config(diagnostic: str, config: str = None, default_config: str = None, folder='diagnostics', loglevel: str = 'WARNING')

Load the diagnostic configuration file and return the configuration dictionary.

Parameters:
  • diagnostic (str) – diagnostic name

  • config (str) – config argument can modify the default configuration file.

  • folder (str) – folder name. Default is “diagnostics”. Can be “tools” or “templates” as well.

  • loglevel (str) – logging level. Default is ‘WARNING’.

Returns:

configuration dictionary

Return type:

dict

aqua.diagnostics.core.merge_config_args(config: dict, args: Namespace, loglevel: str = 'WARNING') dict

Merge the configuration dictionary with the arguments of the CLI.

Parameters:
  • config (dict) – configuration dictionary

  • args (argparse.Namespace) – arguments of the CLI

  • loglevel (str) – logging level. Default is ‘WARNING’.

Returns:

merged configuration dictionary

Return type:

dict

aqua.diagnostics.core.open_cluster(nworkers, cluster, loglevel: str = 'WARNING')

Open a dask cluster if nworkers is provided, otherwise connect to an existing cluster.

Parameters:
  • nworkers (int) – number of workers

  • cluster (str) – cluster address

  • loglevel (str) – logging level

Returns:

dask client cluster (dask.distributed.LocalCluster): dask cluster private_cluster (bool): whether the cluster is private

Return type:

client (dask.distributed.Client)

aqua.diagnostics.core.round_enddate(enddate)

Round the end date to the end of the month

Parameters:

enddate (str or pandas.Timestamp) – end date for the data retrieve

Returns:

end date rounded to the end of the month

Return type:

pandas.Timestamp

aqua.diagnostics.core.round_startdate(startdate)

Round the start date to the beginning of the month

Parameters:

startdate (str or pandas.Timestamp) – start date for the data retrieve

Returns:

start date rounded to the beginning of the month

Return type:

pandas.Timestamp

aqua.diagnostics.core.start_end_dates(startdate=None, enddate=None, start_std=None, end_std=None)

Evaluate start and end dates for the reference data retrieve, in the case both are provided, to minimize the Reader calls. They should be of the form ‘YYYY-MM-DD’ or ‘YYYYMMDD’. The function will translate them to the form ‘YYYY-MM-DD’ and then use pandas Timestamp to evaluate the minimum and maximum dates.

Parameters:
  • startdate (str) – start date for the data retrieve

  • enddate (str) – end date for the data retrieve

  • start_std (str) – start date for the standard deviation data retrieve

  • end_std (str) – end date for the standard deviation data retrieve

Returns:

start and end dates for the data retrieve

Return type:

tuple (str, str)

aqua.diagnostics.core.template_parse_arguments(parser: ArgumentParser)

Add the default arguments to the parser.

Parameters:

parser – argparse.ArgumentParser

Returns:

argparse.ArgumentParser