Skip to content

Instantly share code, notes, and snippets.

@vividvilla
Created November 10, 2017 09:31
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save vividvilla/5b548e42176edccb4eb1f274bfc59146 to your computer and use it in GitHub Desktop.
Save vividvilla/5b548e42176edccb4eb1f274bfc59146 to your computer and use it in GitHub Desktop.
Kite connect Python ticker save to database example

Instructions

CREATE DATABASE ticks;
CREATE TABLE ticks (
token integer NOT NULL,
date timestamp without time zone,
price double precision
);
# Run celery workers
# celery -A db worker --loglevel=info
import sys
import json
import psycopg2
import logging
from celery import Celery
from datetime import datetime
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
# Configure with your own broker
app = Celery("tasks", broker="redis://localhost:6379/4")
# Initialize db
db = psycopg2.connect(database="ticks", user="username", password="password", host="127.0.0.1", port="5432")
# Db insert statement
insert_tick_statement = "INSERT INTO ticks (date, token, price) VALUES (%(date)s, %(token)s, %(price)s)"
# Task to insert to SQLite db
@app.task
def insert_ticks(ticks):
c = db.cursor()
for tick in ticks:
c.execute(insert_tick_statement, {
"date": datetime.now(),
"token": tick["instrument_token"],
"price": tick["last_price"]})
logging.info("Inserting ticks to db : {}".format(json.dumps(ticks)))
try:
db.commit()
except Exception:
db.rollback()
logging.exception("Couldn't write ticks to db: ")
import sys
import json
import logging
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
import time
from db import insert_ticks
from kiteconnect import WebSocket
# Initialise.
kws = WebSocket("api_key", "public_token", "zerodha_user_id")
# RELIANCE BSE, RELIANCE NSE, NIFTY 50, SENSEX
tokens = [128083204, 73856, 256265, 265]
# Callback for tick reception.
def on_tick(ticks, ws):
logging.info("on tick - {}".format(json.dumps(ticks)))
insert_ticks.delay(ticks)
# Callback for successful connection.
def on_connect(ws):
logging.info("Successfully connected to WebSocket")
def on_close():
logging.info("WebSocket connection closed")
def on_error():
logging.info("WebSocket connection thrown error")
# Assign the callbacks.
kws.on_tick = on_tick
kws.on_connect = on_connect
kws.on_close = on_close
kws.on_error = on_error
# Infinite loop on the main thread. Nothing after this will run.
# You have to use the pre-defined callbacks to manage subscriptions.
kws.connect(threaded=True)
# kws.connect(disable_ssl_verification=True) # for ubuntu
count = 0
while True:
logging.info("This is main thread. Will subscribe to each token in tokens list with 5s delay")
if count < len(tokens):
if kws.is_connected():
logging.info("Subscribing to: {}".format(tokens[count]))
kws.subscribe([tokens[count]])
kws.set_mode(kws.MODE_LTP, [tokens[count]])
count += 1
else:
logging.info("Connecting to WebSocket...")
time.sleep(5)
@akshaythakker
Copy link

Hi vividvilla. Firstly thanks for such an eloquent code. How can we write the tick data to a csv instead of an sql? Can you please help?

Thanks

@hemachandar519
Copy link

Hi Vividvilla, Thanks for posting this.. looking forward for more content 👍

@RajasthaniRudraksh
Copy link

In insert_ticks.delay(), What is delay() function here. insert_ticks(ticks) doesn't works?

@vividvilla
Copy link
Author

In insert_ticks.delay(), What is delay() function here. insert_ticks(ticks) doesn't works?

You have run this file as Celery job. task.delay makes the insertion async.

@vividvilla
Copy link
Author

Hi vividvilla. Firstly thanks for such an eloquent code. How can we write the tick data to a csv instead of an sql? Can you please help?

You just replace the Celery job to write the CSV data to the disk.

@alternativehypothesis
Copy link

Hi vividvilla, how do we do this in SQLite3 as part of Python? When I try with SQLite3, it only shows the data being dumped into redis rdb and no data is being inserted into the sqlite3 table.

@sachinglife
Copy link

Hi Vividvilla,

I tried applying it but when I run it I get

2023-08-11 08:07:06.094807 WS Started
2023-08-11 08:07:06,658 - celery.utils.functional - DEBUG -
def insert_ticks(ticks):
return 1

Can you please suggest why it is not working? I am trying to save tick data as redis dictionary

@vividvilla
Copy link
Author

I am not sure if this script will work with latest Celery, you might need to check that part. Also to debug start logging inside insert_ticks to see if you are receiving ticks before writing it to Redis.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment