Created
December 3, 2021 13:41
-
-
Save jonatas/0f1baa51e7e0ac194aadd11923dffee2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
websocket-client | |
psycopg2 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#https://pypi.org/project/websocket_client/ | |
from datetime import datetime | |
import websocket | |
import json | |
import os | |
import psycopg2 | |
config = {'DB_USER': os.environ['DB_USER'], | |
'DB_PASS': os.environ['DB_PASS'], | |
'DB_HOST': os.environ['DB_HOST'], | |
'DB_PORT': os.environ['DB_PORT'], | |
'DB_NAME': os.environ['DB_NAME'], | |
'API_KEY': os.environ['API_KEY']} | |
conn = psycopg2.connect(database=config['DB_NAME'], | |
host=config['DB_HOST'], | |
user=config['DB_USER'], | |
password=config['DB_PASS'], | |
port=config['DB_PORT']) | |
conn.autocommit = True | |
cursor = conn.cursor() | |
cursor.execute( | |
f""" | |
CREATE TABLE IF NOT EXISTS trades ( | |
time TIMESTAMPTZ NOT NULL, | |
symbol TEXT, | |
price FLOAT4, | |
volume FLOAT4 | |
); | |
SELECT create_hypertable('trades', 'time', if_not_exists => true); | |
CREATE MATERIALIZED VIEW IF NOT EXISTS ohlc_1m | |
WITH (timescaledb.continuous) AS | |
SELECT time_bucket('1m', time) AS bucket, | |
symbol, | |
FIRST(price, time) AS open, | |
MAX(price) AS high, | |
MIN(price) AS low, | |
LAST(price, time) AS close, | |
COUNT(1) as events, | |
SUM(volume) AS volume FROM trades | |
GROUP BY 1, 2 | |
WITH DATA; | |
SELECT add_continuous_aggregate_policy('ohlc_1m', | |
start_offset => INTERVAL '1 month', | |
end_offset => INTERVAL '1 minute', | |
schedule_interval => INTERVAL '1 minute', | |
if_not_exists => true); | |
""") | |
# CREATE MATERIALIZED VIEW trades_per_second | |
# WITH (timescaledb.continuous) | |
# AS SELECT symbol, | |
# time_bucket('1s', time) as bucket, | |
# counter_agg(time, price, time_bucket_range('1s'::interval, time)) | |
# FROM trades | |
# GROUP BY 1, 2 WITH DATA; | |
# | |
# SELECT symbol, time_bucket('1m', bucket), | |
# delta( rollup(counter_agg)) | |
# FROM trades_per_second | |
# GROUP BY 1,2; | |
INSERT = f""" | |
INSERT INTO trades ( time, symbol, price, volume) | |
VALUES (%s, %s, %s, %s); | |
""" | |
conn.commit() | |
cursor.close() | |
# Message payload example | |
# {"data":[ | |
# {"c":null,"p":5.583,"s":"BINANCE:USDTBRL","t":1636631765675,"v":8060.2}, | |
# {"c":null,"p":5.583,"s":"BINANCE:USDTBRL","t":1636631765773,"v":298.5}, | |
# {"c":null,"p":5.583,"s":"BINANCE:USDTBRL","t":1636631765995,"v":143.3} | |
# ], "type":"trade"} | |
def on_message(ws, message): | |
#print(message) | |
data = json.loads(message) | |
if data["type"] == "trade": | |
for t in data["data"]: | |
on_trade(t) | |
def on_trade(event): | |
time = datetime.utcfromtimestamp(event['t'] / 1e3).strftime('%Y-%m-%d %H:%M:%S') | |
trade = (time, event['s'], event['p'], event['v']) | |
print(f"\r {trade}", end='') | |
insert_trade(trade) | |
def insert_trade(event): | |
cursor = conn.cursor() | |
result = cursor.execute(INSERT, event) | |
conn.commit() | |
cursor.close() | |
def on_error(ws, error): | |
print(error) | |
ws.close() | |
start() | |
def on_close(ws): | |
print("### closed ###") | |
start() | |
def on_open(ws): | |
# ws.send('{"type":"subscribe","symbol":"AMZN"}') | |
# ws.send('{"type":"subscribe","symbol":"BINANCE:BTCUSDT"}') | |
for symbol in ["USDTBRL","BTCUSDT", "SOLUSDT"]: | |
subscription = '{"type":"subscribe","symbol": "BINANCE:'+symbol+'"}' | |
ws.send(subscription) | |
#ws.send('{"type":"subscribe","symbol":"IC MARKETS:1"}') | |
def start(): | |
print("starting...") | |
url = f"wss://ws.finnhub.io?token={config['API_KEY']}" | |
ws = websocket.WebSocketApp(url, | |
on_message = on_message, | |
on_error = on_error, | |
on_close = on_close) | |
ws.on_open = on_open | |
ws.run_forever() | |
if __name__ == "__main__": | |
start() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment