Source code for LbAPLocal.cli

###############################################################################
# (c) Copyright 2020-2024 CERN for the benefit of the LHCb Collaboration      #
#                                                                             #
# This software is distributed under the terms of the GNU General Public      #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING".   #
#                                                                             #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization  #
# or submit itself to any jurisdiction.                                       #
###############################################################################
import importlib.metadata
import os
import re
import shlex
import shutil
import subprocess
from glob import glob
from pathlib import Path
from typing import Iterable

import click
from apd.authentication import get_auth_headers
from LbAPCommon import parse_yaml, render_yaml, validate_yaml

from .utils import (
    LBAPI_HOST,
    available_productions,
    check_production,
    inside_ap_datapkg,
    logging_subprocess_run,
    production_name_type,
    production_to_versions,
    validate_environment,
)


[docs]class NaturalOrderGroup(click.Group): """Group for showing subcommands in the correct order"""
[docs] def list_commands(self, ctx): return self.commands.keys()
@click.group(cls=NaturalOrderGroup) @click.option( "--version", is_flag=True, callback=print_version, expose_value=False, is_eager=True ) def main(): """Command line tool for the LHCb AnalysisProductions""" pass @main.command() def login(): """Login to the Analysis Productions API""" import requests r = requests.get(f"{LBAPI_HOST}/user", **get_auth_headers()) r.raise_for_status() click.secho(f"Logged in successfully as {r.json()['username']}", fg="green") @main.command() @click.argument("working_group", type=str, default=None, nargs=1, required=False) @click.argument( "production_name", type=production_name_type, default=None, nargs=1, required=False ) def versions(working_group: str, production_name: production_name_type) -> None: """List the available tags of the Analysis Productions repository by running 'lb-ap versions' \n List the available tags for a specific production by running 'lb-ap versions <WORKING_GROUP> <YOUR_PRODUCTION>' """ # TODO the comments below are to facilitate mapping each version to a production but requires some changes # in the api website at a later date if not production_name: # from apd import AnalysisData # from apd.ap_info import fetch_wg_info # versions_to_productions = {} if not working_group: raise click.ClickException( "version querying currently only works with working group," " and production name specified! lb-ap versions WORKING_GROUP PRODUCTION_NAME" ) # from LbAPCommon.config import known_working_groups # for wg in known_working_groups: # productions = fetch_wg_info(wg) # for production in productions: # dataset = AnalysisData(wg, production) # for version in dataset.available_tags["versions"]: # versions_to_productions[version] = [wg, production] else: raise click.ClickException( "version querying currently only works with working group," " and production name specified! lb-ap versions WORKING_GROUP PRODUCTION_NAME" ) # productions = fetch_wg_info(wg) # for production in productions: # dataset = AnalysisData(wg, production) # for version in dataset.available_tags["versions"]: # versions_to_productions[version] = [wg, production] # click.echo(f"The available versions are:") # for version, wg_production in versions_to_productions.items(): # click.echo(f"{version}: {wg_production[0]} {wg_production[1]}") else: versions = production_to_versions(working_group, production_name) click.echo(f"The available versions for {production_name} are: ") for version in versions: click.echo(f" {version}") @main.command() @click.argument("working_group", type=str, nargs=1) @click.argument("production_name", type=production_name_type, nargs=1) @click.argument("version", type=str, nargs=1) @click.argument("branch_name", type=str, nargs=1) @click.option( "--clone-type", type=click.Choice(["ssh", "https", "krb5"]), default="ssh" ) def clone( working_group: str, production_name: production_name_type, version: str, branch_name: str, clone_type: str = "ssh", ) -> None: """Clone the AnalysisProductions repository and do lb-ap checkout with the specified version, production and branch name.""" if version not in production_to_versions(working_group, production_name): raise click.ClickException( f"Version {version} does not correspond to the {production_name} production!." ) click.secho( f"Cloning AnalysisProductions, changing {production_name} to that," f" of {version} and checking out the {branch_name} branch.", fg="yellow", ) if clone_type == "ssh": clone_url = "ssh://git@gitlab.cern.ch:7999/lhcb-datapkg/AnalysisProductions.git" elif clone_type == "https": clone_url = "https://gitlab.cern.ch/lhcb-datapkg/AnalysisProductions.git" elif clone_type == "krb5": clone_url = "https://:@gitlab.cern.ch:8443/lhcb-datapkg/AnalysisProductions.git" else: raise NotImplementedError(clone_type) _run_git("clone", clone_url) if os.path.isdir("./AnalysisProductions"): os.chdir("./AnalysisProductions") _checkout( working_group, production_name, version, branch_name, allow_deletes=True ) else: raise click.ClickException( f"Cloning the repository failed! Check you have the correct permissions," f" to clone via {clone_type} or choose a different clone type [ssh, https, krb5]" ) @main.command() @click.argument("working_group", type=str, nargs=1) @click.argument("production_name", type=production_name_type, nargs=1) @click.argument("version", type=str, nargs=1) @click.argument("branch_name", type=str, nargs=1) @click.option("--allow-deletes/--no-allow-deletes", default=False) def checkout( working_group: str, production_name: production_name_type, version: str, branch_name: str, allow_deletes: bool, ) -> None: """Clean out the current copy of the specified production and clone the production from the specified version \n List the available tags for a specific production by running 'lb-ap versions <YOUR_PRODUCTION>' """ _checkout( working_group, production_name, version, branch_name, allow_deletes=allow_deletes, ) def _checkout( working_group: str, production_name: production_name_type, version: str, branch_name: str, *, allow_deletes: bool, ): inside_ap_datapkg() repo_dir = Path.cwd() if version not in production_to_versions(working_group, production_name): raise click.ClickException( f"Version {version} does not correspond to the {working_group} {production_name} production!" ) proc = _run_git("status", "--porcelain") if proc.stdout.strip(): raise click.ClickException( f"Current git repository is not clean: {proc.stdout}" ) _run_git("fetch", "origin", "--tags") _run_git("checkout", "-b", branch_name, "origin/master") for child in repo_dir.iterdir(): if child.name.lower() != production_name.lower(): continue if allow_deletes: shutil.rmtree(child) else: raise FileExistsError( f"{child.name} already exists in {repo_dir}. If you're ABSOLUTELY " "sure you want to delete it, run this command with --allow-deletes." ) proc = _run_git("ls-tree", "-d", version, printing=False) for line in proc.stdout.strip().splitlines(): _, git_prod_name = line.decode().split("\t") if git_prod_name.lower() == production_name.lower(): break else: raise click.ClickException( f"Failed to find match for {production_name} in {version}" ) click.secho(f"Found match for {production_name} -> {git_prod_name} in {version}!") click.secho(f"Checking out the {version} version of {production_name}", fg="yellow") _run_git("checkout", version, "--", git_prod_name) _run_git("commit", "-m", f"Restore {git_prod_name} from {version}") def _run_git( *args: Iterable[str], printing: bool = True ) -> subprocess.CompletedProcess: """Run a git command and return the output.""" cmd = ("git",) + args click.secho(f"Running {shlex.join(cmd)}", bold=True) proc = logging_subprocess_run(cmd, printing=printing) if proc.returncode != 0: raise click.ClickException(f"Failed to run git command: {proc.args}") return proc @main.command() @click.argument("production_name", type=production_name_type, default="", nargs=1) def list(production_name): """List the available production folders by running 'lb-ap list' \n List the available productions for a specific production by running 'lb-ap list <YOUR_PRODUCTION>' """ inside_ap_datapkg() if production_name: # Check if production exists check_production(production_name) click.echo(f"The available jobs for {production_name} are: ") # Get rendered yaml and find all the production names with open(os.path.join(production_name, "info.yaml"), "rt") as fp: raw_yaml = fp.read() job_data = parse_yaml(render_yaml(raw_yaml)) for job_name in job_data: click.echo(f"* {job_name}") else: click.echo("The available productions are: ") for folder in available_productions(): click.echo(f"* {folder}") @main.command() @click.argument("production_name", type=production_name_type, nargs=1) def render(production_name): """Render the info.yaml for a given production""" inside_ap_datapkg() # Check if production exists check_production(production_name) # Get rendered yaml and print click.secho(f"Rendering info.yaml for {production_name}", fg="green") with open(os.path.join(production_name, "info.yaml"), "rt") as fp: raw_yaml = fp.read() render = render_yaml(raw_yaml) click.echo(render) try: job_data = parse_yaml(render) warnings = validate_yaml(job_data, ".", production_name) except Exception: click.secho("Rendered YAML has errors!", fg="red") click.secho(f'See "lb-ap validate {production_name}" for details', fg="red") raise click.ClickException("Failed to parse and validate YAML") else: for warning in warnings or []: click.secho(f"WARNING: {warning}", fg="yellow") click.secho("YAML parsed and validated successfully", fg="green") @main.command() @click.argument("production_name", type=production_name_type, nargs=1) def validate(production_name): """Validate the configuration for a given production""" inside_ap_datapkg() # Check if production exists check_production(production_name) # Get rendered yaml and print click.secho(f"Rendering info.yaml for {production_name}", fg="green") with open(os.path.join(production_name, "info.yaml"), "rt") as fp: raw_yaml = fp.read() render = render_yaml(raw_yaml) try: job_data = parse_yaml(render) except Exception as e: click.secho("Error parsing YAML!", fg="red") raise click.ClickException(str(e)) else: click.secho("YAML parsed successfully", fg="green") try: warnings = validate_yaml(job_data, ".", production_name) except Exception as e: click.secho("Error validating YAML!", fg="red") raise click.ClickException(str(e)) else: for warning in warnings or []: click.secho(f"WARNING: {warning}", fg="yellow") click.secho("YAML validated successfully", fg="green") @main.command() @click.argument("production_name", type=production_name_type, nargs=1) @click.argument("job_name", type=str, nargs=1) @click.option("--validate/--no-validate", default=True) @click.option( "-i", "--dependent-input", default=None, nargs=1, help="Run the test on a specific input file by passing either an LFN or a path to a local file", ) @click.option( "-n", "--n-events", default=None, nargs=1, type=int, help="Run the test on specified number of events", ) def test(production_name, job_name, dependent_input, n_events, validate): """Execute a job locally""" inside_ap_datapkg() # Check if production exists check_production(production_name) # validate test environment if validate: validate_environment() from .log_parsing import show_log_advice from .testing import prepare_test out_dir, prodrun_config_path = prepare_test( production_name, job_name, dependent_input, n_events ) cmd = ["lb-prod-run", prodrun_config_path, "--verbose"] click.secho(f"Starting application with: {shlex.join(cmd)}", fg="green") result = logging_subprocess_run(cmd, cwd=out_dir) (out_dir / "stdout.log").write_bytes(result.stdout) (out_dir / "stderr.log").write_bytes(result.stderr) click.secho("Summary of log messages:", fg="green") show_log_advice( b"\n".join([result.stdout, result.stderr]).decode(errors="backslashreplace") ) if result.returncode != 0: raise click.ClickException("Execution failed, see above for details") # Obtain the right output file name with open(os.path.join(production_name, "info.yaml"), "rt") as fp: raw_yaml = fp.read() # TODO do we really need to redo parsing and validating when prepare test already does it? prod_data = parse_yaml(render_yaml(raw_yaml)) yaml_warnings = validate_yaml(prod_data, ".", production_name) try: job_data = prod_data[job_name] except KeyError: raise click.ClickException( f"Job {job_name} is not found for production {production_name}!" ) output_file = job_data["output"][0] # Validating Gaudi options test_ntuple_path = out_dir / f"00012345_00006789_1.{output_file}" for file_path in glob(str(out_dir / "*")): if re.match( str(test_ntuple_path), file_path, re.IGNORECASE, ): test_ntuple_path = file_path break else: raise click.ClickException( "ERROR: The expected output file does not exist!\n" f"Expected: {test_ntuple_path}" ) errors, warnings = [], [] # TODO: This hasn't been updated to work with lb-prod-run click.secho("Options parsing is currently unavailable", fg="yellow") # if ".root" in test_ntuple_path.lower(): # errors, warnings = do_options_parsing( # # TODO # env_cmd, # out_dir, # join(out_dir, "output.pkl"), # test_ntuple_path, # job_name, # prod_data, # ) warnings.extend(yaml_warnings or []) for warning in warnings or []: click.secho(f"WARNING: {warning}", fg="yellow") if errors: raise click.ClickException( "Found the following errors when parsing the options:" + "\n * " + "\n * ".join(errors) ) if not any([warnings, errors]): click.secho( "No problems found while validating the options and output file!", fg="green", ) # The functionality of job-dependent job testing relies on out_dir being written # in the line below as the final word so please be careful if changing the below line click.secho(f"Success! Output can be found in {out_dir}", fg="green") @main.command() @click.argument("production_name", type=production_name_type, nargs=1) @click.argument("job_name", type=str, nargs=1) @click.option("--validate/--no-validate", default=True) @click.option( "-i", "--dependent-input", default=None, nargs=1, help="Run the test on a specific input file by passing either an LFN or a path to a local file", ) def debug(production_name, job_name, dependent_input, validate): """Start an interactive session inside the job's environment""" inside_ap_datapkg() check_production(production_name) if validate: validate_environment() from .testing import enter_debugging, prepare_test enter_debugging(*prepare_test(production_name, job_name, dependent_input)) @main.command() @click.argument("pipeline_id", type=int, nargs=1) @click.argument("production_name", type=production_name_type, nargs=1) @click.argument("job_name", type=str, nargs=1) @click.argument("test_id", type=str, nargs=1, default="latest") @click.option("--validate/--no-validate", default=True) def reproduce(pipeline_id, production_name, job_name, test_id, validate): """Reproduce an existing online test locally""" if validate: validate_environment() from .testing import enter_debugging, prepare_reproduce enter_debugging(*prepare_reproduce(pipeline_id, production_name, job_name, test_id)) @main.command() @click.argument("log_fn", type=click.Path(exists=True)) @click.option( "--suppress", type=int, default=5, show_default=True, help="Minimum number of instances required to show ERROR and WARNING messages", ) def parse_log(log_fn, suppress): """Read a Gaudi log file and extract information""" from .log_parsing import show_log_advice with open(log_fn, "rt") as fp: log_text = fp.read() click.echo(f"Summary of log messages in: {log_fn}") show_log_advice(log_text, suppress)