Source code for higgs_dna.workflows.dystudies

from higgs_dna.workflows.skeleton import HggSkeletonProcessor
from higgs_dna.systematics import object_systematics as available_object_systematics
from higgs_dna.systematics import object_corrections as available_object_corrections
from higgs_dna.systematics import weight_systematics as available_weight_systematics
from higgs_dna.systematics import weight_corrections as available_weight_corrections
from higgs_dna.systematics import apply_systematic_variations_object_level
from higgs_dna.selections.photon_selections import photon_preselection
from higgs_dna.selections.lumi_selections import select_lumis
from higgs_dna.utils.dumping_utils import diphoton_list_to_pandas, dump_pandas
from higgs_dna.utils.misc_utils import trigger_match, delta_r_with_ScEta
from higgs_dna.tools.SC_eta import add_photon_SC_eta
from higgs_dna.tools.flow_corrections import apply_flow_corrections_to_photons
from higgs_dna.tools.EcalBadCalibCrystal_events import remove_EcalBadCalibCrystal_events
from higgs_dna.tools.sigma_m_tools import compute_sigma_m
from typing import Any, Dict, List, Optional
import awkward as ak
import logging
import warnings
import numpy
import sys
from coffea.analysis_tools import Weights

logger = logging.getLogger(__name__)


[docs]class TagAndProbeProcessor(HggSkeletonProcessor): def __init__( self, metaconditions: Dict[str, Any], systematics: Dict[str, List[Any]] = None, corrections: Optional[Dict[str, List[str]]] = None, apply_trigger: bool = False, output_location: Optional[str] = None, taggers: Optional[List[Any]] = None, nano_version: int = None, bTagEffFileName: Optional[str] = None, trigger_group: str = ".*SingleEle.*", analysis: str = "tagAndProbe", applyCQR: bool = False, skipJetVetoMap: bool = False, year: Optional[Dict[str, List[str]]] = None, fiducialCuts: str = "classical", doDeco: bool = False, Smear_sigma_m: bool = False, doFlow_corrections: bool = False, validate_with_electrons: bool = False, output_format: str = "parquet", ) -> None: if validate_with_electrons: raise ValueError(f"Validation with electrons is not supported in {self.__class__.__name__}") super().__init__( metaconditions, systematics=systematics, corrections=corrections, apply_trigger=apply_trigger, nano_version=nano_version, bTagEffFileName=bTagEffFileName, output_location=output_location, taggers=taggers, trigger_group=".*SingleEle.*", analysis="tagAndProbe", applyCQR=applyCQR, skipJetVetoMap=False, year=year if year is not None else {}, fiducialCuts=fiducialCuts, doDeco=doDeco, Smear_sigma_m=Smear_sigma_m, doFlow_corrections=doFlow_corrections, validate_with_electrons=validate_with_electrons, output_format=output_format ) self.prefixes = {"tag": "tag", "probe": "probe"}
[docs] def process(self, events: ak.Array) -> Dict[Any, Any]: dataset_name = events.metadata["dataset"] # data or mc? self.data_kind = "mc" if "GenPart" in ak.fields(events) else "data" # lumi mask if self.data_kind == "data": try: lumimask = select_lumis(self.year[dataset_name][0], events, logger) events = events[lumimask] except: logger.info( f"[ lumimask ] Skip now! Unable to find year info of {dataset_name}" ) # apply filters and triggers events = self.apply_filters_and_triggers(events) # remove events affected by EcalBadCalibCrystal if self.data_kind == "data": events = remove_EcalBadCalibCrystal_events(events) # we need ScEta for corrections and systematics, it is present in NanoAODv13+ and can be calculated using PV for older versions events.Photon = add_photon_SC_eta(events.Photon, events.PV) # read which systematics and corrections to process try: correction_names = self.corrections[dataset_name] except KeyError: correction_names = [] try: systematic_names = self.systematics[dataset_name] except KeyError: systematic_names = [] # If --Smear-sigma_m == True and no Smearing correction in .json for MC throws an error, since the pt scpectrum need to be smeared in order to properly calculate the smeared sigma_m_m if ( self.data_kind == "mc" and self.Smear_sigma_m and ("Smearing_Trad" not in correction_names and "Smearing_IJazZ" not in correction_names and "Smearing2G_IJazZ" not in correction_names) ): warnings.warn( "Smearing_Trad or Smearing_IJazZ or Smearing2G_IJazZ should be specified in the corrections field in .json in order to smear the mass!" ) sys.exit(0) # save raw pt if we use scale/smearing corrections s_or_s_applied = False s_or_s_ele_applied = False for correction in correction_names: if "scale" or "smearing" in correction.lower(): if "Electron" in correction: s_or_s_ele_applied = True else: s_or_s_applied = True if s_or_s_applied: events.Photon = ak.with_field(events.Photon, ak.copy(events.Photon.pt), "pt_raw") if s_or_s_ele_applied: events.Electron = ak.with_field(events.Electron, ak.copy(events.Electron.pt), "pt_raw") # Since now we are applying Smearing term to the sigma_m_over_m i added this portion of code # specially for the estimation of smearing terms for the data events [data pt/energy] are not smeared! if self.data_kind == "data" and self.Smear_sigma_m: if "Scale_Trad" in correction_names: correction_name = "Smearing_Trad" elif "Scale_IJazZ" in correction_names: correction_name = "Smearing_IJazZ" elif "Scale2G_IJazZ" in correction_names: correction_name = "Smearing2G_IJazZ" else: logger.info('Specify a scale correction for the data in the corrections field in .json in order to smear the mass!') sys.exit(0) logger.info( f"\nApplying correction {correction_name} to dataset {dataset_name}\n" ) varying_function = available_object_corrections[correction_name] events = varying_function(events=events,year=self.year[dataset_name][0]) for correction_name in correction_names: if correction_name in available_object_corrections.keys(): logger.info( f"\nApplying correction {correction_name} to dataset {dataset_name}\n" ) varying_function = available_object_corrections[correction_name] events = varying_function(events=events, year=self.year[dataset_name][0]) elif correction_name in available_weight_corrections: # event weight corrections will be applied after photon preselection / application of further taggers continue else: # may want to throw an error instead, needs to be discussed warnings.warn(f"Could not process correction {correction_name}.") continue original_photons = events.Photon # Computing the normalizing flow correction if self.data_kind == "mc" and self.doFlow_corrections: original_photons = apply_flow_corrections_to_photons( original_photons, events, self.meta, self.year[dataset_name][0], self.add_photonid_mva_run3, logger ) # Add additional collections if object systematics should be applied collections = { "Photon": original_photons, } # Apply the systematic variations. collections = apply_systematic_variations_object_level( systematic_names, events, self.year[dataset_name][0], logger, available_object_systematics, available_weight_systematics, collections ) original_photons = collections["Photon"] # Write systematic variations to dicts photons_dct = {} photons_dct["nominal"] = original_photons logger.debug(original_photons.systematics.fields) for systematic in original_photons.systematics.fields: for variation in original_photons.systematics[systematic].fields: photons_dct[f"{systematic}_{variation}"] = original_photons.systematics[ systematic ][variation] for variation, photons in photons_dct.items(): logger.debug(f"Variation: {variation}") if variation == "nominal": do_variation = "nominal" if self.chained_quantile is not None: photons = self.chained_quantile.apply(photons, events) # recompute photonid_mva on the fly if self.photonid_mva_EB and self.photonid_mva_EE: photons = self.add_photonid_mva(photons, events) # photon preselection photons = photon_preselection( self, photons, events, electron_veto=False, revert_electron_veto=True, year=self.year[dataset_name][0] ) if self.data_kind == "mc": # TODO: add weight systs and corrections! (if needed) # need to annotate the photons already here with a weight since later, each photon can be tag and probe and this changes the length of the array photons["weight"] = events["genWeight"] # keep only photons matched to gen e+ or e- photons = photons[photons.genPartFlav == 11] # other event related variables need to be added before the tag&probe combination # nPV just for validation of pileup reweighting photons["nPV"] = events.PV.npvs photons["fixedGridRhoAll"] = events.Rho.fixedGridRhoAll # TODO: HLT matching for data # double the number of diphoton candidates (each item in the pair can be both a tag and a probe) tnp = ak.combinations(photons, 2, fields=["tag", "probe"]) pnt = ak.combinations(photons, 2, fields=["probe", "tag"]) tnp_candidates = ak.concatenate([tnp, pnt], axis=1) # Ensure gen matching in MC for origin from gamma star Z (equivalent when it comes to PDG ID assigment in CMS MC) # This means a) parent of tag and probe is the same b) parent is a Z if self.data_kind == "mc": logger.info("Matching to Z boson") mask = (tnp_candidates.tag.matched_gen.distinctParentIdxG == tnp_candidates.probe.matched_gen.distinctParentIdxG) & (tnp_candidates.tag.matched_gen.distinctParent.pdgId == 23) & (tnp_candidates.probe.matched_gen.distinctParent.pdgId == 23) tnp_candidates = tnp_candidates[ak.fill_none(mask, False)] # add ScEta to the matched electrons of the tags matched_electrons_tags = tnp_candidates.tag.matched_electron matched_electrons_tags["ScEta"] = matched_electrons_tags.eta + matched_electrons_tags.deltaEtaSC # check that the e+/e- matched to tag and probe are not the same particle if self.data_kind == "mc": tnp_candidates = tnp_candidates[ tnp_candidates.tag.genPartIdx != tnp_candidates.probe.genPartIdx ] # imply trigger threshold from year year = self.year[dataset_name][0] if "2016" in year: trigger_pt = 27 elif "2017" in year or "2018" in year: trigger_pt = 32 else: trigger_pt = 30 # find out if we're running on EGMNano samples. If so, filterbit for Ele*_WPTight_Gsf is 12, otherwise 1 try: eledoc = [x for x in events.TrigObj.filterBits.__doc__.split(";") if "for Electron" in x][0] except IndexError: eledoc = "" filterbit = 12 if "1e WPTight L1T match" in eledoc else 1 # tag selections tag_mask = ( (tnp_candidates.tag.pt > 40) & (tnp_candidates.tag.electronIdx != -1) & (tnp_candidates.tag.pixelSeed) & ( tnp_candidates.tag.pfChargedIsoPFPV < 20 ) # was: (tnp_candidates.tag.chargedHadronIso < 20) & ( tnp_candidates.tag.pfChargedIsoPFPV / tnp_candidates.tag.pt < 0.3 ) # was: (tnp_candidates.tag.chargedHadronIso / tnp_candidates.tag.pt < 0.3) & ( trigger_match( matched_electrons_tags, events.TrigObj, pdgid=11, pt=trigger_pt, filterbit=filterbit, metric=delta_r_with_ScEta, dr=0.1 ) ) # match tag with an HLT Ele30 object ) # No selection on the probe to not bias it! # apply selections tnp_candidates = tnp_candidates[tag_mask] # Since the Weights object accepts only flat masks, the tag and probe mask is flattened flat_tag_and_probe_mask = ak.any(tag_mask, axis=1) """ This n_event_tnp_cand array is created to keep track of how many tag and probe candidates we have at each event Since the pileup rw is calculated at a event level, we will have only one weight for event But since we are saving ak.flatten(tnp_candidates) , we need the n_event_tnp_cand to unroll the weights to each tnp candidate at the event """ n_event_tnp_cand = ak.to_numpy(ak.num(tnp_candidates[flat_tag_and_probe_mask])) # candidates need to be flattened since we have each photon as a tag and probe, otherwise it can't be exported to numpy tnp_candidates = ak.flatten(tnp_candidates) # return if there is no surviving events if len(tnp_candidates) == 0: logger.info("No surviving events in this run, return now!") return {} # performing the weight corrections after the preselctions if self.data_kind == "mc": event_weights = Weights(size=len(events[flat_tag_and_probe_mask])) event_weights._weight = numpy.array(events[flat_tag_and_probe_mask].genWeight) # corrections to event weights: for correction_name in correction_names: if correction_name in available_weight_corrections: logger.info( f"Adding correction {correction_name} to weight collection of dataset {dataset_name}" ) varying_function = available_weight_corrections[ correction_name ] event_weights = varying_function( events=events[flat_tag_and_probe_mask], photons=events.Photon[flat_tag_and_probe_mask], weights=event_weights, dataset_name=dataset_name, year=self.year[dataset_name][0], ) # systematic variations of event weights go to nominal output dataframe: if do_variation == "nominal": for systematic_name in systematic_names: if systematic_name in available_weight_systematics: logger.info( f"Adding systematic {systematic_name} to weight collection of dataset {dataset_name}" ) varying_function = available_weight_systematics[ systematic_name ] event_weights = varying_function( events=events[flat_tag_and_probe_mask], photons=events.Photon[flat_tag_and_probe_mask], weights=event_weights, dataset_name=dataset_name, year=self.year[dataset_name][0], ) # Compute and store the different variations of sigma_m_over_m tnp_candidates = compute_sigma_m(tnp_candidates, processor='tnp', flow_corrections=self.doFlow_corrections, smear=self.Smear_sigma_m, IsData=(self.data_kind == "data")) # Adding the tagandprobe pair mass. Based on the expression provided here for a massless pair of particles -> (https://en.wikipedia.org/wiki/Invariant_mass) tnp_candidates["mass"] = numpy.sqrt(2 * tnp_candidates["tag"].pt * tnp_candidates["probe"].pt * (numpy.cosh(tnp_candidates["tag"].eta - tnp_candidates["probe"].eta) - numpy.cos(tnp_candidates["tag"].phi - tnp_candidates["probe"].phi))) if self.output_location is not None: df = diphoton_list_to_pandas(self, tnp_candidates) # since we annotated the photons with event variables, these exist now for tag and probe. This concerns weights as well as nPV and fixedGridRhoAll Remove: if self.data_kind == "mc": # Store variations with respect to central weight if do_variation == "nominal": if len(event_weights.variations): logger.info( "Adding systematic weight variations to nominal output file." ) for modifier in event_weights.variations: df["weight_" + modifier] = numpy.repeat(event_weights.weight( modifier=modifier ), n_event_tnp_cand) # storing the central weights df["weight_central"] = numpy.repeat( (event_weights.weight() / numpy.array(events[flat_tag_and_probe_mask].genWeight)), n_event_tnp_cand ) # generated weights * other weights (pile up, SF, etc ...) df["weight"] = numpy.repeat(event_weights.weight(), n_event_tnp_cand) df["weight_no_pu"] = df["tag_weight"] # dropping the nominal and varitation weights df.drop(["tag_weight", "probe_weight"], axis=1, inplace=True) df["nPV"] = df["tag_nPV"] df.drop(["tag_nPV", "probe_nPV"], axis=1, inplace=True) df["fixedGridRhoAll"] = df["tag_fixedGridRhoAll"] df.drop( ["tag_fixedGridRhoAll", "probe_fixedGridRhoAll"], axis=1, inplace=True, ) fname = ( events.behavior["__events_factory__"]._partition_key.replace( "/", "_" ) + ".%s" % self.output_format ) subdirs = [] if "dataset" in events.metadata: subdirs.append(events.metadata["dataset"]) subdirs.append(variation) dump_pandas(self, df, fname, self.output_location, subdirs) return {}
[docs] def process_extra(self, events: ak.Array) -> ak.Array: return events, {}
[docs] def postprocess(self, accumulant: Dict[Any, Any]) -> Any: pass