Skip to content

Instantly share code, notes, and snippets.

@kne42
Last active December 10, 2018 03:04
Show Gist options
  • Save kne42/4ad99940ec518611de02fcdba7fd7f08 to your computer and use it in GitHub Desktop.
Save kne42/4ad99940ec518611de02fcdba7fd7f08 to your computer and use it in GitHub Desktop.
delayed updates
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import inspect\n",
"import weakref\n",
"\n",
"\n",
"def _normalize_callback(callback):\n",
" # adapted from:\n",
" # https://github.com/vispy/vispy/blob/master/vispy/util/event.py\n",
" \"\"\"dereference methods into a (self, method_name) pair so that we can\n",
" make the connection without making a strong reference to the\n",
" instance.\"\"\"\n",
" if isinstance(callback, Callback):\n",
" callback = callback._callback\n",
" \n",
" if inspect.ismethod(callback):\n",
" callback = (callback.__self__, callback.__name__)\n",
"\n",
" # always use a weak ref\n",
" if (isinstance(callback, tuple) and not\n",
" isinstance(callback[0], weakref.ref)):\n",
" callback = (weakref.ref(callback[0]),) + callback[1:]\n",
"\n",
" return callback\n",
"\n",
"\n",
"def _iscallback(obj):\n",
" return inspect.ismethod(obj) or inspect.isfunction(obj) or isinstance(obj, Callback)\n",
"\n",
"\n",
"class UpdateBlocker:\n",
" \"\"\"Context manager to prevent updates from being triggered.\n",
" \n",
" Parameters\n",
" ----------\n",
" manager : UpdateManager\n",
" Update manager.\n",
" delayed : bool, optional\n",
" Whether to execute an update upon exit.\n",
" \"\"\"\n",
" def __init__(self, manager, delayed=False):\n",
" self.manager = manager\n",
" self.delayed = delayed\n",
" \n",
" def __enter__(self):\n",
" self.manager.blocked = True\n",
" return self.manager\n",
" \n",
" def __exit__(self, *excinfo):\n",
" self.manager.blocked = False\n",
" if self.delayed:\n",
" self.manager()\n",
"\n",
" \n",
"class Callback:\n",
" def __init__(self, manager, callback, name=None, doc=None):\n",
" self._manager = manager\n",
" \n",
" self._callback = callback\n",
" if isinstance(callback, tuple):\n",
" self._bound = True\n",
" else:\n",
" self._bound = False\n",
" \n",
" if doc is None:\n",
" doc = ''\n",
" \n",
" self.__name__ = name\n",
" self.__doc__ = doc\n",
" \n",
" self._args = None\n",
" self._kwargs = None\n",
" \n",
" def __str__(self):\n",
" return f\"'{self.__name__}' callback\"\n",
" \n",
" def __repr__(self):\n",
" return str(self)\n",
" \n",
" def __call__(self, *args, **kwargs):\n",
" self._args, self._kwargs = args, kwargs\n",
" \n",
" manager = self._manager\n",
" if isinstance(manager, str):\n",
" manager = getattr(args[0], manager)\n",
" \n",
" queue = manager._queue\n",
" \n",
" try:\n",
" queue.remove(self)\n",
" except ValueError:\n",
" pass\n",
" \n",
" queue.append(self)\n",
" manager()\n",
" \n",
" def _exec(self):\n",
" if self._bound:\n",
" wr, name = self._callback\n",
" func = getattr(wr(), name)\n",
" else:\n",
" func = self._callback\n",
" \n",
" return func(*self._args, **self._kwargs)\n",
" \n",
"\n",
"class UpdateManager:\n",
" def __init__(self, *args, **kwargs):\n",
" self._queue = []\n",
" self._callbacks = {}\n",
" \n",
" for callback in args:\n",
" self.add_callback(callback)\n",
" \n",
" for name, callback in kwargs.items():\n",
" self.add_callback(callback, name=name)\n",
" \n",
" self._blocked = 0\n",
" self._blocking = None\n",
" self._delaying = None\n",
" \n",
" def __dir__(self):\n",
" return list(self._callbacks.keys()) + super().__dir__()\n",
" \n",
" def __getattr__(self, name):\n",
" try:\n",
" return self._callbacks[name]\n",
" except KeyError:\n",
" raise AttributeError(f\"'{type(self)}' object has no attribute '{name}'\")\n",
" \n",
" def __call__(self):\n",
" if self.blocked:\n",
" return\n",
" \n",
" queue = self._queue\n",
" \n",
" while queue:\n",
" callback = queue.pop(0)\n",
" callback._exec()\n",
" \n",
" def _add_callback(self, callback, name=None, doc=None, manager=None):\n",
" if doc is None:\n",
" doc = callback.__doc__\n",
" \n",
" callback = _normalize_callback(callback)\n",
" \n",
" if name is None:\n",
" if isinstance(callback, tuple):\n",
" name = callback[1]\n",
" else:\n",
" name = callback.__name__\n",
" \n",
" if manager is None:\n",
" manager = self\n",
" \n",
" callback = Callback(manager, callback, name=name, doc=doc)\n",
" self._callbacks[name] = callback\n",
" return callback\n",
" \n",
" @staticmethod\n",
" def _add_callback_parse_args(args, kwargs):\n",
" func = kwargs.pop('func', None)\n",
" name = kwargs.pop('name', None)\n",
" doc = kwargs.pop('doc', None)\n",
" manager = kwargs.pop('manager', None)\n",
" \n",
" if kwargs:\n",
" raise TypeError('invalid keyword arguments: '\n",
" + ', '.join(kwargs.values()))\n",
" \n",
" args = list(args)\n",
" \n",
" try:\n",
" if func is None:\n",
" if _iscallback(args[0]):\n",
" func = args.pop(0)\n",
" \n",
" name = name or args.pop(0)\n",
" doc = doc or args.pop(0)\n",
" manager = manager or args.pop(0)\n",
" \n",
" except IndexError:\n",
" pass\n",
" \n",
" if args:\n",
" raise TypeError(f'expected at most 4 arguments; got {4 + len(args)}')\n",
" \n",
" if func is not None: assert _iscallback(func), type(func)\n",
" if name is not None: assert isinstance(name, str), type(func)\n",
" if doc is not None: assert isinstance(doc, str), type(func)\n",
" if manager is not None: assert isinstance(manager, (UpdateManager, str))\n",
" \n",
" return func, name, doc, manager\n",
" \n",
" def add_callback(self, *args, **kwargs):\n",
" func, name, doc, manager = self._add_callback_parse_args(args, kwargs)\n",
" \n",
" if func:\n",
" return self._add_callback(func, name, doc, manager)\n",
" \n",
" def inner(func):\n",
" return self._add_callback(func, name, doc, manager)\n",
" \n",
" return inner\n",
" \n",
" @property\n",
" def blocked(self):\n",
" return self._blocked > 0\n",
" \n",
" @blocked.setter\n",
" def blocked(self, blocked):\n",
" if blocked:\n",
" self._blocked += 1\n",
" elif self._blocked > 0:\n",
" self._blocked -= 1\n",
" \n",
" @property\n",
" def blocking(self):\n",
" self._blocking = self._blocking or UpdateBlocker(self)\n",
" return self._blocking\n",
" \n",
" @property\n",
" def delaying(self):\n",
" self._delaying = self._delaying or UpdateBlocker(self, delayed=True)\n",
" return self._delaying\n",
" \n",
" def copy(self):\n",
" cls = type(self)\n",
" return cls(**self._callbacks)"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"def foo():\n",
" print('SPAM!')"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"update = UpdateManager(foo)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"SPAM!\n"
]
}
],
"source": [
"update.foo()"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"def bar(x):\n",
" print(x)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'bar' callback"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"update.add_callback(bar)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2\n"
]
}
],
"source": [
"update.bar(2)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"with update.blocking:\n",
" update.foo()\n",
" update.bar(42)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"SPAM!\n",
"42\n"
]
}
],
"source": [
"update()"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"@update.add_callback\n",
"def baz(*args, **kwargs):\n",
" print(args, kwargs)"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"() {}\n",
"('aliiiens',) {}\n",
"() {'SPAM': 42}\n",
"(0,) {'aliiiens': 3}\n"
]
}
],
"source": [
"update.baz()\n",
"update.baz('aliiiens')\n",
"update.baz(SPAM=42)\n",
"update.baz(0, aliiiens=3)"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"SPAM!\n",
"aliiiens\n"
]
}
],
"source": [
"with update.delaying:\n",
" update.foo()\n",
" update.bar('aliens')\n",
" update.bar('aliiiens')"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"class Foo:\n",
" def bar(self):\n",
" print(42)"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"f = Foo()"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'foobar' callback"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"update.add_callback(f.bar, name='foobar')"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"42\n"
]
}
],
"source": [
"update.foobar()"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'foobarwr' callback"
]
},
"execution_count": 17,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"update.add_callback(Foo().bar, name='foobarwr')"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [],
"source": [
"try:\n",
" update.foobarwr() # ref should've been garbage collected\n",
"except AttributeError:\n",
" pass\n",
"else:\n",
" raise AssertionError()"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [],
"source": [
"m = update.copy()"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [],
"source": [
"import time"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"SPAM!\n",
"2\n"
]
}
],
"source": [
"with m.delaying:\n",
" m.foo()\n",
" time.sleep(5)\n",
" m.bar(2)"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [],
"source": [
"class Foo:\n",
" _manager = UpdateManager()\n",
" \n",
" def __init__(self):\n",
" self.update = self._manager.copy()\n",
" \n",
" @_manager.add_callback('bar')\n",
" def baz(a):\n",
" print(a)"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [],
"source": [
"f = Foo()"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2\n"
]
}
],
"source": [
"f.update.bar(2)"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"42\n"
]
}
],
"source": [
"f.baz(42)"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [],
"source": [
"class Foo:\n",
" _manager = UpdateManager()\n",
" \n",
" def __init__(self):\n",
" self.update = self._manager.copy()\n",
" self._bar = 42\n",
" \n",
" def no_updates(self):\n",
" return self.update.blocking\n",
" \n",
" def update_at_end(self):\n",
" return self.update.delaying\n",
" \n",
" @property\n",
" def bar(self):\n",
" return self._bar\n",
" \n",
" @bar.setter\n",
" @_manager.add_callback('_bar', manager='update')\n",
" def bar(self, bar):\n",
" self._bar = bar"
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {},
"outputs": [],
"source": [
"f = Foo()"
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"42"
]
},
"execution_count": 28,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"f.bar"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'SPAM'"
]
},
"execution_count": 29,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"f.bar = 'SPAM'; f.bar"
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'SPAM'"
]
},
"execution_count": 30,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"with f.no_updates():\n",
" f.bar = 'aliiiens'\n",
" \n",
"f.bar"
]
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'aliiiens'"
]
},
"execution_count": 31,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"f.update(); f.bar"
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"aliiiens\n"
]
},
{
"data": {
"text/plain": [
"'kne42'"
]
},
"execution_count": 32,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"with f.update_at_end():\n",
" f.bar = 'kne42'\n",
" print(f.bar)\n",
" \n",
"f.bar"
]
},
{
"cell_type": "code",
"execution_count": 33,
"metadata": {},
"outputs": [],
"source": [
"class Bar(Foo):\n",
" _manager = Foo._manager.copy()\n",
" \n",
" def __init__(self):\n",
" super().__init__()\n",
" self._baz = 'SPAM!'\n",
" \n",
" @property\n",
" def baz(self):\n",
" return self._baz\n",
" \n",
" @_manager.add_callback('_baz', manager='update')\n",
" def baz(self, baz):\n",
" self._baz = baz"
]
},
{
"cell_type": "code",
"execution_count": 34,
"metadata": {},
"outputs": [],
"source": [
"try:\n",
" f.update._baz # should not have new callback\n",
"except AttributeError:\n",
" pass\n",
"else:\n",
" raise AssertionError()"
]
},
{
"cell_type": "code",
"execution_count": 35,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'_baz' callback"
]
},
"execution_count": 35,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"b = Bar()\n",
"b.update._baz"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python [conda env:py3.6]",
"language": "python",
"name": "conda-env-py3.6-py"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.7"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment