Last active
October 9, 2023 13:28
-
-
Save ztraboo/6d513bdde7cda8d92827b87b14f255d5 to your computer and use it in GitHub Desktop.
Separate Open edX CMS/LMS Large `tracking.log` file events by date.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Author: Zachary Trabookis | |
Date: October 05, 2023 | |
Description: | |
Separates a larger Open edX `tracking.log` file into chunks by the number of threads defined, then goes through the | |
thread chunked files in order and sorts the log information into new files by date. Going in order | |
allows the events to be ordered as they occured. | |
This replicates what a logrotate would do by day. You would want to run this if you have a large | |
log file that continually writes multiple days. | |
Warning: | |
This does not work with the existing `all.log` file as that file doesn't include the YYYY-MM-DD | |
prefix that this script looks for every line in the log file. | |
Make sure that you have at least triple the space of the `tracking.log` input before running. | |
- The ./output/threads will be the same size as the `tracking.log`file. Gets removed at the end. | |
- The ./output/dates will be the same size as the `tracking.log` file. Gets removed at the end. | |
- The ./output/dates_gz will be a little bit smaller than the `tracking.log` file. | |
Input: | |
CMS/LMS `tracking.log` from Open edX platform. | |
Output: | |
Write out logs events per date and *.gz compress them. | |
Sources: | |
https://medium.com/@darshan_doshi/large-file-splitting-with-python-threading-for-improved-performance-616f03e4622c | |
https://www.pythonforbeginners.com/files/how-to-extract-a-date-from-a-txt-file-in-python | |
https://www.tutorialspoint.com/python-support-for-gzip-files-gzip | |
https://www.tutorialspoint.com/How-to-delete-all-files-in-a-directory-with-Python | |
""" | |
import gzip | |
import os | |
import re | |
import threading | |
# Define the input and output file paths | |
INPUT_FILE_PATH = './tracking.log' | |
OUTPUT_DIR_PATH = './output/' | |
OUTPUT_DIR_PATH_THREADS = f'{OUTPUT_DIR_PATH}threads/' | |
OUTPUT_DIR_PATH_DATES = f'{OUTPUT_DIR_PATH}dates/' | |
OUTPUT_DIR_PATH_DATES_GZ = f'{OUTPUT_DIR_PATH}dates_gz/' | |
# Constants letting the problem know to remove temporary log files transformation or not. | |
REMOVE_OUTPUT_THREADS = True | |
REMOVE_OUTPUT_DATES = True | |
# Create the output directory if it doesn't exist | |
if not os.path.exists(OUTPUT_DIR_PATH): | |
os.makedirs(OUTPUT_DIR_PATH) | |
# Define the number of threads | |
NUM_THREADS = 50 | |
def delete_files_in_directory(directory_path): | |
""" | |
Delete files in a directory given a path. | |
""" | |
try: | |
files = os.listdir(directory_path) | |
for file in files: | |
file_path = os.path.join(directory_path, file) | |
if os.path.isfile(file_path): | |
os.remove(file_path) | |
print(f'All files deleted successfully for {directory_path}.') | |
except OSError: | |
print(f'Error occurred while deleting files for {directory_path}.') | |
def _write_chunk_to_file(chunk_lines, output_file_path): | |
""" | |
Define a function to write a chunk to a file | |
""" | |
with open(output_file_path, 'wb') as output_file: | |
output_file.writelines(chunk_lines) | |
def write_chunks_in_parallel(): | |
""" | |
Separate a large log file into separate smaller log files using threads. | |
""" | |
with open(INPUT_FILE_PATH, 'rb') as input_file: | |
# Calculate the number of lines in the input file | |
num_lines = sum(1 for _ in input_file) | |
print(num_lines) | |
# Calculate the number of lines per thread | |
lines_per_thread = num_lines // NUM_THREADS | |
# Initialize the line counter | |
line_counter = 0 | |
# Initialize the chunk counter | |
chunk_counter = 1 | |
# Initialize a list to store the threads | |
threads = [] | |
# Go back to the beginning of the file | |
input_file.seek(0) | |
# Create a directory to hold the chunck thread logs | |
if not os.path.exists(OUTPUT_DIR_PATH_THREADS): | |
os.makedirs(OUTPUT_DIR_PATH_THREADS) | |
else: | |
# Remove existing files to start clean again | |
delete_files_in_directory(OUTPUT_DIR_PATH_THREADS) | |
# Loop through the threads and create them | |
for i in range(0, NUM_THREADS): | |
# Calculate the output file path for this thread | |
output_file_path = os.path.join(OUTPUT_DIR_PATH_THREADS, f'chunk_{chunk_counter}.txt') | |
# Initialize the list of lines for this thread | |
thread_lines = [] | |
# Loop through the lines and add them to this thread's list | |
while len(thread_lines) < lines_per_thread: | |
line = input_file.readline() | |
if not line: | |
break | |
thread_lines.append(line) | |
# Create a thread to write this chunk of lines to the output file | |
thread = threading.Thread(target=_write_chunk_to_file, args=(thread_lines, output_file_path)) | |
# Start the thread | |
thread.start() | |
# Add the thread to the list of threads | |
threads.append(thread) | |
# Increment the line counter | |
line_counter += len(thread_lines) | |
# Increment the chunk counter | |
chunk_counter += 1 | |
# Wait for all the threads to finish | |
for thread in threads: | |
thread.join() | |
# Check if there are any remaining lines | |
remaining_lines = input_file.readlines() | |
if remaining_lines: | |
# Calculate the output file path | |
output_file_path = os.path.join(OUTPUT_DIR_PATH_THREADS, f'chunk_{chunk_counter}.txt') | |
# Write the remaining lines to the output file | |
with open(output_file_path, 'wb') as output_file: | |
output_file.writelines(remaining_lines) | |
def write_chunks_by_date(): | |
""" | |
Goes through all the chunks created by the threads and writes out files by date, then it | |
compresses them in (.gz format). | |
""" | |
# Initialize the chunk counter | |
chunk_counter = 1 | |
# Create a directory to hold the chunck date logs | |
if not os.path.exists(OUTPUT_DIR_PATH_DATES): | |
os.makedirs(OUTPUT_DIR_PATH_DATES) | |
else: | |
# Remove existing files to start clean again | |
delete_files_in_directory(OUTPUT_DIR_PATH_DATES) | |
# Loop through the thread chunks and create them | |
for i in range(0, NUM_THREADS): | |
# for file in sorted(os.listdir(OUTPUT_DIR_PATH_THREADS)): | |
# Calculate the input file path from the chunked thread | |
input_chunk_file_path = os.path.join(OUTPUT_DIR_PATH_THREADS, f'chunk_{chunk_counter}.txt') | |
with open(input_chunk_file_path, 'r') as input_file: | |
# Go back to the beginning of the file | |
input_file.seek(0) | |
lines = input_file.readlines() | |
# Regular expression pattern to match dates. Look for the start of the log line. | |
# (e.g 2023-01-25 format) | |
pattern = "^\d{4}[/-]\d{2}[/-]\d{2}" | |
# find all the date strings that match the pattern | |
locate_date = lambda pattern, line_string: re.search(pattern, line_string) | |
# Store the dates in a set to eliminate duplicates | |
dates = [locate_date(pattern, line).group(0) for line in lines] | |
date_set = sorted(set(dates)) | |
print(f'input_chunk_file_path = {input_chunk_file_path}') | |
print(f'Located the following dates {date_set}.\n') | |
for date in date_set: | |
# Calculate the output file path and write input from the chunked file. | |
output_file_path = os.path.join(OUTPUT_DIR_PATH_DATES, f'tracking.log-{date}') | |
with open(output_file_path, 'a') as output_file: | |
# Write out the date specific information found in the chunked file to the data | |
# log file. | |
pattern_date_line = f'^{date}.*$' | |
for line in lines: | |
date_located = locate_date(pattern_date_line, line) | |
if date_located: | |
output_file.write(line) | |
# Increment the chunk counter | |
chunk_counter += 1 | |
# Create a directory to hold the chunck date logs *.gz | |
if not os.path.exists(OUTPUT_DIR_PATH_DATES_GZ): | |
os.makedirs(OUTPUT_DIR_PATH_DATES_GZ) | |
else: | |
# Remove existing files to start clean again. At this point the `tracking.log-{date}` should | |
# have been created so we don't need the original thread split files. | |
if REMOVE_OUTPUT_THREADS: | |
delete_files_in_directory(OUTPUT_DIR_PATH_THREADS) | |
# Remove existing files to start clean again | |
delete_files_in_directory(OUTPUT_DIR_PATH_DATES_GZ) | |
# Loop through and compress all date tracking logs | |
for filename in os.listdir(OUTPUT_DIR_PATH_DATES): | |
input_file_path = os.path.join(OUTPUT_DIR_PATH_DATES, filename) | |
with open(input_file_path, 'rb') as input_file: | |
# Calculate the output file path and write input from the chunked file. | |
output_file_path = os.path.join(OUTPUT_DIR_PATH_DATES_GZ, f'{filename}.gz') | |
with gzip.open(output_file_path, 'wb') as output_file: | |
bindata = bytearray(input_file.read()) | |
output_file.write(bindata) | |
# Remove `tracking-log-{date}` files because at this point we've already *.gz them. | |
if REMOVE_OUTPUT_DATES: | |
delete_files_in_directory(OUTPUT_DIR_PATH_DATES) | |
def main(): | |
""" | |
1. Separates a larger log file into small chunked files by number of threads. | |
2. Traverses thread chunked files and builds new logs files separated by date of events in logs. | |
""" | |
write_chunks_in_parallel() | |
write_chunks_by_date() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Example of the folder structure.
Example of what the output looks like.