Source code for apd.eos
###############################################################################
# (c) Copyright 2022-2023 CERN for the benefit of the LHCb Collaboration #
# #
# This software is distributed under the terms of the GNU General Public #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". #
# #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization #
# or submit itself to any jurisdiction. #
###############################################################################
__all__ = ("auth", "authw")
import json
import os
import re
import urllib.parse as urlparse
from pathlib import Path, PurePosixPath
def _find_suitable_token(path: PurePosixPath, allow_write: bool) -> str:
if not hasattr(_find_suitable_token, "tokens"):
_find_suitable_token.tokens = json.loads( # type: ignore[attr-defined]
Path(os.environ["LBAP_TOKENS_FILE"]).read_text()
)
if path.root == "//":
path = PurePosixPath(str(path)[1:])
for eos_token in _find_suitable_token.tokens["eos_tokens"]: # type: ignore[attr-defined]
if allow_write and not eos_token["allow_write"]:
continue
if path.is_relative_to(eos_token["path"]):
return eos_token["token"]
msg = "No matching token"
if allow_write:
msg += " with write access"
msg += f" found for path: {path}\n"
msg += "Available tokens:\n"
for eos_token in _find_suitable_token.tokens["eos_tokens"]: # type: ignore[attr-defined]
msg += f" * {eos_token['path']}"
if not eos_token["allow_write"]:
msg += " (readonly)"
msg += "\n"
raise ValueError(msg)
def add_token_to_url(url: str, allow_write: bool, ignore_nonroot: bool = True) -> str:
original_url = url
url = urlparse.urlparse(url) # type: ignore[assignment]
# skip the files not on root if requested
if ignore_nonroot:
if not url.scheme or url.scheme == "file": # type: ignore[attr-defined]
return original_url
token = _find_suitable_token(PurePosixPath(url.path), allow_write) # type: ignore[attr-defined]
token = urlparse.unquote(token)
url_parts = list(url)
url_parts[4] = urlparse.urlencode(
dict(urlparse.parse_qsl(url_parts[4]))
| {"xrd.wantprot": "unix", "authz": token}
)
url_with_token = urlparse.urlunparse(url_parts)
# EOS currently doesn't accept percent encoded URLs despite ':' being a
# reserved character in URIs. This will be fixed in the next version of EOS
# but for now we need to unquote the URL.
url_with_token = url_with_token.replace("authz=zteos64%3A", "authz=zteos64:")
# EOS also doesn't understand the use of uppercase hex digits for escaped
# padding characters in base64 encoded tokens so replace them with lowercase
url_with_token = re.sub(
r"(&authz=[^&#]+?)((?:%3D){1,3})",
lambda x: x.groups()[0] + x.groups()[1].lower(),
url_with_token,
)
return url_with_token
[docs]def auth(url: str, ignore_nonroot: bool = True) -> str:
"""Take a PFN and return one with read-only credentials appended"""
if "LBAP_TOKENS_FILE" in os.environ:
return add_token_to_url(url, False, ignore_nonroot=ignore_nonroot)
return url
[docs]def authw(url: str, ignore_nonroot: bool = True) -> str:
"""Take a PFN and return one with read-write credentials appended"""
if "LBAP_TOKENS_FILE" in os.environ:
return add_token_to_url(url, True, ignore_nonroot=ignore_nonroot)
return url