Code to pull activity data out of Runkeeper API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import json | |
import time | |
import requests | |
from urllib.parse import urlencode, quote | |
from _credentials import client_id, client_secret, access_token | |
DATA_DIR = os.path.join(os.getcwd(), "data") | |
def get_auth_url(): | |
# create an authorisation url to copy paste into a browser window | |
# so we can get an access token to store in _credentials.py | |
base_url = "https://runkeeper.com/" | |
endpoint = "apps/authorize" | |
params = { | |
"response_type": "code", | |
"client_id": client_id, | |
"redirect_uri": "http://www.martinjc.com", | |
} | |
auth_url = base_url + endpoint + "?" + urlencode(params, quote_via=quote) | |
return auth_url | |
def do_auth(): | |
base_url = "https://runkeeper.com/" | |
endpoint = "apps/token" | |
params = { | |
"grant_type": "authorization_code", | |
"code": "code", | |
"client_id": client_id, | |
"client_secret": client_secret, | |
"redirect_uri": "http://www.martinjc.com", | |
} | |
r = requests.post(base_url + endpoint, params) | |
print(r.json()) | |
def get_data(endpoint, params=None): | |
base_url = "https://api.runkeeper.com/" | |
endpoint = endpoint | |
headers = {"Authorization": "Bearer " + access_token} | |
r = requests.get(base_url + endpoint, params=params, headers=headers) | |
print(r.status_code) | |
print(r.json()) | |
if r.status_code != 200: | |
print(r.json()) | |
return None | |
return r.json() | |
def get_fitness_activities(): | |
# store activity URIs separately | |
with open("activities.json", "w") as activities_output_file, open( | |
"activity_uris.json", "w" | |
) as uri_output_file: | |
activities = [] | |
activity_uris = [] | |
activity_data = get_data("fitnessActivities") | |
num_activities = activity_data["size"] | |
num_calls = int(num_activities / 25) | |
call_count = 0 | |
if activity_data.get("items"): | |
activities.extend(activity_data["items"]) | |
print(call_count, len(activity_data["items"]), len(activities)) | |
for activity in activity_data["items"]: | |
activity_uris.append(activity["uri"]) | |
while activity_data.get("next") and call_count < num_calls: | |
activity_data = get_data(activity_data["next"]) | |
call_count += 1 | |
if activity_data.get("items"): | |
activities.extend(activity_data["items"]) | |
print(call_count, len(activity_data["items"]), len(activities)) | |
for activity in activity_data["items"]: | |
activity_uris.append(activity["uri"]) | |
json.dump(activities, activities_output_file) | |
json.dump(activity_uris, uri_output_file) | |
def download_activities(activity_list): | |
for activity_uri in activity_list: | |
id_str = activity_uri.replace("/fitnessActivities/", "") | |
filename = os.path.join(DATA_DIR, "%s.json" % id_str) | |
if not os.path.exists(filename): | |
activity_data = get_data(activity_uri) | |
time.sleep(9) | |
if activity_data is not None: | |
with open(filename, "w") as output_file: | |
json.dump(activity_data, output_file) | |
if __name__ == "__main__": | |
print(get_auth_url()) | |
do_auth() | |
get_fitness_activities() | |
with open("activity_uris.json", "r") as input_file: | |
activity_uris = json.load(input_file) | |
download_activities(activity_uris) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment