Skip to content

Instantly share code, notes, and snippets.

@tonybaloney
Last active March 9, 2020 05:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tonybaloney/fdbc2f9b096bb75946eb9505920cf882 to your computer and use it in GitHub Desktop.
Save tonybaloney/fdbc2f9b096bb75946eb9505920cf882 to your computer and use it in GitHub Desktop.
A TCP port scanner using Python 3.9 subinterpreters and multithreading
import time
import _xxsubinterpreters as subinterpreters
from threading import Thread
import textwrap as tw
import pickle
from queue import Queue
timeout = 1 # in seconds..
def run(host: str, port: int, results: Queue):
# Create a communication channel
channel_id = subinterpreters.channel_create()
interpid = subinterpreters.create()
subinterpreters.run_string(
interpid,
tw.dedent(
"""
import socket; import _xxsubinterpreters as subinterpreters
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
if result == 0:
subinterpreters.channel_send(channel_id, port)
else:
subinterpreters.channel_send(channel_id, -1)
sock.close()
"""),
shared=dict(
channel_id=channel_id,
host=host,
port=port,
timeout=timeout
))
output = subinterpreters.channel_recv(channel_id)
subinterpreters.channel_release(channel_id)
if output != -1:
results.put(port)
if __name__ == '__main__':
start = time.time()
host = "127.0.0.1" # or pick something else
threads = []
results = Queue()
for port in range(80, 100):
t = Thread(target=run, args=(host, port, results))
t.start()
threads.append(t)
for t in threads:
t.join()
while not results.empty():
print("Port {0} is open".format(results.get()))
print("Completed scan in {0} seconds".format(time.time() - start))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment