Skip to content

Instantly share code, notes, and snippets.

@loathingKernel
Created April 2, 2023 19:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save loathingKernel/627669966e1e6e4ae10f8f31e0b9df27 to your computer and use it in GitHub Desktop.
Save loathingKernel/627669966e1e6e4ae10f8f31e0b9df27 to your computer and use it in GitHub Desktop.
import json
from dataclasses import dataclass, field
from logging import getLogger
from typing import Callable, Dict, TypeVar, List, Tuple
from typing import Union
from PyQt5.QtCore import QObject, pyqtSignal, QUrl, QJsonParseError, QJsonDocument, QUrlQuery
from PyQt5.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkReply
logger = getLogger("QtRequests")
REQUEST_LIMIT = 8
USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36"
RequestHandler = TypeVar("RequestHandler", bound=Callable[[Union[Dict, bytes]], None])
@dataclass
class RequestQueueItem:
method: str = None
url: QUrl = None
payload: Dict = field(default_factory=dict)
params: Dict = field(default_factory=dict)
handlers: List[RequestHandler] = field(default_factory=list)
def __eq__(self, other):
return self.method == other.method and self.url == other.url
class QtRequestManager(QObject):
data_ready = pyqtSignal(object)
def __init__(self, doc_type: str = "json", authorization_token: str = None, parent=None):
super(QtRequestManager, self).__init__(parent=parent)
self.manager = QNetworkAccessManager(self)
self.doc_type = doc_type
self.authorization_token = authorization_token
self.__pending_requests = []
self.__active_requests = []
def __post(self, item: RequestQueueItem):
request = QNetworkRequest(item.url)
request.setHeader(QNetworkRequest.ContentTypeHeader, "application/json;charset=UTF-8")
request.setHeader(QNetworkRequest.UserAgentHeader, USER_AGENT)
request.setAttribute(QNetworkRequest.RedirectPolicyAttribute, QNetworkRequest.NoLessSafeRedirectPolicy)
if self.authorization_token is not None:
request.setRawHeader(b"Authorization", self.authorization_token.encode())
payload = json.dumps(item.payload).encode("utf-8")
reply = self.manager.post(request, payload)
active_request = (reply, item)
self.__active_requests.append(active_request)
reply.errorOccurred.connect(lambda: self.__on_error(active_request))
reply.finished.connect(lambda: self.__on_finished(active_request))
def post(self, handler: RequestHandler, url: str, payload: dict):
item = RequestQueueItem(method="post", url=QUrl(url), payload=payload, handlers=[handler])
if len(self.__active_requests) < REQUEST_LIMIT:
self.__post(item)
else:
self.__pending_requests.append(item)
def __get(self, item: RequestQueueItem):
request = QNetworkRequest(item.url)
request.setHeader(QNetworkRequest.UserAgentHeader, USER_AGENT)
reply = self.manager.get(request)
active_request = (reply, item)
self.__active_requests.append(active_request)
reply.errorOccurred.connect(lambda: self.__on_error(active_request))
reply.finished.connect(lambda: self.__on_finished(active_request))
@staticmethod
def __prepare_query(url, params) -> QUrl:
url = QUrl(url)
query = QUrlQuery(url)
for k, v in params.items():
query.addQueryItem(str(k), str(v))
url.setQuery(query)
return url
def get(self, handler: RequestHandler, url: str, payload: Dict = None, params: Dict = None):
url = self.__prepare_query(url, params) if params is not None else QUrl(url)
item = RequestQueueItem(method="get", url=url, payload=payload, handlers=[handler])
if len(self.__active_requests) < REQUEST_LIMIT:
self.__get(item)
else:
self.__pending_requests.append(item)
@staticmethod
def __on_error(reply_item: Tuple[QNetworkReply, RequestQueueItem]):
reply, item = reply_item
logger.error(reply.errorString())
def __on_finished(self, reply_item: Tuple[QNetworkReply, RequestQueueItem]):
self.__active_requests.pop(self.__active_requests.index(reply_item))
reply, item = reply_item
data = {} if self.doc_type == "json" else b""
if reply:
try:
if reply.error() == QNetworkReply.NoError:
if self.doc_type == "json":
error = QJsonParseError()
json_data = QJsonDocument.fromJson(
reply.readAll().data(), error
)
if QJsonParseError.NoError == error.error:
data = json.loads(json_data.toJson().data().decode())
else:
logger.error(error.errorString())
else:
data = reply.readAll().data()
except RuntimeError as e:
logger.error(str(e))
for handler in item.handlers:
handler(data)
reply.deleteLater()
if self.__pending_requests:
item = self.__pending_requests.pop(0)
if item.method == "post":
self.__post(item)
elif item.method == "get":
self.__get(item)
else:
raise NotImplementedError
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment