Source code for LbProdRun

###############################################################################
# (c) Copyright 2021 CERN for the benefit of the LHCb Collaboration           #
#                                                                             #
# This software is distributed under the terms of the GNU General Public      #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING".   #
#                                                                             #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization  #
# or submit itself to any jurisdiction.                                       #
###############################################################################
import os
import shlex
import shutil
from pathlib import Path
from pprint import pformat

import typer
import yaml
from LbEnv.ProjectEnv.lookup import findDataPackage  # type: ignore
from LbEnv.ProjectEnv.script import decodePkg  # type: ignore

from .models import JobSpecV1, read_jobspec


[docs]def run_job( spec_file: Path, *, dry_run=False, gaudi_dry_run=False, verbose=False, interactive=False, prmon=False, perf=False, ): job_spec = read_jobspec(spec_file) if verbose: typer.secho("Expanded spec file as:", fg=typer.colors.GREEN) typer.secho(pformat(job_spec.model_dump())) execute( job_spec, dry_run=dry_run, gaudi_dry_run=gaudi_dry_run, interactive=interactive, prmon=prmon, perf=perf, )
def _write_prod_conf_options(job_spec, path: Path, verbose=False): """Write an options file for the ProdConf data package""" data = { "Application": job_spec.application.name, "AppVersion": job_spec.application.version, "OptionFormat": job_spec.options.format, "InputFiles": job_spec.input.files, "OutputFilePrefix": job_spec.output.prefix, "OutputFileTypes": job_spec.output.types, "XMLSummaryFile": job_spec.input.xml_summary_file, "XMLFileCatalog": job_spec.input.xml_file_catalog, "HistogramFile": job_spec.output.histogram_file, "DDDBTag": job_spec.db_tags.dddb_tag, "CondDBTag": job_spec.db_tags.conddb_tag, "DQTag": job_spec.db_tags.dq_tag, "NOfEvents": job_spec.input.n_of_events, "TCK": job_spec.input.tck, "ProcessingPass": job_spec.options.processing_pass, } if job_spec.input.seeds: if job_spec.application.name.lower() != "gauss": raise NotImplementedError( "ProdConf simulation seeds are only supported for Gauss applications" ) max_n_events = job_spec.input.seeds.max_n_events or job_spec.input.n_of_events data["RunNumber"] = ( job_spec.input.seeds.production_id * 100 + job_spec.input.seeds.prod_job_id ) data["FirstEventNumber"] = ( max_n_events * (job_spec.input.seeds.prod_job_id - 1) + 1 ) if ( job_spec.input.run_number is not None and job_spec.input.run_number != data["RunNumber"] ): raise ValueError( "Run number in input does not match the one derived from seeds" f" {job_spec.input.run_number} != {data['RunNumber']}" ) if ( job_spec.input.first_event_number is not None and job_spec.input.first_event_number != data["FirstEventNumber"] ): raise ValueError( "First event number in input does not match the one derived from seeds" f" {job_spec.input.first_event_number} != {data['FirstEventNumber']}" ) else: data["RunNumber"] = job_spec.input.run_number data["FirstEventNumber"] = job_spec.input.first_event_number if job_spec.application.uses_gaudi_mp: data["NThreads"] = 1 else: data["NThreads"] = job_spec.application.number_of_processors lines = ["from ProdConf import ProdConf", ""] lines += ["ProdConf("] lines += [f" {k}={v!r}," for k, v in data.items() if v is not None] lines += [")"] string = "\n".join(lines) if verbose: typer.secho(f"Going to write in {path}", fg=typer.colors.GREEN) typer.secho(string) path.write_text(string)
[docs]def execute( job_spec, *, dry_run=False, gaudi_dry_run=False, interactive=False, prmon=False, perf=False, ): command = [] if not prmon: pass elif interactive: typer.secho("Not using prmon as this is an interactive run!", fg="yellow") elif shutil.which("prmon") is None: typer.secho("Not using prmon as it wasn't found on $PATH!", fg="yellow") else: command += ["prmon"] command += ["--interval", os.environ.get("LBPRODRUN_PRMON_INTERVAL", "60")] command += ["--filename", f"prmon_{job_spec.output.prefix}.txt"] command += ["--json-summary", f"prmon_{job_spec.output.prefix}.json"] command += ["--"] perf = os.environ.get("LBPRODRUN_PERF", str(perf)).lower() in ( "1", "true", "yes", ) perf_command = [] perf_executable = os.environ.get("LBPRODRUN_PERF_EXE", "perf") if not perf: pass elif not (os.path.isfile(perf_executable) or shutil.which(perf_executable)): typer.secho( f"Not using perf as '{perf_executable}' wasn't found on $PATH or as an executable file!", fg="yellow", ) else: perf_command += [ perf_executable, "record", "-o", f"perf_{job_spec.output.prefix}.data", "-F", os.environ.get("LBPRODRUN_PERF_FREQUENCY", "99"), "-g", "--call-graph", os.environ.get("LBPRODRUN_PERF_CALL_GRAPH", "dwarf,16384"), "--", ] if isinstance(job_spec.application, JobSpecV1.FullDevApplication): command += [str(job_spec.application.run_script.absolute())] elif ( isinstance(job_spec.application, JobSpecV1.ReleaseApplication) and job_spec.application.is_lbconda ): env_name, env_version = job_spec.application.is_lbconda command += ["lb-conda"] command += [f"{env_name}/{env_version}"] if job_spec.application.data_pkgs: command += ["xenv"] for pkg_name, pkg_vers in map(decodePkg, job_spec.application.data_pkgs): xml_name = pkg_name.replace("/", "_") + ".xenv" xml_path = os.path.join(findDataPackage(pkg_name, pkg_vers), xml_name) if not os.path.exists(xml_path): # fall back on the old conventional name xml_path = xml_path[:-5] + "Environment.xml" # FIXME: xenv has got problems with unicode filenames command += [f"--xml={xml_path}"] else: command += ["lb-run"] command += ["--siteroot=/cvmfs/lhcb.cern.ch/lib/"] command += ["-c", f"{job_spec.application.binary_tag}"] for ep in job_spec.application.data_pkgs: command += [f"--use={ep}"] if isinstance(job_spec.options, JobSpecV1.LegacyOptions): command += ["--use=ProdConf"] if isinstance(job_spec.application, JobSpecV1.ReleaseApplication): typer.secho( f"Executing application {job_spec.application.name} " f"{job_spec.application.version} for binary tag configuration " f"{job_spec.application.binary_tag}" ) if job_spec.application.nightly: command += [f"--nightly={job_spec.application.nightly}"] app = job_spec.application.name if job_spec.application.version: app += "/" + job_spec.application.version command += [app] elif isinstance(job_spec.application, JobSpecV1.LbDevApplication): typer.secho( f"Executing custom application with {job_spec.application.run_script}" ) command += ["--path-to-project"] command += [str(job_spec.application.project_base.absolute())] else: raise NotImplementedError(type(job_spec.application)) if job_spec.application.name == "Franklin": # Franklin is a special case, we don't know what it's based on so # assume the options are correctly defined is_lbexec = isinstance(job_spec.options, JobSpecV1.LbExecOptions) else: is_lbexec = job_spec.application.is_lbexec if is_lbexec: inner_command = _make_lbexec_command(job_spec, dry_run=gaudi_dry_run) else: inner_command = _make_gaudirun_command(job_spec, dry_run=gaudi_dry_run) inner_command = perf_command + inner_command if interactive: command += ["bash", "--norc", "--noprofile"] typer.secho("Starting application environment with:") typer.secho(shlex.join(command)) typer.secho("#" * 80) typer.secho("Entering interactive mode, now run:", fg="green") typer.secho(shlex.join(inner_command)) # add some examples of useful commands like running valgrind # perf, etc: typer.secho("\nExamples of useful commands:", fg="green") typer.secho("-" * 80) typer.secho("Run the application normally:") typer.secho(f" {shlex.join(inner_command)}") typer.secho() typer.secho("Run under valgrind:") typer.secho( " export VALGRIND_LIB=$(which valgrind | xargs dirname | xargs dirname)/libexec/valgrind" ) typer.secho( f" valgrind --tool=memcheck --trace-children=yes --leak-check=full --num-callers=250 --show-leak-kinds=all --track-origins=yes {shlex.join(inner_command)} 2>&1 | tee valgrind_{job_spec.output.prefix}.log" ) typer.secho() def lbconda_which(cmd): try: import subprocess return ( subprocess.check_output( f"lb-conda default which {cmd}", shell=True, text=True ) .strip() .splitlines()[0] ) except Exception: return None heaptrack_found = lbconda_which("heaptrack") if heaptrack_found: typer.secho("Run with heaptrack:") typer.secho(f" {shlex.join(inner_command)} &") typer.secho(" ATTACH_TO_PID=$!; sleep 2;") typer.secho(f" {heaptrack_found} -p $ATTACH_TO_PID") typer.secho() typer.secho("Run with perf:") typer.secho( f" perf record -o perf_interactive.data -F 99 -g --call-graph=dwarf,16384 -- {' '.join(inner_command)}" ) typer.secho("#" * 80) else: typer.secho("Executing command:", fg="green") command += inner_command typer.secho(shlex.join(command)) if dry_run: typer.secho("Exiting early as this is a dry run!", fg="yellow") return os.execvpe(command[0], command, _prepare_env())
def _make_lbexec_command(job_spec, dry_run=False): if not isinstance(job_spec.options, JobSpecV1.LbExecOptions): raise NotImplementedError( "New-style options spec is required for " f"{job_spec.application.name}/{job_spec.application.version}" ) options_yaml_fn = Path(f"lbexec_options_{job_spec.output.prefix}.yaml") _write_options_yaml(job_spec, options_yaml_fn, verbose=False) command = ["lbexec"] if dirname := os.environ.get("LBPRODRUN_OPTIONS_DUMP_DIR"): options_dump = Path(dirname) / f"{job_spec.output.prefix}_dump.json" command += ["--export", str(options_dump)] if dry_run: command += ["--dry-run"] command += [job_spec.options.entrypoint, str(options_yaml_fn)] return command + job_spec.options.extra_args def _write_options_yaml(job_spec, path: Path, verbose=False): """Write an options file for lbexec""" options = job_spec.options.extra_options.copy() # FIXME: Can't actually use this option yet as it doesn't fail if the keys are missing # options["write_decoding_keys_to_git"] = False if job_spec.input.files: options["input_files"] = job_spec.input.files if job_spec.input.run_number and job_spec.application.supports_input_run_number: options["input_run_number"] = job_spec.input.run_number if job_spec.db_tags.dddb_tag: options["dddb_tag"] = job_spec.db_tags.dddb_tag if job_spec.db_tags.conddb_tag: options["conddb_tag"] = job_spec.db_tags.conddb_tag if job_spec.db_tags.dq_tag: options["dq_tag"] = job_spec.db_tags.dq_tag if job_spec.input.n_of_events > 0: options["evt_max"] = job_spec.input.n_of_events if job_spec.input.first_event_number: options["first_evt"] = job_spec.input.first_event_number if job_spec.input.seeds: options["seeds"] = job_spec.input.seeds.model_dump() if job_spec.output.histogram_file: options["histo_file"] = job_spec.output.histogram_file if job_spec.application.event_timeout: options["event_timeout"] = job_spec.application.event_timeout if job_spec.input.xml_summary_file: options["xml_summary_file"] = job_spec.input.xml_summary_file if job_spec.input.xml_file_catalog: options["xml_file_catalog"] = job_spec.input.xml_file_catalog if job_spec.output.compression: options["compression"] = job_spec.output.compression if job_spec.application.number_of_processors: options["n_threads"] = job_spec.application.number_of_processors if not job_spec.application.is_lbconda: options["msg_svc_format"] = "%u % F%18W%S %7W%R%T %0W%M" options["msg_svc_time_format"] = "%Y-%m-%dT%H:%M:%S.%fZ" prefix = job_spec.output.prefix output_filetypes = [] for filetype in job_spec.output.types: if filetype.lower().endswith("hist"): if "histo_file" in options: raise NotImplementedError("Multiple histogram files found") options["histo_file"] = f"{prefix}.{filetype}" elif filetype.lower().endswith(".root") and not job_spec.application.is_lbconda: ntuple_file = f"{prefix}.{filetype}" if "ntuple_file" in options and ntuple_file != options["ntuple_file"]: raise NotImplementedError("Multiple ntuple files found") options["ntuple_file"] = ntuple_file else: output_filetypes.append(filetype) if len(output_filetypes) == 1: options["output_file"] = f"{prefix}.{output_filetypes[0]}" elif len(output_filetypes) > 1: split = [tuple(x.rsplit(".", 1)) for x in output_filetypes] lens = {len(s) for s in split} if len(lens) > 1: raise ValueError("Inconsistent values in OutputFileTypes") if lens.pop() != 2: # no '.' in the OutputFileTypes raise ValueError("Different OutputFileTypes not supported") _, ext = list(zip(*split)) if len(set(ext)) != 1: raise ValueError("Inconsistent extensions in OutputFileTypes") options["output_file"] = f"{prefix}.{{stream}}.{ext[0].lower()}" string = yaml.safe_dump(options) if verbose: typer.secho(f"Going to write in {path}", fg=typer.colors.GREEN) typer.secho(string) path.write_text(string) def _make_gaudirun_command(job_spec, dry_run=False): if not isinstance(job_spec.options, JobSpecV1.LegacyOptions): raise NotImplementedError( "Old-style options spec is required for " f"{job_spec.application.name}/{job_spec.application.version}" ) prod_conf_fn = Path(f"prodConf_{job_spec.output.prefix}.py") _write_prod_conf_options(job_spec, prod_conf_fn, verbose=False) command = [] command += job_spec.options.command if job_spec.options.command[0] == "gaudirun.py": if dirname := os.environ.get("LBPRODRUN_OPTIONS_DUMP_DIR"): options_dump = Path(dirname) / f"{job_spec.output.prefix}_dump.py" command += ["--output", str(options_dump)] if dry_run: if job_spec.options.command[0] != "gaudirun.py": raise ValueError("Dry run is only supported for gaudirun.py") command += ["--dry-run"] if job_spec.application.uses_gaudi_mp: if job_spec.application.number_of_processors > 1: command += ["--ncpus", f"{job_spec.application.number_of_processors}"] command += job_spec.options.files command += [str(prod_conf_fn)] extra_options = job_spec.options.gaudi_extra_options or "" if job_spec.application.event_timeout: extra_options = "\n".join( [ extra_options, "from Configurables import StalledEventMonitor", f"StalledEventMonitor(EventTimeout={job_spec.application.event_timeout})", ] ) if extra_options: extra_options_path = Path("gaudi_extra_options.py") extra_options_path.write_text(extra_options, encoding="utf-8") command += [str(extra_options_path)] return command def _prepare_env(): """Get a dictionary containing the environment that should be used for the job""" env = os.environ.copy() # Versions of Brunel used for 2018 data use XGBoost which uses OpenMP to # provide parallelism and automatically spawns one thread for each CPU. # Use OMP_NUM_THREADS to force it to only use one thread env["OMP_NUM_THREADS"] = "1" return env