Source code for apd.analysis_data

###############################################################################
# (c) Copyright 2021-2023 for the benefit of the LHCb Collaboration           #
#                                                                             #
# This software is distributed under the terms of the GNU General Public      #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING".   #
#                                                                             #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization  #
# or submit itself to any jurisdiction.                                       #
###############################################################################
"""Interface to the Analysis Production data.

Provides:
    * the get_analysis_data method, the principal way to lookup AP info. It returns
    and AnalysisData class.
    * the AnalysisData class, which allows querying information about Analysis Productions

"""
import copy
import itertools
import logging
import os
from enum import Enum
from pathlib import Path
from typing import Any, Optional

from apd.ap_info import (
    InvalidCacheError,
    SampleCollection,
    cache_ap_info,
    check_tag_value_possible,
    iterable,
    load_ap_info,
    safe_casefold,
)
from apd.data_cache import DataCache

logger = logging.getLogger("apd")

APD_METADATA_CACHE_DIR = "APD_METADATA_CACHE_DIR"
APD_METADATA_LIFETIME = "APD_METADATA_LIFETIME"
APD_METADATA_LIFETIME_DEFAULT = 600
APD_DATA_CACHE_DIR = "APD_DATA_CACHE_DIR"


[docs]class ApdReturnType(Enum): PFN = 0 LFN = 1 SAMPLE = 2
def _load_and_setup_cache( cache_dir, working_group, analysis, ap_date=None, api_url="https://lbap.app.cern.ch" ): """Utility function that checks whether the data for the Analysis is cached already and does it if needed.""" env_cache = os.environ.get(APD_METADATA_CACHE_DIR) if not cache_dir: if env_cache: cache_dir = env_cache else: cache_dir = Path.home() / ".cache" / "apd" logger.debug("Cache directory not set, using %s", cache_dir) samples = None try: lifetime = os.environ.get(APD_METADATA_LIFETIME, APD_METADATA_LIFETIME_DEFAULT) samples, _ = load_ap_info( cache_dir, working_group, analysis, ap_date=ap_date, maxlifetime=lifetime, ) except FileNotFoundError: logger.debug( "Caching information for %s/%s to %s for time %s", working_group, analysis, cache_dir, ap_date, ) samples = cache_ap_info( cache_dir, working_group, analysis, ap_date=ap_date, api_url=api_url ) except InvalidCacheError: logger.debug( "Invalid cache. reloading information for %s/%s to %s for time %s", working_group, analysis, cache_dir, ap_date, ) samples = cache_ap_info( cache_dir, working_group, analysis, ap_date=ap_date, api_url=api_url ) assert samples is not None return samples def _validate_tags(tags, default_tags=None, available_tags=None): """Method that checks the dictionary of tag names, values that should be used to filter the data accordingly. Note: - Special cases are handled: tags "name" and "version" as well as "data" and "mc" (which are converted to a "config" value). - Tag values cannot be None. - Tag values cannot be of type bytes. - Int tag values are converted to string. Args: tags (dict): the dictionary of tags to be validated. default_tags (dict, optional): provide default tags. Defaults to None. available_tags (list, optional): provide a list of available tags. Defaults to None. Raises: ValueError: see the Note above. TypeError: see the Note above. Returns: dict: the validated tags. """ # Merging the default tags with the ones passed effective_tags = tags if default_tags: for t, v in default_tags.items(): if t not in effective_tags: effective_tags[t] = v # Final dict that will be returned cleaned = {} # Special handling for the data and mc tags to avoid having to # use the config tag # The config tag is set according to the following table: # # | mc\data | True | False | None | # |:-------:|:----------:|:----------:|:--------------:| # | True | ValueError | mc | mc | # | False | lhcb | ValueError | lhcb | # | None | lhcb | mc | config not set | dataval = effective_tags.get("data", None) mcval = effective_tags.get("mc", None) config = None # We only set the config if one of the options data or mc was specified if dataval is None: # In this case we check whether mc has been specified and use that if mcval is not None: if mcval: config = "mc" else: config = "lhcb" # dataval has been explicitly set to true elif dataval: if mcval: raise ValueError("values of data= and mc= are inconsistent") config = "lhcb" # dataval has been explicitly set to false else: if mcval is not None and not mcval: # mcval explicitly set to False in contradiction with dataval raise ValueError("values of data= and mc= are inconsistent") config = "mc" # Check if config was set as well ! if config: explicit_config = effective_tags.get("config", None) if explicit_config is not None: if explicit_config != config: raise ValueError("cannot specify data or mc as well as config") cleaned["config"] = config # Applying other checks for t, v in effective_tags.items(): # Ignore those as we translated it to config already if t in ["data", "mc"]: continue if v is None: raise TypeError(f"{t} value is None") if isinstance(v, bytes): raise TypeError(f"{t} value is of type {type(v)}") if available_tags is not None: # NB this raises an exception if the tag is not in the list # or if the value does not match any samples check_tag_value_possible(t, v, available_tags) if isinstance(v, int) and not isinstance(v, bool): cleaned[t] = str(v) else: cleaned[t] = v return cleaned def _sample_check(samples, tags): """Filter the SampleCollection and check that we have the samples that we expect""" # Fixing the dict to make sure each item is a list ltags = {} dimensions = tags.keys() for tag, value in tags.items(): if not iterable(value): ltags[safe_casefold(tag)] = [safe_casefold(value)] else: ltags[safe_casefold(tag)] = [safe_casefold(v) for v in value] logger.debug("Checking samples for tags: %s", str(ltags)) # Cardinal product of all the lists products = list(itertools.product(*ltags.values())) hist = {p: 0 for p in products} # Iterating on the samples an increasing the count for stags in samples.itertags(): coordinates = tuple(safe_casefold(stags[d]) for d in dimensions) try: hist[coordinates] = hist[coordinates] + 1 except KeyError as ke: raise KeyError( f"Encountered sample with tags {str(coordinates)} which does not match filtering criteria {str(dict(ltags))}" ) from ke # Now checking whether we have one entry per bin errors = [] for coordinate, sample_count in hist.items(): if sample_count != 1: logger.debug("Error %d samples for %s", sample_count, {str(coordinate)}) errors.append((dict(zip(dimensions, coordinate)), sample_count)) return errors # Map contains AnalysisData objects already loaded __analysis_map: dict[str, Any] = {}
[docs]def get_analysis_data( working_group, analysis, metadata_cache=None, data_cache=None, api_url="https://lbap.app.cern.ch", ap_date=None, **kwargs, ): """Main method to get analysis production information. Gets the AnalysisData information from the same process if possible. If not loaded already, it loads it from the cache disk and if not present or valid, fetches from the REST API. """ key = (working_group, analysis, ap_date) if key in __analysis_map: # As we keep an instance for each WG/Analysis, we need to copy and apply our own defaults ad = copy.deepcopy(__analysis_map[key]) ad.data_cache = data_cache ad.default_tags = _validate_tags(kwargs) return ad ad = AnalysisData( working_group, analysis, metadata_cache, data_cache, api_url, ap_date, **kwargs ) __analysis_map[key] = ad return ad
[docs]class AnalysisData: """Class allowing to access the metadata for a specific analysis. Default values for the tags to filter the data can be passed as argument to the contructor. Similarly for the required working group and analysis names. e.g. datasets = AnalysisData("b2oc", "b02dkpi", polarity="magdown") Invoking () returns a list of PFNs corresponding to the requested dataset Keyword arguments are interpreted as tags Combining all of the tags must give a unique dataset, else an error is raised. To get PFNs from multiple datasets lists can be passed as arguments. The single call datasets(eventtype="27163904", datatype=[2017, 2018], polarity=["magup", "magdown"]) is equivalent to datasets(eventtype="27163904", datatype=2017, polarity="magup") + datasets(eventtype="27163904", datatype=2017, polarity="magdown") + datasets(eventtype="27163904", datatype=2018, polarity="magup") + datasets(eventtype="27163904", datatype=2018, polarity="magdown") """ def __init__( self, working_group, analysis, metadata_cache=None, data_cache=None, api_url="https://lbap.app.cern.ch", ap_date=None, **kwargs, ): """ Constructor that configures the can either fetch the data from the AP service or load from a local cache. Analysis Production tags can be specified as keyword arguments to specify the data to be analyzed. """ self._working_group = working_group self._analysis = analysis # self._samples is a SampleCollection filled in with the values # Only for internal use as the default filters are NOT applied self._samples = None # Special case when the metadata cache is passed directly as # a SampleCollection if metadata_cache: if isinstance(metadata_cache, SampleCollection): logger.debug("Using SampleCollection passed to constructor") self._samples = metadata_cache else: # We use the env variable if it is set envcache = os.environ.get(APD_METADATA_CACHE_DIR, None) if envcache: metadata_cache = envcache if self._samples is None: # In this case the metadata cache was not a SampleCollection or # not set at all, set setup the cache self._samples = _load_and_setup_cache( metadata_cache, working_group, analysis, ap_date, api_url=api_url ) self._available_tags = self._samples.available_tags() # "available_tags" is a list of tags that can be used to restrict the samples that will be used self._default_tags = _validate_tags(kwargs, available_tags=self._available_tags) # Now dealing with data cache data_cache = data_cache or os.environ.get(APD_DATA_CACHE_DIR, None) if isinstance(data_cache, str): self._data_cache = DataCache(data_cache) else: self._data_cache = data_cache def __call__( self, *, return_type=ApdReturnType.PFN, check_data=True, use_local_cache=True, showmax=10, **tags, ): # pylint: disable-msg=too-many-locals """Main method that returns the dataset info. The normal behaviour is to return the PFNs for the samples, but setting return_type to ApdReturnType.SAMPLE returns the SampleCollection""" # Establishing the list of samples to run on samples = self._samples # Merge the current tags with the default passed to the constructor # and check that they are consistent effective_tags = _validate_tags(tags, self._default_tags, self._available_tags) for tagname, tagvalue in effective_tags.items(): logger.debug("Filtering for %s = %s", tagname, tagvalue) # Applying the filters in one go samples = samples.filter(**effective_tags) logger.debug("Matched %d samples", len(samples)) # Filter samples and check that we have what we expect if check_data: errors = _sample_check(samples, effective_tags) if len(errors) > 0: error_txt = f"{len(errors)} problem(s) found\n" for etags, ecount in errors: if etags: error_txt += f"{str(etags)}: " if ecount > 0: error_txt += f"{ecount} samples for the same configuration found, this is ambiguous:" error_txt += ( f"(only the first {showmax} samples printed)" if (ecount > showmax) else "" ) match_list = [ str(m) for m in itertools.islice( samples.filter(**etags).itertags(), 0, showmax ) ] error_txt += "".join( ["\n" + " " * 5 + str(m) for m in match_list] ) else: error_txt += "No matching sample found" logger.debug("Error loading data: %s", error_txt) raise ValueError("Error loading data: " + error_txt) if return_type == ApdReturnType.SAMPLE: return samples if return_type == ApdReturnType.LFN: print("Returning lfns") return samples.LFNs() # by default we return the PFns if use_local_cache: return self._transform_pfns(samples.PFNs()) return samples.PFNs() def _transform_pfns(self, pfns): """Method to return PFNs, useful as it can be overriden in inheriting classes""" if not self._data_cache: return pfns return [self._data_cache(pfn) for pfn in pfns] def __str__(self): """User friendly representation of the AnalysisData instance.""" txt = f"AnalysisProductions: {self._working_group} / {self._analysis}\n" txt += str(self._samples) return txt def __repr__(self): """String representation of the AnalysisData instance.""" return f"<AnalysisData: WG={self._analysis}, analysis={self._working_group}, n_samples={len(self._samples)}>"
[docs] def summary(self, tags: Optional[list] = None) -> dict: """Prepares a summary of the Analysis Production info.""" # Deal with the tags first tag_summary = {} if tags: for tag in tags: if tag in self._available_tags: try: values = sorted(self._available_tags[tag]) except TypeError as exc: raise ValueError( f"Could not sort the values for tag ({tag}). Please check that the values are sensible.\n" ) from exc values = list(self._available_tags[tag]) tag_summary[tag] = values else: raise ValueError( f"Requested tag ({tag}) not valid for the given production (wg: {self._working_group}, analysis: {self._analysis})!" ) else: tag_summary = dict(self._available_tags) summary = {} summary["tags"] = tag_summary # If we specify the tags to be list, we assume the general information should not be printed if not tags: summary["analysis"] = self._analysis summary["working_group"] = self._working_group summary["Number_of_files"] = self._samples.file_count() summary["Bytecount"] = self._samples.byte_count() return summary
[docs] def all_samples(self): """Returns all the samples in this Analysis Production. i.e. without filtering by the default tags""" return self._samples