-
-
Save shashank-sharma/781652094e69b0595b88b2c841ba1136 to your computer and use it in GitHub Desktop.
Parse points log from Dota compendium 2024 points history by video
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Create one video of points history by scrolling it from bottom to top, and then use this program | |
to convert video data to text and parse it | |
Highly experimental, since accuracy is the issue but good starting point to fetch the data | |
For example: 50 can be interpreted as SO | |
Hence fetch all the data and fix any bug/mis-interpreted data by validating it | |
validate_csv_totals method does make sure if no data is missed, and rest of the validation | |
can be check if level is of type int, points are int etc | |
""" | |
import csv | |
import cv2 | |
import numpy as np | |
import pytesseract | |
import re | |
import argparse | |
from collections import OrderedDict | |
def represents_int(s): | |
try: | |
int(s) | |
except ValueError: | |
return False | |
else: | |
return True | |
def preprocess_frame(frame): | |
# Convert to gray | |
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
# Threshold (OTSU) so that text is white on black background | |
_, thresh = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) | |
# Apply morphology dilate with horizontal kernel to blur text in a line together | |
horizontal_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (25, 1)) | |
dilate = cv2.dilate(thresh, horizontal_kernel, iterations=1) | |
# Apply morphology open with a vertical kernel to remove the thin lines from the dotted lines | |
vertical_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1, 7)) | |
opened = cv2.morphologyEx(dilate, cv2.MORPH_OPEN, vertical_kernel) | |
# Get the contours | |
contours, _ = cv2.findContours(opened, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
# Find the contour that has the lowest Y bounding box value (top-most box) | |
top_y = frame.shape[0] | |
top_contour = None | |
for contour in contours: | |
x, y, w, h = cv2.boundingRect(contour) | |
if y < top_y: | |
top_y = y | |
top_contour = contour | |
# Create a mask for all contours except the top-most one | |
mask = np.ones(frame.shape[:2], dtype=np.uint8) * 255 | |
for contour in contours: | |
if np.array_equal(contour, top_contour): | |
continue | |
x, y, w, h = cv2.boundingRect(contour) | |
cv2.rectangle(mask, (x, y), (x+w, y+h), 0, -1) | |
# Apply the mask to the original frame | |
result = cv2.bitwise_and(frame, frame, mask=mask) | |
return result | |
def extract_text(frame): | |
# Use pytesseract to extract text with layout preservation | |
text = pytesseract.image_to_string(frame, config='--psm 6') | |
return text | |
def clean_text(text): | |
# Remove '&' and '@' characters | |
cleaned = re.sub(r'[&@)|]', '', text) | |
# Remove extra whitespace | |
cleaned = ' '.join(cleaned.split()) | |
return cleaned | |
# Not required as error rate is high | |
def clean_level(level): | |
mapping = { | |
"SO": "50", | |
"ie": "50" | |
} | |
if level in mapping.keys(): | |
return mapping[level] | |
else: | |
return level | |
def process_text(text): | |
lines = text.split('\n') | |
processed_lines = [] | |
# Find the index of the "LEVEL" header | |
try: | |
level_index = next(i for i, line in enumerate(lines) if "LEVEL" in line) | |
except StopIteration: | |
return processed_lines | |
# Extract the header line | |
header = lines[level_index].split() | |
# Process the data lines | |
for ogline in lines[level_index+1:]: | |
line = clean_text(ogline) | |
parts = line.split() | |
if len(parts) >= 5: # Ensure we have at least 5 parts (LEVEL, DATE, DESCRIPTION, POINTS, TOTAL) | |
level = parts[0] | |
level = clean_level(level) | |
date = parts[1] | |
points = parts[-2] | |
total = parts[-1].replace(",", "") | |
description = " ".join(parts[2:-2]) # Join the middle parts as the description | |
if not represents_int(total): | |
print("Issue with: ", ogline) | |
continue | |
processed_data = { | |
"level": level, | |
"date": date, | |
"description": description, | |
"points": points, | |
"total": total | |
} | |
processed_lines.append(processed_data) | |
return processed_lines | |
def process_video(video_path, test_mode=False, sample_rate=1): | |
cap = cv2.VideoCapture(video_path) | |
frame_count = 0 | |
unique_lines = {} | |
while True: | |
ret, frame = cap.read() | |
if not ret: | |
break | |
if frame_count % sample_rate == 0: | |
frame = cv2.resize(frame, (2444, 1794), interpolation=cv2.INTER_AREA) | |
preprocessed = preprocess_frame(frame) | |
text = extract_text(preprocessed) | |
processed_lines = process_text(text) | |
# Store unique lines | |
for line in processed_lines: | |
if line["total"] not in unique_lines: | |
unique_lines[line["total"]] = line | |
# Save debug images for the first frame if testing | |
if frame_count == 0: | |
cv2.imwrite('debug_original.png', frame) | |
cv2.imwrite('debug_preprocessed.png', preprocessed) | |
frame_count += 1 | |
if test_mode and frame_count >= 1 * sample_rate: | |
break | |
cap.release() | |
return unique_lines | |
def main(video_path, test_mode=False): | |
data = process_video(video_path, test_mode) | |
print(f"Processing completed. {len(data)} unique lines have been saved.") | |
sorted_data = OrderedDict(sorted(data.items(), key=lambda x: int(x[0].replace(',', '')), reverse=True)) | |
# Write to CSV | |
with open('dota-points.csv', 'w', newline='') as csvfile: | |
fieldnames = ['level', 'date', 'description', 'points', 'total'] | |
writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | |
writer.writeheader() | |
for item in sorted_data.values(): | |
writer.writerow(item) | |
print("Data has been sorted and written to 'dota-points.csv'") | |
# Validate if all the points are fetched and are correct based off points earner | |
def validate_csv_totals(file_path): | |
with open(file_path, 'r') as csvfile: | |
reader = csv.DictReader(csvfile) | |
rows = list(reader) | |
errors = [] | |
expected_total = None | |
for i in range(len(rows) - 1, -1, -1): | |
current_row = rows[i] | |
level = current_row['level'] | |
if not represents_int(level): | |
print("level not correct: ", current_row) | |
date = current_row['date'] | |
if not represents_int(current_row['points'].replace(',', '')) or not represents_int(current_row['total'].replace(',', '')): | |
print("Anomaly for ", current_row) | |
continue | |
points = int(current_row['points'].replace(',', '')) | |
total = int(current_row['total'].replace(',', '')) | |
if expected_total is None: | |
expected_total = total | |
else: | |
expected_total += points | |
if expected_total != total: | |
errors.append(f"Mismatch at level {level} on {date}: " | |
f"Expected {expected_total}, but got {total}") | |
expected_total = total # Reset expected_total to the actual total | |
if errors: | |
print("Validation errors found:") | |
for error in errors: | |
print(error) | |
else: | |
print("All totals are valid.") | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Process Dota 2 Points Log Video") | |
parser.add_argument("video_path", help="Path to the input video file") | |
parser.add_argument("--test", action="store_true", help="Run in test mode (process only 1 frame)") | |
args = parser.parse_args() | |
main(args.video_path, args.test) | |
validate_csv_totals("dota-points.csv") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment