Skip to content

Instantly share code, notes, and snippets.

@abersheeran
Created March 12, 2024 02:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save abersheeran/ebf59686b60e692534a52eee0572c9dc to your computer and use it in GitHub Desktop.
Save abersheeran/ebf59686b60e692534a52eee0572c9dc to your computer and use it in GitHub Desktop.
Use interpreters module in python3.12
import _xxsubinterpreters
from typing import List
from inspect import cleandoc
SharedValue = int | float | bool | bytes | str | None | tuple["SharedValue", ...]
class Interpreter:
def __init__(self, id: int):
self._id = id
self._subinterpreter = None
@property
def id(self) -> int:
return self._id
def is_running(self) -> bool:
return _xxsubinterpreters.is_running(self._id)
def close(self):
_xxsubinterpreters.destroy(self._id)
def run(self, src_str: str, /, shared: dict[str, SharedValue] | None = None):
_xxsubinterpreters.run_string(self._id, cleandoc(src_str), shared=shared)
def list_all() -> List[Interpreter]:
return [Interpreter(x) for x in _xxsubinterpreters.list_all()]
def get_current() -> Interpreter:
return Interpreter(_xxsubinterpreters.get_current())
def get_main() -> Interpreter:
return Interpreter(_xxsubinterpreters.get_main())
def create() -> Interpreter:
subinterp = _xxsubinterpreters.create(isolated=1)
interp = Interpreter(int(subinterp))
interp._subinterpreter = (
subinterp # Add refference to ensure that they have the same lifetime.
)
return interp
if __name__ == "__main__":
import threading
def task():
intp = create()
intp.run(
"""
for i in range(99999999):
a += i
print(a)
""",
{"a": 0},
)
ts = []
for _ in range(8):
t = threading.Thread(target=task)
t.start()
ts.append(t)
for t in ts:
t.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment