Skip to content

Instantly share code, notes, and snippets.

@raimannma
Created November 9, 2022 06:04
Show Gist options
  • Save raimannma/1fe4ffbd5e946087356f9a35975ae0a7 to your computer and use it in GitHub Desktop.
Save raimannma/1fe4ffbd5e946087356f9a35975ae0a7 to your computer and use it in GitHub Desktop.
Details Collector
import json
import logging
import os
import pickle
import random
import time
from abc import ABC, abstractmethod
from functools import cached_property
from threading import Thread
from typing import Optional
import brotli
import requests
import valo_api
import valo_api_official
from munch import munchify
from ratelimit import limits, sleep_and_retry
from requests import RequestException
from valo_api.config import Config as ValoAPIConfig
from valo_api.endpoints.raw import EndpointType
from match_db.db_schemes.match import Match
from match_db.db_schemes.queue_entry import QueueEntry
from match_db.utils.db_manager import DBManager
from match_db.utils.health_check import send_healthy_signal
from match_db.utils.valorant_auth import Auth
logger = logging.getLogger(__name__)
ValoAPIConfig.USER_AGENT = f"ValoAPI/0.1.0+{random.random() * 1000:.0f}"
class DetailsCollector(Thread, ABC):
def __init__(self, db_manager: DBManager):
super().__init__(target=self._run)
self._db_manager = db_manager
def _run(self):
logger.info(f"{self.__class__.__name__} started")
while self.is_alive():
try:
with db_manager.session_scope() as session:
entry = session.query(QueueEntry).offset(random.randint(0, 100)).first()
session.query(QueueEntry).filter_by(id=entry.id).delete()
if entry is None:
logger.info("No more matches to fetch, waiting 1 minute")
time.sleep(60)
continue
match_details = self._fetch(entry.region, entry.match_id)
with open(f"/match_queue/{entry.match_id}.pkl.lock", "wb") as f:
pickle.dump((entry.region, match_details), f)
os.rename(
f"/match_queue/{entry.match_id}.pkl.lock",
f"/match_queue/{entry.match_id}.pkl",
)
except Exception as e:
logger.error(
f"Error while fetching match details with {self.__class__.__name__}: "
f"{e}, sleeping 5 seconds"
)
time.sleep(5)
else:
send_healthy_signal(self.__class__.__name__.lower())
@abstractmethod
def _fetch(self, region: str, match_id: str):
...
class DetailsCollectorOfficial(DetailsCollector):
@sleep_and_retry
@limits(calls=5, period=6)
def _fetch(self, region: str, match_id: str):
return valo_api_official.get_match_details_v1(region, match_id)
class DetailsCollectorUnofficial(DetailsCollector):
@sleep_and_retry
@limits(calls=1, period=1)
def _fetch(self, region: str, match_id: str):
return valo_api.get_raw_data_v1(EndpointType.MATCH_DETAILS, match_id, region)
class DetailsCollectorUnofficialWithoutKey(DetailsCollectorUnofficial):
@sleep_and_retry
@limits(calls=1, period=2)
def _fetch(self, region: str, match_id: str):
env_var = os.environ.get("VALO_API_KEY", None)
if env_var:
del os.environ["VALO_API_KEY"]
match_details = valo_api.get_raw_data_v1(
EndpointType.MATCH_DETAILS, match_id, region
)
if env_var:
os.environ["VALO_API_KEY"] = env_var
return match_details
class DetailsCollectorIngame(DetailsCollector):
riot_username: Optional[str] = os.environ.get("RIOT_USERNAME", None)
riot_password: Optional[str] = os.environ.get("RIOT_PASSWORD", None)
client_platform = (
"ew0KCSJwbGF0Zm9ybVR5cGUiOiAiUEMiLA0KCSJwbGF0Zm9ybU9TIjo"
"gIldpbmRvd3MiLA0KCSJwbGF0Zm9ybU9TVmVyc2lvbiI6ICIxMC4wLjE"
"5MDQyLjEuMjU2LjY0Yml0IiwNCgkicGxhdGZvcm1DaGlwc2V0IjogIlVua25vd24iDQp9"
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._auth = Auth(self.riot_username, self.riot_password)
@cached_property
def current_version(self) -> str:
data = requests.get("https://valorant-api.com/v1/version").json()["data"]
return (
f"{data['branch']}-"
f"shipping-"
f"{data['buildVersion']}-"
f"{data['version'].split('.')[3]}"
)
@property
def headers(self):
return {
"Authorization": f"Bearer {self._auth.access_token}",
"X-Riot-Entitlements-JWT": self._auth.entitlements_token,
"X-Riot-ClientPlatform": self.client_platform,
"X-Riot-ClientVersion": self.current_version,
}
@sleep_and_retry
@limits(calls=1, period=1)
def _fetch(self, region: str, match_id: str):
response = requests.get(
f"https://pd.{region}.a.pvp.net/match-details/v1/matches/{match_id}",
headers=self.headers,
)
if not response.ok:
self._auth.authenticate()
raise RequestException(response=response)
return munchify(response.json())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment