Source code for apd.command

###############################################################################
# (c) Copyright 2021-203 CERN for the benefit of the LHCb Collaboration       #
#                                                                             #
# This software is distributed under the terms of the GNU General Public      #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING".   #
#                                                                             #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization  #
# or submit itself to any jurisdiction.                                       #
###############################################################################
#
# The command line tools use the click and click-log packages for easier development
#
import json
import logging
import os
import sys
import tempfile

import click  # type: ignore[import]
import click_log  # type: ignore[import]
import requests

from .analysis_data import (
    APD_DATA_CACHE_DIR,
    APD_METADATA_CACHE_DIR,
    ApdReturnType,
    get_analysis_data,
)
from .ap_info import cache_ap_info
from .authentication import get_auth_headers, logout
from .data_cache import DataCache
from .rich_console import console, error_console

logger = logging.getLogger("apd")
click_log.basic_config(logger)


common_help = """
Variables:

APD_METADATA_CACHE_DIR: Specify the location of the information cache,
and reuse the cached information instead of reloading every time.

APD_METADATA_LIFETIME: Delay after which the cache should be considered
as invalid and reloaded.

APD_DATA_CACHE_DIR: Specify the location of the location where a copy
of the files will be kept.
"""


[docs]def common_docstr(sep="\n"): """ Append the common help to all the functions docstring """ def _decorator(func): func.__doc__ = sep.join([func.__doc__, common_help]) return func return _decorator
[docs]def exception_handler(exception_type, exception, _): # All your trace are belong to us! # your format error_console.print(f"{exception_type.__name__}: {exception}")
sys.excepthook = exception_handler def _process_common_tags(eventtype, datatype, polarity, config, name, version): """Util to simplify the parsing of common tags""" filter_tags = {} if name is not None: filter_tags["name"] = name if version is not None: filter_tags["version"] = version if eventtype != (): filter_tags["eventtype"] = eventtype if datatype != (): filter_tags["datatype"] = datatype if polarity != (): filter_tags["polarity"] = polarity if config != (): filter_tags["config"] = config return filter_tags @click.command() def cmd_login(): """Login to the Analysis Productions endpoint""" if "LBAP_CI_JOB_JWT" in os.environ and "LBAP_TOKENS_FILE" not in os.environ: _, token_file = tempfile.mkstemp(prefix="apd-", suffix=".json") os.environ["LBAP_TOKENS_FILE"] = token_file print(f"export LBAP_TOKENS_FILE={token_file}") try: r = requests.get( "https://lbap.app.cern.ch/user/", **get_auth_headers(), timeout=10, ) r.raise_for_status() console.print(f"Login successful as {r.json()['username']}") except Exception: # pylint: disable=broad-except # Ensure GitLab CI jobs exit if something goes wrong if "LBAP_CI_JOB_JWT" in os.environ: print("exit 42") raise @click.command() def cmd_logout(): """Login to the Analysis Productions endpoint""" logout() @click.command() @click.argument("cache_dir") @click.argument("working_group") @click.argument("analysis") @click.option("--date", default=None, help="analysis date in ISO 8601 format") @click_log.simple_verbosity_option(logger) @common_docstr() def cmd_cache_ap_info(cache_dir, working_group, analysis, date): """Cache the metadata for analysis production specified.""" logger.debug( "Caching %s/%s to %s for time %s", working_group, analysis, cache_dir, date, ) cache_ap_info(cache_dir, working_group, analysis, ap_date=date) @click.command() @click.argument("working_group") @click.argument("analysis") @click.option( "--cache_dir", default=os.environ.get(APD_METADATA_CACHE_DIR, None), help="Specify location of the cache for the analysis metadata", ) @click.option("--tag", default=None, help="Tag to filter datasets", multiple=True) @click.option( "--value", default=None, help="Tag value used if the name is specified", multiple=True, ) @click.option( "--eventtype", default=None, help="eventtype to filter the datasets", multiple=True ) @click.option( "--datatype", default=None, help="datatype to filter the datasets", multiple=True ) @click.option( "--polarity", default=None, help="polarity to filter the datasets", multiple=True ) @click.option( "--config", default=None, help="Config to use (e.g. lhcb or mc)", multiple=True ) @click.option("--name", default=None, help="dataset name") @click.option("--version", default=None, help="dataset version") @click.option("--date", default=None, help="analysis date in ISO 8601 format") @click_log.simple_verbosity_option(logger) @common_docstr() def cmd_list_pfns( working_group, analysis, cache_dir, tag, value, eventtype, datatype, polarity, config, name, version, date, ): """List the PFNs for the analysis, matching the tags specified. This command checks that the arguments are not ambiguous.""" # Loading the data and filtering/displaying datasets = get_analysis_data( working_group, analysis, metadata_cache=cache_dir, ap_date=date ) filter_tags = _process_common_tags( eventtype, datatype, polarity, config, name, version ) filter_tags |= dict(zip(tag, value)) for f in datasets(**filter_tags): click.echo(f) @click.command() @click.argument("working_group") @click.argument("analysis") @click.option( "--cache_dir", default=os.environ.get(APD_METADATA_CACHE_DIR, None), help="Specify location of the cache for the analysis metadata", ) @click.option("--tag", default=None, help="Tag to filter datasets", multiple=True) @click.option( "--value", default=None, help="Tag value used if the name is specified", multiple=True, ) @click.option( "--eventtype", default=None, help="eventtype to filter the datasets", multiple=True ) @click.option( "--datatype", default=None, help="datatype to filter the datasets", multiple=True ) @click.option( "--polarity", default=None, help="polarity to filter the datasets", multiple=True ) @click.option( "--config", default=None, help="Config to use (e.g. lhcb or mc)", multiple=True ) @click.option("--name", default=None, help="dataset name") @click.option("--version", default=None, help="dataset version") @click.option("--date", default=None, help="analysis date in ISO 8601 format") @click_log.simple_verbosity_option(logger) @common_docstr() def cmd_list_lfns( working_group, analysis, cache_dir, tag, value, eventtype, datatype, polarity, config, name, version, date, ): """List the LFNs for the analysis, matching the tags specified. This command checks that the arguments are not ambiguous.""" # Loading the data and filtering/displaying datasets = get_analysis_data( working_group, analysis, metadata_cache=cache_dir, ap_date=date ) filter_tags = _process_common_tags( eventtype, datatype, polarity, config, name, version ) filter_tags |= dict(zip(tag, value)) for f in datasets(**filter_tags, return_type=ApdReturnType.LFN): click.echo(f) @click.command() @click.argument("working_group") @click.argument("analysis") @click.option( "--cache_dir", default=os.environ.get(APD_METADATA_CACHE_DIR, None), help="Specify location of the cache for the analysis metadata", ) @click.option("--tag", default=None, help="Tag to filter datasets", multiple=True) @click.option( "--value", default=None, help="Tag value used if the name is specified", multiple=True, ) @click.option( "--eventtype", default=None, help="eventtype to filter the datasets", multiple=True ) @click.option( "--datatype", default=None, help="datatype to filter the datasets", multiple=True ) @click.option( "--polarity", default=None, help="polarity to filter the datasets", multiple=True ) @click.option( "--config", default=None, help="Config to use (e.g. lhcb or mc)", multiple=True ) @click.option("--name", default=None, help="dataset name") @click.option("--version", default=None, help="dataset version") @click.option("--date", default=None, help="analysis date in ISO 8601 format") @click_log.simple_verbosity_option(logger) @common_docstr() def cmd_list_samples( working_group, analysis, cache_dir, tag, value, eventtype, datatype, polarity, config, name, version, date, ): """List the samples for the analysis, matching the tags specified. This command does not check whether the data set in unambiguous""" # Loading the data and filtering/displaying datasets = get_analysis_data( working_group, analysis, metadata_cache=cache_dir, ap_date=date ) filter_tags = filter_tags = _process_common_tags( eventtype, datatype, polarity, config, name, version ) filter_tags |= dict(zip(tag, value)) matching = datasets( check_data=False, return_type=ApdReturnType.SAMPLE, **filter_tags ) click.echo(matching) @click.command() @click.argument("working_group") @click.argument("analysis") @click.option( "--cache_dir", default=os.environ.get(APD_METADATA_CACHE_DIR, None), help="Specify location of the cache for the analysis metadata", ) @click.option( "--output", default=None, help="Specify output file for the csv file", ) @click.option( "--groupby", default=None, help="Column list (comma separated) by which the dataset should be grouped (or 'all')", ) @click_log.simple_verbosity_option(logger) def cmd_dump_info(working_group, analysis, cache_dir, output, groupby): """Dump the known information about a specific analysis""" # Loading the data first datasets = get_analysis_data(working_group, analysis, metadata_cache=cache_dir) # Checking whether we need to group the data... if groupby: groupby_tags = [t.strip().lower() for t in groupby.split(",")] # Special case where we use all the tags available except name and version if "all" in groupby_tags: groupby_tags = None groups = datasets.all_samples().groupby(groupby_tags) if output: with open(output, "w") as f: json.dump({str(k): v for k, v in groups.items()}, f) else: for k, v in groups.items(): print(",".join(k)) for line in v: print(" " * 8 + str(line)) else: report = datasets.all_samples().report() # gets the report as CSV in this case, not JSON report_str = "\n".join(([",".join([str(e) for e in line]) for line in report])) if output: with open(output, "w") as f: f.write(report_str) else: print(report_str) @click.command() @click.argument("working_group") @click.argument("analysis") @click.option( "--cache_dir", default=os.environ.get(APD_METADATA_CACHE_DIR, None), help="Specify location of the cached analysis metadata", ) @click.option( "--tag", default=None, help="Tag for which the values should be listed", multiple=True, ) @click.option("--date", default=None, help="analysis date in ISO 8601 format") @click_log.simple_verbosity_option(logger) @common_docstr() def cmd_summary( working_group, analysis, cache_dir, tag, date, ): """Print a summary of the information available about the specified analysis.""" # Loading the dataset and displaying its summary datasets = get_analysis_data( working_group, analysis, metadata_cache=cache_dir, ap_date=date ) datasets = get_analysis_data( working_group, analysis, metadata_cache=cache_dir, ap_date=date ) datasets = get_analysis_data( working_group, analysis, metadata_cache=cache_dir, ap_date=date ) console.print(datasets.summary(tag)) @click.command() @click.argument("working_group") @click.argument("analysis") @click.option( "--cache_dir", default=os.environ.get("APD_METADATA_CACHE_DIR", None), help="Specify location of the cache for the analysis metadata", ) @click.option( "--data_cache_dir", default=os.environ.get(APD_DATA_CACHE_DIR, None), help="Specify location where a copy of the files will be kept", ) @click.option( "-n", "--dry-run", type=bool, default=False, is_flag=True, help="Show which file should be copied instead of doing the actual copy", ) @click.option("--tag", default=None, help="Tag to filter datasets", multiple=True) @click.option( "--value", default=None, help="Tag value used if the name is specified", multiple=True, ) @click.option( "--eventtype", default=None, help="eventtype to filter the datasets", multiple=True ) @click.option( "--datatype", default=None, help="datatype to filter the datasets", multiple=True ) @click.option( "--polarity", default=None, help="polarity to filter the datasets", multiple=True ) @click.option( "--config", default=None, help="Config to use (e.g. lhcb or mc)", multiple=True ) @click.option("--name", default=None, help="dataset name") @click.option("--version", default=None, help="dataset version") @click.option("--date", default=None, help="analysis date in ISO 8601 format") @click_log.simple_verbosity_option(logger) @common_docstr() def cmd_cache_ap_files( working_group, analysis, cache_dir, data_cache_dir, dry_run, tag, value, eventtype, datatype, polarity, config, name, version, date, ): """Cache the files for the analysis locally, matching the tags specified. This command checks that the arguments are not ambiguous.""" # pylint: disable-msg=too-many-locals if not data_cache_dir: raise ValueError("Please specify the location of the data cache") data_cache = DataCache(data_cache_dir) # Loading the data and filtering/displaying datasets = get_analysis_data( working_group, analysis, metadata_cache=cache_dir, ap_date=date ) filter_tags = filter_tags = _process_common_tags( eventtype, datatype, polarity, config, name, version ) filter_tags |= dict(zip(tag, value)) for f in datasets(check_data=False, use_local_cache=False, **filter_tags): local = data_cache.remote_to_local(f) if dry_run: print(str(f)) if local.exists(): click.echo(f"Already local for {f}: {str(local)}") click.echo(f"Would copy {f} to {data_cache.remote_to_local(f)}") else: if local.exists(): click.echo(f"Local copy for {f} {str(local)} already present.") else: click.echo(f"Copying {f} to {data_cache.remote_to_local(f)}") data_cache.copy_locally(f)