Skip to content

Instantly share code, notes, and snippets.

@himynamesdave
Created September 20, 2022 09:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save himynamesdave/2e2048790096e4e41e845c839fc02717 to your computer and use it in GitHub Desktop.
Save himynamesdave/2e2048790096e4e41e845c839fc02717 to your computer and use it in GitHub Desktop.
from bs4 import BeautifulSoup
import os, sys, re, json, requests
def get_mapillary_global_acccess_token():
mly_token = ""
res = requests.get("https://www.mapillary.com/app/")
soup = BeautifulSoup(res.text, "html.parser")
for script in soup.find_all('script'):
src = script.get('src')
if src is not None:
src = "https://www.mapillary.com{}".format(src)
res = requests.get(src)
token = re.findall(r'(MLY\|[A-Za-z0-9]+\|[A-Za-z0-9]+)', res.text)
if len(token) > 0:
mly_token = token[0].strip()
break
return mly_token
def getLatestActivity(access_token, user_id):
status = None
cluster_id = None
url = 'https://graph.mapillary.com/graphql?doc=query getLatestActivity($id: ID!, $first: Int, $after: ID) { fetch__User(id: $id) { id feed(first: $first, after: $after) { page_info { start_cursor end_cursor has_next_page has_previous_page } nodes { cluster_id type created_at_seconds captured_at_seconds thumb_url item_count image_id status initial_processing_status anonymization_status tiler_status __typename } __typename } __typename } __typename }&query=query getLatestActivity($id: ID!, $first: Int, $after: ID) { fetch__User(id: $id) { id feed(first: $first, after: $after) { page_info { start_cursor end_cursor has_next_page has_previous_page __typename } nodes { cluster_id type created_at_seconds captured_at_seconds thumb_url item_count image_id status initial_processing_status anonymization_status tiler_status __typename } __typename } __typename } __typename } &operationName=getLatestActivity&variables={"id":"'+user_id+'","first":50,"after":null}'
headers = {
"Authorization": "OAuth {}".format(access_token)
}
res = requests.get(url, headers=headers)
data = res.json()
return data
def getUserInfo(access_token, username):
url = 'https://graph.mapillary.com/graphql?doc=query getNewSequences($username: String!) { user_by_username(username: $username) { id new_sequences { sequence_keys geojson __typename } __typename } __typename }&query=query getNewSequences($username: String!) { user_by_username(username: $username) { id new_sequences { sequence_keys geojson __typename } __typename } __typename } &operationName=getNewSequences&variables={"username":"'+username+'"}'
headers = {
"Authorization": "OAuth {}".format(access_token)
}
res = requests.get(url, headers=headers)
data = res.json()
return data
def getSequenceUploadStatus(username):
print("getting status for {}".format(username))
access_token = get_mapillary_global_acccess_token()
data = getUserInfo(access_token, username)
with open("mapillary_user_info.json", "w") as f:
f.write(json.dumps(data))
f.close()
user_id = data['data']['user_by_username']['id']
data = getLatestActivity(access_token, user_id)
if 'data' in data:
for o in data['data']['fetch__User']['feed']['nodes']:
print("cluster_id: {} type: {} status: {}\n".format(o['cluster_id'], o['type'], o['status']))
with open("mapillary_latest_activity.json", "w") as f:
f.write(json.dumps(data))
f.close()
if __name__ == '__main__':
username = sys.argv[1]
getSequenceUploadStatus(username)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment