Instructions
Created
November 10, 2017 09:31
-
-
Save vividvilla/5b548e42176edccb4eb1f274bfc59146 to your computer and use it in GitHub Desktop.
Kite connect Python ticker save to database example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE DATABASE ticks; | |
CREATE TABLE ticks ( | |
token integer NOT NULL, | |
date timestamp without time zone, | |
price double precision | |
); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Run celery workers | |
# celery -A db worker --loglevel=info | |
import sys | |
import json | |
import psycopg2 | |
import logging | |
from celery import Celery | |
from datetime import datetime | |
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, | |
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") | |
# Configure with your own broker | |
app = Celery("tasks", broker="redis://localhost:6379/4") | |
# Initialize db | |
db = psycopg2.connect(database="ticks", user="username", password="password", host="127.0.0.1", port="5432") | |
# Db insert statement | |
insert_tick_statement = "INSERT INTO ticks (date, token, price) VALUES (%(date)s, %(token)s, %(price)s)" | |
# Task to insert to SQLite db | |
@app.task | |
def insert_ticks(ticks): | |
c = db.cursor() | |
for tick in ticks: | |
c.execute(insert_tick_statement, { | |
"date": datetime.now(), | |
"token": tick["instrument_token"], | |
"price": tick["last_price"]}) | |
logging.info("Inserting ticks to db : {}".format(json.dumps(ticks))) | |
try: | |
db.commit() | |
except Exception: | |
db.rollback() | |
logging.exception("Couldn't write ticks to db: ") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import json | |
import logging | |
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, | |
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") | |
import time | |
from db import insert_ticks | |
from kiteconnect import WebSocket | |
# Initialise. | |
kws = WebSocket("api_key", "public_token", "zerodha_user_id") | |
# RELIANCE BSE, RELIANCE NSE, NIFTY 50, SENSEX | |
tokens = [128083204, 73856, 256265, 265] | |
# Callback for tick reception. | |
def on_tick(ticks, ws): | |
logging.info("on tick - {}".format(json.dumps(ticks))) | |
insert_ticks.delay(ticks) | |
# Callback for successful connection. | |
def on_connect(ws): | |
logging.info("Successfully connected to WebSocket") | |
def on_close(): | |
logging.info("WebSocket connection closed") | |
def on_error(): | |
logging.info("WebSocket connection thrown error") | |
# Assign the callbacks. | |
kws.on_tick = on_tick | |
kws.on_connect = on_connect | |
kws.on_close = on_close | |
kws.on_error = on_error | |
# Infinite loop on the main thread. Nothing after this will run. | |
# You have to use the pre-defined callbacks to manage subscriptions. | |
kws.connect(threaded=True) | |
# kws.connect(disable_ssl_verification=True) # for ubuntu | |
count = 0 | |
while True: | |
logging.info("This is main thread. Will subscribe to each token in tokens list with 5s delay") | |
if count < len(tokens): | |
if kws.is_connected(): | |
logging.info("Subscribing to: {}".format(tokens[count])) | |
kws.subscribe([tokens[count]]) | |
kws.set_mode(kws.MODE_LTP, [tokens[count]]) | |
count += 1 | |
else: | |
logging.info("Connecting to WebSocket...") | |
time.sleep(5) |
Hi vividvilla, how do we do this in SQLite3 as part of Python? When I try with SQLite3, it only shows the data being dumped into redis rdb and no data is being inserted into the sqlite3 table.
Hi Vividvilla,
I tried applying it but when I run it I get
2023-08-11 08:07:06.094807 WS Started
2023-08-11 08:07:06,658 - celery.utils.functional - DEBUG -
def insert_ticks(ticks):
return 1
Can you please suggest why it is not working? I am trying to save tick data as redis dictionary
I am not sure if this script will work with latest Celery, you might need to check that part. Also to debug start logging inside insert_ticks
to see if you are receiving ticks before writing it to Redis.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
You just replace the Celery job to write the CSV data to the disk.