Skip to content

Instantly share code, notes, and snippets.

@ztraboo
Last active October 9, 2023 13:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ztraboo/6d513bdde7cda8d92827b87b14f255d5 to your computer and use it in GitHub Desktop.
Save ztraboo/6d513bdde7cda8d92827b87b14f255d5 to your computer and use it in GitHub Desktop.
Separate Open edX CMS/LMS Large `tracking.log` file events by date.
"""
Author: Zachary Trabookis
Date: October 05, 2023
Description:
Separates a larger Open edX `tracking.log` file into chunks by the number of threads defined, then goes through the
thread chunked files in order and sorts the log information into new files by date. Going in order
allows the events to be ordered as they occured.
This replicates what a logrotate would do by day. You would want to run this if you have a large
log file that continually writes multiple days.
Warning:
This does not work with the existing `all.log` file as that file doesn't include the YYYY-MM-DD
prefix that this script looks for every line in the log file.
Make sure that you have at least triple the space of the `tracking.log` input before running.
- The ./output/threads will be the same size as the `tracking.log`file. Gets removed at the end.
- The ./output/dates will be the same size as the `tracking.log` file. Gets removed at the end.
- The ./output/dates_gz will be a little bit smaller than the `tracking.log` file.
Input:
CMS/LMS `tracking.log` from Open edX platform.
Output:
Write out logs events per date and *.gz compress them.
Sources:
https://medium.com/@darshan_doshi/large-file-splitting-with-python-threading-for-improved-performance-616f03e4622c
https://www.pythonforbeginners.com/files/how-to-extract-a-date-from-a-txt-file-in-python
https://www.tutorialspoint.com/python-support-for-gzip-files-gzip
https://www.tutorialspoint.com/How-to-delete-all-files-in-a-directory-with-Python
"""
import gzip
import os
import re
import threading
# Define the input and output file paths
INPUT_FILE_PATH = './tracking.log'
OUTPUT_DIR_PATH = './output/'
OUTPUT_DIR_PATH_THREADS = f'{OUTPUT_DIR_PATH}threads/'
OUTPUT_DIR_PATH_DATES = f'{OUTPUT_DIR_PATH}dates/'
OUTPUT_DIR_PATH_DATES_GZ = f'{OUTPUT_DIR_PATH}dates_gz/'
# Constants letting the problem know to remove temporary log files transformation or not.
REMOVE_OUTPUT_THREADS = True
REMOVE_OUTPUT_DATES = True
# Create the output directory if it doesn't exist
if not os.path.exists(OUTPUT_DIR_PATH):
os.makedirs(OUTPUT_DIR_PATH)
# Define the number of threads
NUM_THREADS = 50
def delete_files_in_directory(directory_path):
"""
Delete files in a directory given a path.
"""
try:
files = os.listdir(directory_path)
for file in files:
file_path = os.path.join(directory_path, file)
if os.path.isfile(file_path):
os.remove(file_path)
print(f'All files deleted successfully for {directory_path}.')
except OSError:
print(f'Error occurred while deleting files for {directory_path}.')
def _write_chunk_to_file(chunk_lines, output_file_path):
"""
Define a function to write a chunk to a file
"""
with open(output_file_path, 'wb') as output_file:
output_file.writelines(chunk_lines)
def write_chunks_in_parallel():
"""
Separate a large log file into separate smaller log files using threads.
"""
with open(INPUT_FILE_PATH, 'rb') as input_file:
# Calculate the number of lines in the input file
num_lines = sum(1 for _ in input_file)
print(num_lines)
# Calculate the number of lines per thread
lines_per_thread = num_lines // NUM_THREADS
# Initialize the line counter
line_counter = 0
# Initialize the chunk counter
chunk_counter = 1
# Initialize a list to store the threads
threads = []
# Go back to the beginning of the file
input_file.seek(0)
# Create a directory to hold the chunck thread logs
if not os.path.exists(OUTPUT_DIR_PATH_THREADS):
os.makedirs(OUTPUT_DIR_PATH_THREADS)
else:
# Remove existing files to start clean again
delete_files_in_directory(OUTPUT_DIR_PATH_THREADS)
# Loop through the threads and create them
for i in range(0, NUM_THREADS):
# Calculate the output file path for this thread
output_file_path = os.path.join(OUTPUT_DIR_PATH_THREADS, f'chunk_{chunk_counter}.txt')
# Initialize the list of lines for this thread
thread_lines = []
# Loop through the lines and add them to this thread's list
while len(thread_lines) < lines_per_thread:
line = input_file.readline()
if not line:
break
thread_lines.append(line)
# Create a thread to write this chunk of lines to the output file
thread = threading.Thread(target=_write_chunk_to_file, args=(thread_lines, output_file_path))
# Start the thread
thread.start()
# Add the thread to the list of threads
threads.append(thread)
# Increment the line counter
line_counter += len(thread_lines)
# Increment the chunk counter
chunk_counter += 1
# Wait for all the threads to finish
for thread in threads:
thread.join()
# Check if there are any remaining lines
remaining_lines = input_file.readlines()
if remaining_lines:
# Calculate the output file path
output_file_path = os.path.join(OUTPUT_DIR_PATH_THREADS, f'chunk_{chunk_counter}.txt')
# Write the remaining lines to the output file
with open(output_file_path, 'wb') as output_file:
output_file.writelines(remaining_lines)
def write_chunks_by_date():
"""
Goes through all the chunks created by the threads and writes out files by date, then it
compresses them in (.gz format).
"""
# Initialize the chunk counter
chunk_counter = 1
# Create a directory to hold the chunck date logs
if not os.path.exists(OUTPUT_DIR_PATH_DATES):
os.makedirs(OUTPUT_DIR_PATH_DATES)
else:
# Remove existing files to start clean again
delete_files_in_directory(OUTPUT_DIR_PATH_DATES)
# Loop through the thread chunks and create them
for i in range(0, NUM_THREADS):
# for file in sorted(os.listdir(OUTPUT_DIR_PATH_THREADS)):
# Calculate the input file path from the chunked thread
input_chunk_file_path = os.path.join(OUTPUT_DIR_PATH_THREADS, f'chunk_{chunk_counter}.txt')
with open(input_chunk_file_path, 'r') as input_file:
# Go back to the beginning of the file
input_file.seek(0)
lines = input_file.readlines()
# Regular expression pattern to match dates. Look for the start of the log line.
# (e.g 2023-01-25 format)
pattern = "^\d{4}[/-]\d{2}[/-]\d{2}"
# find all the date strings that match the pattern
locate_date = lambda pattern, line_string: re.search(pattern, line_string)
# Store the dates in a set to eliminate duplicates
dates = [locate_date(pattern, line).group(0) for line in lines]
date_set = sorted(set(dates))
print(f'input_chunk_file_path = {input_chunk_file_path}')
print(f'Located the following dates {date_set}.\n')
for date in date_set:
# Calculate the output file path and write input from the chunked file.
output_file_path = os.path.join(OUTPUT_DIR_PATH_DATES, f'tracking.log-{date}')
with open(output_file_path, 'a') as output_file:
# Write out the date specific information found in the chunked file to the data
# log file.
pattern_date_line = f'^{date}.*$'
for line in lines:
date_located = locate_date(pattern_date_line, line)
if date_located:
output_file.write(line)
# Increment the chunk counter
chunk_counter += 1
# Create a directory to hold the chunck date logs *.gz
if not os.path.exists(OUTPUT_DIR_PATH_DATES_GZ):
os.makedirs(OUTPUT_DIR_PATH_DATES_GZ)
else:
# Remove existing files to start clean again. At this point the `tracking.log-{date}` should
# have been created so we don't need the original thread split files.
if REMOVE_OUTPUT_THREADS:
delete_files_in_directory(OUTPUT_DIR_PATH_THREADS)
# Remove existing files to start clean again
delete_files_in_directory(OUTPUT_DIR_PATH_DATES_GZ)
# Loop through and compress all date tracking logs
for filename in os.listdir(OUTPUT_DIR_PATH_DATES):
input_file_path = os.path.join(OUTPUT_DIR_PATH_DATES, filename)
with open(input_file_path, 'rb') as input_file:
# Calculate the output file path and write input from the chunked file.
output_file_path = os.path.join(OUTPUT_DIR_PATH_DATES_GZ, f'{filename}.gz')
with gzip.open(output_file_path, 'wb') as output_file:
bindata = bytearray(input_file.read())
output_file.write(bindata)
# Remove `tracking-log-{date}` files because at this point we've already *.gz them.
if REMOVE_OUTPUT_DATES:
delete_files_in_directory(OUTPUT_DIR_PATH_DATES)
def main():
"""
1. Separates a larger log file into small chunked files by number of threads.
2. Traverses thread chunked files and builds new logs files separated by date of events in logs.
"""
write_chunks_in_parallel()
write_chunks_by_date()
if __name__ == "__main__":
main()
@ztraboo
Copy link
Author

ztraboo commented Oct 6, 2023

Example of the folder structure.

image

Example of what the output looks like.

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment