Skip to content

Instantly share code, notes, and snippets.

@lgray
Last active October 18, 2019 12:02
Show Gist options
  • Save lgray/58cc92282168097efef9e2fab83f397a to your computer and use it in GitHub Desktop.
Save lgray/58cc92282168097efef9e2fab83f397a to your computer and use it in GitHub Desktop.
import fast_curator
import fast_flow.v1 as fast_flow
import pprint
import copy
datasets = fast_curator.read.from_yaml('curator/file_list.yml')
pprint.pprint(datasets)
coffea_datasets = {}
for ds in datasets:
coffea_datasets[ds.name] = ds.__dict__.copy()
coffea_datasets[ds.name].pop('name')
coffea_datasets[ds.name]['treename'] = coffea_datasets[ds.name].pop('tree')
pprint.pprint(coffea_datasets)
from coffea import processor
from fast_carpenter.masked_tree import MaskedUprootTree
from collections import namedtuple
EventRanger = namedtuple("EventRanger", "start_entry stop_entry entries_in_block")
SingleChunk = namedtuple("SingleChunk", "tree config")
ChunkConfig = namedtuple("ChunkConfig", "dataset")
ConfigProxy = namedtuple("ConfigProxy", "name eventtype")
class stages_accumulator(processor.AccumulatorABC):
def __init__(self, stages):
self._zero = copy.deepcopy(stages)
self._value = copy.deepcopy(stages)
def identity(self):
return stages_accumulator(self._zero)
def __getitem__(self, idx):
return self._value[idx]
def add(self, other):
for i, stage in enumerate(self._value):
if not hasattr(stage, "merge"):
continue
stage.merge(other[i])
class FASTProcessor(processor.ProcessorABC):
def __init__(self, seq_cfg):
self._columns = list()
self._sequence = fast_flow.compile_sequence_yaml(seq_cfg,
output_dir='./carpenter')()
self._accumulator = processor.dict_accumulator(
{'stages': processor.dict_accumulator({})}
)
@property
def columns(self):
return self._columns
@property
def accumulator(self):
return self._accumulator
def process(self, df):
output = self.accumulator.identity()
start = df._branchargs['entrystart']
stop = df._branchargs['entrystop']
tree = MaskedUprootTree(df._tree, EventRanger(start, stop, stop - start))
dsname = df['dataset']
cfg_proxy = ConfigProxy(dsname, 'data' if dsname == 'data' else 'mc')
chunk = SingleChunk(tree, ChunkConfig(cfg_proxy))
output['stages'][dsname] = stages_accumulator(self._sequence)
for work in output['stages'][dsname]._value:
work.event(chunk)
return output
def postprocess(self, accumulator):
stages = accumulator['stages']
results = {}
wf = copy.deepcopy(self._sequence)
for i_step, step in enumerate(wf):
if not hasattr(step, "collector"):
continue
collector = step.collector()
output = collector.collect([(d, (s[i_step],)) for d, s in stages.items()])
results[step.name] = output
accumulator['results'] = results
return accumulator
seq_cfg = '../sequence_cfg.yml'
fp = FASTProcessor(seq_cfg)
from coffea.processor import iterative_executor, futures_executor, run_uproot_job
executor = futures_executor
exe_args = {'workers': 4,
'function_args': {'flatten': False}}
out = run_uproot_job(coffea_datasets, 'events', fp, executor, executor_args = exe_args)
print(out)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment