Skip to content

Instantly share code, notes, and snippets.

@xycui
Last active August 10, 2018 18:27
Show Gist options
  • Save xycui/3ec67277e232b07e20c04f357354ddbb to your computer and use it in GitHub Desktop.
Save xycui/3ec67277e232b07e20c04f357354ddbb to your computer and use it in GitHub Desktop.
python port scan for given url/ip

portScan.py

Parameters:

  • TIME_OUT: The time out period in seconds for test each port
  • PORT_RANGE: The port range need to be tested.
  • THREAD_CNT: Thread count for boosting the test process. The ports will be seperated to groups with this count.
  • HOST: Target IP/domain

Output:

Several files contain the port test results.

import socket
import threading
from random import randint
import glob
import os
import datetime

TIME_OUT = 0.5
PORT_RANGE = range(0,65536)
THREAD_CNT = 10
HOST = ''
OUTPUT_DIR = '.'
LOOP_SCAN = False

def split_list(alist, wanted_parts=1):
    length = len(alist)
    return [ alist[i*length // wanted_parts: (i+1)*length // wanted_parts] 
            for i in range(wanted_parts) ]

def testPort(url, port):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(TIME_OUT)
    ip = socket.gethostbyname(url)
    result = sock.connect_ex((ip ,port))
    sock.close()
    if result == 0:
       return 'Open: ' + str(port)
    else:
       return 'Close: ' + str(port)

def testRangePort(idx, url, lport, rport, retDict):
    openedPorts = []
    filename = generateFileName(idx)
    with open(filename,'w') as output:
        for num in range(lport, rport+1):            
            ret = testPort(url, num)
            print(ret)
            output.write(ret + '\n')
            if randint(0, 5) == 0:
                output.flush()
            if ret.startswith('Open'):
                openedPorts.append(num)
        output.flush()
        output.close()
    retDict[idx] = openedPorts

def writeResult(result):
    with open(os.path.join(OUTPUT_DIR,'openedPorts.txt'),'w') as output:
        output.write(str(result) +'\n')
        output.flush()
        output.close()

def generateOuputDirName():
    currentTime = datetime.datetime.now()
    dirname = './output-{0}/'.format(currentTime.strftime("%Y%m%d_%H%M%S"))
    if not os.path.exists(dirname):
        os.makedirs(dirname)
        return dirname

def generateFileName(idx):
    filename = os.path.join(OUTPUT_DIR, 'testResult{0}.txt'.format(str(idx)))
    if not os.path.exists(os.path.dirname(filename)):
        os.makedirs(os.path.dirname(filename))
    return filename

def cleanFolder():
    fileList = glob.glob('./*.txt')
    print(fileList)
    for item in fileList:
        print(os.path.isfile(item))
        if os.path.isfile(item):
            os.remove(item)

def askArgument():
    global HOST
    global THREAD_CNT
    global LOOP_SCAN
    global TIME_OUT
    while not HOST:
        HOST = input("Please enter the host you want to scan: ")
    try:
        paraCnt = int(input("Parallel count: "))
        THREAD_CNT = THREAD_CNT if paraCnt <=0 else paraCnt
    except:
        pass
    try:
        timeOut = float(input("Time out for port scan in second: "))
        TIME_OUT = TIME_OUT if timeOut<=0 else timeOut
    except:
        pass
    LOOP_SCAN = input("Enable loop scan mode(y/n): ") == 'y'

askArgument()
while True:
    OUTPUT_DIR = generateOuputDirName()
    portArr = PORT_RANGE
    taskList = []
    retDict = {}
    for idx, item in enumerate(split_list(portArr , THREAD_CNT)):
        if len(item) ==0:
            continue
        print(str(item[0]) +':'+str(item[len(item)-1]))
        t = threading.Thread(target=testRangePort, args=(idx, HOST ,item[0],item[len(item)-1],retDict))
        t.start()
        taskList.append(t)
    for item in taskList:
        item.join()

    openedPorts = []
    for key in sorted(retDict):
        openedPorts += retDict[key]
    
    print("Opened ports: " + str(openedPorts))
    writeResult(openedPorts)
    if not LOOP_SCAN:
        break

    if len(openedPorts) !=0:
        break
import socket
import threading
from random import randint
import glob
import os
import datetime
TIME_OUT = 0.5
PORT_RANGE = range(0,65536)
THREAD_CNT = 10
HOST = ''
OUTPUT_DIR = '.'
LOOP_SCAN = False
def split_list(alist, wanted_parts=1):
length = len(alist)
return [ alist[i*length // wanted_parts: (i+1)*length // wanted_parts]
for i in range(wanted_parts) ]
def testPort(url, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(TIME_OUT)
ip = socket.gethostbyname(url)
result = sock.connect_ex((ip ,port))
sock.close()
if result == 0:
return 'Open: ' + str(port)
else:
return 'Close: ' + str(port)
def testRangePort(idx, url, lport, rport, retDict):
openedPorts = []
filename = generateFileName(idx)
with open(filename,'w') as output:
for num in range(lport, rport+1):
ret = testPort(url, num)
print(ret)
output.write(ret + '\n')
if randint(0, 5) == 0:
output.flush()
if ret.startswith('Open'):
openedPorts.append(num)
output.flush()
output.close()
retDict[idx] = openedPorts
def writeResult(result):
with open(os.path.join(OUTPUT_DIR,'openedPorts.txt'),'w') as output:
output.write(str(result) +'\n')
output.flush()
output.close()
def generateOuputDirName():
currentTime = datetime.datetime.now()
dirname = './output-{0}/'.format(currentTime.strftime("%Y%m%d_%H%M%S"))
if not os.path.exists(dirname):
os.makedirs(dirname)
return dirname
def generateFileName(idx):
filename = os.path.join(OUTPUT_DIR, 'testResult{0}.txt'.format(str(idx)))
if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
return filename
def cleanFolder():
fileList = glob.glob('./*.txt')
print(fileList)
for item in fileList:
print(os.path.isfile(item))
if os.path.isfile(item):
os.remove(item)
def askArgument():
global HOST
global THREAD_CNT
global LOOP_SCAN
global TIME_OUT
while not HOST:
HOST = input("Please enter the host you want to scan: ")
try:
paraCnt = int(input("Parallel count: "))
THREAD_CNT = THREAD_CNT if paraCnt <=0 else paraCnt
except:
pass
try:
timeOut = float(input("Time out for port scan in second: "))
TIME_OUT = TIME_OUT if timeOut<=0 else timeOut
except:
pass
LOOP_SCAN = input("Enable loop scan mode(y/n): ") == 'y'
askArgument()
while True:
OUTPUT_DIR = generateOuputDirName()
portArr = PORT_RANGE
taskList = []
retDict = {}
for idx, item in enumerate(split_list(portArr , THREAD_CNT)):
if len(item) ==0:
continue
print(str(item[0]) +':'+str(item[len(item)-1]))
t = threading.Thread(target=testRangePort, args=(idx, HOST ,item[0],item[len(item)-1],retDict))
t.start()
taskList.append(t)
for item in taskList:
item.join()
openedPorts = []
for key in sorted(retDict):
openedPorts += retDict[key]
print("Opened ports: " + str(openedPorts))
writeResult(openedPorts)
if not LOOP_SCAN:
break
if len(openedPorts) !=0:
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment