Created
September 20, 2022 09:28
-
-
Save himynamesdave/2e2048790096e4e41e845c839fc02717 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from bs4 import BeautifulSoup | |
import os, sys, re, json, requests | |
def get_mapillary_global_acccess_token(): | |
mly_token = "" | |
res = requests.get("https://www.mapillary.com/app/") | |
soup = BeautifulSoup(res.text, "html.parser") | |
for script in soup.find_all('script'): | |
src = script.get('src') | |
if src is not None: | |
src = "https://www.mapillary.com{}".format(src) | |
res = requests.get(src) | |
token = re.findall(r'(MLY\|[A-Za-z0-9]+\|[A-Za-z0-9]+)', res.text) | |
if len(token) > 0: | |
mly_token = token[0].strip() | |
break | |
return mly_token | |
def getLatestActivity(access_token, user_id): | |
status = None | |
cluster_id = None | |
url = 'https://graph.mapillary.com/graphql?doc=query getLatestActivity($id: ID!, $first: Int, $after: ID) { fetch__User(id: $id) { id feed(first: $first, after: $after) { page_info { start_cursor end_cursor has_next_page has_previous_page } nodes { cluster_id type created_at_seconds captured_at_seconds thumb_url item_count image_id status initial_processing_status anonymization_status tiler_status __typename } __typename } __typename } __typename }&query=query getLatestActivity($id: ID!, $first: Int, $after: ID) { fetch__User(id: $id) { id feed(first: $first, after: $after) { page_info { start_cursor end_cursor has_next_page has_previous_page __typename } nodes { cluster_id type created_at_seconds captured_at_seconds thumb_url item_count image_id status initial_processing_status anonymization_status tiler_status __typename } __typename } __typename } __typename } &operationName=getLatestActivity&variables={"id":"'+user_id+'","first":50,"after":null}' | |
headers = { | |
"Authorization": "OAuth {}".format(access_token) | |
} | |
res = requests.get(url, headers=headers) | |
data = res.json() | |
return data | |
def getUserInfo(access_token, username): | |
url = 'https://graph.mapillary.com/graphql?doc=query getNewSequences($username: String!) { user_by_username(username: $username) { id new_sequences { sequence_keys geojson __typename } __typename } __typename }&query=query getNewSequences($username: String!) { user_by_username(username: $username) { id new_sequences { sequence_keys geojson __typename } __typename } __typename } &operationName=getNewSequences&variables={"username":"'+username+'"}' | |
headers = { | |
"Authorization": "OAuth {}".format(access_token) | |
} | |
res = requests.get(url, headers=headers) | |
data = res.json() | |
return data | |
def getSequenceUploadStatus(username): | |
print("getting status for {}".format(username)) | |
access_token = get_mapillary_global_acccess_token() | |
data = getUserInfo(access_token, username) | |
with open("mapillary_user_info.json", "w") as f: | |
f.write(json.dumps(data)) | |
f.close() | |
user_id = data['data']['user_by_username']['id'] | |
data = getLatestActivity(access_token, user_id) | |
if 'data' in data: | |
for o in data['data']['fetch__User']['feed']['nodes']: | |
print("cluster_id: {} type: {} status: {}\n".format(o['cluster_id'], o['type'], o['status'])) | |
with open("mapillary_latest_activity.json", "w") as f: | |
f.write(json.dumps(data)) | |
f.close() | |
if __name__ == '__main__': | |
username = sys.argv[1] | |
getSequenceUploadStatus(username) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment