Skip to content

Instantly share code, notes, and snippets.

@Justintime50
Last active June 14, 2023 22:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Justintime50/330d0c9da54c868807eedae7a7319d76 to your computer and use it in GitHub Desktop.
Save Justintime50/330d0c9da54c868807eedae7a7319d76 to your computer and use it in GitHub Desktop.
An example of how to setup event handlers in Python
import requests
class Event:
"""An Event that gets triggered when an HTTP event occurs."""
def __init__(self):
self._event_handlers = []
def __iadd__(self, handler):
self._event_handlers.append(handler)
return self
def __isub__(self, handler):
self._event_handlers.remove(handler)
return self
def __call__(self, *args, **kwargs):
for event_handler in self._event_handlers:
event_handler(*args, **kwargs)
class RequestEvent(Event):
"""An event that gets triggered when an HTTP request begins."""
pass
class ResponseEvent(Event):
"""An event that gets triggered when an HTTP request is returned."""
pass
class Client:
"""A client object for making HTTP calls."""
def __init__(self, api_key=None):
self.api_key = api_key
self._request_event = RequestEvent()
self._response_event = ResponseEvent()
def make_request(self):
"""Make an HTTP request, fire off hooks for the request and response."""
method = "GET"
url = "https://example.com"
data = None
self._request_event(method=method, url=url, data=data)
response = requests.request(method=method, url=url, json=data)
self._response_event(
method=method, url=url, data=data, http_status=response.status_code
)
return response.text
def add_request_event_subscriber(self, function):
"""Add functions to fire when a request event occurs."""
self._request_event += function
def remove_request_event_subscriber(self, function):
"""Remove functions from firing when a request even occurs."""
self._request_event -= function
def add_response_event_subscriber(self, function):
"""Add functions to fire when a response event occurs."""
self._response_event += function
def remove_response_event_subscriber(self, function):
"""Remove functions from firing when a response even occurs."""
self._response_event -= function
def custom_request_hook(**kwargs):
"""User-defined code that fires once a request hook occurs."""
print("\nA request hook was fired!")
print(f"URL used for request: {kwargs.get('url')}")
print(f"Method used for request: {kwargs.get('method')}")
print(f"Data used for request: {kwargs.get('data')}")
def custom_response_hook(**kwargs):
"""User-defined code that fires once a response hook occurs."""
print("\nA response hook was fired!")
print(f"The status code of the response: {kwargs.get('http_status')}")
def main():
client = Client(123)
# Subscribe to request events
# You could subscribe once and call multiple requests and each time the hook would fire
client.add_request_event_subscriber(custom_request_hook)
client.add_response_event_subscriber(custom_response_hook)
response = client.make_request()
# print(response)
# Unsubscribe from the request events
client.remove_request_event_subscriber(custom_request_hook)
client.remove_response_event_subscriber(custom_response_hook)
response = client.make_request()
# print(response)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment