Skip to content

Instantly share code, notes, and snippets.

@aprilahijriyan
Last active July 23, 2021 13:01
Show Gist options
  • Save aprilahijriyan/33e0595460f16594ab1aa48d27466e40 to your computer and use it in GitHub Desktop.
Save aprilahijriyan/33e0595460f16594ab1aa48d27466e40 to your computer and use it in GitHub Desktop.
Simple Asynchronous Python HTTP requests are similar to jQuery.ajax requests
from concurrent.futures import Future
from requests_futures.sessions import FuturesSession
from requests.exceptions import ConnectionError
class AjaxSession(FuturesSession):
"""
Simple HTTP Request yang memudahkan anda berinteraksi dengan server
pada background-process melalui event handler yang tersedia.
Kelas ini di desain seperti ``jQuery.ajax``, yang menyediakan event handler (dengan tambahan sedikit).
Seperti::
* ``on_finish``: event ini adalah event utama untuk menghandle hasil dari http request.
* ``on_failure``: event ini adalah event utama untuk menghandle exception pada client / server.
* ``on_success``: event ini adalah event untuk menghandle jika http request berhasil.
* ``on_error``: sama seperti event ``on_error``, tapi ini akan menghandle exception dari koneksi error saja.
Example::
ajax = AjaxSession()
future = ajax.get("http://httpbin.org/get")
"""
_clb_on_error = None
_clb_on_success = None
def on_finish(self, f: Future):
ex = getattr(f, "_exception", None)
if ex:
self.on_failure(ex)
else:
resp = f._result
self.call_on_success(resp)
def on_failure(self, ex):
if isinstance(ex, ConnectionError):
self.call_on_error(ex)
def on_success(self, resp):
print("HTTP Response:", resp)
def on_error(self, ex):
print("HTTP Error:", ex)
def call_on_error(self, ex):
func = getattr(self, "_clb_on_error") or self.on_error
func(ex)
def call_on_success(self, ex):
func = getattr(self, "_clb_on_success") or self.on_success
func(ex)
def set_handler(self, on_succes=None, on_error=None):
assert callable(on_succes)
assert callable(on_error)
self._clb_on_success = on_succes
self._clb_on_error = on_error
def request(self, *args, **kwargs):
f = super().request(*args, **kwargs)
f.add_done_callback(self.on_finish)
return f
@zakirkun
Copy link

zakirkun commented Jun 3, 2020

good

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment