Skip to content

Instantly share code, notes, and snippets.

@nosoop
Last active March 5, 2019 14:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nosoop/6f8065df2aa81573ab8ba7d2cee65a86 to your computer and use it in GitHub Desktop.
Save nosoop/6f8065df2aa81573ab8ba7d2cee65a86 to your computer and use it in GitHub Desktop.
Script to only dump Workshop maps from a collection to stdout
#!/usr/bin/python3
import requests, json, argparse, os, sys, shutil
def webapi_url(interf, version = 1):
return 'https://api.steampowered.com/{}/v{}/'.format(interf, version)
def param_dict(key, values):
return { '{0}[{1}]'.format(key, i): value for i, value in enumerate(values) }
def get_collection_details(collections):
collection_request = {
'format': 'json',
'collectioncount': len(collections)
}
collection_request.update(param_dict('publishedfileids', collections))
try:
r = requests.post(webapi_url('ISteamRemoteStorage/GetCollectionDetails'),
data = collection_request)
return r.json().get('response', {})
except (requests.exceptions.ConnectionError, ValueError) as e:
raise
def get_published_file_details(fileids):
publishedfiles_request = {
'format': 'json',
'itemcount': len(fileids)
}
publishedfiles_request.update(param_dict('publishedfileids', list(fileids)))
try:
r = requests.post(webapi_url('ISteamRemoteStorage/GetPublishedFileDetails'),
data = publishedfiles_request)
return r.json().get('response', {})
except (requests.exceptions.ConnectionError, ValueError) as e:
raise
def resolve_workshopid_file_name(workshop_path, id):
'''
Checks the host's workshop directory and returns the map's display name, if the map is
present.
'''
if workshop_path is not None:
workshop_map_directory = workshop_path + '/content/440'
workshop_map_path = workshop_map_directory + '/' + str(id)
if os.path.exists(workshop_map_path):
for f in os.listdir(workshop_map_path):
return os.path.splitext(f)[0]
return None
def fetch(collections: list, include_tags: list = [], exclude_tags: list = [], workshop_dir = None, **kwargs):
'''
Provites a generator containing strings corresponding to workshop maps.
'''
try:
collectiondetails = get_collection_details(collections)
except (requests.exceptions.ConnectionError, ValueError):
print("Could not get collection details. Is Steam down?", file=sys.stderr)
return
include_tag_set = set(include_tags)
exclude_tag_set = set(exclude_tags)
workshop_map_ids = set()
for collection in collectiondetails['collectiondetails']:
if not 'children' in collection:
print("Collection looks empty. Is the collection published?", file=sys.stderr)
return
# filetype = 0 is map?
for publishedfile in filter(lambda p: p['filetype'] == 0, collection['children']):
workshop_map_ids.add(publishedfile['publishedfileid'])
try:
published_file_results = get_published_file_details(workshop_map_ids)
except (requests.exceptions.ConnectionError, ValueError):
print("Could not get Workshop map details. Is Steam down?", file=sys.stderr)
return
filtered_workshop_map_ids = set()
for publishedfile in published_file_results['publishedfiledetails']:
if (publishedfile['result'] == 9):
# remove map if the submission doesn't exist
filtered_workshop_map_ids.add(publishedfile['publishedfileid'])
continue
map_tags = set(tag['tag'] for tag in publishedfile['tags'])
if not exclude_tag_set.isdisjoint(map_tags):
# remove map if any of the 'exclude' tags is present in the map
filtered_workshop_map_ids.add(publishedfile['publishedfileid'])
elif len(include_tag_set) > 0 and include_tag_set.isdisjoint(map_tags):
# remove map if none of the 'include' tags is present in the map
filtered_workshop_map_ids.add(publishedfile['publishedfileid'])
for workshop_id in workshop_map_ids - filtered_workshop_map_ids:
display_name = resolve_workshopid_file_name(workshop_dir, workshop_id)
if display_name is not None:
yield 'workshop/{0}.ugc{1}'.format(display_name, workshop_id)
else:
yield 'workshop/{0}'.format(workshop_id)
# __main__
if __name__ == '__main__':
# fix for columns not propagating on certain versions of Python
os.environ['COLUMNS'] = str(shutil.get_terminal_size().columns)
parser = argparse.ArgumentParser(
description = "Returns a list of Workshop maps given a collection(s).",
usage = "%(prog)s [options]")
parser.add_argument('-c', '--collection', metavar='ID', action='append',
help="Steam Workshop map collection(s) to retrieve maps from")
parser.add_argument('-t', '--include-tag', metavar='TAG', action='append',
dest='include_tags', help="restrict workshop maps to ones including tag(s)")
parser.add_argument('-T', '--exclude-tag', metavar='TAG', action='append',
dest='exclude_tags', help="ignore workshop entries including tag(s)")
parser.add_argument('-W', '--workshop-dir', metavar='DIR',
help=("the game's workshop directory (e.g., /tf/../steamapps/workshop) - "
"optional; used to resolve IDs to display names"))
args = parser.parse_args()
args.workshop_dir = args.workshop_dir or os.environ.get('STEAM_WORKSHOP_DIR')
args.include_tags = args.include_tags or []
args.exclude_tags = args.exclude_tags or []
if args.workshop_dir is None:
print("no workshop directory provided: workshop map import tool will not resolve maps "
"to full names (add --workshop-dir or STEAM_WORKSHOP_DIR environment variable)",
file=sys.stderr)
if args.collection is None:
print("no collections provided", file=sys.stderr)
sys.exit(1)
# TODO expand mapcycle using os.path.expanduser?
args.collections = args.collection
print(*fetch(**args.__dict__), sep = '\n')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment