Created
June 3, 2021 16:29
-
-
Save zzzeek/a2adcac4e659302ed5ad05d2beb8b57d to your computer and use it in GitHub Desktop.
python 3.10.0b2 segfault
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import greenlet | |
import sqlite3 | |
class _ErrorContainer(object): | |
error = None | |
def _expect_raises_fn(fn): | |
ec = _ErrorContainer() | |
try: | |
fn() | |
except Exception as err: | |
assert str(err) == "this is a test" | |
# assign the exception context outside of the except | |
# is necessary | |
ec.error = err | |
# don't del the exception context is necessary | |
# del ec | |
def greenlet_spawn(fn, *args, **kwargs): | |
# spawning a greenlet is necessary | |
context = greenlet.greenlet(fn, greenlet.getcurrent()) | |
# assignment to "result" is necessary | |
result = context.switch(*args, **kwargs) | |
# raising exception is necessary | |
raise Exception("this is a test") | |
class OuterConnectionWrapper: | |
def __init__(self, connection): | |
self.connection = connection | |
def go(self, stmt): | |
sqlite_connection = self.connection | |
cursor = sqlite_connection.cursor() | |
cursor.execute("select 1") | |
return cursor | |
def execute(self, stmt): | |
return greenlet_spawn(self.go, stmt) | |
def _do_close(self): | |
self.connection.close() | |
self.connection = None | |
def close(self): | |
self._do_close() | |
class InnerConnectionWrapper: | |
def __init__(self, connection): | |
self.connection = connection | |
def create_function(self, *arg, **kw): | |
self.connection.create_function(*arg, **kw) | |
def cursor(self): | |
return self.connection.cursor() | |
def close(self): | |
self.connection = None | |
class ConnectionPool: | |
def __init__(self): | |
self.conn = sqlite3.connect(":memory:") | |
def regexp(a, b): | |
return None | |
self.conn.create_function("regexp", 2, regexp) | |
def connect(self): | |
return InnerConnectionWrapper(self.conn) | |
def do_test(): | |
pool = ConnectionPool() | |
def go(): | |
c1 = pool.connect() | |
conn = OuterConnectionWrapper(c1) | |
try: | |
conn.execute("test") | |
finally: | |
conn.close() | |
_expect_raises_fn(go) | |
do_test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment