Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Simple and leightweight multi-threaded port scanner written in python that can work with port and ip address ranges. At the moment this script only supports full-connection tcp scans. For usage help see >python pyscan.py -h
import sys
class ProgressPrinter:
def __init__(self, units, unit_type='', max_length=50, pre='', post='', fill='=', head='>', empty=' '):
if units > max_length:
self.steps = max_length / units
self.max_length = max_length
else:
self.steps = 1
self.max_length = units
self.units = units
self.unit_type = unit_type
self.pre = pre
self.post = post
self.fill = fill
self.head = head
self.empty = empty
def print_progress(self, current_unit=0, pre=''):
if self.head != '' and current_unit != 0:
progress = self.fill * (round(current_unit * self.steps) - 1) + self.head
else:
progress = self.fill * round(current_unit * self.steps)
empty = self.empty * (self.max_length - len(progress))
if current_unit == 0:
if self.pre != '':
sys.stdout.write('\n' + self.pre + '\n')
if pre != '':
sys.stdout.write(pre + ' ' * self.max_length + '\n')
sys.stdout.write('[' + progress + empty +
'] - Finished {}/{} {}'.format(current_unit, self.units, self.unit_type))
else:
if pre != '':
sys.stdout.write('\r' + pre + ' ' * self.max_length + '\n')
sys.stdout.write('[' + progress + empty +
'] - Finished {}/{} {}'.format(current_unit, self.units, self.unit_type))
else:
sys.stdout.write('\r[' + progress + empty +
'] - Finished {}/{} {}'.format(current_unit, self.units, self.unit_type))
if current_unit == self.units:
sys.stdout.write('\n')
if self.post != '':
sys.stdout.write(self.post + '\n')
sys.stdout.flush()
#!/usr/bin/env python3
import optparse
from socket import *
from threading import Lock, Thread
from queue import Queue
import datetime
from progress_printer import ProgressPrinter
scan_lock = Lock()
print_lock = Lock()
finished_scans = 0
num_worker_threads = 10
num_port_threads = 50
def conn_scan(host, port, results):
conn_sock = socket(AF_INET, SOCK_STREAM)
conn_sock.settimeout(4)
try:
conn_sock.connect((host, port))
conn_sock.send(b'HTTP_Request - 404\r\n')
response = conn_sock.recv(100)
scan_lock.acquire()
results['resolved'][host]['open'].append({'port': port, 'response': response})
scan_lock.release()
except error as e:
scan_lock.acquire()
results['resolved'][host]['closed'].append({'port': port, 'error': e})
scan_lock.release()
finally:
conn_sock.close()
def port_scan(host, ports, results, progress_printer):
global finished_scans
port_jobs = Queue()
try:
tgt_ip = gethostbyname(host)
except error as e:
scan_lock.acquire()
results['unresolved'].append({'host': host, 'error': e})
print_lock.acquire()
finished_scans += 1
pre = "[*] Scan for host {} failed!".format(host)
progress_printer.print_progress(finished_scans, pre=pre)
scan_lock.release()
print_lock.release()
return False
try:
tgt_name = gethostbyaddr(tgt_ip)
scan_lock.acquire()
results['resolved'][host] = {'hostname': tgt_name[0], 'ip': tgt_ip, 'initial': host, 'open': [], 'closed': []}
except error as _:
scan_lock.acquire()
results['resolved'][host] = {'hostname': host, 'ip': tgt_ip, 'initial': host, 'open': [], 'closed': []}
finally:
scan_lock.release()
def worker():
while True:
port = port_jobs.get()
conn_scan(host, port, results)
port_jobs.task_done()
for i in range(num_port_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
for port in ports:
port_jobs.put(port)
port_jobs.join()
print_lock.acquire()
finished_scans += 1
pre = "[*] Scan for host {} finished!".format(host)
progress_printer.print_progress(finished_scans, pre=pre)
print_lock.release()
return True
def validate_ip_or_range(s):
valid = True
a = s.split('.')
if len(a) != 4:
return False
for x in a:
if x.isdigit() and not 0 < int(x) <= 255:
valid = False
elif "-" in x:
r = [int(r) for r in x.split("-")]
if not ((r[0] < r[1]) and (0 < r[0] <= 255) and (0 < r[1] <= 255)):
valid = False
return valid
def parse_host_args(host_args):
host_args = host_args.split(",")
tgt_hosts = []
for host_arg in host_args:
if validate_ip_or_range(host_arg):
if "-" in host_arg:
addr_parts = host_arg.split(".")
parts = []
for addr_part in addr_parts:
if "-" in addr_part:
r = [int(part) for part in addr_part.split("-")]
parts.append([str(part) for part in range(r[0], r[1] + 1)])
else:
parts.append([addr_part])
for p0 in parts[0]:
for p1 in parts[1]:
for p2 in parts[2]:
for p3 in parts[3]:
tgt_hosts.append(p0 + "." + p1 + "." + p2 + "." + p3)
else:
tgt_hosts.append(host_arg)
else:
tgt_hosts.append(host_arg)
return tgt_hosts
def parse_port_args(port_args):
port_args = port_args.split(",")
tgt_ports = []
for port_arg in port_args:
port_arg = [int(port) for port in port_arg.split('-')]
if len(port_arg) == 1:
tgt_ports.append(int(port_arg[0]))
else:
for port in range(port_arg[0], port_arg[1] + 1):
tgt_ports.append(port)
return tgt_ports
def print_results(results, only_open):
print('\n----- SHOWING SCAN RESULTS -----')
resolved = results['resolved']
unresolved = results['unresolved']
for host in unresolved:
print("[-] Couldn't resolve host {} - {}".format(host['host'], host['error']))
for host, result in resolved.items():
print("[+] Scan results for {} [{} -> {}]".format(result['ip'], result['initial'], result['hostname']))
for port in result['open']:
print(" [+] {}/tcp open - {}".format(port['port'], port['response']))
if not only_open:
for port in result['closed']:
print(" [-] {}/tcp closed - {}".format(port['port'], port['error']))
def save_to_file(results, only_open):
time = datetime.datetime.now()
hours, minutes, seconds, days, months, years = time.hour, time.minute, time.second, time.day, time.month, time.year
name = "{}-{}-{}_{}-{}-{}_PortScan".format(years, months, days, hours, minutes, seconds)
resolved = results['resolved']
unresolved = results['unresolved']
f = open(name + '.txt', 'w+')
f.write("{} - Scanned {} hosts: \r\n".format(str(time), len(resolved.keys()) + len(unresolved)))
for host in unresolved:
f.write("[-] Couldn't resolve host {} - {}\r\n".format(host['host'], host['error']))
for host, result in resolved.items():
f.write("[+] Scan results for {} [{} -> {}]\r\n".format(result['ip'], result['initial'], result['hostname']))
for port in result['open']:
f.write(" [+] {}/tcp open - {}\r\n".format(port['port'], port['response']))
if not only_open:
for port in result['closed']:
f.write(" [-] {}/tcp closed - {}\r\n".format(port['port'], port['error']))
def main():
parser = optparse.OptionParser('pyscan.py -H <target_host(s)> -p <target_port(s)> [-o -f]')
parser.add_option('-H', '--Hosts', dest='tgtHosts', type="string",
help='specify target host(s) as a comma separated '
'list of hostname(s) or IP addresses or ip '
'address ranges '
'[eg. google.com,127.0.0.1,192.168.192.1-124]')
parser.add_option('-p', '--ports', dest='tgtPorts', type="string",
help='specify target port(s) separated trough commas '
'and/or port ranges [e.g. 80,100-110,90]')
parser.add_option('-o', '--open-only', dest='openOnly', action="store_true", default=False,
help='Only show open ports in console and file output')
parser.add_option('-f', '--to-file', dest='toFile', action="store_true", default=False,
help='Saves a copy of the scan results to a file on disk.')
(options, args) = parser.parse_args()
host_args = options.tgtHosts
port_args = options.tgtPorts
open_only = options.openOnly
to_file = options.toFile
results = {'resolved': {}, 'unresolved': []}
scan_jobs = Queue()
if host_args is None or port_args is None:
print("usage {}".format(parser.usage))
exit(0)
else:
target_hosts = parse_host_args(host_args)
target_ports = parse_port_args(port_args)
pre = "Running scan for {} host(s) with {} port(s) per host. This can take some time...\n" \
"Concurrent scans are ENABLED: Scanning with a max of {} host and {} port threads!\n"\
.format(len(target_hosts), len(target_ports), num_worker_threads, num_port_threads)
pp = ProgressPrinter(len(target_hosts), unit_type='scans', pre=pre)
pp.print_progress()
def worker():
while True:
host = scan_jobs.get()
port_scan(host, target_ports, results, pp)
scan_jobs.task_done()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
for host in target_hosts:
scan_jobs.put(host)
scan_jobs.join()
print_results(results, open_only)
if to_file:
save_to_file(results, open_only)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment