Skip to content

Instantly share code, notes, and snippets.

@timraay
Last active October 9, 2022 21:53
Show Gist options
  • Save timraay/1a898eb6bc56469680bc572d48008907 to your computer and use it in GitHub Desktop.
Save timraay/1a898eb6bc56469680bc572d48008907 to your computer and use it in GitHub Desktop.
Battlemetrics log extraction script
"""
Battlemetrics log extraction script
Last updated October 9th 2022
The intention of this script is to allow pulling large number of log lines
from the Battlemetrics API and convert them to a text format that looks like
that of the Community RCON Tool. For the sake of simplicity and since
Battlemetrics doesn't store the same amount of information this may mean that
some events are ignored (eg. votekicks, bans) or have placeholder values
(Steam64ID in chat events).
Running this requires Python 3. Dependencies are automatically installed upon
starting the script.
"""
# Set to True to enable debug information
DEBUG = False
import subprocess
import sys
def log(message, *args):
if DEBUG:
print('[DEBUG]', message % args)
def install(package):
print(f"Installing '{package}' module...")
subprocess.check_call([sys.executable, "-m", "pip", "install", package])
from datetime import datetime
try:
import requests
except ImportError:
install('requests')
import requests
try:
import pyperclip
except ImportError:
install('pyperclip')
import pyperclip
try:
from dateutil.parser import parse
except ImportError:
install('python-dateutil')
from dateutil.parser import parse
init = False
while not init:
try:
SERVER_ID = int(input('\nInsert the ID of your server. This is the number you can find in the URL of your server\'s Battlemetrics page.\n\n Server ID: '))
ACCESS_TOKEN = input('\nInsert a temporary access token for the Battlemetrics API.\n\n Access token: ')
LOGS_START = parse(input('\nInsert the start time of the logs you want to capture in UTC. The parser will accept pretty lenient answers.\n\n Start time (UTC): '))
print(' -->', LOGS_START.isoformat())
LOGS_END = parse(input('\nInsert the end time of the logs you want to capture in UTC.\n\n End time (UTC): '))
print(' -->', LOGS_END.isoformat())
init = True
except:
print(' Invalid value! Please try again.')
else:
if LOGS_START > LOGS_END:
print(' Start time is later than the end time! Please try again.')
init = False
URL = "https://api.battlemetrics.com/activity"
PAGE_SIZE = 500
PARAMS = {
"version": "^0.1.0",
"tagTypeMode": "and",
"filter[types][blacklist]": "event:query",
"filter[servers]": SERVER_ID,
"filter[timestamp]": ':' + LOGS_END.strftime('%Y-%m-%dT%H:%M:%S.000Z'),
"page[size]": PAGE_SIZE,
"access_token": ACCESS_TOKEN
}
MESSAGES = {
"playerMessage": "CHAT \t{playerName}: {message} (00000000000000000)",
"event:addPlayer": "CONNECTED \t{name}",
"event:removePlayer": "DISCONNECTED \t{name}",
"hll:kill": "KILL \t{killerName}({killerTeam}/{killerSteamID}) -> {victimName}({victimTeam}/{victimSteamID}) with {weapon}",
"hll:teamKill": "TEAM KILL \t{killerName}({killerTeam}/{killerSteamID}) -> {victimName}({victimTeam}/{victimSteamID}) with {weapon}",
"hll:enteredAdminCam": "CAMERA \t[{playerName} ({steamID})] Entered Admin Camera",
"hll:leftAdminCam": "CAMERA \t[{playerName} ({steamID})] Left Admin Camera",
"hll:matchEnded": "MATCH ENDED \tMATCH ENDED `{map}` {teamOne} ({teamOneScore} - {teamTwoScore}) {teamTwo}",
"hll:matchStart": "MATCH START \tMATCH START {map}",
"hll:teamSwitch": "TEAMSWITCH \tTEAMSWITCH {playerName} ({fromTeam} > {toTeam})",
}
def get_datetime(time_str):
return datetime.strptime(time_str.split('.')[0], '%Y-%m-%dT%H:%M:%S')
def pull_logs():
def _pull(url, params = dict(access_token=ACCESS_TOKEN)):
res = requests.get(url, params=params)
log('Got status %s from %s', res.status_code, res.url)
data = res.json()
if data.get('errors'):
err = data['errors'][0]
raise Exception(f"{err}")
# log('Response is %s', data)
logs = data['data']
next = data['links'].get('next')
filtered_logs = list()
for log_data in logs:
time = get_datetime(log_data["attributes"]['timestamp'])
if time < LOGS_START:
break
filtered_logs.append(log_data)
print('... Collected', len(filtered_logs), 'logs')
log('Num logs gathered: %s / %s - Request more? %s', len(filtered_logs), PAGE_SIZE, len(filtered_logs) == PAGE_SIZE)
return filtered_logs, next
all_logs = []
logs, url = _pull(URL, params=PARAMS)
all_logs += logs
while len(logs) == PAGE_SIZE:
logs, url = _pull(url)
all_logs += logs
log('Returning a total of %s logs', len(logs))
return all_logs
UNKNOWN_EVENTS = set()
def process_log(log):
log = log["attributes"]
type = log["messageType"]
timestamp = log["timestamp"]
data = log["data"]
time = get_datetime(timestamp)
message = MESSAGES.get(type)
if message:
return str(time) + '\t' + message.format(**data)
else:
UNKNOWN_EVENTS.add(type)
try:
print()
logs = list()
data = pull_logs()
for log_ in data:
text = process_log(log_)
if text:
logs.append(text)
if UNKNOWN_EVENTS:
log('Skipped %s events that were not recognized: %s', len(UNKNOWN_EVENTS), ', '.join(UNKNOWN_EVENTS))
pyperclip.copy('\n'.join(logs))
print('\nCopied', len(logs), 'logs to clipboard!')
except Exception as e:
print('\n####################################')
print('\n An error occured!')
print(f' {e.__class__.__name__}: {str(e)}\n')
if DEBUG:
import traceback
print(traceback.format_exc())
input('Press Enter to close.')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment