Skip to content

Instantly share code, notes, and snippets.

@lukasheinrich
Created September 7, 2022 20:32
Show Gist options
  • Save lukasheinrich/a7f23b71cf048727ca9012f5f6ea940a to your computer and use it in GitHub Desktop.
Save lukasheinrich/a7f23b71cf048727ca9012f5f6ea940a to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 9,
"id": "2ff23fe5",
"metadata": {},
"outputs": [],
"source": [
"import pyhf\n",
"from pyhf.parameters import ParamViewer\n",
"from pyhf import get_backend\n",
"from pyhf import events\n",
"\n",
"def add_custom_modifier(funcname,deps,newparams):\n",
" def make_func(expression):\n",
" def func(d):\n",
" import numexpr as ne\n",
" return ne.evaluate(expression,local_dict=dict(zip(deps,d)))\n",
" return func\n",
"\n",
" def _allocate_new_param(p):\n",
" return {\n",
" 'paramset_type': 'unconstrained',\n",
" 'n_parameters': 1,\n",
" 'is_shared': True,\n",
" 'inits': p['inits'],\n",
" 'bounds': p['bounds'],\n",
" 'is_scalar': True,\n",
" 'fixed': False,\n",
" }\n",
" \n",
" class _builder:\n",
" def __init__(self, config):\n",
" self.builder_data = {'funcs': {}}\n",
" self.config = config\n",
"\n",
" def collect(self, thismod, nom):\n",
" maskval = True if thismod else False\n",
" mask = [maskval] * len(nom)\n",
" return {'mask': mask}\n",
"\n",
" def append(self, key, channel, sample, thismod, defined_samp):\n",
" self.builder_data.setdefault(key, {}).setdefault(sample, {}).setdefault(\n",
" 'data', {'mask': []}\n",
" )\n",
" nom = (\n",
" defined_samp['data']\n",
" if defined_samp\n",
" else [0.0] * self.config.channel_nbins[channel]\n",
" )\n",
" moddata = self.collect(thismod, nom)\n",
" self.builder_data[key][sample]['data']['mask'] += moddata['mask']\n",
" if thismod:\n",
" if thismod['name'] != funcname:\n",
" print(thismod)\n",
" self.builder_data['funcs'].setdefault(thismod['name'],thismod['data']['expr'])\n",
" self.required_parsets = {k:[_allocate_new_param(v)] for k,v in newparams.items()}\n",
"\n",
" def finalize(self):\n",
" return self.builder_data\n",
"\n",
" class _applier:\n",
" name = funcname\n",
" op_code = 'multiplication'\n",
"\n",
" def __init__(self, modifiers, pdfconfig, builder_data, batch_size=None):\n",
" self.funcs = [make_func(v) for v in builder_data['funcs'].values()]\n",
" \n",
" \n",
" \n",
" self.batch_size = batch_size\n",
" pars_for_applier = deps\n",
" _modnames = [f'{mtype}/{m}' for m, mtype in modifiers]\n",
"\n",
" parfield_shape = (\n",
" (self.batch_size, pdfconfig.npars)\n",
" if self.batch_size\n",
" else (pdfconfig.npars,)\n",
" )\n",
" self.param_viewer = ParamViewer(\n",
" parfield_shape, pdfconfig.par_map, pars_for_applier\n",
" )\n",
" self._custommod_mask = [\n",
" [[builder_data[modname][s]['data']['mask']] for s in pdfconfig.samples]\n",
" for modname in _modnames\n",
" ]\n",
" self._precompute()\n",
" events.subscribe('tensorlib_changed')(self._precompute)\n",
"\n",
" def _precompute(self):\n",
" tensorlib, _ = get_backend()\n",
" if not self.param_viewer.index_selection:\n",
" return\n",
" self.custommod_mask = tensorlib.tile(\n",
" tensorlib.astensor(self._custommod_mask), (1, 1, self.batch_size or 1, 1)\n",
" )\n",
" self.custommod_mask_bool = tensorlib.astensor(\n",
" self.custommod_mask, dtype=\"bool\"\n",
" )\n",
" self.custommod_default = tensorlib.ones(self.custommod_mask.shape)\n",
"\n",
" def apply(self, pars):\n",
" \"\"\"\n",
" Returns:\n",
" modification tensor: Shape (n_modifiers, n_global_samples, n_alphas, n_global_bin)\n",
" \"\"\"\n",
" if not self.param_viewer.index_selection:\n",
" return\n",
" tensorlib, _ = get_backend()\n",
" if self.batch_size is None:\n",
" deps = self.param_viewer.get(pars)\n",
" print('deps',deps.shape)\n",
" results = tensorlib.astensor([f(deps) for f in self.funcs])\n",
" results = tensorlib.einsum(\n",
" 'msab,m->msab', self.custommod_mask, results\n",
" )\n",
" else:\n",
" deps = self.param_viewer.get(pars)\n",
" print('deps',deps.shape)\n",
" results = tensorlib.astensor([f(deps) for f in self.funcs])\n",
" results = tensorlib.einsum(\n",
" 'msab,ma->msab', self.custommod_mask, results\n",
" )\n",
" results = tensorlib.where(\n",
" self.custommod_mask_bool, results, self.custommod_default\n",
" )\n",
" return results\n",
" \n",
" modifier_set = {_applier.name: (_builder, _applier)}\n",
" modifier_set.update(**pyhf.modifiers.histfactory_set)\n",
" return modifier_set"
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "45717981",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'name': 'f1', 'type': 'customfunc', 'data': {'expr': 'm1+(m2**2)'}}\n",
"{'name': 'f2', 'type': 'customfunc', 'data': {'expr': 'm1'}}\n",
"deps (2, 1)\n"
]
},
{
"data": {
"text/plain": [
"array([[510., 510., 510., 510., 510., 510., 510., 510., 510., 510., 510.,\n",
" 510., 510., 510., 510., 510., 510., 510., 510., 510.]])"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"new_params = {\n",
" 'm1': {'inits': (1.0,), 'bounds': ((-5.0,5.0),)},\n",
" 'm2': {'inits': (1.0,), 'bounds': ((-5.0,5.0),)}\n",
"}\n",
"\n",
"expanded_pyhf = add_custom_modifier('customfunc', ['m1','m2'], new_params)\n",
"model = pyhf.Model(\n",
" {\n",
" 'channels': [\n",
" {\n",
" 'name': 'singlechannel',\n",
" 'samples': [\n",
" {'name': 'signal', 'data': [10] * 20,\n",
" 'modifiers': [\n",
" {'name': 'f2','type': 'customfunc','data': {'expr': 'm1'}},\n",
" ],\n",
" },\n",
" {'name': 'background', 'data': [100] * 20, 'modifiers': [\n",
" {'name': 'f1','type': 'customfunc','data': {'expr': 'm1+(m2**2)'}},\n",
" ]},\n",
" ],\n",
" }\n",
" ]\n",
" },\n",
" modifier_set=expanded_pyhf,\n",
" poi_name='m1',\n",
" validate=False,\n",
" batch_size = 1\n",
")\n",
"\n",
"\n",
"model.expected_actualdata([[1.0,2.0]])"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "938c8fdf",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.11"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
@matthewfeickert
Copy link

matthewfeickert commented Sep 26, 2022

@lukasheinrich I'm not sure how current this still is, but if this is the most up to date draft we need to update it to work with pyhf v0.7.0 as this currently fails to run (just need to add the self.is_shared).

edit: I've patched this on my fork. We can't make PRs to Gists, but you can do the following:

git remote add matthewfeickert git@gist.github.com:6fdd0e06c4a224edeca041cb0851e754.git
git fetch --all
git show matthewfeickert/main  # See what I did before merge
git merge matthewfeickert/main
git push origin HEAD
git remote remove matthewfeickert

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment