Skip to content

Instantly share code, notes, and snippets.

@AmatanHead
Created October 14, 2017 15:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AmatanHead/6059ee6f6e9a6572934a36199f153122 to your computer and use it in GitHub Desktop.
Save AmatanHead/6059ee6f6e9a6572934a36199f153122 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
"""
This module implements a threadlocal-like class that is local
for asyncio tasks. This is suboptimal and we're fin with it
(at least for now).
Waiting for a native implementation proposed in pep-0550.
"""
import weakref
import threading
import _threading_local
import gino
_sentinel = type('_sentinel', (), {})
# noinspection PyProtectedMember,PyPep8Naming,PyShadowingNames
class _localimpl(_threading_local._localimpl):
def get_dict(self):
"""
Return the dict for the current task/thread.
Raises KeyError if none defined.
"""
# NB: we have to use try/except here because we may be called in a
# thread that doesnt't have a loop associated with it. If this is the
# case, asyncio.Task.current_task() will raise a RuntimeError.
try:
# NB: we can't use id(asyncio.Task.current_task()) as a self.dicts
# key because task locals may inherit, e.g. if a task was spawned
# in another task, both of them will have the same local storage.
local = gino.get_local()
except RuntimeError:
local = None
if local is None:
return super(_localimpl, self).get_dict()
else:
# We can't weakref a dict so we weakref an element of it
local = local.setdefault('_sentinel', _sentinel())
return self.dicts[id(local)][1]
def create_dict(self):
"""
Create a new dict for the current task/thread, and return it.
"""
try:
local: dict = gino.get_local()
except RuntimeError:
local = None
if local is None:
return super(_localimpl, self).create_dict()
local = local.setdefault('_sentinel', _sentinel())
key = self.key
local_dict = {}
local_id = id(local)
# This mimics the original threadlocal implementation. We use two
# weakrefs here to subscribe to object deletion: if the task gets
# deleted, we want to remove the dict associated with if asap. If
# the tasklocal object gets deleted, we want to remove the weakref
# that triggers dict removal.
def local_deleted(_, key=key):
# When the localimpl is deleted, remove the thread attribute.
thread = wrtask()
if thread is not None:
del thread.__dict__[key]
def task_deleted(_, local_id=local_id):
# When the thread is deleted, remove the local dict.
# Note that this is suboptimal if the thread object gets
# caught in a reference loop. We would like to be called
# as soon as the OS-level thread ends instead.
local = wrlocal()
if local is not None:
local.dicts.pop(local_id)
wrlocal = weakref.ref(self, local_deleted)
wrtask = weakref.ref(local, task_deleted)
local.__dict__[key] = wrlocal
self.dicts[local_id] = wrtask, local_dict
return local_dict
# noinspection PyProtectedMember,PyPep8Naming
class local:
__slots__ = '_local__impl', '__dict__'
def __new__(cls, *args, **kwargs):
# I'm not sure why this check present in the original implementation,
# but, well, it does no harm to just leave it here...
if (args or kwargs) and (cls.__init__ is object.__init__):
raise TypeError('initialization arguments are not supported')
self = object.__new__(cls)
impl = _localimpl()
impl.localargs = (args, kwargs)
impl.locallock = threading.RLock()
object.__setattr__(self, '_local__impl', impl)
# We need to create the local/thread dict in anticipation of
# __init__ being called, to avoid infinite recursion.
impl.create_dict()
return self
def __getattribute__(self, name):
with _threading_local._patch(self):
return object.__getattribute__(self, name)
def __setattr__(self, name, value):
if name == '__dict__':
cname = self.__class__.__name__
raise AttributeError(f'{cname}.__dict__ is read-only')
with _threading_local._patch(self):
return object.__setattr__(self, name, value)
def __delattr__(self, name):
if name == '__dict__':
cname = self.__class__.__name__
raise AttributeError(f'{cname}.__dict__ is read-only')
with _threading_local._patch(self):
return object.__delattr__(self, name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment