Skip to content

Instantly share code, notes, and snippets.

@michaelrinderle
Last active January 13, 2024 09:29
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save michaelrinderle/559e7ec6ba938ef00ad12161bdf4756b to your computer and use it in GitHub Desktop.
Save michaelrinderle/559e7ec6ba938ef00ad12161bdf4756b to your computer and use it in GitHub Desktop.
Python script to parse OpenCanary log file with broken JSON structure
import json
import socket
CANARY_FILE = "opencanary.log"
INFORMATION_LOG = 1001
BRUTE_FORCE_LOG = 6001
PORT_SCAN_LOG = 5001
HTTP_SCAN_LOG = 3000
log_data = []
information_logs = []
bruteforce_logs = []
portscan_logs = []
http_logs = []
def parse_logfile(file_path):
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
log_data.append(json.loads(line))
def sort_totals():
for a in log_data:
if a["logtype"] == INFORMATION_LOG:
information_logs.append(a)
elif a["logtype"] == BRUTE_FORCE_LOG:
bruteforce_logs.append(a)
elif a["logtype"] == PORT_SCAN_LOG:
portscan_logs.append(a)
else:
http_logs.append(a)
def print_information_logs():
print(f"[*] Information logs : {len(information_logs)}")
for info in information_logs:
if info['logdata']['msg'] == "Canary running!!!":
print(f"[*] T: {info['local_time']} > {info['logdata']['msg']}")
else:
print(f"[*] T: {info['local_time']} > {info['logdata']['msg']['logdata']}")
def print_bruteforce_logs():
print(f"[*] Bruteforce logs : {len(bruteforce_logs)}")
for brute in bruteforce_logs:
creds = brute['logdata']
print("[*] T: {0:25} > P: {1} Src: {2:20} U: {3:15} P: {4:15}"
.format(brute['local_time'],
brute['dst_port'],
brute['src_host'],
creds['USERNAME'],
creds['PASSWORD']))
def print_portscan_logs():
print(f"[*] Portscan logs : {len(portscan_logs)}")
for scan in portscan_logs:
print("[*] T: {0:25} > P: {1} Src: {2:20}"
.format(scan['local_time'],
scan['dst_port'],
scan['src_host']))
def print_http_logs():
print(f"[*] Miscellaneous logs : {len(http_logs)}")
for http in http_logs:
print("[*] T: {0:25} > P: {1} Src: {2:20} U: {3:15} P: {4:15}"
.format(http['local_time'],
http['dst_port'],
http['src_host'],
http['logdata']['HOSTNAME'],
http['logdata']['USERAGENT']))
def print_bruteforce_analytics():
list_of_ips = []
usernames = []
passwords = []
for brute in bruteforce_logs:
creds = brute['logdata']
if brute['src_host'] not in list_of_ips:
list_of_ips.append(brute['src_host'])
if creds["USERNAME"] not in usernames:
usernames.append(creds['USERNAME'])
if creds['PASSWORD'] not in passwords:
passwords.append(creds['PASSWORD'])
list_of_ips.sort()
usernames.sort()
passwords.sort()
print("[*] Bruteforce IP/Hosts")
for ip in list_of_ips:
try:
hostname = socket.gethostbyaddr(ip)
except:
hostname = 'host not found'
print(f"IP: {ip} > {hostname}")
print("[*] Usernames used")
for username in usernames:
print(username)
print("[*] Passwords used")
for password in passwords:
print(password)
def main():
parse_logfile(CANARY_FILE)
sort_totals()
print("[*] Log totals\n")
print_information_logs()
print_bruteforce_logs()
print_portscan_logs()
print_http_logs()
print_bruteforce_analytics()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment