Created
October 14, 2017 15:41
-
-
Save AmatanHead/6059ee6f6e9a6572934a36199f153122 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
This module implements a threadlocal-like class that is local | |
for asyncio tasks. This is suboptimal and we're fin with it | |
(at least for now). | |
Waiting for a native implementation proposed in pep-0550. | |
""" | |
import weakref | |
import threading | |
import _threading_local | |
import gino | |
_sentinel = type('_sentinel', (), {}) | |
# noinspection PyProtectedMember,PyPep8Naming,PyShadowingNames | |
class _localimpl(_threading_local._localimpl): | |
def get_dict(self): | |
""" | |
Return the dict for the current task/thread. | |
Raises KeyError if none defined. | |
""" | |
# NB: we have to use try/except here because we may be called in a | |
# thread that doesnt't have a loop associated with it. If this is the | |
# case, asyncio.Task.current_task() will raise a RuntimeError. | |
try: | |
# NB: we can't use id(asyncio.Task.current_task()) as a self.dicts | |
# key because task locals may inherit, e.g. if a task was spawned | |
# in another task, both of them will have the same local storage. | |
local = gino.get_local() | |
except RuntimeError: | |
local = None | |
if local is None: | |
return super(_localimpl, self).get_dict() | |
else: | |
# We can't weakref a dict so we weakref an element of it | |
local = local.setdefault('_sentinel', _sentinel()) | |
return self.dicts[id(local)][1] | |
def create_dict(self): | |
""" | |
Create a new dict for the current task/thread, and return it. | |
""" | |
try: | |
local: dict = gino.get_local() | |
except RuntimeError: | |
local = None | |
if local is None: | |
return super(_localimpl, self).create_dict() | |
local = local.setdefault('_sentinel', _sentinel()) | |
key = self.key | |
local_dict = {} | |
local_id = id(local) | |
# This mimics the original threadlocal implementation. We use two | |
# weakrefs here to subscribe to object deletion: if the task gets | |
# deleted, we want to remove the dict associated with if asap. If | |
# the tasklocal object gets deleted, we want to remove the weakref | |
# that triggers dict removal. | |
def local_deleted(_, key=key): | |
# When the localimpl is deleted, remove the thread attribute. | |
thread = wrtask() | |
if thread is not None: | |
del thread.__dict__[key] | |
def task_deleted(_, local_id=local_id): | |
# When the thread is deleted, remove the local dict. | |
# Note that this is suboptimal if the thread object gets | |
# caught in a reference loop. We would like to be called | |
# as soon as the OS-level thread ends instead. | |
local = wrlocal() | |
if local is not None: | |
local.dicts.pop(local_id) | |
wrlocal = weakref.ref(self, local_deleted) | |
wrtask = weakref.ref(local, task_deleted) | |
local.__dict__[key] = wrlocal | |
self.dicts[local_id] = wrtask, local_dict | |
return local_dict | |
# noinspection PyProtectedMember,PyPep8Naming | |
class local: | |
__slots__ = '_local__impl', '__dict__' | |
def __new__(cls, *args, **kwargs): | |
# I'm not sure why this check present in the original implementation, | |
# but, well, it does no harm to just leave it here... | |
if (args or kwargs) and (cls.__init__ is object.__init__): | |
raise TypeError('initialization arguments are not supported') | |
self = object.__new__(cls) | |
impl = _localimpl() | |
impl.localargs = (args, kwargs) | |
impl.locallock = threading.RLock() | |
object.__setattr__(self, '_local__impl', impl) | |
# We need to create the local/thread dict in anticipation of | |
# __init__ being called, to avoid infinite recursion. | |
impl.create_dict() | |
return self | |
def __getattribute__(self, name): | |
with _threading_local._patch(self): | |
return object.__getattribute__(self, name) | |
def __setattr__(self, name, value): | |
if name == '__dict__': | |
cname = self.__class__.__name__ | |
raise AttributeError(f'{cname}.__dict__ is read-only') | |
with _threading_local._patch(self): | |
return object.__setattr__(self, name, value) | |
def __delattr__(self, name): | |
if name == '__dict__': | |
cname = self.__class__.__name__ | |
raise AttributeError(f'{cname}.__dict__ is read-only') | |
with _threading_local._patch(self): | |
return object.__delattr__(self, name) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment