Source code for LbAPCommon.parsing

###############################################################################
# (c) Copyright 2021-2022 CERN for the benefit of the LHCb Collaboration      #
#                                                                             #
# This software is distributed under the terms of the GNU General Public      #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING".   #
#                                                                             #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization  #
# or submit itself to any jurisdiction.                                       #
###############################################################################

import re
from collections import OrderedDict
from os.path import isfile, join, relpath

import jinja2
import yaml
from strictyaml import (
    Any,
    Bool,
    Enum,
    Float,
    Int,
    Map,
    MapPattern,
    Optional,
    Regex,
    Seq,
    Str,
    load,
)

from LbAPCommon import config
from LbAPCommon.linting.bk_paths import validate_bk_query

RE_APPLICATION = r"^(([A-Za-z]+/)+v\d+r\d+(p\d+)?(\@[a-z0-9_\-\+]+)?)|(lb\-conda/[A-Za-z0-9_]+/(\d\d\d\d\-\d\d-\d\d))"
RE_JOB_NAME = r"^[a-zA-Z0-9][a-zA-Z0-9_\-]+$"
RE_OUTPUT_FILE_TYPE = (
    r"^([A-Za-z][A-Za-z0-9_]+\.)+((ROOT|root|HIST|hist)|.?(DST|dst|mdf|MDF))$"
)
RE_OPTIONS_FN = r"^\$?[a-zA-Z0-9/\.\-\+\=_]+$"
RE_INFORM = r"^(?:[a-zA-Z]{3,}|[^@\s]+@[^@\s]+\.[^@\s]+)$"

RE_ROOT_IN_TES = r"^\/.+$"
RE_DDDB_TAG = r"^.{1,50}$"
RE_CONDDB_TAG = r"^.{1,50}$"

RE_COMMENT = r"(.{1,5000})"
DQ_FLAGS_SCHEMA = Seq(Enum(["BAD", "OK", "EXPRESS_OK", "UNCHECKED"]))

RE_INCLUSIVE_RUN_RANGE = r"\d+:\d+"

LEGACY_OPTIONS = {
    Optional("command"): Seq(Str()),
    "files": Seq(Regex(RE_OPTIONS_FN)),
}
LBEXEC_OPTIONS = {
    "entrypoint": Regex(r".+:.+"),
    Optional("extra_options"): MapPattern(Str(), Any()),
    Optional("extra_args"): Seq(Str()),
}

BASE_JOB_SCHEMA = {
    "application": Regex(RE_APPLICATION),
    "input": MapPattern(Str(), Any()),
    "output": Regex(RE_OUTPUT_FILE_TYPE) | Seq(Regex(RE_OUTPUT_FILE_TYPE)),
    "options": Regex(RE_OPTIONS_FN)
    | Seq(Regex(RE_OPTIONS_FN))
    | MapPattern(Str(), Any()),
    "wg": Enum(config.known_working_groups),
    "inform": Regex(RE_INFORM) | Seq(Regex(RE_INFORM)),
    # Automatic configuration
    "automatically_configure": Bool(),
    "turbo": Bool(),
    Optional("root_in_tes"): Regex(RE_ROOT_IN_TES),
    Optional("simulation"): Bool(),
    Optional("luminosity"): Bool(),
    Optional("data_type"): Enum(config.known_data_types),
    Optional("input_type"): Enum(config.known_input_types),
    Optional("dddb_tag"): Regex(RE_DDDB_TAG),
    Optional("conddb_tag"): Regex(RE_CONDDB_TAG),
    Optional("checks"): Seq(Str()),  # TODO: replace this with a regex
    Optional("extra_checks"): Seq(
        Str()
    ),  # TODO: replace this with a regex or with something like the line below
    # Production submission metadata
    Optional("comment"): Regex(RE_COMMENT),
    Optional("tags"): MapPattern(Str(), Str()),
    "priority": Enum(config.allowed_priorities),
    "completion_percentage": Float(),
}
INPUT_SCHEMAS = {
    "bk_query": Map(
        {
            "bk_query": Str(),
            Optional("n_test_lfns"): Int(),
            Optional("dq_flags"): DQ_FLAGS_SCHEMA,
            Optional("smog2_state"): Seq(Str()),
            Optional("extended_dq_ok"): Seq(Str()),
            Optional("runs"): Seq(Int() | Regex(RE_INCLUSIVE_RUN_RANGE)),
            Optional("start_run"): Int(),
            Optional("end_run"): Int(),
            Optional("input_plugin", default="default"): Enum(["default", "by-run"]),
            Optional("keep_running", default=True): Bool(),
            Optional("sample_fraction"): Float(),
            Optional("sample_seed"): Str(),
        }
    ),
    "sample": Map(
        {
            "wg": Str(),
            "analysis": Str(),
            Optional("name"): Str(),
            Optional("version"): Str(),
            Optional("tags"): MapPattern(Str(), Str()),
            Optional("n_test_lfns"): Int(),
            Optional("input_plugin", default="default"): Enum(["default", "by-run"]),
            Optional("keep_running", default=True): Bool(),
        }
    ),
    "job_name": Map(
        {"job_name": Str(), Optional("filetype"): Regex(RE_OUTPUT_FILE_TYPE)}
    ),
    "transform_ids": Map(
        {
            "transform_ids": Seq(Int()),
            "filetype": Regex(RE_OUTPUT_FILE_TYPE),
            Optional("n_test_lfns"): Int(),
            Optional("dq_flags"): DQ_FLAGS_SCHEMA,
            Optional("smog2_state"): Seq(Str()),
            Optional("runs"): Seq(Int() | Regex(RE_INCLUSIVE_RUN_RANGE)),
            Optional("start_run"): Int(),
            Optional("end_run"): Int(),
            Optional("sample_fraction"): Float(),
            Optional("sample_seed"): Str(),
        }
    ),
}
DEFAULT_JOB_VALUES = {
    "automatically_configure": False,
    "turbo": False,
    "completion_percentage": 100,
    "priority": "1b",
}

CHECK_TYPE_SCHEMAS = {
    "range": {
        Optional("mode"): Enum(config.validation_modes),
        "expression": Str(),  # TODO: replace this with a regex
        "limits": Map({"min": Float(), "max": Float()}),
        Optional("n_bins"): Int(),
        Optional("blind_ranges"): Map({"min": Float(), "max": Float()})
        | Seq(Map({"min": Float(), "max": Float()})),
        Optional("exp_mean"): Float(),
        Optional("exp_std"): Float(),
        Optional("mean_tolerance"): Float(),
        Optional("std_tolerance"): Float(),
    },
    "range_nd": {
        Optional("mode"): Enum(config.validation_modes),
        "expressions": Map(
            {  # TODO: replace Str() with a regex
                "x": Str(),
                "y": Str(),
                Optional("z"): Str(),
            }
        ),
        "limits": Map(
            {
                "x": Map({"min": Float(), "max": Float()}),
                "y": Map({"min": Float(), "max": Float()}),
                Optional("z"): Map({"min": Float(), "max": Float()}),
            }
        ),
        Optional("n_bins"): Map(
            {
                "x": Int(),
                "y": Int(),
                Optional("z"): Int(),
            }
        ),
        Optional("blind_ranges"): Seq(
            Map(
                {
                    "x": Map({"min": Float(), "max": Float()}),
                    "y": Map({"min": Float(), "max": Float()}),
                    Optional("z"): Map({"min": Float(), "max": Float()}),
                }
            )
        ),
    },
    "num_entries": {
        Optional("mode"): Enum(config.validation_modes),
        "count": Int(),
    },
    "num_entries_per_invpb": {
        Optional("mode"): Enum(config.validation_modes),
        "count_per_invpb": Float(),
        Optional("lumi_pattern"): Str(),
    },
    "range_bkg_subtracted": {
        Optional("mode"): Enum(config.validation_modes),
        "expression": Str(),
        "limits": Map({"min": Float(), "max": Float()}),
        "expr_for_subtraction": Str(),
        "mean_sig": Float(),
        "background_shift": Float(),
        "background_window": Float(),
        "signal_window": Float(),
        Optional("n_bins"): Int(),
        Optional("blind_ranges"): Map({"min": Float(), "max": Float()})
        | Seq(Map({"min": Float(), "max": Float()})),
    },
    "branches_exist": {
        Optional("mode"): Enum(config.validation_modes),
        "branches": Seq(Str()),
    },
}
BASIC_VALIDATION_SCHEMAS = {
    validation_type: {"mode": Enum(config.validation_modes)}
    for validation_type in config.validation_types
}
CHECK_TYPE_SCHEMAS = {**CHECK_TYPE_SCHEMAS, **BASIC_VALIDATION_SCHEMAS}
BASE_CHECK_SCHEMA = {
    "type": Enum(list(CHECK_TYPE_SCHEMAS)),
    Optional("tree_pattern"): Str(),
}
BASE_CHECK_DEFAULT_VALUES = {
    "tree_pattern": r"(.*/DecayTree)|(.*/MCDecayTree)",
}
CHECK_TYPE_DEFAULT_VALUES = {
    "mode": "Strict",
    "num_entries": {},
    "range": {
        "n_bins": 50,
    },
    "range_nd": {
        "n_bins": {
            "x": 50,
            "y": 50,
            "z": 50,
        },
    },
    "num_entries_per_invpb": {
        "lumi_pattern": r"(.*/LumiTuple)",
    },
    "range_bkg_subtracted": {
        "n_bins": 50,
    },
    "branches_exist": {},
}
BASIC_VALIDATION_DEFAULT_VALUES = {
    validation_type: {"mode": "Strict"} for validation_type in config.validation_types
}
CHECK_TYPE_DEFAULT_VALUES = {
    **CHECK_TYPE_DEFAULT_VALUES,
    **BASIC_VALIDATION_DEFAULT_VALUES,
}


def _ordered_dict_to_dict(a):
    if isinstance(a, (OrderedDict, dict)):
        return {k: _ordered_dict_to_dict(v) for k, v in a.items()}
    elif isinstance(a, (list, tuple)):
        return [_ordered_dict_to_dict(v) for v in a]
    else:
        return a


[docs]def render_yaml(raw_yaml): """Render a "raw" YAML jinja template. Accepts LbAP yaml configuration jinja template and renders it into a full YAML configuration. Args: raw_yaml (str): YAML jinja-template string Raises: ValueError: raised if jinja2 couldn't render the raw_yaml string. Returns: str: a jinja-rendered YAML string. """ try: rendered_yaml = jinja2.Template( raw_yaml, undefined=jinja2.StrictUndefined ).render() except jinja2.TemplateError as e: raise ValueError( "Failed to render with jinja2 on line %s: %s" % (getattr(e, "lineno", "unknown"), e) ) from e return rendered_yaml
def _validate_proc_pass_map(job_names, proc_pass_map): """Build processing paths and validate them from a processing pass map. Given a list of step job names (in correct order), and the processing pass map, build the processing path for each step and verify the length is below 100. Args: job_names (list[str]): a list containing step job names. proc_pass_map (dict): A dictionary mapping job names to processing pass Raises: ValueError: raised if the processing path length is over 100 characters """ for i, job_name in enumerate(job_names): proc_passes = map(proc_pass_map.get, job_names[:i] + [job_name]) pro_path = "/".join(proc_passes) if len(pro_path) >= 100: proc_pass = proc_pass_map[job_name] step_jobs_list = " - " + "\n - ".join(job_names) raise ValueError( f"The expected processing path length for the job {job_name!r} is too long.\n" "DIRAC requires this to be less than 100 characters.\n\n" f"'Step' jobs:\n{step_jobs_list!r}\n" f"Job name: {job_name!r}\n" f"Processing pass for this step: {proc_pass!r}\n" f"Processing path for this step ({len(pro_path)} chars): {pro_path}\n\n" "To recover from this issue, consider:" " - Removing redundant information from your job name.\n" " - Shortening your job names.\n" " - If the offending job depends on output from other jobs, ensure that they have a common prefix.\n" ) def create_proc_pass_map(job_names, version, default_proc_pass="default"): """Create a job name to processing pass map. Given a list of step job names and the production version, produce a job_name --> processing pass mapping. Args: job_names (list): step job names version (str): LbAPproduction version default_proc_pass (str, optional): the default processing pass. Defaults to "default". Returns: dict: a step job name to processing pass map """ proc_pass_prefix = f"AnaProd-{version}-" proc_pass_map = {} # dummy_version = "v0r0p00000000" def clean_proc_pass(i, original_job_name): # attempt to remove redundant information from the job name job_name = re.sub( r"([0-9]{8})|(MagUp|MagDown|MU|MD)|((^|[^0*9])201[125678]($|[^0*9]))", "", original_job_name, ) # Remove repeated separator chatacters job_name = re.sub(r"([-_])[-_]+", r"\1", job_name).strip("_-") if i == 0: return f"{proc_pass_prefix}{job_name}" proc_pass = job_name for previous_job_name in job_names[:i]: size = 0 previous_proc_pass = proc_pass_map[previous_job_name] # Remove the prefix if this is the first job if previous_proc_pass.startswith(proc_pass_prefix): previous_proc_pass = previous_proc_pass[len(proc_pass_prefix) :] # Look for a common prefix and remove it for last, this in zip(previous_proc_pass, proc_pass): if last != this: break size += 1 proc_pass = proc_pass[size:].strip("_-+") # If the processing pass has been entirely stripped use a default if not proc_pass: proc_pass = default_proc_pass return proc_pass for i, job_name in enumerate(job_names): proc_pass_map[job_name] = clean_proc_pass(i, job_name) _validate_proc_pass_map(job_names, proc_pass_map) return proc_pass_map def is_simulation_job(prod_data: dict, job_name: str): """Determine if a job is using MC input or not. Args: prod_data (dict): Entire production information from yaml parsing job_name (str): Name of the job to determine if it's using MC input or not Raises: NotImplementedError: No bookkeeping location or job name provided. Returns: bool: True if the job is using MC input, False if it is not """ job_dict = prod_data[job_name] if "simulation" not in job_dict: if "bk_query" in job_dict["input"]: if "/mc/" in job_dict["input"]["bk_query"].lower(): return True else: return False elif "job_name" in job_dict["input"]: dependent_job = prod_data[job_name]["input"]["job_name"] return is_simulation_job(prod_data, dependent_job) else: raise NotImplementedError( "Input requires either a bookkeeping location or a previous job name" )
[docs]def parse_yaml(rendered_yaml): """Parse rendered YAML text. Args: rendered_yaml (str): The rendered YAML jinja template. Raises: ValueError: raised if errors occurred during parsing. Returns: tuple: a tuple of the parsed configuration data2 (dict) and checks data (dict). """ data1 = load( rendered_yaml, schema=MapPattern(Regex(RE_JOB_NAME), Any(), minimum_keys=1) ) data_checks = {} if "checks" in data1: # apply the appropriate schema to each different type of check for _check_name, check_data in data1["checks"].items(): check_type = str(check_data["type"]) check_schema = { **BASE_CHECK_SCHEMA, **CHECK_TYPE_SCHEMAS[check_type], } check_defaults = { **BASE_CHECK_DEFAULT_VALUES, **CHECK_TYPE_DEFAULT_VALUES[check_type], } # apply default values common to all check types for key, value in check_schema.copy().items(): if isinstance(key, Optional): key = key.key if key not in check_data and key in check_defaults: key = Optional(key, default=check_defaults.get(key)) check_schema[key] = value check_data.revalidate(Map(check_schema)) # if checks pass validation, store elsewhere & delete from main data # so that normal jobs aren't impacted data_checks = data1.data["checks"] # special case: range_nd defaults with variable number of axes # if only 2D, remove the defaults for the 3rd axis for chk, chk_data in data_checks.items(): if chk_data.get("type") == "range_nd": if ( chk_data.get("n_bins") == CHECK_TYPE_DEFAULT_VALUES["range_nd"]["n_bins"] ): if len(chk_data.get("expressions")) == 2: del data_checks[chk]["n_bins"]["z"] del data1["checks"] if "defaults" in data1: defaults_schema = {} for key, value in BASE_JOB_SCHEMA.items(): if isinstance(key, Optional): key = key.key key = Optional(key, default=DEFAULT_JOB_VALUES.get(key)) defaults_schema[key] = value data1["defaults"].revalidate(Map(defaults_schema)) defaults = data1.data["defaults"] # Remove the defaults data from the snippet del data1["defaults"] else: defaults = DEFAULT_JOB_VALUES.copy() job_names = list(data1.data.keys()) if len(set(n.lower() for n in job_names)) != len(job_names): raise ValueError( "Found multiple jobs with the same name but different capitalisation" ) job_name_schema = Regex(r"(" + r"|".join(map(re.escape, job_names)) + r")") # StrictYAML has non-linear complexity when parsing many keys # Avoid extremely slow parsing by doing each key individually data2 = {} for k, v in data1.items(): k = k.data v = _ordered_dict_to_dict(v.data) production_schema = {} if "comment" in v: raise ValueError( "comment is only allowed to be set in the defaults of the production!" ) for key, value in BASE_JOB_SCHEMA.items(): if isinstance(key, Optional): key = key.key production_schema[Optional(key, default=defaults.get(key))] = value elif key in defaults: production_schema[Optional(key, default=defaults[key])] = value else: production_schema[key] = value data = load( yaml.safe_dump({k: v}), MapPattern(job_name_schema, Map(production_schema), minimum_keys=1), ) for input_key, input_schema in INPUT_SCHEMAS.items(): if input_key in data.data[k]["input"]: data[k]["input"].revalidate(input_schema) break else: raise ValueError( ( "Failed to find a valid schema for %s's input. " "Allowed values are: %s" ) % (k, set(INPUT_SCHEMAS)) ) if isinstance(data.data[k]["options"], dict): if "files" in data.data[k]["options"]: data[k]["options"].revalidate(Map(LEGACY_OPTIONS)) else: data[k]["options"].revalidate(Map(LBEXEC_OPTIONS)) # move contents of extra_checks to checks data_dict = data.data if "checks" not in data_dict[k]: data_dict[k]["checks"] = [] if "extra_checks" in data_dict[k]: data_dict[k]["checks"] += data_dict[k]["extra_checks"] del data_dict[k]["extra_checks"] # Ensure runs is not used with start_run/end_run if runs := data_dict[k]["input"].get("runs"): if ( "start_run" in data_dict[k]["input"] or "end_run" in data_dict[k]["input"] ): raise ValueError( f"Both inclusive run ranges and start/end runs are specified for {k}" ) # If a single run range is specified, convert it to start/end runs if len(runs) == 1 and isinstance(runs[0], str): del data_dict[k]["input"]["runs"] start_run, end_run = map(int, runs[0].split(":")) data_dict[k]["input"]["start_run"] = start_run data_dict[k]["input"]["end_run"] = end_run if "start_run" in data_dict[k]["input"] and "end_run" in data_dict[k]["input"]: start_run = data_dict[k]["input"]["start_run"] end_run = data_dict[k]["input"]["end_run"] if start_run >= end_run: raise ValueError( f"Start run {start_run} must be less than end run {end_run} for {k}" ) data2.update(data_dict) return data2, data_checks
def _normalise_filetype(prod_name, job_name, filetype): filetype = filetype.upper() errors = [] if len(filetype) >= 50: errors += ["The filetype is excessively long"] if re.findall(r"[0-9]{8}", filetype, re.IGNORECASE): errors += ["It appears the event type is included"] if re.findall(r"Mag(Up|Down)", filetype, re.IGNORECASE): errors += ["It appears the magnet polarity is included"] if re.findall(r"(^|[^0*9])201[125678]($|[^0*9])", filetype, re.IGNORECASE): errors += ["It appears the data taking year is included"] if errors: _errors = "\n * ".join(errors) raise ValueError( f"Output filetype {filetype} for {prod_name}/{job_name} is invalid " f"as it appears to contain redundant information.\n\n" f" * {_errors}" ) return filetype def _check_name_magnet_polarity(bk_query, job_name): match = re.search(r"-mag(up|down)[-/]", bk_query) if not match: return [f"Failed to find magnet polarity in {bk_query}"] good_pol = match.groups()[0] bad_pol = {"down": "up", "up": "down"}[good_pol] if f"mag{bad_pol}" in job_name: raise ValueError( f"Found 'mag{bad_pol}' in job name {job_name!r} with" f"'mag{good_pol}' input ({bk_query!r}). " "Has the wrong magnet polarity been used?" ) match = re.search(r"([^a-z0-9]|\b)m(u|d)([^a-z0-9]|\b)", job_name) if match and match.groups()[1] == bad_pol[0]: raise ValueError( f"Found 'm{bad_pol[0]}' in job name {job_name!r} with" f" 'mag{good_pol}' input ({bk_query!r}). " "Has the wrong magnet polarity been used?" ) return []
[docs]def validate_yaml(jobs_data, checks_data, repo_root, prod_name): """Validate YAML configuration for anything that would definitely break a job or the production. Other validations that aren't essential for at least testing a job are done via the checks framework. Args: jobs_data (dict): Parsed job configuration. checks_data (dict): Parsed checks configuration. repo_root (str): Repository location. prod_name (str): Production name. Raises: ValueError: Raised if there are showstopper issues in the parsed job configuration. """ # Ensure all values that can be either a list or a string are lists of strings for job_name, _ in jobs_data.items(): try: _validate_job_data(repo_root, prod_name, job_name, jobs_data, checks_data) except Exception as e: raise ValueError(f"Failed to validate {job_name!r} with error {e!r}") from e # Ensure job name inputs are unambiguous for job_name, job_data in jobs_data.items(): if "job_name" in job_data["input"]: if job_data["input"]["job_name"] not in jobs_data: raise ValueError( f"Unrecognised job name in input: {job_data['input']['job_name']}" ) input_job_data = jobs_data[job_data["input"]["job_name"]] input_filetype = job_data["input"].get("filetype", "").upper() if len(input_job_data["output"]) == 1: if input_filetype not in [""] + input_job_data["output"]: raise ValueError( f"Unrecognised {input_filetype=} for {job_name=} input, " f"expected one of: {input_job_data['output']}" ) elif input_filetype == "": raise ValueError( f"{job_name} gets its input from a job with multiple outputs. " "The 'filetype' key must be specified in the 'input' section." ) elif input_filetype.upper() not in input_job_data["output"]: raise ValueError( f"Unrecognised {input_filetype=} for {job_name=} input, " f"expected one of: {input_job_data['output']}" ) # Validate checks try: _validate_checks_data(checks_data, jobs_data) except Exception as e: raise ValueError(f"Failed to validate checks with error {e!r}") from e
def _validate_job_data(repo_root, prod_name, job_name, jobs_data, checks_data): job_data = jobs_data[job_name] # Normalise list/str fields to always be lists for prop in ["output", "options", "inform", "checks", "extra_checks"]: if not isinstance(job_data.get(prop, []), (list, dict)): job_data[prop] = [job_data[prop]] # Validate the input data if "bk_query" in job_data["input"]: validate_bk_query(job_data["input"]["bk_query"]) # Validate the output filetype job_data["output"] = [ _normalise_filetype(prod_name, job_name, s) for s in job_data["output"] ] # Normalise the options filenames if we're using a non-PyConf application if isinstance(job_data["options"], list): job_data["options"] = {"files": job_data["options"]} if "files" in job_data["options"]: normalised_options = [] for fn in job_data["options"]["files"]: if fn.startswith("$"): normalised_options.append(fn) continue fn_normed = ( fn if repo_root is None else relpath(join(repo_root, fn), start=repo_root) ) if fn_normed.startswith("../"): raise ValueError(f"{fn} not found inside {repo_root}") if repo_root is not None and not isfile( join(repo_root, prod_name, fn_normed) ): raise FileNotFoundError( f"Production {job_name!r} has a missing options file: " f"{join(prod_name, fn_normed)!r}", ) normalised_options.append( join("$ANALYSIS_PRODUCTIONS_BASE", prod_name, fn_normed) ) job_data["options"]["files"] = normalised_options # Validate the check names # All checks listed for jobs should match a check defined in checks_data if "checks" in job_data: for ck in job_data["checks"]: if ck not in list(checks_data.keys()): raise ValueError(f"Check {ck} not found in list of defined checks") # Validate the completion percentage if not (10 <= job_data["completion_percentage"] <= 100): raise ValueError( f"Validation failed for job {job_name!r}, completion_percentage " f"was set to {job_data['completion_percentage']!r}, allowed " "values are in the interval [10, 100]." ) def _validate_checks_data(checks_data, jobs_data): # All checks defined in checks_data should be used by at least 1 job checks_used = {ck: False for ck in checks_data} for _job_name, job_data in jobs_data.items(): if "checks" in job_data: for ck in job_data["checks"]: if ck in checks_used: checks_used[ck] = True for ck, ck_used in checks_used.items(): if not ck_used: raise ValueError(f"Check {ck} is defined but not used") # Requirements for numbers of bins in checks that create histograms # Must have 1<n<=100 bins on each axis n_bins_maximum = 100 for _cname, cdata in checks_data.items(): if cdata.get("type") == "range" or cdata.get("type") == "range_bkg_subtracted": if cdata.get("n_bins") <= 1 or cdata.get("n_bins") > n_bins_maximum: raise ValueError( f"Number of bins for a histogram must be in the interval [2, {n_bins_maximum}]" ) elif cdata.get("type") == "range_nd": for _axis, n_bins in cdata.get("n_bins").items(): if n_bins <= 1 or n_bins > n_bins_maximum: raise ValueError( f"Number of bins for a histogram must be in the interval [2, {n_bins_maximum}] on each axis" ) # For mean/stddev in range checks: can't specify tolerance without a central value (& vice versa) for _cname, cdata in checks_data.items(): if cdata.get("type") == "range": req_pair_tuples = [ ("exp_mean", "mean_tolerance"), ("exp_std", "std_tolerance"), ] for pair in req_pair_tuples + [tup[::-1] for tup in req_pair_tuples]: if (cdata.get(pair[0]) is not None) and (cdata.get(pair[1])) is None: raise ValueError( f"When using {pair[0]!r}, must also specify {pair[1]!r}" )