- TIME_OUT: The time out period in seconds for test each port
- PORT_RANGE: The port range need to be tested.
- THREAD_CNT: Thread count for boosting the test process. The ports will be seperated to groups with this count.
- HOST: Target IP/domain
Several files contain the port test results.
import socket
import threading
from random import randint
import glob
import os
import datetime
TIME_OUT = 0.5
PORT_RANGE = range(0,65536)
THREAD_CNT = 10
HOST = ''
OUTPUT_DIR = '.'
LOOP_SCAN = False
def split_list(alist, wanted_parts=1):
length = len(alist)
return [ alist[i*length // wanted_parts: (i+1)*length // wanted_parts]
for i in range(wanted_parts) ]
def testPort(url, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(TIME_OUT)
ip = socket.gethostbyname(url)
result = sock.connect_ex((ip ,port))
sock.close()
if result == 0:
return 'Open: ' + str(port)
else:
return 'Close: ' + str(port)
def testRangePort(idx, url, lport, rport, retDict):
openedPorts = []
filename = generateFileName(idx)
with open(filename,'w') as output:
for num in range(lport, rport+1):
ret = testPort(url, num)
print(ret)
output.write(ret + '\n')
if randint(0, 5) == 0:
output.flush()
if ret.startswith('Open'):
openedPorts.append(num)
output.flush()
output.close()
retDict[idx] = openedPorts
def writeResult(result):
with open(os.path.join(OUTPUT_DIR,'openedPorts.txt'),'w') as output:
output.write(str(result) +'\n')
output.flush()
output.close()
def generateOuputDirName():
currentTime = datetime.datetime.now()
dirname = './output-{0}/'.format(currentTime.strftime("%Y%m%d_%H%M%S"))
if not os.path.exists(dirname):
os.makedirs(dirname)
return dirname
def generateFileName(idx):
filename = os.path.join(OUTPUT_DIR, 'testResult{0}.txt'.format(str(idx)))
if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
return filename
def cleanFolder():
fileList = glob.glob('./*.txt')
print(fileList)
for item in fileList:
print(os.path.isfile(item))
if os.path.isfile(item):
os.remove(item)
def askArgument():
global HOST
global THREAD_CNT
global LOOP_SCAN
global TIME_OUT
while not HOST:
HOST = input("Please enter the host you want to scan: ")
try:
paraCnt = int(input("Parallel count: "))
THREAD_CNT = THREAD_CNT if paraCnt <=0 else paraCnt
except:
pass
try:
timeOut = float(input("Time out for port scan in second: "))
TIME_OUT = TIME_OUT if timeOut<=0 else timeOut
except:
pass
LOOP_SCAN = input("Enable loop scan mode(y/n): ") == 'y'
askArgument()
while True:
OUTPUT_DIR = generateOuputDirName()
portArr = PORT_RANGE
taskList = []
retDict = {}
for idx, item in enumerate(split_list(portArr , THREAD_CNT)):
if len(item) ==0:
continue
print(str(item[0]) +':'+str(item[len(item)-1]))
t = threading.Thread(target=testRangePort, args=(idx, HOST ,item[0],item[len(item)-1],retDict))
t.start()
taskList.append(t)
for item in taskList:
item.join()
openedPorts = []
for key in sorted(retDict):
openedPorts += retDict[key]
print("Opened ports: " + str(openedPorts))
writeResult(openedPorts)
if not LOOP_SCAN:
break
if len(openedPorts) !=0:
break