LbAPLocal#

Checks#

Utils#

LbAPLocal.utils.available_productions()[source]#

Function that finds all production folders with an info.yaml

LbAPLocal.utils.check_cvmfs()[source]#
LbAPLocal.utils.check_production(production_name)[source]#
LbAPLocal.utils.check_production_existed(available_versions: dict, production_name: str)[source]#
LbAPLocal.utils.check_proxy()[source]#
LbAPLocal.utils.check_status_code(response)[source]#
LbAPLocal.utils.create_output_dir(production_name, require_cmt)[source]#

Create the directory structure for testing locally

LbAPLocal.utils.inside_ap_datapkg()[source]#

Check if script is run from main directory

LbAPLocal.utils.logging_subprocess_run(args, *, cwd=None, printing=True)[source]#
LbAPLocal.utils.pool_xml_catalog(lfns)[source]#
LbAPLocal.utils.production_name_type(production_name: str) str[source]#
LbAPLocal.utils.production_to_versions(working_group: str, production_name: str) list[source]#

List the AnalysisProductions tags corresponding to the specified production.

LbAPLocal.utils.recursively_create_input(production_name, job_name, filetype, prod_data, n_events)[source]#
LbAPLocal.utils.resolve_input_overrides(prod_data, job_name)[source]#
LbAPLocal.utils.validate_environment()[source]#

Testing#

LbAPLocal.testing.do_options_parsing(env_cmd, out_dir, pkl_file, root_file, job_name, prod_data)[source]#
LbAPLocal.testing.enter_debugging(out_dir, prodrun_config_path, dependent_job=None)[source]#
LbAPLocal.testing.prepare_reproduce(pipeline_id, production_name, job_name, test_id)[source]#
LbAPLocal.testing.prepare_test(production_name, job_name, dependent_input=None, n_events=None)[source]#

Run a local test job for a specific production job

CLI#

class LbAPLocal.cli.NaturalOrderGroup(name: Optional[str] = None, commands: Optional[Union[MutableMapping[str, Command], Sequence[Command]]] = None, **attrs: Any)[source]#

Group for showing subcommands in the correct order

list_commands(ctx)[source]#

Returns a list of subcommand names in the order they should appear.

LbAPLocal.cli.print_version(ctx, param, value)[source]#

Log Parsing#

LbAPLocal.log_parsing.show_log_advice(log_text, suppress=5)[source]#