Skip to content

Instantly share code, notes, and snippets.

@digreatbrian
Created January 23, 2023 08:00
Show Gist options
  • Save digreatbrian/f17b7d76bf5d470dfd77e4b04dd03037 to your computer and use it in GitHub Desktop.
Save digreatbrian/f17b7d76bf5d470dfd77e4b04dd03037 to your computer and use it in GitHub Desktop.
Flexible and Extensive Port Scanner
#port scanner
import socket
import threading
__author__='Brian Musakwa'
class Scanner():
'''Simple PortScanner just for you. 😜
Arguments include::
timeout : Timeout to connect to the host, defaults to 0.8
host : Either IP or domain of host.
port : Default port you would like to scan first.
range_ports : Range of ports to scan which is in form of a dictionary with keywords "from" and "to" eg {"from":8000,"to":9000} ,defaults to {}
custom_ports : list of custom ports to scan, defaults to []
interactive : bool for either logging of a scan to the terminal or not ,defaults to True
require_response : bool for either getting response from host or not ,defaults to False
'''
def __init__(self,host:str,port:int,range_ports={},custom_ports=[],interactive=True,require_response=False,timeout=.8):
#other ports ={'from':start_port,'to':end_port}
#socket.setdefaulttimeout(timeout)
self.timeout=timeout
self.host=host
self.port=port
self.range_ports=range_ports
self.interactive=interactive
self.require_response=require_response
self.custom_ports=custom_ports
if range_ports:
try:
self.from_port=range_ports['from']
self.to_port=range_ports['to']
except KeyError:
raise TypeError('range_ports argument should be a dict containing keys ["from" ,"to"] which shows a range of ports to scan from and to .Like {"from" :1000,"to":2000} .')
if not self.is_ip(host):
self.hostname=host
try:self.host=socket.gethostbyname(host)
except socket.gaierror:
raise socket.gaierror('No address associated with hostname ,make sure you are connected to the internet.')
def is_ip(self,str_):
d=str_.split('.')
for x in d:
if not x.isnumeric():
return False
return True
def log_to_file(self,port,response,state='open'):
filename="ScanResults.txt"
host=self.host
if not response:
response=''
if hasattr(self,"hostname"):
host=self.hostname+ '/' + host
data="Host : %s\r\nPort : %s\r\nState : %s\r\nResponse : %s \r\n"%(host,port,state.upper(),response)
with open(filename,"a") as fd:
fd.write(data)
fd.close()
def on_port_open(self,port,resp):
'''Fired whenever an open port is found.'''
self.log_to_file(port,resp)
def on_port_closed(self,port,resp):
'''Fired whenever a closed port is found.'''
def scan(self):
semaphore=threading.Semaphore(value=1)
host=self.host
port=self.port
open_ports=[]
closed_ports=[]
na_ports=[]
responses={}
def _scanport(port):
#acquire lock
semaphore.acquire()
response=None
try:
self.scanning_crnt=port
if self.interactive:
print('[ * ] Scanning Port %d ...'%port)
sock=socket.socket()
sock.settimeout(self.timeout)
s=sock.connect((host,port))
if self.require_response:
#if response needed to confirm if the port is working
if self.interactive:
print("[ * ] Acquiring response on port %d "%port)
sock.send(b"HIE SERVER\r\n")
response=sock.recv(1024)
responses['%d'%port]=response
self.on_port_open(port,response)
open_ports.append(port)
sock.close()
if self.interactive:
print('[ * ] Opened Port : %d'%port)
except ConnectionRefusedError:
sock.close()
self.on_port_closed(port,None)
if self.interactive:
print('[ * ] Closed Port : %d'%port)
closed_ports.append(port)
except socket.timeout:
self.on_port_closed(port,None)
if self.interactive:
print('[ * ] Closed Port : %d'%port)
closed_ports.append(port)
semaphore.release()
def scanport(port):
_scanport(port)
self.scaning_crnt=None
#scanning ports now
scanport(port)
for x in self.custom_ports:
scanport(x)
if self.range_ports:
for r in range(self.from_port,self.to_port+1):
scanport(r)
return {'open':open_ports,'closed':closed_ports,'other':na_ports,'responses':responses}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment