Last active
March 5, 2019 14:41
-
-
Save nosoop/6f8065df2aa81573ab8ba7d2cee65a86 to your computer and use it in GitHub Desktop.
Script to only dump Workshop maps from a collection to stdout
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import requests, json, argparse, os, sys, shutil | |
def webapi_url(interf, version = 1): | |
return 'https://api.steampowered.com/{}/v{}/'.format(interf, version) | |
def param_dict(key, values): | |
return { '{0}[{1}]'.format(key, i): value for i, value in enumerate(values) } | |
def get_collection_details(collections): | |
collection_request = { | |
'format': 'json', | |
'collectioncount': len(collections) | |
} | |
collection_request.update(param_dict('publishedfileids', collections)) | |
try: | |
r = requests.post(webapi_url('ISteamRemoteStorage/GetCollectionDetails'), | |
data = collection_request) | |
return r.json().get('response', {}) | |
except (requests.exceptions.ConnectionError, ValueError) as e: | |
raise | |
def get_published_file_details(fileids): | |
publishedfiles_request = { | |
'format': 'json', | |
'itemcount': len(fileids) | |
} | |
publishedfiles_request.update(param_dict('publishedfileids', list(fileids))) | |
try: | |
r = requests.post(webapi_url('ISteamRemoteStorage/GetPublishedFileDetails'), | |
data = publishedfiles_request) | |
return r.json().get('response', {}) | |
except (requests.exceptions.ConnectionError, ValueError) as e: | |
raise | |
def resolve_workshopid_file_name(workshop_path, id): | |
''' | |
Checks the host's workshop directory and returns the map's display name, if the map is | |
present. | |
''' | |
if workshop_path is not None: | |
workshop_map_directory = workshop_path + '/content/440' | |
workshop_map_path = workshop_map_directory + '/' + str(id) | |
if os.path.exists(workshop_map_path): | |
for f in os.listdir(workshop_map_path): | |
return os.path.splitext(f)[0] | |
return None | |
def fetch(collections: list, include_tags: list = [], exclude_tags: list = [], workshop_dir = None, **kwargs): | |
''' | |
Provites a generator containing strings corresponding to workshop maps. | |
''' | |
try: | |
collectiondetails = get_collection_details(collections) | |
except (requests.exceptions.ConnectionError, ValueError): | |
print("Could not get collection details. Is Steam down?", file=sys.stderr) | |
return | |
include_tag_set = set(include_tags) | |
exclude_tag_set = set(exclude_tags) | |
workshop_map_ids = set() | |
for collection in collectiondetails['collectiondetails']: | |
if not 'children' in collection: | |
print("Collection looks empty. Is the collection published?", file=sys.stderr) | |
return | |
# filetype = 0 is map? | |
for publishedfile in filter(lambda p: p['filetype'] == 0, collection['children']): | |
workshop_map_ids.add(publishedfile['publishedfileid']) | |
try: | |
published_file_results = get_published_file_details(workshop_map_ids) | |
except (requests.exceptions.ConnectionError, ValueError): | |
print("Could not get Workshop map details. Is Steam down?", file=sys.stderr) | |
return | |
filtered_workshop_map_ids = set() | |
for publishedfile in published_file_results['publishedfiledetails']: | |
if (publishedfile['result'] == 9): | |
# remove map if the submission doesn't exist | |
filtered_workshop_map_ids.add(publishedfile['publishedfileid']) | |
continue | |
map_tags = set(tag['tag'] for tag in publishedfile['tags']) | |
if not exclude_tag_set.isdisjoint(map_tags): | |
# remove map if any of the 'exclude' tags is present in the map | |
filtered_workshop_map_ids.add(publishedfile['publishedfileid']) | |
elif len(include_tag_set) > 0 and include_tag_set.isdisjoint(map_tags): | |
# remove map if none of the 'include' tags is present in the map | |
filtered_workshop_map_ids.add(publishedfile['publishedfileid']) | |
for workshop_id in workshop_map_ids - filtered_workshop_map_ids: | |
display_name = resolve_workshopid_file_name(workshop_dir, workshop_id) | |
if display_name is not None: | |
yield 'workshop/{0}.ugc{1}'.format(display_name, workshop_id) | |
else: | |
yield 'workshop/{0}'.format(workshop_id) | |
# __main__ | |
if __name__ == '__main__': | |
# fix for columns not propagating on certain versions of Python | |
os.environ['COLUMNS'] = str(shutil.get_terminal_size().columns) | |
parser = argparse.ArgumentParser( | |
description = "Returns a list of Workshop maps given a collection(s).", | |
usage = "%(prog)s [options]") | |
parser.add_argument('-c', '--collection', metavar='ID', action='append', | |
help="Steam Workshop map collection(s) to retrieve maps from") | |
parser.add_argument('-t', '--include-tag', metavar='TAG', action='append', | |
dest='include_tags', help="restrict workshop maps to ones including tag(s)") | |
parser.add_argument('-T', '--exclude-tag', metavar='TAG', action='append', | |
dest='exclude_tags', help="ignore workshop entries including tag(s)") | |
parser.add_argument('-W', '--workshop-dir', metavar='DIR', | |
help=("the game's workshop directory (e.g., /tf/../steamapps/workshop) - " | |
"optional; used to resolve IDs to display names")) | |
args = parser.parse_args() | |
args.workshop_dir = args.workshop_dir or os.environ.get('STEAM_WORKSHOP_DIR') | |
args.include_tags = args.include_tags or [] | |
args.exclude_tags = args.exclude_tags or [] | |
if args.workshop_dir is None: | |
print("no workshop directory provided: workshop map import tool will not resolve maps " | |
"to full names (add --workshop-dir or STEAM_WORKSHOP_DIR environment variable)", | |
file=sys.stderr) | |
if args.collection is None: | |
print("no collections provided", file=sys.stderr) | |
sys.exit(1) | |
# TODO expand mapcycle using os.path.expanduser? | |
args.collections = args.collection | |
print(*fetch(**args.__dict__), sep = '\n') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment