from higgs_dna.workflows.skeleton import HggSkeletonProcessor
from higgs_dna.tools.SC_eta import add_photon_SC_eta
from higgs_dna.tools.EELeak_region import veto_EEleak_flag
from higgs_dna.tools.EcalBadCalibCrystal_events import remove_EcalBadCalibCrystal_events
from higgs_dna.tools.gen_helpers import get_fiducial_flag, get_higgs_truth_attributes
from higgs_dna.tools.sigma_m_tools import compute_sigma_m
from higgs_dna.tools.jetID import add_jetId
from higgs_dna.selections.photon_selections import photon_preselection
from higgs_dna.selections.diphoton_selections import build_diphoton_candidates, apply_fiducial_cut_det_level
from higgs_dna.selections.lepton_selections import select_electrons, select_muons
from higgs_dna.selections.jet_selections import select_jets, jetvetomap
from higgs_dna.selections.lumi_selections import select_lumis
from higgs_dna.utils.dumping_utils import (
diphoton_ak_array,
dump_ak_array,
diphoton_list_to_pandas,
dump_pandas,
get_obj_syst_dict,
)
from higgs_dna.utils.misc_utils import choose_jet
from higgs_dna.tools.flow_corrections import apply_flow_corrections_to_photons
from higgs_dna.tools.mass_decorrelator import decorrelate_mass_resolution
# from higgs_dna.utils.dumping_utils import diphoton_list_to_pandas, dump_pandas
from higgs_dna.systematics import object_systematics as available_object_systematics
from higgs_dna.systematics import object_corrections as available_object_corrections
from higgs_dna.systematics import weight_systematics as available_weight_systematics
from higgs_dna.systematics import weight_corrections as available_weight_corrections
from higgs_dna.systematics import apply_systematic_variations_object_level
from typing import Any, Dict, List, Optional
import awkward as ak
import numpy
import vector
from coffea.analysis_tools import Weights
import copy
import logging
logger = logging.getLogger(__name__)
vector.register_awkward()
[docs]class HggBaseProcessor(HggSkeletonProcessor): # type: ignore
def __init__(
self,
metaconditions: Dict[str, Any],
systematics: Dict[str, List[Any]] = None,
corrections: Dict[str, List[Any]] = None,
apply_trigger: bool = False,
output_location: Optional[str] = None,
taggers: Optional[List[Any]] = None,
nano_version: int = None,
bTagEffFileName: Optional[str] = None,
trigger_group: str = ".*DoubleEG.*",
analysis: str = "mainAnalysis",
applyCQR: bool = False,
skipJetVetoMap: bool = False,
year: Dict[str, List[str]] = None,
fiducialCuts: str = "classical",
doDeco: bool = False,
Smear_sigma_m: bool = False,
doFlow_corrections: bool = False,
validate_with_electrons: bool = False,
output_format: str = "parquet",
) -> None:
super().__init__(
metaconditions,
systematics=systematics,
corrections=corrections,
apply_trigger=apply_trigger,
nano_version=nano_version,
bTagEffFileName=bTagEffFileName,
output_location=output_location,
taggers=taggers,
trigger_group=trigger_group,
analysis=analysis,
applyCQR=applyCQR,
skipJetVetoMap=skipJetVetoMap,
year=year,
fiducialCuts=fiducialCuts,
doDeco=doDeco,
Smear_sigma_m=Smear_sigma_m,
doFlow_corrections=doFlow_corrections,
validate_with_electrons=validate_with_electrons,
output_format=output_format
)
[docs] def process(self, events: ak.Array) -> Dict[Any, Any]:
self.resolve_nano_version(events)
dataset_name = events.metadata["dataset"]
# data or monte carlo?
self.data_kind = "mc" if hasattr(events, "GenPart") else "data"
# here we start recording possible coffea accumulators
# most likely histograms, could be counters, arrays, ...
histos_etc = {}
histos_etc[dataset_name] = {}
if self.data_kind == "mc":
histos_etc[dataset_name]["nTot"] = int(
ak.num(events.genWeight, axis=0)
)
histos_etc[dataset_name]["nPos"] = int(ak.sum(events.genWeight > 0))
histos_etc[dataset_name]["nNeg"] = int(ak.sum(events.genWeight < 0))
histos_etc[dataset_name]["nEff"] = int(
histos_etc[dataset_name]["nPos"] - histos_etc[dataset_name]["nNeg"]
)
histos_etc[dataset_name]["genWeightSum"] = float(
numpy.sum(events.genWeight.to_numpy())
)
else:
histos_etc[dataset_name]["nTot"] = int(len(events))
histos_etc[dataset_name]["nPos"] = int(histos_etc[dataset_name]["nTot"])
histos_etc[dataset_name]["nNeg"] = int(0)
histos_etc[dataset_name]["nEff"] = int(histos_etc[dataset_name]["nTot"])
histos_etc[dataset_name]["genWeightSum"] = float(len(events))
# lumi mask
if self.data_kind == "data":
try:
lumimask = select_lumis(self.year[dataset_name][0], events, logger)
events = events[lumimask]
except KeyError:
logger.warning(
f"[ lumimask ] Skip now! Unable to find year info of {dataset_name}"
)
# metadata array to append to higgsdna output
metadata = {}
if self.data_kind == "mc":
# Add sum of gen weights before selection for normalisation in postprocessing
metadata["sum_genw_presel"] = str(numpy.sum(events.genWeight.to_numpy()))
else:
metadata["sum_genw_presel"] = "Data"
# apply filters and triggers
events = self.apply_filters_and_triggers(events)
# remove events affected by EcalBadCalibCrystal
if self.data_kind == "data":
excluded_years = ["2018", "2017", "2016preVFP", "2016postVFP"]
if self.year[dataset_name][0] not in excluded_years:
events = remove_EcalBadCalibCrystal_events(events)
# add zero photon mass and charge
# TODO: remove this temporary fix when https://github.com/scikit-hep/vector/issues/498 is resolved
events["Photon"] = self.add_zero_photon_mass_and_charge(events.Photon)
# we need ScEta for corrections and systematics, it is present in NanoAODv13+ and can be calculated using PV for older versions
events["Photon"] = add_photon_SC_eta(events.Photon, events.PV)
if self.validate_with_electrons:
# select photons with an associated electron and a pixel seed
photons_mask = (events.Photon.electronIdx != -1) & (events.Photon.pixelSeed)
events["Photon"] = events.Photon[photons_mask]
events = events[ak.num(events.Photon) >= 2]
# add veto EE leak branch for photons, could also be used for electrons
if (
self.year[dataset_name][0] == "2022EE"
or self.year[dataset_name][0] == "2022postEE"
):
events["Photon"] = veto_EEleak_flag(self, events.Photon)
# read which systematics and corrections to process
try:
correction_names = self.corrections[dataset_name]
except KeyError:
correction_names = []
try:
systematic_names = self.systematics[dataset_name]
except KeyError:
systematic_names = []
for correction_name in correction_names:
if correction_name in available_object_corrections.keys():
logger.info(
f"Applying correction {correction_name} to dataset {dataset_name}"
)
varying_function = available_object_corrections[correction_name]
events = varying_function(
events=events, year=self.year[dataset_name][0]
)
elif correction_name in available_weight_corrections:
# event weight corrections will be applied after photon preselection / application of further taggers
continue
else:
logger.error(f"The correction '{correction_name}' does not exist.")
raise ValueError(f"The correction '{correction_name}' does not exist.")
# apply jetvetomap: only retain events that without any jets in the veto region
if not self.skipJetVetoMap:
events = jetvetomap(
self, events, logger, dataset_name, year=self.year[dataset_name][0]
)
original_photons = events.Photon
# NOTE: jet jerc systematics are added in the correction functions and handled later
original_jets = events.Jet
# Computing the normalizing flow correction
if self.data_kind == "mc" and self.doFlow_corrections:
original_photons = apply_flow_corrections_to_photons(
original_photons,
events,
self.meta,
self.year[dataset_name][0],
self.add_photonid_mva_run3,
logger
)
# Add additional collections if object systematics should be applied
collections = {
"Photon": original_photons,
}
# Apply the systematic variations.
collections = apply_systematic_variations_object_level(
systematic_names,
events,
self.year[dataset_name][0],
logger,
available_object_systematics,
available_weight_systematics,
collections
)
original_photons = collections["Photon"]
# Write systematic variations to dictss
photons_dct = {}
photons_dct["nominal"] = original_photons
logger.debug(original_photons.systematics.fields)
for systematic in original_photons.systematics.fields:
for variation in original_photons.systematics[systematic].fields:
photons_dct[f"{systematic}_{variation}"] = original_photons.systematics[systematic][variation]
# NOTE: jet jerc systematics are added in the corrections, now extract those variations and create the dictionary
jerc_syst_list, jets_dct = get_obj_syst_dict(original_jets, ["pt", "mass"])
# object systematics dictionary
logger.debug(f"[ jerc systematics ] {jerc_syst_list}")
# Build the flattened array of all possible variations
variations_combined = []
variations_combined.append(original_photons.systematics.fields)
# NOTE: jet jerc systematics are not added with add_systematics
variations_combined.append(jerc_syst_list)
# Flatten
variations_flattened = sum(variations_combined, []) # Begin with empty list and keep concatenating
# Attach _down and _up
variations = [item + suffix for item in variations_flattened for suffix in ['_down', '_up']]
# Add nominal to the list
variations.append('nominal')
logger.debug(f"[systematics variations] {variations}")
for variation in variations:
photons, jets = photons_dct["nominal"], events.Jet
if variation == "nominal":
pass # Do nothing since we already get the unvaried, but nominally corrected objets above
elif variation in [*photons_dct]: # [*dict] gets the keys of the dict since Python >= 3.5
photons = photons_dct[variation]
elif variation in [*jets_dct]:
jets = jets_dct[variation]
do_variation = variation # We can also simplify this a bit but for now it works
if self.chained_quantile is not None:
photons = self.chained_quantile.apply(photons, events)
# recompute photonid_mva on the fly
if self.photonid_mva_EB and self.photonid_mva_EE:
photons = self.add_photonid_mva(photons, events)
# photon preselection
if self.validate_with_electrons:
photons = photon_preselection(self, photons, events, year=self.year[dataset_name][0], electron_veto=False, revert_electron_veto=True)
else:
photons = photon_preselection(self, photons, events, year=self.year[dataset_name][0])
diphotons = build_diphoton_candidates(photons, self.min_pt_lead_photon)
# Apply the fiducial cut at detector level with helper function
diphotons = apply_fiducial_cut_det_level(self, diphotons)
if self.data_kind == "mc":
# Add the fiducial flags for particle level
diphotons['fiducialClassicalFlag'] = get_fiducial_flag(events, flavour='Classical')
diphotons['fiducialGeometricFlag'] = get_fiducial_flag(events, flavour='Geometric')
# baseline modifications to diphotons
if self.diphoton_mva is not None:
diphotons = self.add_diphoton_mva(diphotons, events)
# workflow specific processing
events, process_extra = self.process_extra(events)
histos_etc.update(process_extra)
# jet_variables
jets = ak.zip(
{
"pt": jets.pt,
"eta": jets.eta,
"phi": jets.phi,
"mass": jets.mass,
"charge": ak.zeros_like(jets.pt),
"jetId": add_jetId(jets, self.nano_version, self.year[dataset_name][0], flattenUnflatten=True), # add jet ID based on nano version
**(
{"neHEF": jets.neHEF, "neEmEF": jets.neEmEF, "chEmEF": jets.chEmEF, "muEF": jets.muEF} if self.nano_version == 12 else {}
),
**(
{"neHEF": jets.neHEF, "neEmEF": jets.neEmEF, "chMultiplicity": jets.chMultiplicity, "neMultiplicity": jets.neMultiplicity, "chEmEF": jets.chEmEF, "chHEF": jets.chHEF, "muEF": jets.muEF} if self.nano_version >= 13 else {}
),
}
)
jets = ak.with_name(jets, "PtEtaPhiMCandidate")
electrons = ak.zip(
{
"pt": events.Electron.pt,
"eta": events.Electron.eta,
"phi": events.Electron.phi,
"mass": events.Electron.mass,
"charge": events.Electron.charge,
"cutBased": events.Electron.cutBased,
"mvaIso_WP90": events.Electron.mvaIso_WP90,
"mvaIso_WP80": events.Electron.mvaIso_WP80,
}
)
electrons = ak.with_name(electrons, "PtEtaPhiMCandidate")
# Special cut for base workflow to replicate iso cut for electrons also for muons
events['Muon'] = events.Muon[events.Muon.pfRelIso03_all < 0.2]
muons = ak.zip(
{
"pt": events.Muon.pt,
"eta": events.Muon.eta,
"phi": events.Muon.phi,
"mass": events.Muon.mass,
"charge": events.Muon.charge,
"tightId": events.Muon.tightId,
"mediumId": events.Muon.mediumId,
"looseId": events.Muon.looseId,
"isGlobal": events.Muon.isGlobal,
"pfIsoId": events.Muon.pfIsoId
}
)
muons = ak.with_name(muons, "PtEtaPhiMCandidate")
# lepton cleaning
sel_electrons = electrons[
select_electrons(self, electrons, diphotons)
]
sel_muons = muons[select_muons(self, muons, diphotons)]
# jet selection and pt ordering
jets = jets[
select_jets(self, jets, diphotons, sel_muons, sel_electrons)
]
jets = jets[ak.argsort(jets.pt, ascending=False)]
# adding selected jets to events to be used in ctagging SF calculation
events["sel_jets"] = jets
n_jets = ak.num(jets)
Njets2p5 = ak.num(jets[(jets.pt > 30) & (numpy.abs(jets.eta) < 2.5)])
first_jet_pt = choose_jet(jets.pt, 0, -999.0)
first_jet_eta = choose_jet(jets.eta, 0, -999.0)
first_jet_phi = choose_jet(jets.phi, 0, -999.0)
first_jet_mass = choose_jet(jets.mass, 0, -999.0)
first_jet_charge = choose_jet(jets.charge, 0, -999.0)
second_jet_pt = choose_jet(jets.pt, 1, -999.0)
second_jet_eta = choose_jet(jets.eta, 1, -999.0)
second_jet_phi = choose_jet(jets.phi, 1, -999.0)
second_jet_mass = choose_jet(jets.mass, 1, -999.0)
second_jet_charge = choose_jet(jets.charge, 1, -999.0)
diphotons["PTJ0"] = first_jet_pt
diphotons["first_jet_eta"] = first_jet_eta
diphotons["first_jet_phi"] = first_jet_phi
diphotons["first_jet_mass"] = first_jet_mass
diphotons["first_jet_charge"] = first_jet_charge
diphotons["PTJ1"] = second_jet_pt
diphotons["second_jet_eta"] = second_jet_eta
diphotons["second_jet_phi"] = second_jet_phi
diphotons["second_jet_mass"] = second_jet_mass
diphotons["second_jet_charge"] = second_jet_charge
diphotons["n_jets"] = n_jets
diphotons["NJ"] = Njets2p5
with numpy.errstate(over='ignore', invalid='ignore'):
first_jet_pz = first_jet_pt * numpy.sinh(first_jet_eta)
first_jet_energy = numpy.sqrt((first_jet_pt**2 * numpy.cosh(first_jet_eta)**2) + first_jet_mass**2)
first_jet_y = 0.5 * numpy.log((first_jet_energy + first_jet_pz) / (first_jet_energy - first_jet_pz))
first_jet_y = ak.fill_none(first_jet_y, -999)
first_jet_y = ak.where(numpy.isnan(first_jet_y), -999, first_jet_y)
diphotons["YJ0"] = first_jet_y
# run taggers on the events list with added diphotons
# the shape here is ensured to be broadcastable
for tagger in self.taggers:
(
diphotons["_".join([tagger.name, str(tagger.priority)])],
tagger_extra,
) = tagger(
events, diphotons
) # creates new column in diphotons - tagger priority, or 0, also return list of histrograms here?
histos_etc.update(tagger_extra)
# if there are taggers to run, arbitrate by them first
# Deal with order of tagger priorities
# Turn from diphoton jagged array to whether or not an event was selected
if len(self.taggers):
counts = ak.num(diphotons.pt, axis=1)
flat_tags = numpy.stack(
(
ak.flatten(
diphotons[
"_".join([tagger.name, str(tagger.priority)])
]
)
for tagger in self.taggers
),
axis=1,
)
tags = ak.from_regular(
ak.unflatten(flat_tags, counts), axis=2
)
winner = ak.min(tags[tags != 0], axis=2)
diphotons["best_tag"] = winner
# lowest priority is most important (ascending sort)
# leave in order of diphoton pT in case of ties (stable sort)
sorted_gg = ak.argsort(diphotons.best_tag, stable=True)
diphotons = diphotons[sorted_gg]
diphotons = ak.firsts(diphotons)
original_diphotons = copy.copy(diphotons)
# annotate diphotons with event information
diphotons["event"] = events.event
diphotons["lumi"] = events.luminosityBlock
diphotons["run"] = events.run
# nPV just for validation of pileup reweighting
diphotons["nPV"] = events.PV.npvs
diphotons["fixedGridRhoAll"] = events.Rho.fixedGridRhoAll
# Beamspot variables
diphotons["BeamSpot_sigmaZ"] = events.BeamSpot.sigmaZ
diphotons["BeamSpot_sigmaZError"] = events.BeamSpot.sigmaZError
# annotate diphotons with dZ information (difference between z position of GenVtx and PV) as required by flashggfinalfits
if self.data_kind == "mc":
diphotons["genWeight"] = events.genWeight
diphotons["dZ"] = events.GenVtx.z - events.PV.z
TruthPTH, TruthYH = get_higgs_truth_attributes(events)
diphotons["TruthPTH"] = TruthPTH
diphotons["TruthYH"] = TruthYH
# Fill zeros for data because there is no GenVtx for data, obviously
else:
diphotons["dZ"] = ak.zeros_like(events.PV.z)
# drop events without a preselected diphoton candidate
# drop events without a tag, if there are tags
if len(self.taggers):
selection_mask = ~(
ak.is_none(diphotons)
| ak.is_none(diphotons.best_tag)
)
diphotons = diphotons[selection_mask]
else:
selection_mask = ~ak.is_none(diphotons)
diphotons = diphotons[selection_mask]
# return if there is no surviving events
if len(diphotons) == 0:
logger.info("No surviving events in this run!")
if self.data_kind == "mc":
# initiate Weight container here, after selection, since event selection cannot easily be applied to weight container afterwards
event_weights = Weights(size=len(events[selection_mask]),storeIndividual=True)
# set weights to generator weights
event_weights._weight = ak.to_numpy(events["genWeight"][selection_mask])
# corrections to event weights:
for correction_name in correction_names:
if correction_name in available_weight_corrections:
logger.info(
f"Adding correction {correction_name} to weight collection of dataset {dataset_name}"
)
common_args = {
"events": events[selection_mask],
"photons": original_diphotons[selection_mask],
"weights": event_weights,
"dataset_name": dataset_name,
"year": self.year[dataset_name][0],
}
varying_function = available_weight_corrections[correction_name]
event_weights = varying_function(**common_args)
elif correction_name in available_object_corrections:
continue
else:
logger.error(f"The correction '{correction_name}' does not exist.")
raise ValueError(f"The correction '{correction_name}' does not exist.")
# systematic variations of event weights go to nominal output dataframe:
if do_variation == "nominal":
for systematic_name in systematic_names:
if systematic_name in available_weight_systematics:
logger.info(
f"Adding systematic {systematic_name} to weight collection of dataset {dataset_name}"
)
if systematic_name == "LHEScale":
if hasattr(events, "LHEScaleWeight"):
diphotons["nweight_LHEScale"] = ak.num(
events.LHEScaleWeight[selection_mask],
axis=1,
)
diphotons[
"weight_LHEScale"
] = events.LHEScaleWeight[selection_mask]
else:
logger.info(
f"No {systematic_name} Weights in dataset {dataset_name}"
)
elif systematic_name == "LHEPdf":
if hasattr(events, "LHEPdfWeight"):
# two AlphaS weights are removed
diphotons["nweight_LHEPdf"] = (
ak.num(
events.LHEPdfWeight[selection_mask],
axis=1,
)
- 2
)
diphotons[
"weight_LHEPdf"
] = events.LHEPdfWeight[selection_mask][
:, :-2
]
else:
logger.info(
f"No {systematic_name} Weights in dataset {dataset_name}"
)
else:
common_args = {
"events": events[selection_mask],
"photons": original_diphotons[selection_mask],
"weights": event_weights,
"dataset_name": dataset_name,
"year": self.year[dataset_name][0],
}
varying_function = available_weight_systematics[systematic_name]
event_weights = varying_function(**common_args)
elif systematic_name in available_object_systematics:
continue
else:
logger.error(f"The systematic '{systematic_name}' does not exist.")
raise ValueError(f"The systematic '{systematic_name}' does not exist.")
diphotons["weight"] = event_weights.weight()
diphotons["weight_central"] = event_weights.weight() / events["genWeight"][selection_mask]
metadata["sum_weight_central"] = str(ak.sum(event_weights.weight()))
# Handle variations
if do_variation == "nominal":
if event_weights.variations:
logger.info(
"Adding systematic weight variations to nominal output file."
)
for modifier in event_weights.variations:
diphotons["weight_" + modifier] = event_weights.weight(modifier=modifier)
metadata["sum_weight_" + modifier] = str(ak.sum(event_weights.weight(modifier=modifier)))
# Add weight variables (=1) for data for consistent datasets
else:
diphotons["weight_central"] = ak.ones_like(
diphotons["event"]
)
diphotons["weight"] = ak.ones_like(diphotons["event"])
# Compute and store the different variations of sigma_m_over_m
diphotons = compute_sigma_m(diphotons, processor='base', flow_corrections=self.doFlow_corrections, smear=self.Smear_sigma_m, IsData=(self.data_kind == "data"))
# Decorrelating the mass resolution - Still need to supress the decorrelator noises
if self.doDeco:
# Decorrelate nominal sigma_m_over_m
diphotons["sigma_m_over_m_nominal_decorr"] = decorrelate_mass_resolution(diphotons, type="nominal", year=self.year[dataset_name][0])
# decorrelate smeared nominal sigma_m_overm_m
if (self.Smear_sigma_m):
diphotons["sigma_m_over_m_smeared_decorr"] = decorrelate_mass_resolution(diphotons, type="smeared", year=self.year[dataset_name][0])
# decorrelate flow corrected sigma_m_over_m
if (self.doFlow_corrections):
diphotons["sigma_m_over_m_corr_decorr"] = decorrelate_mass_resolution(diphotons, type="corr", year=self.year[dataset_name][0])
# decorrelate flow corrected smeared sigma_m_over_m
if (self.doFlow_corrections and self.Smear_sigma_m):
diphotons["sigma_m_over_m_corr_smeared_decorr"] = decorrelate_mass_resolution(diphotons, type="corr_smeared", year=self.year[dataset_name][0], IsSAS_ET_Dependent=True)
# Instead of the nominal sigma_m_over_m, we will use the smeared version of it -> (https://indico.cern.ch/event/1319585/#169-update-on-the-run-3-mass-r)
# else:
# warnings.warn("Smeamering need to be applied in order to decorrelate the (Smeared) mass resolution. -- Exiting!")
# sys.exit(0)
if self.output_location is not None:
if self.output_format == "root":
df = diphoton_list_to_pandas(self, diphotons)
else:
akarr = diphoton_ak_array(self, diphotons)
# Remove fixedGridRhoAll from photons to avoid having event-level info per photon
akarr = akarr[
[
field
for field in akarr.fields
if "lead_fixedGridRhoAll" not in field
]
]
fname = (
events.attrs[
"@events_factory"
]._partition_key.replace("/", "_")
+ ".%s" % self.output_format
)
fname = (fname.replace("%2F","")).replace("%3B1","")
subdirs = []
if "dataset" in events.metadata:
subdirs.append(events.metadata["dataset"])
subdirs.append(do_variation)
if self.output_format == "root":
dump_pandas(self, df, fname, self.output_location, subdirs)
else:
dump_ak_array(
self, akarr, fname, self.output_location, metadata, subdirs,
)
return histos_etc
[docs] def postprocess(self, accumulant: Dict[Any, Any]) -> Any:
pass