Skip to content

Instantly share code, notes, and snippets.

@altilunium
Created December 25, 2023 15:26
Show Gist options
  • Save altilunium/c9c2993216c113a8457df14ba64220f0 to your computer and use it in GitHub Desktop.
Save altilunium/c9c2993216c113a8457df14ba64220f0 to your computer and use it in GitHub Desktop.
access log parser
import os
from datetime import datetime, timedelta
import re
import mistune
import web
def merge_and_process_txt_files(directory_path):
if not os.path.exists(directory_path):
print(f"Directory '{directory_path}' does not exist.")
return
txt_files = [f for f in os.listdir(directory_path) if f.endswith('.txt') or f.endswith("ss.log")]
if not txt_files:
print(f"No txt files found in '{directory_path}'.")
return
all_lines = []
for txt_file in txt_files:
file_path = os.path.join(directory_path, txt_file)
with open(file_path, 'r') as file:
all_lines.extend(file.readlines())
date_pattern = re.compile(r'\[([^\]]+)\]')
ip_pattern = re.compile(r'(.*)\s\-\s\-')
brack_pattern = re.compile(r'\"([^"]*)\"')
themd = "Date | IP | HTTP | Useragent | Referer \n"
themd = themd + " - | - | - | - | - \n"
for line in all_lines:
processed_line = line
date_match = date_pattern.search(processed_line)
date_str = date_match.group(1)
log_date = datetime.strptime(date_str, '%d/%b/%Y:%H:%M:%S %z')
new_date = log_date + timedelta(hours=7)
ip = ip_pattern.search(processed_line)
nyas = ["","",""]
nyas = re.findall(brack_pattern,processed_line)
#print(len(nyas))
if len(nyas) == 1:
nyas.append("")
nyas.append("")
elif len(nyas) == 2:
nyas.append("")
themd = themd + str(new_date.strftime('%d %B %Y, %H:%M')) + " | " + str(ip.group(1)) + " | " + str(nyas[0]) + " | " + str(nyas[2]) + " | " + str(nyas[1]) + "\n"
html_output = mistune.html(themd)
return html_output
def render(content):
return f"""
<html>
<head>
<title>File Viewer</title>
</head>
<body>
<pre>{content}</pre>
</body>
</html>
"""
urls = (
'/', 'FileViewer'
)
class FileViewer:
def GET(self):
return render(merge_and_process_txt_files("."))
class MyApplication(web.application):
def run(self, port=8080, *middleware):
func = self.wsgifunc(*middleware)
return web.httpserver.runsimple(func, ('0.0.0.0', port))
if __name__ == "__main__":
app = MyApplication(urls, globals())
app.run(port=9191)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment