Created
November 26, 2020 01:25
-
-
Save jeremy-rifkin/a21d559d0cd069854d3a751caf43ba14 to your computer and use it in GitHub Desktop.
Script to parse minecraft server logs and add up users' playtime.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/python | |
# Script to parse minecraft server logs and add up users' playtime. | |
# Usage: python3 playtime.py path/to/logs | |
# | |
# Rifkin 2020 | |
import datetime | |
import gzip | |
import os | |
import pytz | |
import re | |
import sys | |
import traceback | |
FileRE = r"(\d{4})-(\d{2})-(\d{2})-\d+\.log\.gz" | |
timeRE = r"(\d{2}):(\d{2}):(\d{2})" | |
entryRE = r"^\[(\d{2}):(\d{2}):(\d{2})\]" | |
UserAuthRE = r"^\[(\d{2}:\d{2}:\d{2})\] \[User Authenticator #\d+\/INFO\]: UUID of player (\w{3,16}) is ([0-9a-f\-]+)$" | |
LoggedInRE = r"^\[(\d{2}:\d{2}:\d{2})\] \[Server thread\/INFO\]: (\w{3,16})\[/\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5}\] logged in with entity id \d+ at \(\[\w+\]-?\d+\.\d+, -?\d+\.\d+, -?\d+\.\d+\)$" | |
LoggedOutRE = r"^\[(\d{2}:\d{2}:\d{2})\] \[(?:Server thread\/INFO|Thread-1\/INFO)\]: ([^ ]{3,16}) left the game$" | |
LoginFromOtherLocationRE = r"^\[\d{2}:\d{2}:\d{2}\] \[Server thread\/INFO\]: \w{3,16} lost connection: You logged in from another location$" | |
StoppingServerRE = r"^\[(\d{2}:\d{2}:\d{2})\] \[(?:Server thread\/INFO|Thread-1\/INFO)\]: Stopping server$" | |
tz = pytz.timezone("America/Denver") | |
class Logger: | |
def __init__(self): | |
self.warnings = [] | |
def warn(self, msg): | |
print(msg) | |
self.warnings.append(msg) | |
def status(self): | |
print("{} warnings --".format(len(self.warnings))) | |
for w in self.warnings: | |
print(w) | |
# dict of sets. uuid -> {usernames: set, playtime: float, lastlogin: datetime or None} | |
class PlayerDatabase: | |
def __init__(self): | |
self.db = {} | |
def check_and_init(self, uuid): | |
if uuid not in self.db: | |
self.db[uuid] = { | |
"usernames": set(), | |
"playtime": 0, | |
"lastLogin": None | |
} | |
def search(self, username): | |
for uuid in self.db: | |
for name in self.db[uuid]["usernames"]: | |
if name == username: | |
return uuid | |
raise Exception("Username not found") | |
def register(self, uuid, username): | |
self.check_and_init(uuid) | |
self.db[uuid]["usernames"].add(username) | |
def login(self, username, time): | |
uuid = self.search(username) | |
if self.db[uuid]["lastLogin"] is not None: | |
raise Exception("Mismatch - login without registered logout") | |
self.db[uuid]["lastLogin"] = time | |
def logout(self, username, time): | |
uuid = self.search(username) | |
if self.db[uuid]["lastLogin"] is None: | |
raise Exception("Mismatch - logout without login") | |
self.db[uuid]["playtime"] += (time - self.db[uuid]["lastLogin"]).total_seconds() | |
self.db[uuid]["lastLogin"] = None | |
def user_has_logged_in(self, username): | |
uuid = self.search(username) | |
return self.db[uuid]["lastLogin"] is not None | |
def logout_all(self, time): | |
for uuid in self.db: | |
if self.db[uuid]["lastLogin"] is not None: | |
self.db[uuid]["playtime"] += (time - self.db[uuid]["lastLogin"]).total_seconds() | |
self.db[uuid]["lastLogin"] = None | |
def parseTime(t): | |
match = re.match(timeRE, t) | |
return [int(x) for x in match.groups()] | |
def process_logs(log_dir): | |
DB = PlayerDatabase() | |
logger = Logger() | |
for root, dirs, files in os.walk(log_dir): | |
path = root.split(os.sep) | |
for filename in sorted(files): | |
print(filename) | |
f = None | |
date = None | |
if os.path.splitext(filename)[1] == ".gz": | |
f = gzip.open(root + "/" + filename, "rt", encoding="utf-8") | |
match = re.match(FileRE, filename) | |
groups = match.groups() | |
date = datetime.date(int(groups[0]), int(groups[1]), int(groups[2])) | |
else: | |
f = open(root + "/" + filename, "r") | |
lastmod = os.path.getmtime(root + "/" + filename) | |
date = datetime.date.fromtimestamp(lastmod) | |
line = f.readline() | |
lastTime = None | |
while line: | |
line = line.strip() | |
match = re.match(LoggedInRE, line) | |
if match: | |
groups = match.groups() | |
print("[{}] [login] {}".format(groups[0], groups[1])) | |
t = parseTime(groups[0]) | |
dt = datetime.datetime(date.year, date.month, date.day, t[0], t[1], t[2]) | |
dt = tz.localize(dt) | |
DB.login(groups[1], dt) | |
elif "logged in" in line: # debug: catch regex anomalies | |
if not re.match(LoginFromOtherLocationRE, line): | |
logger.warn("Warning: possible regex miss: [{}] {}".format(date, line)) | |
match = re.match(UserAuthRE, line) | |
if match: | |
groups = match.groups() | |
DB.register(groups[2], groups[1]) | |
elif "UUID of player" in line: # debug: catch regex anomalies | |
logger.warn("Warning: possible regex miss: [{}] {}".format(date, line)) | |
match = re.match(LoggedOutRE, line) | |
if match: | |
groups = match.groups() | |
print("[{}] [logout] {}".format(groups[0], groups[1])) | |
t = parseTime(groups[0]) | |
dt = datetime.datetime(date.year, date.month, date.day, t[0], t[1], t[2]) | |
dt = tz.localize(dt) | |
#if not DB.user_has_logged_in(groups[1]): | |
# # need to artificially log the user in... | |
# pass | |
DB.logout(groups[1], dt) | |
elif "left the game" in line: # debug: catch regex anomalies | |
logger.warn("Warning: possible regex miss: [{}] {}".format(date, line)) | |
match = re.match(StoppingServerRE, line) | |
if match: | |
groups = match.groups() | |
print("[{}] [crash] =====".format(groups[0])) | |
t = parseTime(groups[0]) | |
dt = datetime.datetime(date.year, date.month, date.day, t[0], t[1], t[2]) | |
dt = tz.localize(dt) | |
DB.logout_all(dt) | |
break # TODO | |
elif "Stopping server" in line: | |
logger.warn("Warning: possible regex miss: [{}] {}".format(date, line)) | |
# update time of last log message | |
match = re.match(entryRE, line) | |
if match: | |
groups = match.groups() | |
dt = datetime.datetime(date.year, date.month, date.day, int(groups[0]), int(groups[1]), int(groups[2])) | |
dt = tz.localize(dt) | |
lastTime = dt | |
line = f.readline() | |
if filename == "latest.log": | |
print("wrapping up") | |
DB.logout_all(lastTime) | |
f.close() | |
logger.status() | |
for uuid in DB.db: | |
user = DB.db[uuid] | |
if user["playtime"] > 0: | |
print("{}: {:.2f} hours".format(", ".join(user["usernames"]), user["playtime"] / 60 / 60)) | |
def main(): | |
if not (len(sys.argv) == 2 or len(sys.argv) == 3): | |
print("arguments: <log directory> [timezone]") | |
print("timezone shouldn't be horribly important") | |
sys.exit(1) | |
log_dir = sys.argv[1] | |
if len(sys.argv) == 3: | |
global tz | |
tz = pytz.timezone(sys.argv[2]) | |
process_logs(log_dir) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment