Created
April 2, 2023 19:37
-
-
Save loathingKernel/627669966e1e6e4ae10f8f31e0b9df27 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from dataclasses import dataclass, field | |
from logging import getLogger | |
from typing import Callable, Dict, TypeVar, List, Tuple | |
from typing import Union | |
from PyQt5.QtCore import QObject, pyqtSignal, QUrl, QJsonParseError, QJsonDocument, QUrlQuery | |
from PyQt5.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkReply | |
logger = getLogger("QtRequests") | |
REQUEST_LIMIT = 8 | |
USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36" | |
RequestHandler = TypeVar("RequestHandler", bound=Callable[[Union[Dict, bytes]], None]) | |
@dataclass | |
class RequestQueueItem: | |
method: str = None | |
url: QUrl = None | |
payload: Dict = field(default_factory=dict) | |
params: Dict = field(default_factory=dict) | |
handlers: List[RequestHandler] = field(default_factory=list) | |
def __eq__(self, other): | |
return self.method == other.method and self.url == other.url | |
class QtRequestManager(QObject): | |
data_ready = pyqtSignal(object) | |
def __init__(self, doc_type: str = "json", authorization_token: str = None, parent=None): | |
super(QtRequestManager, self).__init__(parent=parent) | |
self.manager = QNetworkAccessManager(self) | |
self.doc_type = doc_type | |
self.authorization_token = authorization_token | |
self.__pending_requests = [] | |
self.__active_requests = [] | |
def __post(self, item: RequestQueueItem): | |
request = QNetworkRequest(item.url) | |
request.setHeader(QNetworkRequest.ContentTypeHeader, "application/json;charset=UTF-8") | |
request.setHeader(QNetworkRequest.UserAgentHeader, USER_AGENT) | |
request.setAttribute(QNetworkRequest.RedirectPolicyAttribute, QNetworkRequest.NoLessSafeRedirectPolicy) | |
if self.authorization_token is not None: | |
request.setRawHeader(b"Authorization", self.authorization_token.encode()) | |
payload = json.dumps(item.payload).encode("utf-8") | |
reply = self.manager.post(request, payload) | |
active_request = (reply, item) | |
self.__active_requests.append(active_request) | |
reply.errorOccurred.connect(lambda: self.__on_error(active_request)) | |
reply.finished.connect(lambda: self.__on_finished(active_request)) | |
def post(self, handler: RequestHandler, url: str, payload: dict): | |
item = RequestQueueItem(method="post", url=QUrl(url), payload=payload, handlers=[handler]) | |
if len(self.__active_requests) < REQUEST_LIMIT: | |
self.__post(item) | |
else: | |
self.__pending_requests.append(item) | |
def __get(self, item: RequestQueueItem): | |
request = QNetworkRequest(item.url) | |
request.setHeader(QNetworkRequest.UserAgentHeader, USER_AGENT) | |
reply = self.manager.get(request) | |
active_request = (reply, item) | |
self.__active_requests.append(active_request) | |
reply.errorOccurred.connect(lambda: self.__on_error(active_request)) | |
reply.finished.connect(lambda: self.__on_finished(active_request)) | |
@staticmethod | |
def __prepare_query(url, params) -> QUrl: | |
url = QUrl(url) | |
query = QUrlQuery(url) | |
for k, v in params.items(): | |
query.addQueryItem(str(k), str(v)) | |
url.setQuery(query) | |
return url | |
def get(self, handler: RequestHandler, url: str, payload: Dict = None, params: Dict = None): | |
url = self.__prepare_query(url, params) if params is not None else QUrl(url) | |
item = RequestQueueItem(method="get", url=url, payload=payload, handlers=[handler]) | |
if len(self.__active_requests) < REQUEST_LIMIT: | |
self.__get(item) | |
else: | |
self.__pending_requests.append(item) | |
@staticmethod | |
def __on_error(reply_item: Tuple[QNetworkReply, RequestQueueItem]): | |
reply, item = reply_item | |
logger.error(reply.errorString()) | |
def __on_finished(self, reply_item: Tuple[QNetworkReply, RequestQueueItem]): | |
self.__active_requests.pop(self.__active_requests.index(reply_item)) | |
reply, item = reply_item | |
data = {} if self.doc_type == "json" else b"" | |
if reply: | |
try: | |
if reply.error() == QNetworkReply.NoError: | |
if self.doc_type == "json": | |
error = QJsonParseError() | |
json_data = QJsonDocument.fromJson( | |
reply.readAll().data(), error | |
) | |
if QJsonParseError.NoError == error.error: | |
data = json.loads(json_data.toJson().data().decode()) | |
else: | |
logger.error(error.errorString()) | |
else: | |
data = reply.readAll().data() | |
except RuntimeError as e: | |
logger.error(str(e)) | |
for handler in item.handlers: | |
handler(data) | |
reply.deleteLater() | |
if self.__pending_requests: | |
item = self.__pending_requests.pop(0) | |
if item.method == "post": | |
self.__post(item) | |
elif item.method == "get": | |
self.__get(item) | |
else: | |
raise NotImplementedError |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment