Instantly share code, notes, and snippets.
Created
April 7, 2020 02:45
-
Star
(0)
0
You must be signed in to star a gist -
Fork
(0)
0
You must be signed in to fork a gist
-
Save droberson/29f5454880e520e73163698eb37f0998 to your computer and use it in GitHub Desktop.
some horrible script i wrote to banner grab a ton of FTP servers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import sys | |
import time | |
import fcntl | |
import errno | |
import socket | |
from threading import Thread | |
from queue import Queue | |
Q = Queue() | |
def valid_ipv4_address(ip_address): | |
""" valid_ipv4_address() - Validate an IPv4 address. | |
Args: | |
ip_address (str) - IP address to verify. | |
Returns: | |
True if ip_address is a valid IPv4 address. | |
False if ip_address is not a valid IPv4 address. | |
""" | |
try: | |
socket.inet_aton(ip_address) | |
except socket.error: | |
return False | |
return True | |
def log(host, port, success, message): | |
with open("ftp-logfile", "a") as logfile: | |
logfile.write("%d %d %s %s %s\n" % | |
(time.time(), 1 if success else 0, host, port, message)) | |
def grab_ftp_banner(host, port, timeout=5): | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.settimeout(timeout) | |
fcntl.fcntl(sock, fcntl.F_SETFL, os.O_NONBLOCK) | |
try: | |
sock.connect((host, port)) | |
except Exception as exc: | |
#print(" [-] %d %s:%s %s" % (time.time(), host, port, exc)) | |
log(host, port, False, exc) | |
return False | |
try: | |
sock.send(b"QUIT\r\n") | |
except ConnectionResetError as exc: | |
log(host, port, False, exc) | |
return False | |
received = 0 | |
banner = [] | |
while True: | |
try: | |
msg = sock.recv(4096) | |
received += len(msg) | |
except socket.error as e: | |
err = e.args[0] | |
if err == errno.EAGAIN or err == errno.EWOULDBLOCK: | |
continue | |
else: | |
#print(" [-] %d %s:%s %s" % (time.time(), host, port, e)) | |
log(host, port, False, e) | |
break | |
except socket.timeout: | |
#print(" [-] %d %s:%s timed out" % (time.time(), host, port)) | |
log(host, port, False, "timed out") | |
sock.close() | |
return False | |
if msg != b'' or received > 8192: | |
banner += [msg] | |
else: | |
break | |
# sock.send(b"USER anonymous\r\n") | |
# sock.send(b"PASS dmfr1337@gmail.com\r\n") | |
# sock.send(b"QUIT\r\n") | |
# anonymous = [] | |
# login = True | |
# while True: | |
# try: | |
# msg = sock.recv(8192) | |
# print(msg) | |
# except socket.error as e: | |
# err = e.args[0] | |
# if err == errno.EAGAIN or err == errno.EWOULDBLOCK: | |
# continue | |
# else: | |
# print(" [*] %s" % e) | |
# break | |
# if msg != b'': | |
# anonymous += [msg] | |
# else: | |
# break | |
banner = b"".join(banner) | |
#print(" [+] %d %s:%s %s" % (time.time(), host, port, banner)) | |
sock.close() | |
log(host, port, True, banner) | |
return True | |
def thread_scan(thread_id, queue): | |
while True: | |
host, port = queue.get() | |
#print(" [*] Scanning %s:%s on thread %d" % (host, str(port), thread_id)) | |
grab_ftp_banner(host, port, timeout=5) | |
queue.task_done() | |
def process_ip_list(ip_list, port=0): | |
for ip in ip_list: | |
if valid_ipv4_address(ip): | |
Q.put((ip, port)) | |
else: | |
print("[-] invalid ip: %s" % ip) | |
print("[+] Scanning %s - %s (%d/%d %d%%) -- Elapsed: %s" % | |
(ip_list[0], ip_list[-1], | |
progress, ip_count, progress / ip_count * 100, | |
display_time(round(time.time() - start_time)))) | |
for thread in range(32): | |
worker = Thread(target=thread_scan, args=(thread, Q)) | |
worker.setDaemon(True) | |
worker.start() | |
Q.join() | |
def linecount(filename): | |
with open(filename, "r") as file: | |
for i, _ in enumerate(file): | |
pass | |
return i + 1 | |
intervals = ( | |
("weeks", 604800), # 60 * 60 * 24 * 7 | |
("days", 86400), # 60 * 60 * 24 | |
("hours", 3600), # 60 * 60 | |
("minutes", 60), | |
("seconds", 1) | |
) | |
def display_time(seconds, granularity=2): | |
result = [] | |
if seconds == 0: | |
return "0 seconds" | |
for name, count in intervals: | |
value = seconds // count | |
if value: | |
seconds -= value * count | |
if value == 1: | |
name = name.rstrip("s") | |
result.append("{} {}".format(value, name)) | |
return ", ". join(result[:granularity]) | |
print("[+] Scanning ips in %s" % sys.argv[1]) | |
ip_count = linecount(sys.argv[1]) | |
progress = 0 | |
start_time = time.time() | |
with open(sys.argv[1], "r") as fp: | |
count = 0 | |
cset = [] | |
for line in fp: | |
count += 1 | |
cset += [line.rstrip()] | |
if count % 1000 == 0: | |
progress += len(cset) | |
process_ip_list(cset, port=21) | |
cset = [] | |
progress += len(cset) | |
if len(cset) > 0: | |
process_ip_list(cset, 21) | |
print("[+] Finished. Elapsed: %s" % display_time(time.time() - start_time)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment