Created
January 7, 2021 09:38
-
-
Save JoogsWasTaken/8a8e60859e1721255c57e9185eb6cb10 to your computer and use it in GitHub Desktop.
Python scripts for pulling public keys from Steam's login mechanism and ingesting them into a SQLite database
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# usage: python collect.py <username> <folder to save response to> | |
import requests | |
import sys | |
import os | |
import time | |
def main(): | |
ts = int(time.time()) | |
pk_dir = sys.argv[2] | |
if not os.path.exists(pk_dir): | |
os.mkdir(pk_dir) | |
steam_uri = "https://store.steampowered.com" | |
s = requests.Session() | |
s.headers.update({ | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.83 Safari/537.36" | |
}) | |
rsa_req = s.post(steam_uri + "/login/getrsakey", data={ | |
"username": sys.argv[1] | |
}) | |
f = open(pk_dir + "/pubkey_" + str(ts) + ".json", "w", encoding="utf-8") | |
f.write(rsa_req.text) | |
f.close() | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# usage: python ingest.py <folder containing saved responses> | |
import os, os.path | |
import sys | |
import sqlite3 | |
import json | |
def extract_timestamp_from_filename(pk_filename): | |
return int(pk_filename[pk_filename.index("_") + 1:pk_filename.index(".")]) | |
def main(): | |
pubkey_dir = sys.argv[1] | |
if not os.path.exists(pubkey_dir) or not os.path.isdir(pubkey_dir): | |
print("path doesn't exist or is not a directory") | |
exit() | |
pubkey_dir_name = os.path.basename(os.path.dirname(pubkey_dir)) | |
sqlite_file = os.path.join(pubkey_dir, "..", pubkey_dir_name + ".sqlite") | |
if os.path.exists(sqlite_file): | |
print("sqlite database already exists") | |
exit() | |
con = sqlite3.connect(sqlite_file) | |
con.execute(''' | |
CREATE TABLE pubkeys ( | |
timestamp NUMBER, | |
pk_mod TEXT, | |
pk_exp TEXT, | |
pk_timestamp NUMBER, | |
pk_token TEXT | |
) | |
''') | |
pubkey_filenames = os.listdir(pubkey_dir) | |
pubkey_tuples = [] | |
for pubkey_filename in pubkey_filenames: | |
pk = None | |
with open(os.path.join(pubkey_dir, pubkey_filename), "r") as f: | |
pk = json.load(f) | |
if pk is None: | |
print("failed to read file {}".format(pubkey_filename)) | |
continue | |
if not pk["success"]: | |
print("file {} contains failed request response".format(pubkey_filename)) | |
continue | |
pubkey_tuples.append(( | |
extract_timestamp_from_filename(pubkey_filename), | |
pk["publickey_mod"], | |
pk["publickey_exp"], | |
int(pk["timestamp"]), | |
pk["token_gid"] | |
)) | |
con.executemany("INSERT INTO pubkeys VALUES (?, ?, ?, ?, ?)", pubkey_tuples) | |
con.commit() | |
con.close() | |
print("successfully ingested {} pubkeys".format(len(pubkey_tuples))) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment