Last active
March 18, 2022 11:38
-
-
Save zed/d41457c752e2de867194fbcfe021c446 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/*.json | |
/*.xml |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"""Send telegram on each new goal in [Football] World Cup 2018. | |
Install dependencies: | |
$ pip install appdirs requests | |
Make sure `send-tg` command works and run the script. | |
See https://ru.stackoverflow.com/q/845845/23044 | |
""" | |
import datetime as DT | |
import json | |
import os | |
import subprocess | |
import time | |
from pathlib import Path | |
import appdirs | |
import requests | |
DONE, _, _, LIVE = range(4) # MatchStatus (a guess for the live fifa api) | |
appname = "livescore-2018.6" | |
update_delay = 30 # seconds between requests to the live url | |
user_data_dir = appdirs.user_data_dir("livescore-2018", "jfs") | |
os.makedirs(user_data_dir, exist_ok=True) | |
matches_path = Path(user_data_dir) / "matches.json" | |
calendar_url = ( | |
"https://api.fifa.com/api/v1/calendar/matches?idseason=254645&idcompetition=17&count=100" | |
) | |
live_url = "https://api.fifa.com/api/v1/live/football/recent/17/254645?language=ru-RU" | |
def parse_rfc3339(timestr): | |
# it is enough to support just Z-format here | |
return DT.datetime.strptime(timestr, "%Y-%m-%dT%H:%M:%SZ").replace( | |
tzinfo=DT.timezone.utc | |
) | |
def load_matches(): | |
try: | |
return json.loads(matches_path.read_text(encoding="utf-8")) | |
except FileNotFoundError: | |
return {} # no matches yet | |
def save_matches(matches): | |
matches_path.write_text(json.dumps(matches), encoding="utf-8") | |
def get_info_about_match(match): | |
has_penalty = match["HomeTeamPenaltyScore"] or match["AwayTeamPenaltyScore"] | |
return ( | |
match["HomeTeam"]["TeamName"][0]["Description"] | |
+ " " | |
+ str(match["HomeTeam"]["Score"]) | |
+ (f"({match['HomeTeamPenaltyScore']})" if has_penalty else "") | |
+ " — " | |
+ match["AwayTeam"]["TeamName"][0]["Description"] | |
+ " " | |
+ str(match["AwayTeam"]["Score"]) | |
+ (f"({match['AwayTeamPenaltyScore']})" if has_penalty else "") | |
) | |
subprocess.check_call(["send-tg", f"{appname}: started"]) # test sending a telegram | |
while True: | |
now = DT.datetime.now(DT.timezone.utc) | |
data = requests.get(live_url).json() | |
matches = [m for m in data["Results"] if parse_rfc3339(m["Date"]) <= now] | |
if matches: # during or after a recent match | |
saved_matches = load_matches() | |
for match in matches: | |
# find out whether a new goal is scored or a new match is started | |
saved_match = saved_matches.get(match["IdMatch"]) | |
if ( | |
not saved_match | |
or any( | |
match[team]["Score"] != saved_match[team]["Score"] | |
for team in ["HomeTeam", "AwayTeam"] | |
) | |
or match["HomeTeamPenaltyScore"] != saved_match["HomeTeamPenaltyScore"] | |
or match["AwayTeamPenaltyScore"] != saved_match["AwayTeamPenaltyScore"] | |
): | |
subprocess.call(["send-tg", get_info_about_match(match)]) | |
saved_matches[match["IdMatch"]] = match | |
save_matches(saved_matches) | |
matches[:] = [m for m in matches if m["MatchStatus"] != DONE] | |
if not matches: # no live matches: sleep until the next match | |
# find the next match | |
data = requests.get(calendar_url).json() | |
try: | |
dt = next( # there are 64 matches total (use linear search) | |
dt | |
for match in sorted( | |
data["Results"], key=lambda m: parse_rfc3339(m["Date"]) | |
) | |
for dt in [parse_rfc3339(match["Date"])] | |
if dt > now | |
) | |
except StopIteration: | |
subprocess.call(["send-tg", f"{appname}: no more matches"]) | |
break | |
else: | |
print(f"sleep until {dt}") | |
time.sleep((dt - now).total_seconds()) | |
time.sleep(update_delay) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment