Created
January 23, 2023 08:00
-
-
Save digreatbrian/f17b7d76bf5d470dfd77e4b04dd03037 to your computer and use it in GitHub Desktop.
Flexible and Extensive Port Scanner
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#port scanner | |
import socket | |
import threading | |
__author__='Brian Musakwa' | |
class Scanner(): | |
'''Simple PortScanner just for you. 😜 | |
Arguments include:: | |
timeout : Timeout to connect to the host, defaults to 0.8 | |
host : Either IP or domain of host. | |
port : Default port you would like to scan first. | |
range_ports : Range of ports to scan which is in form of a dictionary with keywords "from" and "to" eg {"from":8000,"to":9000} ,defaults to {} | |
custom_ports : list of custom ports to scan, defaults to [] | |
interactive : bool for either logging of a scan to the terminal or not ,defaults to True | |
require_response : bool for either getting response from host or not ,defaults to False | |
''' | |
def __init__(self,host:str,port:int,range_ports={},custom_ports=[],interactive=True,require_response=False,timeout=.8): | |
#other ports ={'from':start_port,'to':end_port} | |
#socket.setdefaulttimeout(timeout) | |
self.timeout=timeout | |
self.host=host | |
self.port=port | |
self.range_ports=range_ports | |
self.interactive=interactive | |
self.require_response=require_response | |
self.custom_ports=custom_ports | |
if range_ports: | |
try: | |
self.from_port=range_ports['from'] | |
self.to_port=range_ports['to'] | |
except KeyError: | |
raise TypeError('range_ports argument should be a dict containing keys ["from" ,"to"] which shows a range of ports to scan from and to .Like {"from" :1000,"to":2000} .') | |
if not self.is_ip(host): | |
self.hostname=host | |
try:self.host=socket.gethostbyname(host) | |
except socket.gaierror: | |
raise socket.gaierror('No address associated with hostname ,make sure you are connected to the internet.') | |
def is_ip(self,str_): | |
d=str_.split('.') | |
for x in d: | |
if not x.isnumeric(): | |
return False | |
return True | |
def log_to_file(self,port,response,state='open'): | |
filename="ScanResults.txt" | |
host=self.host | |
if not response: | |
response='' | |
if hasattr(self,"hostname"): | |
host=self.hostname+ '/' + host | |
data="Host : %s\r\nPort : %s\r\nState : %s\r\nResponse : %s \r\n"%(host,port,state.upper(),response) | |
with open(filename,"a") as fd: | |
fd.write(data) | |
fd.close() | |
def on_port_open(self,port,resp): | |
'''Fired whenever an open port is found.''' | |
self.log_to_file(port,resp) | |
def on_port_closed(self,port,resp): | |
'''Fired whenever a closed port is found.''' | |
def scan(self): | |
semaphore=threading.Semaphore(value=1) | |
host=self.host | |
port=self.port | |
open_ports=[] | |
closed_ports=[] | |
na_ports=[] | |
responses={} | |
def _scanport(port): | |
#acquire lock | |
semaphore.acquire() | |
response=None | |
try: | |
self.scanning_crnt=port | |
if self.interactive: | |
print('[ * ] Scanning Port %d ...'%port) | |
sock=socket.socket() | |
sock.settimeout(self.timeout) | |
s=sock.connect((host,port)) | |
if self.require_response: | |
#if response needed to confirm if the port is working | |
if self.interactive: | |
print("[ * ] Acquiring response on port %d "%port) | |
sock.send(b"HIE SERVER\r\n") | |
response=sock.recv(1024) | |
responses['%d'%port]=response | |
self.on_port_open(port,response) | |
open_ports.append(port) | |
sock.close() | |
if self.interactive: | |
print('[ * ] Opened Port : %d'%port) | |
except ConnectionRefusedError: | |
sock.close() | |
self.on_port_closed(port,None) | |
if self.interactive: | |
print('[ * ] Closed Port : %d'%port) | |
closed_ports.append(port) | |
except socket.timeout: | |
self.on_port_closed(port,None) | |
if self.interactive: | |
print('[ * ] Closed Port : %d'%port) | |
closed_ports.append(port) | |
semaphore.release() | |
def scanport(port): | |
_scanport(port) | |
self.scaning_crnt=None | |
#scanning ports now | |
scanport(port) | |
for x in self.custom_ports: | |
scanport(x) | |
if self.range_ports: | |
for r in range(self.from_port,self.to_port+1): | |
scanport(r) | |
return {'open':open_ports,'closed':closed_ports,'other':na_ports,'responses':responses} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment