Source code for LbExec.workflows.mdf_utils
###############################################################################
# (c) Copyright 2024 CERN for the benefit of the LHCb Collaboration #
# #
# This software is distributed under the terms of the GNU General Public #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". #
# #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization #
# or submit itself to any jurisdiction. #
###############################################################################
"""MDF file utilities."""
from __future__ import annotations
import ctypes
import time
from typing import BinaryIO, cast
from ..options import Options
from ..utils import read_xml_file_catalog, resolve_input_files, write_summary_xml
# Constants
MDF_MERGE_CHUNK_SIZE = 1024 * 1024 # 1MB chunk size for merging MDF files
def _get_mdf_event_size(buffer: ctypes.Array[ctypes.c_char]) -> int:
"""Extract the event size from the MDF buffer."""
raw_bytes = buffer.raw
size = int.from_bytes(raw_bytes[0:4], "little")
for i in range(4, 12, 4):
if int.from_bytes(raw_bytes[i : i + 4], "little") != size:
raise ValueError("MDF file does not have a valid header.")
return size
[docs]def merge_mdf(options: Options):
"""Merge MDF files into a single compressed output file."""
import ROOT # type: ignore[import-untyped]
import zstandard
input_files = options.input_files
if options.xml_file_catalog:
file_catalog = read_xml_file_catalog(options.xml_file_catalog)
input_files = resolve_input_files(options.input_files, file_catalog)
output_ctx: BinaryIO
if options.compression is None:
print("Merging MDF files without compression...")
output_ctx = open(options.output_file, "wb")
elif options.compression.algorithm == "ZSTD":
level = options.compression.level * 2
print(f"Using Zstandard compressor with level {level}...")
comp_params = zstandard.ZstdCompressionParameters.from_level(level)
comp = zstandard.ZstdCompressor(compression_params=comp_params)
raw_file = open( # pylint: disable=consider-using-with
options.output_file, "wb"
)
output_ctx = cast(BinaryIO, comp.stream_writer(raw_file))
else:
raise NotImplementedError(options.compression)
bytes_read = 0
next_progress_print = MDF_MERGE_CHUNK_SIZE * 100
buffer = ctypes.create_string_buffer(MDF_MERGE_CHUNK_SIZE)
start_time = time.time()
with output_ctx as fh:
for input_file in input_files:
input_file = input_file.removeprefix("mdf:")
print("Reading input file:", input_file)
header_checked = False
with ROOT.TFile.Open( # pylint: disable=no-member
f"{input_file}?filetype=raw"
) as rf:
to_read = rf.GetSize()
while to_read > 0:
requested = min(to_read, MDF_MERGE_CHUNK_SIZE)
failure = rf.ReadBuffer(buffer, requested)
if failure:
raise RuntimeError(f"Failed to read from file {input_file}.")
to_read -= requested
bytes_read += requested
# Safety check to make sure we're actually reading a valid MDF file
if not header_checked:
first_event_size = _get_mdf_event_size(buffer)
# If possible, check the second event header
if first_event_size + 12 < bytes_read:
second_buffer = ctypes.create_string_buffer(
buffer.raw[first_event_size:]
)
_get_mdf_event_size(second_buffer)
header_checked = True
if bytes_read >= next_progress_print:
print(
f"Status: {bytes_read / (1024 * 1024):.2f} MiB read and "
f"{fh.tell() / (1024 * 1024):.2f} MiB written "
f"in {time.time() - start_time:.2f} seconds."
)
next_progress_print += MDF_MERGE_CHUNK_SIZE * 100
fh.write(memoryview(buffer)[:requested])
fh.flush()
bytes_written = fh.tell()
end_time = time.time()
print(f"Total bytes read: {bytes_read} ({bytes_read / (1024 * 1024):.2f} MiB)")
print(
f"Total bytes written: {bytes_written} ({bytes_written / (1024 * 1024):.2f} MiB)"
)
print(f"Compression ratio: {bytes_written / bytes_read:.2f}")
print(f"Total time taken: {end_time - start_time:.2f} seconds.")
print(f"Finished merging MDF files into {options.output_file}.")
write_summary_xml(options, {options.output_file})