Source code for higgs_dna.submission.lxplus

import subprocess
import sys
import json
from pathlib import Path
import os
from copy import deepcopy
import logging

logger = logging.getLogger(__name__)

SCRATCH_DUMP_ENV = "HIGGSDNA_STAGEOUT_DIR"
STAGE_TARBALL_NAME = "higgsdna_stageout.tar.gz"


[docs]class LXPlusVanillaSubmitter: """ A class for submitting jobs on the CERN's LXPlus cluster using HTCondor, one job per file in a sample list of an analysis. All jobs for a given sample are submitted to the same cluster. The constructor creates a directory .higgs_dna_vanilla_lxplus if it does not exist and another one called .higgs_dna_vanilla_lxplus/<analysis_name>. The date and time (YMD_HMS) is appended to the end of <analysis_name> to avoid overwriting previous submissions. Inside this directory two subdirectories called <inputs> and <jobs> will be created. In the former the split JSON files will be stored, in the latter the HTCondor job files will be stored. To send each job to a separate cluster then set cluster_per_sample=False in the class constructor. Note: This submission method works well on lxplus, but has also been tested on other infrastructures like RWTH Aachen because it does not rely on any specific EOS or HTCondor features. It is simply a generic HTCondor submitter that relies on an automated submission of automatically generated sh and sub files. """ def __init__( self, analysis_name, analysis_dict, original_analysis_path, sample_dict, args_string, queue="longlunch", memory="10GB", files_per_job=1, cluster_per_sample=True, max_materialize=None, dump_dir=None, stage_output=False, ): # Basic assignments self.analysis_name = f"{analysis_name}_{subprocess.getoutput('date +%Y%m%d_%H%M%S')}" self.analysis_dict = analysis_dict self.sample_dict = sample_dict self.args_string = args_string self.queue = queue self.memory = memory self.cluster_per_sample = cluster_per_sample self.dump_dir = dump_dir self.stage_output = stage_output self.transfer_inputs_by_analysis = {} if self.dump_dir and not str(self.dump_dir).startswith("root://"): self.dump_dir = os.path.abspath(self.dump_dir) if self.stage_output and not self.dump_dir: raise ValueError("Staging output on vanilla lxplus requires --dump to be specified.") if self.dump_dir: Path(self.dump_dir).mkdir(parents=True, exist_ok=True) # parse memory string (e.g. "10GB", "8000MB") into MB integer mem_str = str(self.memory).upper() if mem_str.endswith("GB"): self.memory_mb = int(mem_str[:-2]) * 1024 # binary convention elif mem_str.endswith("MB"): self.memory_mb = int(mem_str[:-2]) else: self.memory_mb = int(mem_str) # Early parse of raw max_materialize self._max_materialize_raw = max_materialize self._parse_max_materialize() # Setup directories self.current_dir = os.getcwd() self.base_dir = os.path.join(self.current_dir, ".higgs_dna_vanilla_lxplus") self.analysis_dir = os.path.join(self.base_dir, self.analysis_name) self.input_dir = os.path.join(self.analysis_dir, "inputs") Path(self.input_dir).mkdir(parents=True, exist_ok=True) self.jobs_dir = os.path.join(self.analysis_dir, "jobs") Path(self.jobs_dir).mkdir(parents=True, exist_ok=True) self.job_files = [] # Create input JSON files and job submission files self._make_input_json(files_per_job) self._write_sub_files(original_analysis_path) def _should_stage_input(self, file_path): return isinstance(file_path, str) and not file_path.startswith("root:") def _prepare_staged_sample_files(self, files): staged_files = [] transfer_inputs = [] seen_names = set() for file_path in files: if not self._should_stage_input(file_path): staged_files.append(file_path) continue abs_path = os.path.abspath(file_path) local_name = os.path.basename(abs_path) if not local_name: staged_files.append(file_path) continue if local_name in seen_names: logger.warning( "Duplicate basename '%s' in staged inputs; keeping original path '%s'.", local_name, file_path, ) staged_files.append(file_path) continue seen_names.add(local_name) staged_files.append(local_name) transfer_inputs.append(abs_path) return staged_files, transfer_inputs def _parse_max_materialize(self): raw = self._max_materialize_raw if raw is not None: try: val = float(raw) except ValueError: raise ValueError(f"Invalid --vlxp-max-materialize: {raw!r}") if val <= 0: raise ValueError(f"--vlxp-max-materialize must be > 0, got {val}") self._max_materialize_val = val else: self._max_materialize_val = None def _compute_max_materialize(self, n_jobs): val = self._max_materialize_val if val is None: return None if val <= 1: return max(1, int(val * n_jobs)) return min(n_jobs, int(val)) def _make_input_json(self, files_per_job): self.json_analysis_files = {} self.json_sample_files = {} for sample in self.sample_dict: self.json_analysis_files[sample] = [] self.json_sample_files[sample] = [] for i in range(0, len(self.sample_dict[sample]), files_per_job): sub_files = self.sample_dict[sample][i : i + files_per_job] transfer_inputs = [] sample_json_contents = sub_files if self.stage_output: sample_json_contents, transfer_inputs = self._prepare_staged_sample_files(sub_files) sample_json = os.path.join(self.input_dir, f"{sample}-{i}.json") with open(sample_json, "w") as jf: json.dump({sample: sample_json_contents}, jf, indent=4) self.json_sample_files[sample].append(sample_json) analysis_json = os.path.join(self.input_dir, f"AN-{sample}-{i}.json") an_to_dump = deepcopy(self.analysis_dict) if self.stage_output: an_to_dump["samplejson"] = os.path.basename(sample_json) else: an_to_dump["samplejson"] = sample_json with open(analysis_json, "w") as jf: json.dump(an_to_dump, jf, indent=4) self.json_analysis_files[sample].append(analysis_json) if self.stage_output: self.transfer_inputs_by_analysis[analysis_json] = [ os.path.abspath(analysis_json), os.path.abspath(sample_json), *transfer_inputs, ] def _stage_tarball_destination(self, base_name): dest_base = self.dump_dir.rstrip("/") return os.path.join(dest_base, f"{base_name}_$(Cluster).$(ProcId).tar.gz") def _write_output_target(self, sub_file, target_dir): if str(target_dir).startswith("root://"): sub_file.write(f"output_destination = {target_dir}\n") else: sub_file.write(f"output_directory = {target_dir}\n") def _write_sub_files(self, original_analysis_path): cluster_per_sample = self.cluster_per_sample if self.stage_output and self.cluster_per_sample: logger.info( "Stage-output input transfer enabled: submitting one cluster per analysis JSON to keep transfer_input_files job-specific." ) cluster_per_sample = False # Proxy setup (only needed if cluster_per_sample=True) try: _, _ = subprocess.getstatusoutput("voms-proxy-info -e --valid 5:00") except: logger.exception("voms proxy not found or validity less that 5 hours") raise try: _, out = subprocess.getstatusoutput("voms-proxy-info -p") proxy = out.strip().split("\n")[-1] except: logger.exception("Unable to voms proxy") raise if cluster_per_sample: for sample, analysis_list in self.json_analysis_files.items(): base_name = f"AN-{sample}" jobs_dir = os.path.realpath(self.jobs_dir).replace("/eos/home-", "/eos/user/") if "/eos/user" in jobs_dir: job_dir = "root://eosuser.cern.ch/" + jobs_dir elif "/eos/cms" in jobs_dir: job_dir = "root://eoscms.cern.ch/" + jobs_dir else: job_dir = jobs_dir n_jobs = len(analysis_list) max_mat = self._compute_max_materialize(n_jobs) # Executable script job_file_executable = os.path.join(jobs_dir, f"{base_name}.sh") with open(job_file_executable, "w") as exe: exe.write("#!/bin/sh\n") exe.write(f"export X509_USER_PROXY={proxy}\n") for idx, json_file in enumerate(analysis_list): args_ = self.args_string.replace(original_analysis_path, json_file).replace(" vanilla_lxplus", " iterative") exe.write(f"if [ $1 -eq {idx} ]; then\n") if self.stage_output: exe.write( f' export {SCRATCH_DUMP_ENV}="${{_CONDOR_SCRATCH_DIR}}/{base_name}_{idx}_stage"\n' ) exe.write(f' rm -rf "${{{SCRATCH_DUMP_ENV}}}"\n') exe.write(f' mkdir -p "${{{SCRATCH_DUMP_ENV}}}"\n') exe.write( f' STAGEOUT_TARBALL="${{_CONDOR_SCRATCH_DIR}}/{STAGE_TARBALL_NAME}"\n' ) exe.write(" RUN_RC=0\n") exe.write( f" /usr/bin/env {sys.prefix}/bin/run_analysis.py {args_} || RUN_RC=107\n" ) exe.write( f' if ! tar -C "${{{SCRATCH_DUMP_ENV}}}" -czf "${{STAGEOUT_TARBALL}}" .; then\n' ) exe.write(" echo 'Warning: stage-out tarball creation failed.' 1>&2\n") exe.write( ' tar -czf "${STAGEOUT_TARBALL}" --files-from /dev/null 2>/dev/null || true\n' ) exe.write(" if [ ${RUN_RC} -eq 0 ]; then RUN_RC=108; fi\n") exe.write(" fi\n") exe.write(f' rm -rf "${{{SCRATCH_DUMP_ENV}}}"\n') exe.write(" exit ${RUN_RC}\n") else: exe.write(f" /usr/bin/env {sys.prefix}/bin/run_analysis.py {args_} || exit 107\n") if not self.stage_output: exe.write(" exit 0\n") exe.write("fi\n") os.chmod(job_file_executable, 0o775) # Submit file job_file_submit = os.path.join(jobs_dir, f"{base_name}.sub") with open(job_file_submit, "w") as sub: sub.write(f"executable = {job_file_executable}\n") sub.write("arguments = $(ProcId)\n") sub.write(f"output = {base_name}.$(ClusterId).$(ProcId).out\n") sub.write(f"error = {base_name}.$(ClusterId).$(ProcId).err\n") sub.write(f"log = {jobs_dir}/{base_name}.$(ClusterId).log\n") self._write_output_target(sub, job_dir) sub.write(f"RequestMemory = ifThenElse(isUndefined(MemoryUsage), {self.memory_mb}, int(MemoryUsage * 1.1))\n") sub.write("getenv = True\n") if self.stage_output: sub.write("should_transfer_files = YES\n") sub.write("when_to_transfer_output = ON_EXIT\n") sub.write(f"transfer_output_files = {STAGE_TARBALL_NAME}\n") destination = self._stage_tarball_destination(base_name) sub.write( f'transfer_output_remaps = "{STAGE_TARBALL_NAME} = {destination}"\n' ) sub.write(f'+JobFlavour = "{self.queue}"\n') sub.write('on_exit_remove = (ExitBySignal == False) && (ExitCode == 0)\n') sub.write('on_exit_hold = (ExitBySignal == True) || (ExitCode != 0)\n') sub.write('periodic_hold = ( JobStatus == 7 ) && ((CurrentTime - EnteredCurrentStatus) > 300)\n') sub.write('periodic_hold_reason = "Job stuck suspended >5m — requeueing"\n') sub.write('periodic_release = ( JobStatus == 5 ) && ((CurrentTime - EnteredCurrentStatus) > 60)\n') sub.write('max_retries = 10\n') if max_mat is not None: sub.write(f"max_materialize = {max_mat}\n") sub.write('requirements = Machine =!= LastRemoteHost\n') sub.write(f"queue {n_jobs}\n") self.job_files.append(job_file_submit) else: for sample, analysis_list in self.json_analysis_files.items(): for analysis_json in analysis_list: base_name = os.path.splitext(os.path.basename(analysis_json))[0] jobs_dir = os.path.realpath(self.jobs_dir).replace("/eos/home-", "/eos/user/") if "/eos/user" in jobs_dir: job_dir = "root://eosuser.cern.ch/" + jobs_dir elif "/eos/cms" in jobs_dir: job_dir = "root://eoscms.cern.ch/" + jobs_dir else: job_dir = jobs_dir job_file_submit = os.path.join(jobs_dir, f"{base_name}.sub") max_mat = self._compute_max_materialize(1) analysis_json_argument = analysis_json if self.stage_output: analysis_json_argument = os.path.basename(analysis_json) args_ = self.args_string.replace(original_analysis_path, analysis_json_argument).replace(" vanilla_lxplus", " iterative") if self.stage_output: job_file_executable = os.path.join(jobs_dir, f"{base_name}.sh") with open(job_file_executable, "w") as exe: exe.write("#!/bin/sh\n") exe.write(f"export X509_USER_PROXY={proxy}\n") exe.write( f'export {SCRATCH_DUMP_ENV}="${{_CONDOR_SCRATCH_DIR}}/{base_name}_stage"\n' ) exe.write(f'rm -rf "${{{SCRATCH_DUMP_ENV}}}"\n') exe.write(f'mkdir -p "${{{SCRATCH_DUMP_ENV}}}"\n') exe.write( f'STAGEOUT_TARBALL="${{_CONDOR_SCRATCH_DIR}}/{STAGE_TARBALL_NAME}"\n' ) exe.write("RUN_RC=0\n") exe.write( f"/usr/bin/env {sys.prefix}/bin/run_analysis.py {args_} || RUN_RC=107\n" ) exe.write( f'if ! tar -C "${{{SCRATCH_DUMP_ENV}}}" -czf "${{STAGEOUT_TARBALL}}" .; then\n' ) exe.write(" echo 'Warning: stage-out tarball creation failed.' 1>&2\n") exe.write( ' tar -czf "${STAGEOUT_TARBALL}" --files-from /dev/null 2>/dev/null || true\n' ) exe.write(" if [ ${RUN_RC} -eq 0 ]; then RUN_RC=108; fi\n") exe.write("fi\n") exe.write(f'rm -rf "${{{SCRATCH_DUMP_ENV}}}"\n') exe.write("exit ${RUN_RC}\n") os.chmod(job_file_executable, 0o775) with open(job_file_submit, "w") as sub: if self.stage_output: sub.write(f"executable = {job_file_executable}\n") else: sub.write("executable = /usr/bin/env\n") sub.write(f"arguments = {sys.prefix}/bin/run_analysis.py {args_} || exit 107\n") sub.write(f"output = {base_name}.out\n") sub.write(f"error = {base_name}.err\n") sub.write(f"log = {jobs_dir}/{base_name}.log\n") self._write_output_target(sub, job_dir) sub.write(f"RequestMemory = ifThenElse(isUndefined(MemoryUsage), {self.memory_mb}, int(MemoryUsage * 1.1))\n") sub.write("getenv = True\n") if self.stage_output: transfer_inputs = self.transfer_inputs_by_analysis.get(analysis_json, []) if transfer_inputs: sub.write(f"transfer_input_files = {','.join(transfer_inputs)}\n") sub.write("should_transfer_files = YES\n") sub.write("when_to_transfer_output = ON_EXIT\n") sub.write(f"transfer_output_files = {STAGE_TARBALL_NAME}\n") destination = self._stage_tarball_destination(base_name) sub.write( f'transfer_output_remaps = "{STAGE_TARBALL_NAME} = {destination}"\n' ) sub.write(f'+JobFlavour = "{self.queue}"\n') sub.write('on_exit_remove = (ExitBySignal == False) && (ExitCode == 0)\n') sub.write('on_exit_hold = (ExitBySignal == True) || (ExitCode != 0)\n') sub.write('periodic_hold = ( JobStatus == 7 ) && ((CurrentTime - EnteredCurrentStatus) > 300)\n') sub.write('periodic_hold_reason = "Job stuck suspended >5m — requeueing"\n') sub.write('periodic_release = ( JobStatus == 5 ) && ((CurrentTime - EnteredCurrentStatus) > 60)\n') sub.write('max_retries = 10\n') if max_mat is not None: sub.write(f"max_materialize = {max_mat}\n") sub.write('requirements = Machine =!= LastRemoteHost\n') sub.write("queue 1\n") self.job_files.append(job_file_submit)
[docs] def submit(self): """ Submit all the generated .sub files to HTCondor. """ for jf in self.job_files: if self.current_dir.startswith("/eos"): subprocess.run(["condor_submit", "-spool", jf]) else: subprocess.run(["condor_submit", jf])