Created
November 12, 2022 09:22
-
-
Save melardev/d4611220bfbbc3a24d49fb0887973485 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This snippet illustrates the usage of the __call__() magic method, how it can be used to solve a real problem. | |
Please read code comments for further details. | |
""" | |
import asyncio | |
import binance | |
from autobahn.asyncio import WebSocketClientProtocol, WebSocketClientFactory | |
# noinspection PyPep8Naming | |
class WsClientProtocol(WebSocketClientProtocol): | |
name: str | |
arg2: float | |
def __init__(self, name: str, arg2: float): | |
super().__init__() | |
self.name = name | |
self.arg2 = arg2 | |
def onMessage(self, payload, isBinary): | |
if isBinary: | |
print(f"{self.name} onMessage Binary: {len(payload)} bytes received") | |
else: | |
print(f'{self.name} onMessage {payload}') | |
def onOpen(self): | |
print(f'{self.name} - Connection Opened') | |
def __call__(self, *args, **kwargs): | |
# __call__ is a magic method called when something is treated as callable (by using ()), for | |
# example self.onOpen is callable, because we can use () -> self.onOpen() | |
# to make an instance of this class callable we implement __call__() like we do here. | |
# This method will be called by the framework to create an object, we just return ourselves (self) | |
# as the object. | |
return self | |
loop = asyncio.get_event_loop() | |
endpoint = f"wss://stream.binance.com:9443/ws/btcusdt@kline_{binance.Client.KLINE_INTERVAL_1MINUTE}" | |
factory = WebSocketClientFactory(endpoint) | |
# protocol field must be a callable object | |
# since we don't want to provide the class as protocol field | |
# because it would create one object of such class but without giving us | |
# the opportunity to pass arguments, then we create an object ourselves, passing all | |
# arguments we want, and implementing the __call__ so when it is called by autobahn | |
# framework, we return the instance we already created ourselves | |
factory.protocol = WsClientProtocol('task1-controller', 3.0) | |
coroutine = loop.create_connection(factory, "stream.binance.com", 9443, ssl=True) | |
loop.run_until_complete(coroutine) | |
loop.run_forever() | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment