Skip to content

Instantly share code, notes, and snippets.

@om2c0de
Created September 4, 2020 16:47
Show Gist options
  • Save om2c0de/d2bbc7f41875b2e7e68c7b330f24f1e9 to your computer and use it in GitHub Desktop.
Save om2c0de/d2bbc7f41875b2e7e68c7b330f24f1e9 to your computer and use it in GitHub Desktop.
Patched controller vega
import json
import logging
from http import HTTPStatus
from typing import Awaitable, Optional, Union
from auth.exceptions import UserNotFound
from auth.jwt_utils import jwt_decode
from services.projects import Projects
from services.users import User, Users
from utils.constants import BaseErrorCodes
from utils.error_response import Error
from tornado.escape import json_decode
from .kerberos import KerberosAuthHandler
auth_logger = logging.getLogger('auth')
class Controller(KerberosAuthHandler):
current_user = None # Чтобы pyrestful не вызывал get_current_user() при каждом запросе
def set_default_headers(self):
# Logger.info("setting CORS")
self.set_header("Access-Control-Allow-Origin", "*")
self.set_header("Access-Control-Allow-Headers", "x-requested-with")
self.set_header("Access-Control-Allow-Headers", "access-control-allow-origin,authorization,content-type")
self.set_header('Access-Control-Allow-Methods', '*')
self.set_header('Access-Control-Allow-Credentials', 'true')
self.set_header('Access-Control-Expose-Headers', 'Content-Disposition')
def options(self):
# no body
self.set_status(HTTPStatus.NO_CONTENT)
self.finish()
def send_response(self, status_code: int, body: Union[str, bytes, dict]) -> None:
"""
Отправляет HTTP-ответ с переданными статус-кодом и телом.
В контроллерах должно использоваться так:
return self.send_response(...)
Так сделано, чтобы не забыть написать выражение return после вызова метода.
"""
self.set_status(status_code)
self.finish(body)
return None
def _get_user_id_from_auth_header(self) -> str or None:
auth_header = str(self._get_auth_header())
if 'Bearer' in auth_header: # JWT authentication
_, token_encoded = auth_header.split(' ')
token_decoded = jwt_decode(token_encoded)
return token_decoded['user_id']
elif 'Negotiate' in auth_header: # Kerberos authentication
return self.get_secure_cookie('user_id')
return None
@staticmethod
def _search_for_user_by_id(user_id):
user = Users().read_by_filter({'user_id': user_id})
if user is None:
raise UserNotFound()
return user
def get_current_user(self) -> User:
user_id = self._get_user_id_from_auth_header()
if user_id:
user = self._search_for_user_by_id(user_id)
auth_logger.debug(f'Got current user: {user}')
return user
return None
def check_project_exists(self, project_id: str, logger_handler) -> Optional[bool]:
"""Актуально только для контроллеров: calculation, imports, results
Arguments:
project_id {str}
logger_handler {Logger}
Returns:
[True, None]
"""
if not Projects().exists(object_id=project_id):
message = f"Project({project_id}) was not found."
logger_handler.info(message)
error = Error(code=BaseErrorCodes.BAD_REQUEST, message=message)
return self.send_response( # type: ignore
status_code=HTTPStatus.BAD_REQUEST, body=error.get_full_error_dict()
)
return True
def _get_auth_header(self) -> str:
auth_header = self.request.headers.get('Authorization')
if not auth_header:
auth_logger.debug(f'Auth header not found. Headers: {self.request.headers}')
self.authenticate_redirect()
return auth_header
def get_resource_id_from_body(self, resource_id_alias: str) -> Optional[str]:
try:
body_dict = json_decode(self.request.body)
except json.decoder.JSONDecodeError:
auth_logger.debug(
f'Not found resource id ({resource_id_alias}) in body: invalid json in body: {self.request.body}'
)
return None # not json
try:
resource_id = body_dict[resource_id_alias]
auth_logger.debug(f'Found resource id ({resource_id_alias}) in body: {resource_id}')
return resource_id
except KeyError:
auth_logger.debug(f'Not found resource id ({resource_id_alias}) in body: {self.request.body}')
return None # no resource id
class ControllerException(Exception):
def __init__(
self, code: str, data: Optional[str] = None, source: Optional[str] = None, reason: Optional[str] = None
):
self.code = code
self.data = data
self.source = source
self.reason = reason
@property
def extra(self):
return self.data
@property
def error_code(self):
return self.code
def __str__(self):
json_data = {'code': str(self.code), 'data': self.data}
try:
result = json.dumps(json_data)
except Exception as ex:
result = f'ControllerException (Internal error) -> {ex}'
return result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment