#!/usr/bin/env python
import argparse
import json
import sys
from typing import List, Iterable, Dict
from pathlib import Path
import subprocess
from higgs_dna.utils.logger_utils import setup_logger
from concurrent.futures import ThreadPoolExecutor, as_completed
# Define xrootd prefixes for different regions
xrootd_pfx = {
"Americas": "root://cmsxrootd.fnal.gov/",
"Eurasia": "root://xrootd-cms.infn.it/",
"Yolo": "root://cms-xrd-global.cern.ch/",
}
[docs]def get_fetcher_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Generate a JSON file mapping dataset names to file paths."
)
parser.add_argument(
"-i",
"--input",
help="Input dataset definition file to process.",
required=True,
)
parser.add_argument(
"-w",
"--where",
help="Specify the region for xrootd prefix (only for grid mode).",
default="Eurasia",
choices=["Americas", "Eurasia", "Yolo"],
)
parser.add_argument(
"-x",
"--xrootd",
help="Override xrootd prefix with the one given.",
default=None,
)
parser.add_argument(
"--dbs-instance",
dest="instance",
help="The DBS instance to use for querying datasets (only for grid mode).",
type=str,
default="prod/global",
choices=["prod/global", "prod/phys01", "prod/phys02", "prod/phys03"],
)
parser.add_argument(
"--mode",
help="Mode of operation: 'grid' to fetch remote datasets or 'local' to fetch local file paths.",
choices=["grid", "local"],
default="grid",
)
parser.add_argument(
"--recursive",
action="store_true",
help="Recursively include files in subdirectories (only for local mode).",
)
parser.add_argument(
"--file-extension",
nargs='*',
help="Filter files by extensions (e.g., .root .txt) (only for local mode). If not specified, all files are included.",
)
return parser.parse_args()
[docs]def get_dataset_dict_grid(fset: Iterable[Iterable[str]], xrd: str, dbs_instance: str, logger) -> Dict[str, List[str]]:
"""
Fetch file lists for grid datasets using dasgoclient.
This function is parallelised and will restart stuck requests after 10 seconds.
:param fset: Iterable of tuples (dataset-short-name, dataset-path)
:param xrd: xrootd prefix
:param dbs_instance: DBS instance for dasgoclient
:param logger: Logger instance
:return: Dictionary mapping dataset names to list of file paths
"""
def fetch_dataset(name: str, dataset: str) -> (str, List[str]):
logger.info(f"Fetching files for dataset '{name}': '{dataset}'")
private_appendix = "" if not dataset.endswith("/USER") else " instance=prod/phys03"
cmd = f"/cvmfs/cms.cern.ch/common/dasgoclient -query='instance={dbs_instance} file dataset={dataset}{private_appendix}'"
logger.debug(f"Executing command: {cmd}")
while True:
try:
flist = subprocess.check_output(cmd, shell=True, universal_newlines=True, timeout=10).splitlines()
break
except subprocess.TimeoutExpired:
logger.warning(f"Timeout reached for dataset '{dataset}', retrying...")
except subprocess.CalledProcessError as e:
logger.error(f"dasgoclient command failed for dataset '{dataset}': {e}")
return name, []
except Exception as e:
logger.error(f"Unexpected error while fetching files for dataset '{dataset}': {e}")
return name, []
flist = [xrd + f for f in flist if f.strip()]
logger.info(f"Found {len(flist)} files for dataset '{name}'.")
return name, flist
fdict = {}
with ThreadPoolExecutor() as executor:
future_to_dataset = {executor.submit(fetch_dataset, name, dataset): name for name, dataset in fset}
for future in as_completed(future_to_dataset):
name, flist = future.result()
if name not in fdict:
fdict[name] = flist
else:
fdict[name].extend(flist)
# Reorder fdict to match the order in which datasets were passed in fset
ordered_fdict = {}
for name, _ in fset:
ordered_fdict[name] = fdict.get(name, [])
return ordered_fdict
[docs]def get_dataset_dict_local(
fset: Iterable[Iterable[str]], recursive: bool, extensions: List[str], logger
) -> Dict[str, List[str]]:
"""
Collect file lists for local directories.
:param fset: Iterable of tuples (dataset-short-name, directory-path)
:param recursive: Whether to search directories recursively
:param extensions: List of file extensions to filter (case-insensitive)
:param logger: Logger instance
:return: Dictionary mapping dataset names to list of local file paths
"""
fdict = {}
for name, dir_path in fset:
logger.info(f"Collecting files for local dataset '{name}': '{dir_path}'")
directory = Path(dir_path)
if not directory.is_dir():
logger.error(f"Directory '{dir_path}' does not exist or is not a directory.")
continue
# Choose the appropriate glob method
pattern = '**/*' if recursive else '*'
try:
files = []
for file in directory.glob(pattern):
if file.is_file():
if extensions:
if file.suffix.lower() in [ext.lower() for ext in extensions]:
files.append(str(file.resolve()))
else:
files.append(str(file.resolve()))
if name not in fdict:
fdict[name] = files
else:
fdict[name].extend(files)
logger.info(f"Found {len(files)} files for local dataset '{name}'.")
except Exception as e:
logger.error(f"Error while collecting files from directory '{dir_path}': {e}")
return fdict
[docs]def main():
args = get_fetcher_args()
logger = setup_logger(level="INFO")
if not args.input.endswith(".txt"):
logger.error("Input file must have a '.txt' extension and be a text file!")
sys.exit(1)
# Read and parse the input file
fset = read_input_file(args.input, args.mode, logger)
if not fset:
logger.error(f"No valid entries found in '{args.input}'. Exiting.")
sys.exit(1)
logger.info(f"Using the following dataset names and paths: {fset}")
if args.mode == "grid":
# Determine xrootd prefix
xrd = xrootd_pfx.get(args.where, "")
if args.xrootd:
xrd = args.xrootd
logger.info(f"Using xrootd prefix: '{xrd}'")
# Fetch grid file paths
fdict = get_dataset_dict_grid(fset, xrd, args.instance, logger)
elif args.mode == "local":
# Fetch local file paths
fdict = get_dataset_dict_local(fset, args.recursive, args.file_extension, logger)
# Check if any data was collected
if not fdict:
logger.error("No files were collected. Exiting without creating JSON.")
sys.exit(1)
# Define output JSON file path
output_json = Path(args.input).with_suffix('.json')
# Write the JSON data to the output file
try:
with open(output_json, 'w') as fp:
json.dump(fdict, fp, indent=4)
logger.info(f"Successfully wrote data to JSON file '{output_json}'.")
except Exception as e:
logger.error(f"Error writing to JSON file '{output_json}': {e}")
sys.exit(1)
if __name__ == "__main__":
main()