Last active
October 18, 2019 12:02
-
-
Save lgray/58cc92282168097efef9e2fab83f397a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import fast_curator | |
import fast_flow.v1 as fast_flow | |
import pprint | |
import copy | |
datasets = fast_curator.read.from_yaml('curator/file_list.yml') | |
pprint.pprint(datasets) | |
coffea_datasets = {} | |
for ds in datasets: | |
coffea_datasets[ds.name] = ds.__dict__.copy() | |
coffea_datasets[ds.name].pop('name') | |
coffea_datasets[ds.name]['treename'] = coffea_datasets[ds.name].pop('tree') | |
pprint.pprint(coffea_datasets) | |
from coffea import processor | |
from fast_carpenter.masked_tree import MaskedUprootTree | |
from collections import namedtuple | |
EventRanger = namedtuple("EventRanger", "start_entry stop_entry entries_in_block") | |
SingleChunk = namedtuple("SingleChunk", "tree config") | |
ChunkConfig = namedtuple("ChunkConfig", "dataset") | |
ConfigProxy = namedtuple("ConfigProxy", "name eventtype") | |
class stages_accumulator(processor.AccumulatorABC): | |
def __init__(self, stages): | |
self._zero = copy.deepcopy(stages) | |
self._value = copy.deepcopy(stages) | |
def identity(self): | |
return stages_accumulator(self._zero) | |
def __getitem__(self, idx): | |
return self._value[idx] | |
def add(self, other): | |
for i, stage in enumerate(self._value): | |
if not hasattr(stage, "merge"): | |
continue | |
stage.merge(other[i]) | |
class FASTProcessor(processor.ProcessorABC): | |
def __init__(self, seq_cfg): | |
self._columns = list() | |
self._sequence = fast_flow.compile_sequence_yaml(seq_cfg, | |
output_dir='./carpenter')() | |
self._accumulator = processor.dict_accumulator( | |
{'stages': processor.dict_accumulator({})} | |
) | |
@property | |
def columns(self): | |
return self._columns | |
@property | |
def accumulator(self): | |
return self._accumulator | |
def process(self, df): | |
output = self.accumulator.identity() | |
start = df._branchargs['entrystart'] | |
stop = df._branchargs['entrystop'] | |
tree = MaskedUprootTree(df._tree, EventRanger(start, stop, stop - start)) | |
dsname = df['dataset'] | |
cfg_proxy = ConfigProxy(dsname, 'data' if dsname == 'data' else 'mc') | |
chunk = SingleChunk(tree, ChunkConfig(cfg_proxy)) | |
output['stages'][dsname] = stages_accumulator(self._sequence) | |
for work in output['stages'][dsname]._value: | |
work.event(chunk) | |
return output | |
def postprocess(self, accumulator): | |
stages = accumulator['stages'] | |
results = {} | |
wf = copy.deepcopy(self._sequence) | |
for i_step, step in enumerate(wf): | |
if not hasattr(step, "collector"): | |
continue | |
collector = step.collector() | |
output = collector.collect([(d, (s[i_step],)) for d, s in stages.items()]) | |
results[step.name] = output | |
accumulator['results'] = results | |
return accumulator | |
seq_cfg = '../sequence_cfg.yml' | |
fp = FASTProcessor(seq_cfg) | |
from coffea.processor import iterative_executor, futures_executor, run_uproot_job | |
executor = futures_executor | |
exe_args = {'workers': 4, | |
'function_args': {'flatten': False}} | |
out = run_uproot_job(coffea_datasets, 'events', fp, executor, executor_args = exe_args) | |
print(out) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment