Skip to content

Instantly share code, notes, and snippets.

@tonybaloney
Last active June 5, 2023 22:13
Show Gist options
  • Save tonybaloney/262986212e1061b97908657a53a605d6 to your computer and use it in GitHub Desktop.
Save tonybaloney/262986212e1061b97908657a53a605d6 to your computer and use it in GitHub Desktop.
import time
import _xxsubinterpreters as subinterpreters
import _xxinterpchannels as interpchannels
from threading import Thread
import textwrap as tw
from queue import Queue
timeout = 1 # in seconds..
def run(host: str, port: int, results: Queue):
# Create a communication channel
channel_id = interpchannels.create()
interpid = subinterpreters.create()
subinterpreters.run_string(
interpid,
tw.dedent(
"""
import socket
import _xxsubinterpreters as subinterpreters
import _xxinterpchannels as interpchannels
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
interpchannels.send(channel_id, result)
sock.close()
"""),
shared=dict(
channel_id=channel_id,
host=host,
port=port,
timeout=timeout
))
print("completed")
output = interpchannels.recv(channel_id)
interpchannels.release(channel_id)
if output == 0:
results.put(port)
if __name__ == '__main__':
start = time.time()
host = "localhost" # pick a friend
threads = []
results = Queue()
for port in range(80, 100):
t = Thread(target=run, args=(host, port, results))
t.start()
threads.append(t)
for t in threads:
t.join()
while not results.empty():
print("Port {0} is open".format(results.get()))
print("Completed scan in {0} seconds".format(time.time() - start))
@ericsnowcurrently
Copy link

FWIW, I'm pretty sure the crashes are much fewer now, though non-zero.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment