Source code for LbAPLocal.testing

###############################################################################
# (c) Copyright 2020-2024 CERN for the benefit of the LHCb Collaboration      #
#                                                                             #
# This software is distributed under the terms of the GNU General Public      #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING".   #
#                                                                             #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization  #
# or submit itself to any jurisdiction.                                       #
###############################################################################
import importlib
import io
import json
import os
import shlex
import shutil
import subprocess
import tempfile
import zipfile
from os.path import join

import click
import requests
from apd.authentication import get_auth_headers
from LbAPCommon import parse_yaml, render_yaml, validate_yaml
from LbAPCommon.hacks import project_uses_cmt
from LbAPCommon.options_parsing import validate_options

from .utils import (
    LBAPI_HOST,
    check_production,
    check_status_code,
    create_output_dir,
    pool_xml_catalog,
    recursively_create_input,
    resolve_input_overrides,
)


[docs]def prepare_test(production_name, job_name, dependent_input=None, n_events=None): """Run a local test job for a specific production job""" # Check if production exists check_production(production_name) # Check if job actually exists in production with open(os.path.join(production_name, "info.yaml"), "rt") as fp: raw_yaml = fp.read() prod_data, checks_data = parse_yaml(render_yaml(raw_yaml)) validate_yaml(prod_data, checks_data, ".", production_name) try: job_data = prod_data[job_name] except KeyError: raise click.ClickException( f"Job {job_name} is not found for production {production_name}!" ) application_name, application_version = job_data["application"].rsplit("/", 1) application = { "name": application_name, "version": application_version, "data_pkgs": ["AnalysisProductions v999999999999"], } if "@" in application_version: app_version, binary_tag = application_version.split("@", 1) application["version"] = app_version application["binary_tag"] = binary_tag require_cmt = project_uses_cmt(application["name"], application["version"]) prodrun_conf = { "spec_version": 1, "application": application, "input": {"xml_summary_file": "summaryDaVinci_00012345_00006789_1.xml"}, "output": {"prefix": "00012345_00006789_1"}, } if isinstance(job_data["options"], dict): prodrun_conf["options"] = job_data["options"].copy() if "entrypoint" not in job_data["options"]: prodrun_conf["options"].setdefault("format", "WGProd") else: prodrun_conf["options"] = {"format": "WGProd", "files": job_data["options"]} if n_events is not None: prodrun_conf["input"]["n_of_events"] = n_events extra_files = [] override_input, output_filetype = resolve_input_overrides(prod_data, job_name) if "job_name" in job_data["input"] and dependent_input is None: dependent_input, extra_files = recursively_create_input( production_name, job_data["input"]["job_name"], job_data["input"].get("filetype"), prod_data, n_events, ) # only create output directories if this is the job that's about to be run # i.e. there are no dependent jobs that need to be run first dynamic_dir, out_dir = create_output_dir(production_name, require_cmt) for path in extra_files: shutil.copyfile(path, f"{out_dir}/{path.name}") if job_data["automatically_configure"]: if "files" not in prodrun_conf["options"]: raise NotImplementedError( "automatically_configure is not yet supported for lbexec based productions" ) params = {} if output_filetype: params["override-output-filetype"] = output_filetype response = requests.post( f"{LBAPI_HOST}/pipelines/autoconf-options/", **get_auth_headers(), json={**job_data, **({"input": override_input} if override_input else {})}, params=params, ) check_status_code(response) config_path = dynamic_dir / production_name / f"{job_name}_autoconf.py" config_path.parent.mkdir(parents=True) config_path.write_text(response.text) prodrun_conf["options"]["files"].insert( 0, join("$ANALYSIS_PRODUCTIONS_DYNAMIC", production_name, config_path.name) ) prodrun_config_path = out_dir / "prodConf_DaVinci_00012345_00006789_1.json" prodrun_conf["output"]["types"] = job_data["output"] # force to use the file chosen by the user if "bk_query" in job_data["input"] or "transform_ids" in job_data["input"]: if dependent_input is not None: prodrun_conf["input"]["files"] = [dependent_input] if "LFN:" in dependent_input: (out_dir / "pool_xml_catalog.xml").write_text( pool_xml_catalog([dependent_input]) ) prodrun_conf["input"]["xml_file_catalog"] = "pool_xml_catalog.xml" else: response = requests.post( f"{LBAPI_HOST}/pipelines/lfns/", **get_auth_headers(), json=job_data["input"], ) lfns = check_status_code(response).json() prodrun_conf["input"]["files"] = [f"LFN:{lfn}" for lfn in lfns] (out_dir / "pool_xml_catalog.xml").write_text(pool_xml_catalog(lfns)) prodrun_conf["input"]["xml_file_catalog"] = "pool_xml_catalog.xml" elif "job_name" in job_data["input"]: prodrun_conf["input"]["files"] = [dependent_input] prodrun_config_path.write_text(json.dumps(prodrun_conf)) return out_dir, prodrun_config_path.name
[docs]def prepare_reproduce(pipeline_id, production_name, job_name, test_id): click.secho( f"Reproducing test for test {pipeline_id} {production_name} {job_name}", fg="green", ) prod_url = f"{LBAPI_HOST}/pipelines/{pipeline_id}/{production_name}/" response = requests.get(prod_url, **get_auth_headers()) prod_info = check_status_code(response).json() job_url = f"{prod_url}jobs/{job_name}/" if test_id != "latest": job_url += f"tests/{test_id}" response = requests.get(job_url, **get_auth_headers()) job_info = check_status_code(response).json() if test_id == "latest": job_url += f"tests/{job_info['test']['attempt']}" tmp_dir = tempfile.mkdtemp() click.secho(f"Cloning {prod_info['repo_url']}", fg="green") subprocess.check_call(["git", "clone", prod_info["repo_url"], tmp_dir]) click.secho(f"Running test in {tmp_dir}", fg="green") os.chdir(tmp_dir) click.secho(f"Fetching and checking out {prod_info['commit']}", fg="green") subprocess.check_call(["git", "fetch", "origin", prod_info["commit"]]) subprocess.check_call(["git", "checkout", prod_info["commit"]]) check_production(production_name) require_cmt = project_uses_cmt( job_info["application_name"], job_info["application_version"] ) dynamic_dir, out_dir = create_output_dir(production_name, require_cmt) # Download the jobs output sandbox response = requests.get(f"{job_url}/zip", **get_auth_headers()) zip_data = io.BytesIO(check_status_code(response).content) with zipfile.ZipFile(zip_data) as zf: zf.extractall(out_dir) prodrun_config_path = max( out_dir.glob("prodConf*.json"), key=lambda x: x.name.split("_")[-1] ) # Write automatic configuration if requested if job_info["automatically_configure"]: filename = dynamic_dir / job_info["dynamic_options_path"] # Prevent directory traversal if str(filename.relative_to(dynamic_dir)).startswith("."): raise NotImplementedError(dynamic_dir, filename) filename.parent.mkdir(parents=True) filename.write_text(job_info["autoconf_options"]) # Download the job input input_paths = [x["path"] for x in job_info["test"]["input_files"]] if not input_paths: raise NotImplementedError("Failed to find any input files") if all("/" in x for x in input_paths): # It's an LFN so write the pool_xml_catalog.xml (out_dir / "pool_xml_catalog.xml").write_text(pool_xml_catalog(input_paths)) elif all("/" not in x for x in input_paths): if len(input_paths) != 1: raise NotImplementedError(input_paths) filename = out_dir / input_paths[0] # It's from another job so download from S3 response = requests.get( f"{prod_url}jobs/{job_info['input']['job_name']}/tests/" f"{job_info['test']['attempt']}/files/{filename.name}/url", **get_auth_headers(), ) url = check_status_code(response).json()["url"] click.secho(f"Downloading {filename.name}", fg="green") with requests.get(url, stream=True) as resp: check_status_code(resp) with filename.open("wb") as fp: shutil.copyfileobj(resp.raw, fp) prodrun_conf = json.loads(prodrun_config_path.read_text()) prodrun_conf["input"]["files"] = [filename.name] prodrun_config_path.write_text(json.dumps(prodrun_conf)) else: raise NotImplementedError(input_paths) return out_dir, prodrun_config_path.name
[docs]def enter_debugging(out_dir, prodrun_config_path, dependent_job=None): # This means the requested job depends on another job and has not been told # the location of that jobs output if dependent_job is not None: raise NotImplementedError( "Automatic input creation for job-dependent jobs is not implemented in debug mode \n" f"The requested job depends on {dependent_job['job_name']}, please provide the location " f"of the output file of a local test of {dependent_job['job_name']} by appending " '" -i <file_location>" to the debug command' ) cmd = ["lb-prod-run", prodrun_config_path, "--interactive"] click.secho(f"Running: lb-prod-run {shlex.join(cmd)}") os.chdir(out_dir) os.execlp(cmd[0], *cmd)
[docs]def do_options_parsing(env_cmd, out_dir, pkl_file, root_file, job_name, prod_data): json_file = join(out_dir, "output.json") with importlib.resources.path( "LbAPCommon.options_parsing", "gaudi_pickle_to_json.py" ) as script_path: subprocess.run( env_cmd + [ "python", script_path, "--pkl", pkl_file, "--output", json_file, "--debug", ], text=True, ) return validate_options(json_file, root_file, job_name, prod_data)