Skip to content

Instantly share code, notes, and snippets.

@theracingapi
Last active September 1, 2024 10:46
Show Gist options
  • Save theracingapi/ad930791391b398e0bf5b2d1b296ce05 to your computer and use it in GitHub Desktop.
Save theracingapi/ad930791391b398e0bf5b2d1b296ce05 to your computer and use it in GitHub Desktop.
Python script to create a CSV report of horses on tomorrows racecards running for the first time since being gelded (Basic Plan)
import requests
from requests.auth import HTTPBasicAuth
from operator import itemgetter
import csv
from datetime import datetime, timedelta
import asyncio
import aiohttp
# API credentials
username = "<USERNAME>"
password = "<PASSWORD>"
def get_gelding_last_race_tasks(session, gelding_last_race_rows):
tasks = set()
for g in gelding_last_race_rows:
# Build query to return the geldings last run
today = datetime.today().strftime("%Y-%m-%d")
params = {
"end_date": today,
"limit": 1
}
tasks.add(session.get(f"https://api.theracingapi.com/v1/racecards/{g['horse_id']}/results", params=params))
return list(tasks)
# Function to get last race data for each gelding
async def get_gelding_last_race_list(gelding_race_rows):
gelding_last_race_list = []
auth = aiohttp.BasicAuth(username, password)
async with aiohttp.ClientSession(auth=auth) as session:
tasks = get_gelding_last_race_tasks(session, gelding_race_rows)
responses = await asyncio.gather(*tasks)
for response in responses:
data = await response.json()
if len(data["results"]) > 0:
horse_id = None
for q in data["query"]:
if "horse_id" in q:
horse_id = q[1]
gelding_run_data = next((item for item in data["results"][0]["runners"] if item["horse_id"] == horse_id), None)
gelding_last_race_list.append(gelding_run_data)
return gelding_last_race_list
async def get_first_time_gelded_report(racecards):
# Initialise gelding race rows list
gelding_race_rows = []
# Create initial gelded horse race rows, containing all geldings running today
for race in racecards:
for runner in race["runners"]:
if runner["sex_code"] == "G":
gelding_race_row = {}
gelding_race_row["region"] = race["region"]
gelding_race_row["course"] = race["course"]
gelding_race_row["off_time"] = race["off_time"]
gelding_race_row["horse"] = runner["horse"]
gelding_race_row["horse_id"] = runner["horse_id"]
gelding_race_row["trainer"] = runner["trainer"]
gelding_race_row["trainer_id"] = runner["trainer_id"]
gelding_race_rows.append(gelding_race_row)
# Sort the rows
gelding_race_rows = sorted(gelding_race_rows, key=itemgetter("region", "course", "off_time"))
# Get list containing the previous race data for each gelding
gelding_last_race_list = await get_gelding_last_race_list(gelding_race_rows)
# Initialise first time gelded rows, append gelding_race_rows items where runner was not a gelding in its previous race
first_time_gelding_rows = []
for row in gelding_race_rows:
try:
last_race_data = next((item for item in gelding_last_race_list if item["horse_id"] == row["horse_id"]), None)
if last_race_data:
# Append row if horse was not a gelding in its previous race
if last_race_data["sex"] != "G":
first_time_gelding_rows.append(row)
except Exception as e:
print(e)
pass
# Create a csv file containing the first time gelded list
keys = gelding_race_rows[0].keys()
with open(f"first_time_gelding_list_{(datetime.today() + timedelta(days=1)).strftime('%Y-%m-%d')}.csv", "w", newline="") as output_file:
dict_writer = csv.DictWriter(output_file, keys)
dict_writer.writeheader()
dict_writer.writerows(first_time_gelding_rows)
if __name__ == '__main__':
# Get todays racecards
params = {
"day": "tomorrow",
}
racecards = requests.get("https://api.theracingapi.com/v1/racecards", auth=HTTPBasicAuth(username, password), params=params).json()["racecards"]
# Get jockey trainer analysis report
asyncio.run(get_first_time_gelded_report(racecards))
@Bonobo791
Copy link

Just so you're aware, the logic for gelded last race isn't working. I removed it and the code worked fine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment