LbAPI#

N.B. Some modules are not added (TODO!)

class LbAPI.PrintTimings[source]#
logger = <Logger LbAPI.timing (WARN)>#
timing(metric_name, timing, tags)[source]#
async LbAPI.dirac_exception_handler(request: Request, exc: SErrorException)[source]#
async LbAPI.no_result_handler(request: Request, exc: NoResultFound)[source]#
async LbAPI.shutdown()[source]#
async LbAPI.startup()[source]#

Models#

class LbAPI.models.FileURL(*, url: str, name: str, validity: int)[source]#
name: str#
url: str#
validity: int#
class LbAPI.models.JobInfo(*, id: int, name: str, keep_output: bool, application_name: str, application_version: str, wg: str, data_type: Optional[str] = None, simulation: Optional[bool] = None, input_type: Optional[str] = None, conddb_tag: Optional[str] = None, dddb_tag: Optional[str] = None, priority: str, automatically_configure: bool, autoconf_options: Optional[str] = None, dynamic_options_path: Optional[str] = None, input: dict[str, Any], test: TestInfo)[source]#
class TestInfo(*, attempt: int, status: str, statistics: TestStatistics, input_files: list[LbAPI.models.JobInfo.TestInfo.TestInputFile], output_files: list[LbAPI.models.JobInfo.TestInfo.TestOutputFile], n_log: TestLogSummary)[source]#
class TestInputFile(*, path: str, dataset_size: int, size: int)[source]#
dataset_size: int#
path: str#
size: int#
class TestLogSummary(*, warning: Optional[int] = None, error: Optional[int] = None, fatal: Optional[int] = None)[source]#
error: Optional[int]#
fatal: Optional[int]#
warning: Optional[int]#
class TestOutputFile(*, path: str, dataset_size: int, size: int)[source]#
dataset_size: int#
path: str#
size: int#
class TestStatistics(*, events_processed: Optional[int] = None, events_requested: Optional[int] = None, run_time: Optional[float] = None, cpu_norm: Optional[float] = None, mem_footprint: Optional[int] = None)[source]#
cpu_norm: Optional[float]#
events_processed: Optional[int]#
events_requested: Optional[int]#
mem_footprint: Optional[int]#
run_time: Optional[float]#
attempt: int#
input_files: list[LbAPI.models.JobInfo.TestInfo.TestInputFile]#
n_log: TestLogSummary#
output_files: list[LbAPI.models.JobInfo.TestInfo.TestOutputFile]#
statistics: TestStatistics#
status: str#
application_name: str#
application_version: str#
autoconf_options: Optional[str]#
automatically_configure: bool#
conddb_tag: Optional[str]#
data_type: Optional[str]#
dddb_tag: Optional[str]#
dynamic_options_path: Optional[str]#
id: int#
input: dict[str, Any]#
input_type: Optional[str]#
keep_output: bool#
name: str#
priority: str#
simulation: Optional[bool]#
test: TestInfo#
wg: str#
class LbAPI.models.PipelineProductionInfo(*, id: int, pipeline_id: int, name: str, repo_url: str, commit: str, commit_message: str, triggering_user: str, pipeline_url: str, comment: str)[source]#
comment: str#
commit: str#
commit_message: str#
id: int#
name: str#
pipeline_id: int#
pipeline_url: str#
repo_url: str#
triggering_user: str#
class LbAPI.models.ProductionSummaryStats(*, full_input_size: int, full_est_output_size: int, test_input_size: int, test_output_size: int, test_total_processed: int, mem_footprint: int)[source]#
full_est_output_size: int#
full_input_size: int#
mem_footprint: int#
test_input_size: int#
test_output_size: int#
test_total_processed: int#

Database#

async LbAPI.database.create_engine(*, poolclass=None)[source]#
async LbAPI.database.dispose_engine()[source]#
async LbAPI.database.get_db()[source]#
LbAPI.database.retryable_query(*, n_retries=5)[source]#

Auth#

class LbAPI.auth.AuthTypes(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
CERN_SSO = 'CERN_SSO'#
GITLAB_TOKEN = 'GITLAB_TOKEN'#
USER_TOKEN = 'USER_TOKEN'#
class LbAPI.auth.Roles(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
ADMIN = 'admin'#
DEFAULT = 'default-role'#
LIASON = 'liason'#
class LbAPI.auth.User(*, username: str, roles: list[LbAPI.auth.Roles] = [], email: Optional[str] = None, name: Optional[str] = None, auth_type: AuthTypes, token_id: Optional[int] = None)[source]#
auth_type: AuthTypes#
email: Optional[str]#
name: Optional[str]#
roles: list[LbAPI.auth.Roles]#
token_id: Optional[int]#
username: str#
async LbAPI.auth.get_apc(user=Depends(get_untrusted_user))[source]#
async LbAPI.auth.get_current_user(request: Request, authorization: str = Depends(OpenIdConnect)) User[source]#

Get the current user, as authenticated using the CERN SSO.

async LbAPI.auth.get_untrusted_user(request: Request, authorization: str = Depends(OpenIdConnect), db: AsyncSession = Depends(get_db)) User[source]#

Get the current user regardless of authentication method.

This method is similar to get_current_user except it also accepts long lived access tokens. This method should only be used for low risk endpoints.

Responses#

Utils#

class LbAPI.utils.Token(raw_token: str)[source]#
classmethod generate(purpose: str) tuple[str, LbAPI.utils.Token][source]#
LbAPI.utils.eos_artifact_path(eos_artifact_path: str, pipeline_id: int, ci_run_name: str, job_id: int, job_name: str) tuple[str, str][source]#
async LbAPI.utils.eos_ls(path: str) list[str][source]#
async LbAPI.utils.generate_eos_token(directory: str, allow_write: bool, owner: str, expires: datetime, *, tree=True)[source]#
async LbAPI.utils.get_krb5_creds(username: str, password: str, realm: str) NamedTemporaryFile[source]#
LbAPI.utils.get_package_version(pkg_name)[source]#
LbAPI.utils.is_cern_ip(ip: str) bool[source]#
async LbAPI.utils.parse_jwt(authorization: str, audience: str, oauth2_app: StarletteOAuth2App)[source]#
async LbAPI.utils.run(*command: str, extra_env: Optional[dict[str, str]] = None, cwd: Optional[PathLike] = None, input: Optional[str] = None, timeout=60, log_cmd: bool = True, log_stdout: bool = False) tuple[int, str, str][source]#
LbAPI.utils.set_content_range(response, unit, offset, results, n_total)[source]#
async LbAPI.utils.to_eos(filename: str, eos_path: str)[source]#

Routers#

class LbAPI.routers.productions.DatasetSpec(*, version: str, name: str, sample_id: int)[source]#
name: str#
sample_id: int#
version: str#
class LbAPI.routers.productions.DatasetTagModSpec(*, dataset: DatasetSpec, old_tags: dict[str, str], new_tags: dict[str, str])[source]#
dataset: DatasetSpec#
new_tags: dict[str, str]#
old_tags: dict[str, str]#
class LbAPI.routers.productions.GenericDatasetActionStatus(*, status: str)[source]#
status: str#
class LbAPI.routers.productions.Sample(*, wg: str, analysis: str, version: str, name: str, request_id: int, sample_id: int, state: str, owners: list[str], tags: dict[str, str], merge_request: Optional[str] = None, jira_task: Optional[str] = None, last_state_update: datetime, validity_start: datetime, validity_end: Optional[datetime] = None, transformations: list[LbAPI.routers.productions.TransformationInfo], lfns: dict[str, list[str]], available_bytes: int, total_bytes: int)[source]#
available_bytes: int#
lfns: dict[str, list[str]]#
total_bytes: int#
transformations: list[TransformationInfo]#
class LbAPI.routers.productions.SampleSummary(*, wg: str, analysis: str, version: str, name: str, request_id: int, sample_id: int, state: str, owners: list[str], tags: dict[str, str], merge_request: Optional[str] = None, jira_task: Optional[str] = None, last_state_update: datetime, validity_start: datetime, validity_end: Optional[datetime] = None, transformations: Optional[list[LbAPI.routers.productions.TransformationInfo]] = None, lfns: Union[None, list[str], dict[str, list[str]]] = None)[source]#
analysis: str#
jira_task: Optional[str]#
last_state_update: datetime#
lfns: Union[None, list[str], dict[str, list[str]]]#
merge_request: Optional[str]#
name: str#
owners: list[str]#
request_id: int#
sample_id: int#
state: str#
tags: dict[str, str]#
transformations: Optional[list[TransformationInfo]]#
validity_end: Optional[datetime]#
validity_start: datetime#
version: str#
wg: str#
class LbAPI.routers.productions.StepInfo(*, stepID: int, application: str, extras: list[str], options: Union[list[str], dict[str, Any]])[source]#
application: str#
extras: list[str]#
options: Union[list[str], dict[str, Any]]#
stepID: int#
class LbAPI.routers.productions.TransformationInfo(*, id: int, status: str, steps: list[LbAPI.routers.productions.StepInfo], used: bool)[source]#
id: int#
status: str#
steps: list[StepInfo]#
used: bool#
LbAPI.routers.productions.add_samples(wg: str, analysis: str, request_ids: list[int], user=Depends(get_current_user))[source]#
async LbAPI.routers.productions.archive(wg: str, analysis: str, datasets: list[LbAPI.routers.productions.DatasetSpec], user=Depends(get_current_user), at_time: Optional[datetime] = None)[source]#
LbAPI.routers.productions.get_elevated_apc(wg: str, analysis: str, user: User, *, allow_create_analysis=False)[source]#

Get an AnalysisProductionsClient with elevated permissions

Raises a HTTP 403 if the user does not have permission to act upon the given analysis.

LbAPI.routers.productions.get_owners(wg: str, analysis: str, apc=Depends(get_apc))[source]#
async LbAPI.routers.productions.get_sample(wg: str, analysis: str, version: str, name: str, at_time: Optional[datetime] = Query(None), apc=Depends(get_apc))[source]#
LbAPI.routers.productions.get_sample_(wg, analysis, version, name, apc=None, with_lfns=False, with_pfns=False, with_transformations=True, tags=False, at_time=None)[source]#
async LbAPI.routers.productions.get_sample_ancestors(wg: str, analysis: str, version: str, name: str, user=Depends(get_current_user), apc=Depends(get_apc))[source]#
async LbAPI.routers.productions.get_samples(wg: str, analysis: str, at_time: Optional[datetime] = Query(None), with_lfns: bool = Query(False), with_pfns: bool = Query(False), with_transformations: bool = Query(False), apc=Depends(get_apc))[source]#
async LbAPI.routers.productions.get_summary_XML(wg: str, analysis: str, version: str, name: str, summaryFilename: str, apc=Depends(get_apc))[source]#
async LbAPI.routers.productions.list_analyses(new_style: bool = False, apc=Depends(get_apc))[source]#
async LbAPI.routers.productions.list_requests(apc=Depends(get_apc))[source]#
LbAPI.routers.productions.set_owners(wg: str, analysis: str, new_owners: list[str], user=Depends(get_current_user))[source]#
async LbAPI.routers.productions.tags_update(wg: str, analysis: str, dataset_tagmod_specs: list[LbAPI.routers.productions.DatasetTagModSpec], user=Depends(get_current_user))[source]#
LbAPI.routers.productions.verify_sample_ids_in_analysis(wg: str, analysis: str, datasets: list[LbAPI.routers.productions.DatasetSpec], user: User) list[int][source]#

Select samples under an analysis that the user has permissions to modify.

TODO: This should be done in LHCbDIRAC instead of here…

Parameters:
  • wg (str) – Analysis WG.

  • analysis (str) – Analysis Name

  • datasets (list) – Datasets requested to be selected

  • user – user

Raises:

HTTPException – Raised if sample(s) cannot be found, or user does not have appropriate permissions.

Returns:

List of selected datasets the user is allowed to touch.

Return type:

list

class LbAPI.routers.stable.Sample(*, wg: str, analysis: str, version: str, name: str, request_id: int, sample_id: int, state: str, owners: list[str], lfns: dict[str, list[str]], available_bytes: int, total_bytes: int, last_state_update: datetime, validity_start: datetime, validity_end: Optional[datetime] = None)[source]#
analysis: str#
available_bytes: int#
last_state_update: datetime#
lfns: dict[str, list[str]]#
name: str#
owners: list[str]#
request_id: int#
sample_id: int#
state: str#
total_bytes: int#
validity_end: Optional[datetime]#
validity_start: datetime#
version: str#
wg: str#
async LbAPI.routers.stable.list_analyses(at_time: Optional[datetime] = Query(None), apc=Depends(get_apc))[source]#
async LbAPI.routers.stable.samples(wg: str, analysis: str, at_time: Optional[datetime] = Query(None), apc=Depends(get_apc))[source]#
async LbAPI.routers.stable.tags(wg: str, analysis: str, at_time: Optional[datetime] = Query(None), apc=Depends(get_apc))[source]#

Pipelines#

async LbAPI.routers.pipelines.get_pipeline_specific(pipeline_id: int, db: AsyncSession = Depends(get_db))[source]#
async LbAPI.routers.pipelines.get_pipelines(response: Response, page: int = Query(1), per_page: int = Query(100), search: Optional[str] = Query(None), db: AsyncSession = Depends(get_db))[source]#
async LbAPI.routers.pipelines.production(pipeline_id: int, production_name: str, db: AsyncSession = Depends(get_db))[source]#
async LbAPI.routers.pipelines.raw_yaml(pipeline_id: int, production_name: str, db: AsyncSession = Depends(get_db))[source]#
async LbAPI.routers.pipelines.rendered_yaml(pipeline_id: int, production_name: str, db: AsyncSession = Depends(get_db))[source]#