Skip to content

Instantly share code, notes, and snippets.

@jonatas
Created December 3, 2021 13:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jonatas/0f1baa51e7e0ac194aadd11923dffee2 to your computer and use it in GitHub Desktop.
Save jonatas/0f1baa51e7e0ac194aadd11923dffee2 to your computer and use it in GitHub Desktop.
websocket-client
psycopg2
#https://pypi.org/project/websocket_client/
from datetime import datetime
import websocket
import json
import os
import psycopg2
config = {'DB_USER': os.environ['DB_USER'],
'DB_PASS': os.environ['DB_PASS'],
'DB_HOST': os.environ['DB_HOST'],
'DB_PORT': os.environ['DB_PORT'],
'DB_NAME': os.environ['DB_NAME'],
'API_KEY': os.environ['API_KEY']}
conn = psycopg2.connect(database=config['DB_NAME'],
host=config['DB_HOST'],
user=config['DB_USER'],
password=config['DB_PASS'],
port=config['DB_PORT'])
conn.autocommit = True
cursor = conn.cursor()
cursor.execute(
f"""
CREATE TABLE IF NOT EXISTS trades (
time TIMESTAMPTZ NOT NULL,
symbol TEXT,
price FLOAT4,
volume FLOAT4
);
SELECT create_hypertable('trades', 'time', if_not_exists => true);
CREATE MATERIALIZED VIEW IF NOT EXISTS ohlc_1m
WITH (timescaledb.continuous) AS
SELECT time_bucket('1m', time) AS bucket,
symbol,
FIRST(price, time) AS open,
MAX(price) AS high,
MIN(price) AS low,
LAST(price, time) AS close,
COUNT(1) as events,
SUM(volume) AS volume FROM trades
GROUP BY 1, 2
WITH DATA;
SELECT add_continuous_aggregate_policy('ohlc_1m',
start_offset => INTERVAL '1 month',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 minute',
if_not_exists => true);
""")
# CREATE MATERIALIZED VIEW trades_per_second
# WITH (timescaledb.continuous)
# AS SELECT symbol,
# time_bucket('1s', time) as bucket,
# counter_agg(time, price, time_bucket_range('1s'::interval, time))
# FROM trades
# GROUP BY 1, 2 WITH DATA;
#
# SELECT symbol, time_bucket('1m', bucket),
# delta( rollup(counter_agg))
# FROM trades_per_second
# GROUP BY 1,2;
INSERT = f"""
INSERT INTO trades ( time, symbol, price, volume)
VALUES (%s, %s, %s, %s);
"""
conn.commit()
cursor.close()
# Message payload example
# {"data":[
# {"c":null,"p":5.583,"s":"BINANCE:USDTBRL","t":1636631765675,"v":8060.2},
# {"c":null,"p":5.583,"s":"BINANCE:USDTBRL","t":1636631765773,"v":298.5},
# {"c":null,"p":5.583,"s":"BINANCE:USDTBRL","t":1636631765995,"v":143.3}
# ], "type":"trade"}
def on_message(ws, message):
#print(message)
data = json.loads(message)
if data["type"] == "trade":
for t in data["data"]:
on_trade(t)
def on_trade(event):
time = datetime.utcfromtimestamp(event['t'] / 1e3).strftime('%Y-%m-%d %H:%M:%S')
trade = (time, event['s'], event['p'], event['v'])
print(f"\r {trade}", end='')
insert_trade(trade)
def insert_trade(event):
cursor = conn.cursor()
result = cursor.execute(INSERT, event)
conn.commit()
cursor.close()
def on_error(ws, error):
print(error)
ws.close()
start()
def on_close(ws):
print("### closed ###")
start()
def on_open(ws):
# ws.send('{"type":"subscribe","symbol":"AMZN"}')
# ws.send('{"type":"subscribe","symbol":"BINANCE:BTCUSDT"}')
for symbol in ["USDTBRL","BTCUSDT", "SOLUSDT"]:
subscription = '{"type":"subscribe","symbol": "BINANCE:'+symbol+'"}'
ws.send(subscription)
#ws.send('{"type":"subscribe","symbol":"IC MARKETS:1"}')
def start():
print("starting...")
url = f"wss://ws.finnhub.io?token={config['API_KEY']}"
ws = websocket.WebSocketApp(url,
on_message = on_message,
on_error = on_error,
on_close = on_close)
ws.on_open = on_open
ws.run_forever()
if __name__ == "__main__":
start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment