Python scripts for pulling public keys from Steam's login mechanism and ingesting them into a SQLite database
# usage: python collect.py <username> <folder to save response to> | |
import requests | |
import sys | |
import os | |
import time | |
def main(): | |
ts = int(time.time()) | |
pk_dir = sys.argv[2] | |
if not os.path.exists(pk_dir): | |
os.mkdir(pk_dir) | |
steam_uri = "https://store.steampowered.com" | |
s = requests.Session() | |
s.headers.update({ | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.83 Safari/537.36" | |
}) | |
rsa_req = s.post(steam_uri + "/login/getrsakey", data={ | |
"username": sys.argv[1] | |
}) | |
f = open(pk_dir + "/pubkey_" + str(ts) + ".json", "w", encoding="utf-8") | |
f.write(rsa_req.text) | |
f.close() | |
if __name__ == "__main__": | |
main() |
# usage: python ingest.py <folder containing saved responses> | |
import os, os.path | |
import sys | |
import sqlite3 | |
import json | |
def extract_timestamp_from_filename(pk_filename): | |
return int(pk_filename[pk_filename.index("_") + 1:pk_filename.index(".")]) | |
def main(): | |
pubkey_dir = sys.argv[1] | |
if not os.path.exists(pubkey_dir) or not os.path.isdir(pubkey_dir): | |
print("path doesn't exist or is not a directory") | |
exit() | |
pubkey_dir_name = os.path.basename(os.path.dirname(pubkey_dir)) | |
sqlite_file = os.path.join(pubkey_dir, "..", pubkey_dir_name + ".sqlite") | |
if os.path.exists(sqlite_file): | |
print("sqlite database already exists") | |
exit() | |
con = sqlite3.connect(sqlite_file) | |
con.execute(''' | |
CREATE TABLE pubkeys ( | |
timestamp NUMBER, | |
pk_mod TEXT, | |
pk_exp TEXT, | |
pk_timestamp NUMBER, | |
pk_token TEXT | |
) | |
''') | |
pubkey_filenames = os.listdir(pubkey_dir) | |
pubkey_tuples = [] | |
for pubkey_filename in pubkey_filenames: | |
pk = None | |
with open(os.path.join(pubkey_dir, pubkey_filename), "r") as f: | |
pk = json.load(f) | |
if pk is None: | |
print("failed to read file {}".format(pubkey_filename)) | |
continue | |
if not pk["success"]: | |
print("file {} contains failed request response".format(pubkey_filename)) | |
continue | |
pubkey_tuples.append(( | |
extract_timestamp_from_filename(pubkey_filename), | |
pk["publickey_mod"], | |
pk["publickey_exp"], | |
int(pk["timestamp"]), | |
pk["token_gid"] | |
)) | |
con.executemany("INSERT INTO pubkeys VALUES (?, ?, ?, ?, ?)", pubkey_tuples) | |
con.commit() | |
con.close() | |
print("successfully ingested {} pubkeys".format(len(pubkey_tuples))) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment