Skip to content

Instantly share code, notes, and snippets.

@tonybaloney
Last active June 5, 2023 22:13
Show Gist options
  • Save tonybaloney/262986212e1061b97908657a53a605d6 to your computer and use it in GitHub Desktop.
Save tonybaloney/262986212e1061b97908657a53a605d6 to your computer and use it in GitHub Desktop.
import time
import _xxsubinterpreters as subinterpreters
import _xxinterpchannels as interpchannels
from threading import Thread
import textwrap as tw
from queue import Queue
timeout = 1 # in seconds..
def run(host: str, port: int, results: Queue):
# Create a communication channel
channel_id = interpchannels.create()
interpid = subinterpreters.create()
subinterpreters.run_string(
interpid,
tw.dedent(
"""
import socket
import _xxsubinterpreters as subinterpreters
import _xxinterpchannels as interpchannels
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
interpchannels.send(channel_id, result)
sock.close()
"""),
shared=dict(
channel_id=channel_id,
host=host,
port=port,
timeout=timeout
))
print("completed")
output = interpchannels.recv(channel_id)
interpchannels.release(channel_id)
if output == 0:
results.put(port)
if __name__ == '__main__':
start = time.time()
host = "localhost" # pick a friend
threads = []
results = Queue()
for port in range(80, 100):
t = Thread(target=run, args=(host, port, results))
t.start()
threads.append(t)
for t in threads:
t.join()
while not results.empty():
print("Port {0} is open".format(results.get()))
print("Completed scan in {0} seconds".format(time.time() - start))
@tonybaloney
Copy link
Author

Assertion failed: (PyType_Check(Assertion failed: (PyType_Check(base)), function find_name_in_mrbase)), function find_name_in_mro, file typeobject.c, line 4153.o, file typeobject.c, line 4153.
Assertion failed: (PyType_Check(base)), function find_name_in_mro, file typeobject.c, line 4153.
  * frame #4: 0x00000001002e5a93 python.exe`find_name_in_mro.cold.6 at typeobject.c:4153:26 [opt]
    frame #5: 0x00000001001062dd python.exe`find_name_in_mro(type=0x00000001004711d0, name=0x0000000102af7830, error=0x00007000054b52a4) at typeobject.c:4153:26 [opt]
    frame #6: 0x000000010010283e python.exe`_PyType_Lookup(type=0x00000001004711d0, name=0x0000000102af7830) at typeobject.c:4212:11 [opt]
    frame #7: 0x00000001000ed867 python.exe`_PyObject_GenericSetAttrWithDict(obj=0x0000000102ae24b0, name=0x0000000102af7830, value=0x0000000102ae0bf0, dict=0x0000000000000000) at object.c:1515:13 [opt]
    frame #8: 0x00000001000edc0b python.exe`PyObject_GenericSetAttr(obj=<unavailable>, name=<unavailable>, value=<unavailable>) at object.c:1588:12 [opt]
    frame #9: 0x00000001000ec73f python.exe`PyObject_SetAttr(v=0x0000000102ae24b0, name=<unavailable>, value=0x0000000102ae0bf0) at object.c:1145:15 [opt]
    frame #10: 0x00000001000ec632 python.exe`PyObject_SetAttrString(v=0x0000000102ae24b0, name=<unavailable>, w=0x0000000102ae0bf0) at object.c:941:11 [opt]
    frame #11: 0x00000001000e8794 python.exe`_add_methods_to_object(module=0x0000000102ae24b0, name=0x0000000100553690, functions=<unavailable>) at moduleobject.c:172:13 [opt]
    frame #12: 0x00000001000e8593 python.exe`PyModule_FromDefAndSpec2(def=0x000000010047d3f0, spec=0x0000000102af5fa0, module_api_version=1013) at moduleobject.c:348:15 [opt]
    frame #13: 0x00000001001f3044 python.exe`create_builtin(tstate=0x0000000102a90798, name=0x0000000100553690, spec=0x0000000102af5fa0) at import.c:1354:24 [opt]
    frame #14: 0x00000001001f3c53 python.exe`_imp_create_builtin(module=<unavailable>, spec=0x0000000102af5fa0) at import.c:3357:21 [opt]

@ericsnowcurrently
Copy link

FYI, I have the script mostly running-ish now. :) There were a few errors in the script that I've corrected:

import time
import _xxsubinterpreters as subinterpreters
import _xxinterpchannels as interpchannels
from threading import Thread
import textwrap as tw
from queue import Queue

timeout = 1  # in seconds..


def run(host: str, port: int, results: Queue):
    # Create a communication channel
    channel_id = interpchannels.create()
    interpid = subinterpreters.create()
    subinterpreters.run_string(
        interpid,
        tw.dedent(
            """
            import socket
            import _xxsubinterpreters as subinterpreters
            import _xxinterpchannels as interpchannels
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(timeout)
            result = sock.connect_ex((host, port))
            interpchannels.send(channel_id, result)
            sock.close()
            """),
        shared=dict(
            channel_id=channel_id,
            host=host,
            port=port,
            timeout=timeout
        ))
    print("completed")
    output = interpchannels.recv(channel_id)
    interpchannels.release(channel_id)
    if output == 0:
        results.put(port)


if __name__ == '__main__':
    start = time.time()
    host = "localhost"  # pick a friend
    threads = []
    results = Queue()
    for port in range(80, 100):
        t = Thread(target=run, args=(host, port, results))
        t.start()
        threads.append(t)
    for t in threads:
        t.join()
    while not results.empty():
        print("Port {0} is open".format(results.get()))
    print("Completed scan in {0} seconds".format(time.time() - start))

@ericsnowcurrently
Copy link

I've updated my per-interpreter-gil-new branch with some fixes, but there are still races causing havoc.

@tonybaloney
Copy link
Author

It worked! (but also had an issue at the end)

completed
completed
completed
completed
completed
completed
completed
completed
completed
completed
completed
completed
completed
completed
completed
completed
completed
completed
completed
completed
Port 88 is open
Completed scan in 0.46167707443237305 seconds
Assertion failed: (keys != Py_EMPTY_KEYS), function free_keys_object, file dictobject.c, line 654.
Fatal Python error: Aborted

Current thread 0x00007ff85e59a340 (most recent call first):
  Garbage-collecting
  <no Python frame>
zsh: abort      ./python.exe -X dev test_scanner.py

@ericsnowcurrently
Copy link

Yeah, I'll be looking into that.

@tonybaloney
Copy link
Author

>>> timeit.timeit("p=Process();p.start();p.join()", setup="from multiprocessing import Process", number=100)
9.623484174000623
>>> timeit.timeit("id=subinterpreters.create();subinterpreters.run_string(id, '')", setup="import _xxsubinterpreters as subinterpreters", number=100)
1.7007676030043513

Ooh nice!

@ericsnowcurrently
Copy link

I definitely wasn't expecting as much of a performance improvement. Here's one of the slides from my upcoming PyCon talk, where I mostly benchmark using the same code Dave Beazley did in his PyCon 2015 talk, using a resource-constrained VM on my laptop:

Comparison: requests/second (fib(1)) → long request (fib(30)):

  1 client 2 clients 3 clients vs. fib(40)
Plain Threaded 13045 reqs/sec → 0.924 sec 11863 reqs/sec → 1.792 sec 8143 reqs/sec → 2.488 sec 121 reqs/sec → 1.779 sec
Threaded + Subprocesses 683 reqs/sec → 0.939 sec 441 reqs/sec → 1.022 sec 314 reqs/sec → 1.055 sec 607 reqs/sec → 1.009 sec
Async 6526 reqs/sec → 0.907 sec 3707 reqs/sec → 1.779 sec 2550 reqs/sec → 2.614 sec 0 reqs/sec → ∞ sec
Interpreters (shared GIL) 13198 reqs/sec → 0.976 sec ??? reqs/sec ??? reqs/sec ??? reqs/sec
Interpreters (own GIL) 13238 reqs/sec → 0.868 sec 12595 reqs/sec → 0.896 sec 11767 reqs/sec → 0.915 sec 11522 reqs/sec → 0.872 sec

@ericsnowcurrently
Copy link

FWIW, I'm pretty sure the crashes are much fewer now, though non-zero.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment