Skip to content

Instantly share code, notes, and snippets.

@getjump
Created November 2, 2021 09:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save getjump/0b9e953857ceff289296bf0c03a86fef to your computer and use it in GitHub Desktop.
Save getjump/0b9e953857ceff289296bf0c03a86fef to your computer and use it in GitHub Desktop.
import aiohttp
import asyncio
import logging
import time
from pyppeteer import launch
from urllib.parse import urlparse, parse_qs
from bs4 import BeautifulSoup
from enum import Enum
from typing import Optional, Tuple, Literal
from rostelecom_key.session import LOGGER, RtKeySession
_LOGGER = logging.getLogger(__name__)
class AuthenticationStep1Result(Enum):
AUTHENTICATED_BY_COOKIES = 1
NEED_TO_FOLLOW_NEXT_STEPS = 2
class AuthenticationStep2Result(Enum):
NEED_TO_FOLLOW_NEXT_STEPS = 1
class AuthenticationSendSmsResult(Enum):
SMS_WAS_SENT = 1
class AuthenticationDoubleRedirect(Enum):
SUCCESS = 1
class AuthenticationDoubleRedirectResponse():
def __init__(self, code: str, t: str, state: str):
self.code = code
self.t = t
self.state = state
class AuthenticationGetTokenResponse():
def __init__(self, bearer_token: str, auth_token: str):
self.bearer_token = bearer_token
self.auth_token = auth_token
class ElementNotFound(Exception):
pass
class Authentication:
BASE_AUTHENTICATION_ENDPOINT = 'https://b2c.passport.rt.ru/auth/realms/b2c/protocol/openid-connect/auth'
BASE_AUTHENTICATION_REDIRECT_URI = 'https://sso.key.rt.ru/api/v1/oauth/b2c/callback'
BASE_SIGNIN_WITH_TIME_TEMPLATE = 'https://key.rt.ru/main/signin?t={}'
HOUSEHOLD_GET_TOKEN_URI = 'https://household.key.rt.ru/api/v3/app/sso/oauth2/token'
CLIENT_ID = None
def __init__(self, session: RtKeySession, client_id = 'lk_dmh'):
self.session = session
self.CLIENT_ID = client_id
async def get_browser(self):
if self.browser is None:
self.browser = await launch({
'headless': True
})
self.page = await self.browser.newPage()
await self.page.setRequestInterception(True)
return self.browser
async def authenticate_via_sms(self, phone: str) -> Tuple[Literal[AuthenticationSendSmsResult.SMS_WAS_SENT], str]:
(step1_result, step1_followup) = await self.authentication_step1()
if step1_result is AuthenticationStep1Result.AUTHENTICATED_BY_COOKIES:
# Goes directly to Get Token?
pass
elif step1_result is AuthenticationStep1Result.NEED_TO_FOLLOW_NEXT_STEPS:
(step2_result, step2_followup) = await self.authentication_step2(str(step1_followup))
if step2_result is AuthenticationStep2Result.NEED_TO_FOLLOW_NEXT_STEPS:
return await self.send_sms_with_code(step2_followup, phone)
raise Exception
async def send_sms_with_code(self, uri: str, phone: str) -> Tuple[Literal[AuthenticationSendSmsResult.SMS_WAS_SENT], str]:
r = await self.session.post(uri, data={ 'phone': phone, 'otp_get_code': 'Получить+код' })
html_text = await r.text()
soup = BeautifulSoup(html_text, 'html.parser')
collection = soup.select('#kc-form-login .page-right-container-header:nth-of-type(1)')
if len(collection) != 1:
raise Exception
if 'Код подтверждения отправлен' not in collection[0].text:
raise Exception
element = soup.find('form', id='kc-form-login')
if element is None:
raise ElementNotFound
return (AuthenticationSendSmsResult.SMS_WAS_SENT, str(element.get('action'))) # type: ignore
async def authenticate_with_code_from_sms(self, uri: str, phone: str, code: str):
r = await self.session.post(uri, data={
'username': phone,
'password': code,
'otp_login': 'Войти',
'otp_login': 'Войти'
})
html_text = await r.text()
soup = BeautifulSoup(html_text, 'html.parser')
errors = soup.select('.error-container')
# Assert there are no errors
assert(len(errors) == 0)
(_, data) = await self.authentication_double_redirect(r)
return await self.get_token(data)
async def get_token(self, data: AuthenticationDoubleRedirectResponse, grant_type: str = 'authorization_code'):
r = await self.session.post(self.HOUSEHOLD_GET_TOKEN_URI, data={
'code': data.code,
't': data.t,
'state': data.state,
'grant_type': grant_type
})
response_json = await r.json()
# Assert there are no errors
assert(response_json['error'] is not None)
# Two type of tokens, one for one group of methods, and second for another ones
self.session.set_auth_token(response_json['data']['vc']['access_token'])
self.session.set_bearer_token(response_json['data']['key']['access_token'])
return AuthenticationGetTokenResponse(response_json['data']['key']['access_token'], response_json['data']['vc']['access_token'])
async def authentication_step2(self, uri: str) -> Tuple[Literal[AuthenticationStep2Result.NEED_TO_FOLLOW_NEXT_STEPS], str]:
r = await self.session.get(uri)
# LOGGER.debug(uri)
if r.status != 302:
raise Exception(r.status)
location = str(r.headers.get('Location'))
async with self.session.get(location) as response:
html_text = await response.text()
soup = BeautifulSoup(html_text, 'html.parser')
element = soup.find('form', id='kc-form-login')
if element is None:
raise ElementNotFound
return (AuthenticationStep1Result.NEED_TO_FOLLOW_NEXT_STEPS, str(element.get('action'))) # type: ignore
async def authentication_double_redirect(self, request) -> Tuple[Literal[AuthenticationDoubleRedirect.SUCCESS], AuthenticationDoubleRedirectResponse]:
# That means we already authenticated with AUTHENTICATED_BY_COOKIES
# We should follow redirects and then catch params from Location header
location = str(request.headers.get('Location'))
async with self.session.get(location) as response:
if response.status == 302:
location = response.headers.get('Location')
async with self.session.get(str(location)) as response:
parsed_url = urlparse(location)
params = parse_qs(str(parsed_url.query))
return (AuthenticationDoubleRedirect.SUCCESS, AuthenticationDoubleRedirectResponse(params['code'][0], params['t'][0], params['state'][0]))
raise Exception
async def authentication_step1(self) -> Tuple[Literal[AuthenticationStep1Result.AUTHENTICATED_BY_COOKIES, AuthenticationStep1Result.NEED_TO_FOLLOW_NEXT_STEPS], Optional[str]]:
r = await self.session.get(self.BASE_AUTHENTICATION_ENDPOINT, params={
'client_id': self.CLIENT_ID,
'redirect_uri': self.BASE_AUTHENTICATION_REDIRECT_URI,
'response_type': 'code',
'state': self.BASE_SIGNIN_WITH_TIME_TEMPLATE.format(round(time.time() * 1000))
}, allow_redirects=False)
if r.status == 302:
result = await self.authentication_double_redirect(r)
if result == AuthenticationDoubleRedirect.SUCCESS:
return (AuthenticationStep1Result.AUTHENTICATED_BY_COOKIES, None)
else:
html_text = await r.text()
soup = BeautifulSoup(html_text, 'html.parser')
element = soup.find('form', id='kc-form-login')
if element is None:
raise ElementNotFound
return (AuthenticationStep1Result.NEED_TO_FOLLOW_NEXT_STEPS, str(element.get('action'))) # type: ignore
raise Exception
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment