Source code for LbAPCommon.validators.bookkeeping_xml

###############################################################################
# (c) Copyright 2020-2022 CERN for the benefit of the LHCb Collaboration      #
#                                                                             #
# This software is distributed under the terms of the GNU General Public      #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING".   #
#                                                                             #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization  #
# or submit itself to any jurisdiction.                                       #
###############################################################################

__all__ = ["parse_bookkeeping_xml"]

from collections import namedtuple
from datetime import timedelta
from os.path import basename

from lxml import etree

InputFile = namedtuple("InputFile", ["path", "size", "dataset_size"])
OutputFile = namedtuple("OutputFile", ["path", "size", "dataset_size"])


[docs]def parse_bookkeeping_xml(fp, expected_input_files): """Extract information from the bookkeeping XML. Parameters ---------- fp : file-like File pointer of the XML summary expected_input_files : :obj:`list` List of object describing the expected input files Returns ------- passed : :obj:`bool` True to the XML summary reports the job as passed """ xml_bk = etree.parse(fp, base_url="LbAnalysisProductions/data/xml") events_processed = int( xml_bk.find('.//TypedParameter[@Name="NumberOfEvents"]').attrib["Value"] ) events_requested = int( xml_bk.find('.//TypedParameter[@Name="StatisticsRequested"]').attrib["Value"] ) run_time = timedelta( seconds=float(xml_bk.find('.//TypedParameter[@Name="CPUTIME"]').attrib["Value"]) ) cpu_norm = float( xml_bk.find('.//TypedParameter[@Name="WNCPUHS06"]').attrib["Value"] ) input_files = _find_input_files(xml_bk, expected_input_files) output_files = _find_output_files(xml_bk, input_files) return ( events_processed, events_requested, run_time, cpu_norm, input_files, output_files, )
def _find_input_files(xml_bk, expected_input_files): input_files = [] for e in xml_bk.findall(".//InputFile"): input_file = expected_input_files(e.attrib["Name"]) # fn = e.attrib["Name"] # for input_file in expected_input_files: # if input_file.path == basename(fn): # break # else: # raise NotImplementedError( # "Failed to find input file %s in %r" % (fn, expected_input_files) # ) input_files += [ InputFile( path=input_file.path, size=input_file.size, dataset_size=input_file.dataset_size, ) ] return input_files def _find_output_files(xml_bk, input_files): output_files = [] for f in xml_bk.findall(".//OutputFile"): if f.attrib["TypeName"] == "LOG": continue fn = basename(f.attrib["Name"]) size = float(f.find('.//Parameter[@Name="FileSize"]').attrib["Value"]) if len(set(f.dataset_size for f in input_files)) != 1: raise NotImplementedError( "This estimation hasn't been implemented for" "jobs with multiple input datasets" ) dataset_size = int( size * input_files[0].dataset_size / sum(f.size for f in input_files) ) output_files.append(OutputFile(path=fn, size=size, dataset_size=dataset_size)) return output_files