Created
November 9, 2022 06:04
-
-
Save raimannma/1fe4ffbd5e946087356f9a35975ae0a7 to your computer and use it in GitHub Desktop.
Details Collector
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import logging | |
import os | |
import pickle | |
import random | |
import time | |
from abc import ABC, abstractmethod | |
from functools import cached_property | |
from threading import Thread | |
from typing import Optional | |
import brotli | |
import requests | |
import valo_api | |
import valo_api_official | |
from munch import munchify | |
from ratelimit import limits, sleep_and_retry | |
from requests import RequestException | |
from valo_api.config import Config as ValoAPIConfig | |
from valo_api.endpoints.raw import EndpointType | |
from match_db.db_schemes.match import Match | |
from match_db.db_schemes.queue_entry import QueueEntry | |
from match_db.utils.db_manager import DBManager | |
from match_db.utils.health_check import send_healthy_signal | |
from match_db.utils.valorant_auth import Auth | |
logger = logging.getLogger(__name__) | |
ValoAPIConfig.USER_AGENT = f"ValoAPI/0.1.0+{random.random() * 1000:.0f}" | |
class DetailsCollector(Thread, ABC): | |
def __init__(self, db_manager: DBManager): | |
super().__init__(target=self._run) | |
self._db_manager = db_manager | |
def _run(self): | |
logger.info(f"{self.__class__.__name__} started") | |
while self.is_alive(): | |
try: | |
with db_manager.session_scope() as session: | |
entry = session.query(QueueEntry).offset(random.randint(0, 100)).first() | |
session.query(QueueEntry).filter_by(id=entry.id).delete() | |
if entry is None: | |
logger.info("No more matches to fetch, waiting 1 minute") | |
time.sleep(60) | |
continue | |
match_details = self._fetch(entry.region, entry.match_id) | |
with open(f"/match_queue/{entry.match_id}.pkl.lock", "wb") as f: | |
pickle.dump((entry.region, match_details), f) | |
os.rename( | |
f"/match_queue/{entry.match_id}.pkl.lock", | |
f"/match_queue/{entry.match_id}.pkl", | |
) | |
except Exception as e: | |
logger.error( | |
f"Error while fetching match details with {self.__class__.__name__}: " | |
f"{e}, sleeping 5 seconds" | |
) | |
time.sleep(5) | |
else: | |
send_healthy_signal(self.__class__.__name__.lower()) | |
@abstractmethod | |
def _fetch(self, region: str, match_id: str): | |
... | |
class DetailsCollectorOfficial(DetailsCollector): | |
@sleep_and_retry | |
@limits(calls=5, period=6) | |
def _fetch(self, region: str, match_id: str): | |
return valo_api_official.get_match_details_v1(region, match_id) | |
class DetailsCollectorUnofficial(DetailsCollector): | |
@sleep_and_retry | |
@limits(calls=1, period=1) | |
def _fetch(self, region: str, match_id: str): | |
return valo_api.get_raw_data_v1(EndpointType.MATCH_DETAILS, match_id, region) | |
class DetailsCollectorUnofficialWithoutKey(DetailsCollectorUnofficial): | |
@sleep_and_retry | |
@limits(calls=1, period=2) | |
def _fetch(self, region: str, match_id: str): | |
env_var = os.environ.get("VALO_API_KEY", None) | |
if env_var: | |
del os.environ["VALO_API_KEY"] | |
match_details = valo_api.get_raw_data_v1( | |
EndpointType.MATCH_DETAILS, match_id, region | |
) | |
if env_var: | |
os.environ["VALO_API_KEY"] = env_var | |
return match_details | |
class DetailsCollectorIngame(DetailsCollector): | |
riot_username: Optional[str] = os.environ.get("RIOT_USERNAME", None) | |
riot_password: Optional[str] = os.environ.get("RIOT_PASSWORD", None) | |
client_platform = ( | |
"ew0KCSJwbGF0Zm9ybVR5cGUiOiAiUEMiLA0KCSJwbGF0Zm9ybU9TIjo" | |
"gIldpbmRvd3MiLA0KCSJwbGF0Zm9ybU9TVmVyc2lvbiI6ICIxMC4wLjE" | |
"5MDQyLjEuMjU2LjY0Yml0IiwNCgkicGxhdGZvcm1DaGlwc2V0IjogIlVua25vd24iDQp9" | |
) | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self._auth = Auth(self.riot_username, self.riot_password) | |
@cached_property | |
def current_version(self) -> str: | |
data = requests.get("https://valorant-api.com/v1/version").json()["data"] | |
return ( | |
f"{data['branch']}-" | |
f"shipping-" | |
f"{data['buildVersion']}-" | |
f"{data['version'].split('.')[3]}" | |
) | |
@property | |
def headers(self): | |
return { | |
"Authorization": f"Bearer {self._auth.access_token}", | |
"X-Riot-Entitlements-JWT": self._auth.entitlements_token, | |
"X-Riot-ClientPlatform": self.client_platform, | |
"X-Riot-ClientVersion": self.current_version, | |
} | |
@sleep_and_retry | |
@limits(calls=1, period=1) | |
def _fetch(self, region: str, match_id: str): | |
response = requests.get( | |
f"https://pd.{region}.a.pvp.net/match-details/v1/matches/{match_id}", | |
headers=self.headers, | |
) | |
if not response.ok: | |
self._auth.authenticate() | |
raise RequestException(response=response) | |
return munchify(response.json()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment