LbAPI#
N.B. Some modules are not added (TODO!)
Models#
- class LbAPI.models.FileURL(*, url: str, name: str, validity: int)[source]#
- name: str#
- url: str#
- validity: int#
- class LbAPI.models.JobInfo(*, id: int, name: str, keep_output: bool, application_name: str, application_version: str, wg: str, data_type: Optional[str] = None, simulation: Optional[bool] = None, input_type: Optional[str] = None, conddb_tag: Optional[str] = None, dddb_tag: Optional[str] = None, priority: str, automatically_configure: bool, autoconf_options: Optional[str] = None, dynamic_options_path: Optional[str] = None, input: dict[str, Any], test: TestInfo)[source]#
- class TestInfo(*, attempt: int, status: str, statistics: TestStatistics, input_files: list[LbAPI.models.JobInfo.TestInfo.TestInputFile], output_files: list[LbAPI.models.JobInfo.TestInfo.TestOutputFile], n_log: TestLogSummary)[source]#
- class TestInputFile(*, path: str, dataset_size: int, size: int)[source]#
- dataset_size: int#
- path: str#
- size: int#
- class TestLogSummary(*, warning: Optional[int] = None, error: Optional[int] = None, fatal: Optional[int] = None)[source]#
- error: Optional[int]#
- fatal: Optional[int]#
- warning: Optional[int]#
- class TestOutputFile(*, path: str, dataset_size: int, size: int)[source]#
- dataset_size: int#
- path: str#
- size: int#
- class TestStatistics(*, events_processed: Optional[int] = None, events_requested: Optional[int] = None, run_time: Optional[float] = None, cpu_norm: Optional[float] = None, mem_footprint: Optional[int] = None)[source]#
- cpu_norm: Optional[float]#
- events_processed: Optional[int]#
- events_requested: Optional[int]#
- mem_footprint: Optional[int]#
- run_time: Optional[float]#
- attempt: int#
- input_files: list[LbAPI.models.JobInfo.TestInfo.TestInputFile]#
- n_log: TestLogSummary#
- output_files: list[LbAPI.models.JobInfo.TestInfo.TestOutputFile]#
- statistics: TestStatistics#
- status: str#
- application_name: str#
- application_version: str#
- autoconf_options: Optional[str]#
- automatically_configure: bool#
- conddb_tag: Optional[str]#
- data_type: Optional[str]#
- dddb_tag: Optional[str]#
- dynamic_options_path: Optional[str]#
- id: int#
- input: dict[str, Any]#
- input_type: Optional[str]#
- keep_output: bool#
- name: str#
- priority: str#
- simulation: Optional[bool]#
- wg: str#
- class LbAPI.models.PipelineProductionInfo(*, id: int, pipeline_id: int, name: str, repo_url: str, commit: str, commit_message: str, triggering_user: str, pipeline_url: str, comment: str)[source]#
- comment: str#
- commit: str#
- commit_message: str#
- id: int#
- name: str#
- pipeline_id: int#
- pipeline_url: str#
- repo_url: str#
- triggering_user: str#
- class LbAPI.models.ProductionSummaryStats(*, full_input_size: int, full_est_output_size: int, test_input_size: int, test_output_size: int, test_total_processed: int, mem_footprint: int)[source]#
- full_est_output_size: int#
- full_input_size: int#
- mem_footprint: int#
- test_input_size: int#
- test_output_size: int#
- test_total_processed: int#
Database#
Auth#
- class LbAPI.auth.AuthTypes(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
- CERN_SSO = 'CERN_SSO'#
- GITLAB_TOKEN = 'GITLAB_TOKEN'#
- USER_TOKEN = 'USER_TOKEN'#
- class LbAPI.auth.Roles(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
- ADMIN = 'admin'#
- DEFAULT = 'default-role'#
- LIASON = 'liason'#
- class LbAPI.auth.User(*, username: str, roles: list[LbAPI.auth.Roles] = [], email: Optional[str] = None, name: Optional[str] = None, auth_type: AuthTypes, token_id: Optional[int] = None)[source]#
-
- email: Optional[str]#
- name: Optional[str]#
- roles: list[LbAPI.auth.Roles]#
- token_id: Optional[int]#
- username: str#
- async LbAPI.auth.get_current_user(request: Request, authorization: str = Depends(OpenIdConnect)) User [source]#
Get the current user, as authenticated using the CERN SSO.
- async LbAPI.auth.get_untrusted_user(request: Request, authorization: str = Depends(OpenIdConnect), db: AsyncSession = Depends(get_db)) User [source]#
Get the current user regardless of authentication method.
This method is similar to
get_current_user
except it also accepts long lived access tokens. This method should only be used for low risk endpoints.
Responses#
Utils#
- class LbAPI.utils.Token(raw_token: str)[source]#
- classmethod generate(purpose: str) tuple[str, LbAPI.utils.Token] [source]#
- LbAPI.utils.eos_artifact_path(eos_artifact_path: str, pipeline_id: int, ci_run_name: str, job_id: int, job_name: str) tuple[str, str] [source]#
- async LbAPI.utils.generate_eos_token(directory: str, allow_write: bool, owner: str, expires: datetime, *, tree=True)[source]#
- async LbAPI.utils.get_krb5_creds(username: str, password: str, realm: str) NamedTemporaryFile [source]#
- async LbAPI.utils.parse_jwt(authorization: str, audience: str, oauth2_app: StarletteOAuth2App)[source]#
Routers#
- class LbAPI.routers.productions.DatasetSpec(*, version: str, name: str, sample_id: int)[source]#
- name: str#
- sample_id: int#
- version: str#
- class LbAPI.routers.productions.DatasetTagModSpec(*, dataset: DatasetSpec, old_tags: dict[str, str], new_tags: dict[str, str])[source]#
- dataset: DatasetSpec#
- new_tags: dict[str, str]#
- old_tags: dict[str, str]#
- class LbAPI.routers.productions.Sample(*, wg: str, analysis: str, version: str, name: str, request_id: int, sample_id: int, state: str, owners: list[str], tags: dict[str, str], merge_request: Optional[str] = None, jira_task: Optional[str] = None, last_state_update: datetime, validity_start: datetime, validity_end: Optional[datetime] = None, transformations: list[LbAPI.routers.productions.TransformationInfo], lfns: dict[str, list[str]], available_bytes: int, total_bytes: int)[source]#
- available_bytes: int#
- lfns: dict[str, list[str]]#
- total_bytes: int#
- transformations: list[TransformationInfo]#
- class LbAPI.routers.productions.SampleSummary(*, wg: str, analysis: str, version: str, name: str, request_id: int, sample_id: int, state: str, owners: list[str], tags: dict[str, str], merge_request: Optional[str] = None, jira_task: Optional[str] = None, last_state_update: datetime, validity_start: datetime, validity_end: Optional[datetime] = None, transformations: Optional[list[LbAPI.routers.productions.TransformationInfo]] = None, lfns: Union[None, list[str], dict[str, list[str]]] = None)[source]#
- analysis: str#
- jira_task: Optional[str]#
- last_state_update: datetime#
- lfns: Union[None, list[str], dict[str, list[str]]]#
- merge_request: Optional[str]#
- name: str#
- owners: list[str]#
- request_id: int#
- sample_id: int#
- state: str#
- tags: dict[str, str]#
- transformations: Optional[list[TransformationInfo]]#
- validity_end: Optional[datetime]#
- validity_start: datetime#
- version: str#
- wg: str#
- class LbAPI.routers.productions.StepInfo(*, stepID: int, application: str, extras: list[str], options: Union[list[str], dict[str, Any]])[source]#
- application: str#
- extras: list[str]#
- options: Union[list[str], dict[str, Any]]#
- stepID: int#
- class LbAPI.routers.productions.TransformationInfo(*, id: int, status: str, steps: list[LbAPI.routers.productions.StepInfo], used: bool)[source]#
- id: int#
- status: str#
- used: bool#
- LbAPI.routers.productions.add_samples(wg: str, analysis: str, request_ids: list[int], user=Depends(get_current_user))[source]#
- async LbAPI.routers.productions.archive(wg: str, analysis: str, datasets: list[LbAPI.routers.productions.DatasetSpec], user=Depends(get_current_user), at_time: Optional[datetime] = None)[source]#
- LbAPI.routers.productions.get_elevated_apc(wg: str, analysis: str, user: User, *, allow_create_analysis=False)[source]#
Get an AnalysisProductionsClient with elevated permissions
Raises a HTTP 403 if the user does not have permission to act upon the given analysis.
- async LbAPI.routers.productions.get_sample(wg: str, analysis: str, version: str, name: str, at_time: Optional[datetime] = Query(None), apc=Depends(get_apc))[source]#
- LbAPI.routers.productions.get_sample_(wg, analysis, version, name, apc=None, with_lfns=False, with_pfns=False, with_transformations=True, tags=False, at_time=None)[source]#
- async LbAPI.routers.productions.get_sample_ancestors(wg: str, analysis: str, version: str, name: str, user=Depends(get_current_user), apc=Depends(get_apc))[source]#
- async LbAPI.routers.productions.get_samples(wg: str, analysis: str, at_time: Optional[datetime] = Query(None), with_lfns: bool = Query(False), with_pfns: bool = Query(False), with_transformations: bool = Query(False), apc=Depends(get_apc))[source]#
- async LbAPI.routers.productions.get_summary_XML(wg: str, analysis: str, version: str, name: str, summaryFilename: str, apc=Depends(get_apc))[source]#
- async LbAPI.routers.productions.list_analyses(new_style: bool = False, apc=Depends(get_apc))[source]#
- LbAPI.routers.productions.set_owners(wg: str, analysis: str, new_owners: list[str], user=Depends(get_current_user))[source]#
- async LbAPI.routers.productions.tags_update(wg: str, analysis: str, dataset_tagmod_specs: list[LbAPI.routers.productions.DatasetTagModSpec], user=Depends(get_current_user))[source]#
- LbAPI.routers.productions.verify_sample_ids_in_analysis(wg: str, analysis: str, datasets: list[LbAPI.routers.productions.DatasetSpec], user: User) list[int] [source]#
Select samples under an analysis that the user has permissions to modify.
TODO: This should be done in LHCbDIRAC instead of here…
- Parameters:
wg (str) – Analysis WG.
analysis (str) – Analysis Name
datasets (list) – Datasets requested to be selected
user – user
- Raises:
HTTPException – Raised if sample(s) cannot be found, or user does not have appropriate permissions.
- Returns:
List of selected datasets the user is allowed to touch.
- Return type:
list
- class LbAPI.routers.stable.Sample(*, wg: str, analysis: str, version: str, name: str, request_id: int, sample_id: int, state: str, owners: list[str], lfns: dict[str, list[str]], available_bytes: int, total_bytes: int, last_state_update: datetime, validity_start: datetime, validity_end: Optional[datetime] = None)[source]#
- analysis: str#
- available_bytes: int#
- last_state_update: datetime#
- lfns: dict[str, list[str]]#
- name: str#
- owners: list[str]#
- request_id: int#
- sample_id: int#
- state: str#
- total_bytes: int#
- validity_end: Optional[datetime]#
- validity_start: datetime#
- version: str#
- wg: str#
- async LbAPI.routers.stable.list_analyses(at_time: Optional[datetime] = Query(None), apc=Depends(get_apc))[source]#
- async LbAPI.routers.stable.samples(wg: str, analysis: str, at_time: Optional[datetime] = Query(None), apc=Depends(get_apc))[source]#
- async LbAPI.routers.stable.tags(wg: str, analysis: str, at_time: Optional[datetime] = Query(None), apc=Depends(get_apc))[source]#
Pipelines#
- async LbAPI.routers.pipelines.get_pipeline_specific(pipeline_id: int, db: AsyncSession = Depends(get_db))[source]#
- async LbAPI.routers.pipelines.get_pipelines(response: Response, page: int = Query(1), per_page: int = Query(100), search: Optional[str] = Query(None), db: AsyncSession = Depends(get_db))[source]#
- async LbAPI.routers.pipelines.production(pipeline_id: int, production_name: str, db: AsyncSession = Depends(get_db))[source]#