Skip to content

Instantly share code, notes, and snippets.

@hellovertex
Created April 26, 2021 15:00
Show Gist options
  • Save hellovertex/05e2faadc649899e2115622afed4c9b8 to your computer and use it in GitHub Desktop.
Save hellovertex/05e2faadc649899e2115622afed4c9b8 to your computer and use it in GitHub Desktop.
Faust App that redirects websocket traffic via Kafka topic
"""
1. Launch Kafka:
- $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
- $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
run via faust -A <filename> worker -l info
2. Faust library:
faust.App.agent: - main processing actor in Faust App
"""
import faust
from database import DatabaseHandler
from config import DB_PATH
# establish connection to database at db_path, create it if does not exist
dbh = DatabaseHandler(DB_PATH)
# create table named 'trades' [only if it does not exist]
dbh.create_table_trades()
# todo: serialize incoming data and write to database
app = faust.App(
'client',
broker='kafka://localhost:9092',
value_serializer='raw',
autodiscover=False,
# topic_partitions=3,
# broker_commit_every=100,
# stream_buffer_maxsize=65536,
)
topic = app.topic('ticks')
@app.agent(topic)
async def echo(ticks: faust.Stream):
async for tick in ticks:
data_mock = []
dbh.insert_many('trades', data_mock)
print('echo from app 1 that wrote to db')
print(f'also {tick} has been written')
import sqlite3
from typing import Optional
import sys, getopt, os
class DatabaseHandler:
def __init__(self, abs_path: str):
""" Creates or uses existing database located in {abs_path} and stores its connection """
try:
self.conn = sqlite3.connect(str(abs_path))
assert self.conn is not None
except sqlite3.Error as e:
# database could not be created (or opened) -> abort
print(e)
sys.exit(1)
def create_table_trades(self) -> None:
try:
sql = 'CREATE TABLE trades(event_type TEXT, ' \
'event_time INT, ' \
'symbol TEXT, ' \
'trade_id INT NOT NULL PRIMARY KEY,' \
'price REAL,' \
'quantity REAL,' \
'buyer_order_id INT,' \
'seller_order_id INT,' \
'is_market_maker BOOLEAN CHECK (is_market_maker IN (0, 1)),' \
'ignore BOOLEAN CHECK (ignore IN (0, 1)))'
self.conn.cursor().execute(sql)
# conn.commit()
except sqlite3.Error as e:
print('creation failed:')
print(e)
def remove_table(self, tablename: str) -> None:
sql = "DROP TABLE {}".format(tablename)
try:
self.conn.cursor().executescript(sql)
self.conn.commit()
except sqlite3.Error as e:
print('removing failed:')
print(e)
def insert_many(self, tablename, data, mode='replace') -> bool:
assert mode in ['ignore', 'replace']
try:
if mode == "ignore":
self.conn.cursor().executemany(
"INSERT OR IGNORE INTO {} VALUES (?,?,?,?,?,?,?,?,?,?)".format(tablename), data)
elif mode == "replace":
self.conn.cursor().executemany(
"INSERT OR REPLACE INTO {} VALUES (?,?,?,?,?,?,?,?,?,?)".format(tablename), data)
self.conn.commit()
return True
except sqlite3.Error as e:
print('insertion failed:')
print(e)
return False
"""
1. Launch Kafka:
- $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
- $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
run via faust -A <filename> worker -l info
consumer1 run via faust -A <filename> worker -l info -p 6066
consumer2 run via faust -A <filename> worker -l info -p 6067
etc...
2. Faust library:
faust.App.agent: - main processing actor in Faust App
- unary async function - receives stream as its argument
faust.Stream: - async python generator
- abstractions over a kafka topic
- can apply operations on the stream (filter(), take(5))
faust.Record: - data transfer object: Represents events via python classes inheriting it
- serialization, deserialization
From: https://faust.readthedocs.io/en/latest/userguide/settings.html#guide-settings
app configuration:
broker: - only supported production transport is kafka://,
- uses the aiokafka client under the hood, for consuming and producing messages
- can specify multiple hosts, e.g. broker='kafka://kafka1.example.com:9092;kafka2.example.com:9092' [fault tolerance]
store: - default is memory://
- production should use rocksdb://
processing_guarantee: “at_least_once” (default) and “exactly_once”.
Note that if exactly-once processing is enabled consumers are configured with isolation.level="read_committed"
and producers are configured with retries=Integer.MAX_VALUE and enable.idempotence=true per default.
Note that by default exactly-once processing requires a cluster of at least three brokers what is the recommended setting for production.
For development you can change this, by adjusting broker setting transaction.state.log.replication.factor to the number of brokers you want to use.
autodiscover: set to false, see https://faust.readthedocs.io/en/latest/userguide/settings.html#autodiscover
"""
from kafka import KafkaProducer
from websockets.client import WebSocketClientProtocol
from websockets.exceptions import ConnectionClosed
from mode import Service
import websockets
import faust
from faust import App
import random
import numpy as np
import asyncio
import websocket
import time
import logging
sock_addr = "wss://stream.binance.com:9443/ws/btcusdt@trade"
class WebSocketClient():
def __init__(self, sock_addr, **kwargs):
self.producer = KafkaProducer()
self.sock_addr = sock_addr
# need to implement as service when we want to gracefully shutdown
async def connect(self):
'''
returns a WebSocketClientProtocol, used to send and receive messages
'''
self.connection = await websockets.client.connect(self.sock_addr)
if self.connection.open:
print('Connection stablished. Client correcly connected')
return self.connection
async def sendMessage(self, message):
await self.connection.send(message)
async def receiveMessage(self, connection: WebSocketClientProtocol):
while True:
try:
msg = await connection.recv()
self.producer.send(topic='ticks', value=f'{msg}'.encode())
except websockets.exceptions.ConnectionClosed:
print('Connection with server closed')
#todo: handle 24h disconnects
break
if __name__ == '__main__':
client = WebSocketClient("wss://stream.binance.com:9443/ws/btcusdt@trade")
loop = asyncio.get_event_loop()
# Start connection and get client connection protocol
connection = loop.run_until_complete(client.connect())
# Start listener
tasks = [
asyncio.ensure_future(client.receiveMessage(connection)),
]
loop.run_until_complete(asyncio.wait(tasks))
# producer = KafkaProducer()
# i = 0
# while True:
# producer.send(topic='ticks', value=str(i).encode())
# producer.send(topic='ticks_ml', value=str(i).encode())
# print(f'sent both msgs at iteration {i}')
# i+=1
# time.sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment