Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save oldmonkABA/286c4f3080610b2d5ff48480d13579bf to your computer and use it in GitHub Desktop.
Save oldmonkABA/286c4f3080610b2d5ff48480d13579bf to your computer and use it in GitHub Desktop.
Automatic login to kiteconnect using only requests library. Due credits to https://gist.github.com/GannyS/619e29dc999ffd014e80ec69ae190b93
# -*- coding: utf-8 -*-
import json
import os
import logmatic
import logging
from logging.config import dictConfig
import requests
from http import HTTPStatus
import re
from kiteconnect import KiteConnect
import os
import datetime
__author__ = "oldmonkABA"
# TODO: These can be passed in as command line arguments. Make this change later!
# Name of file containing configuration information for the logging framework
LOG_FILE_NAME = "logging.json"
# Name of file containing KITE connection information
CONNECTION_INFO_FILE_NAME = "connection_info.json"
# Logger for this module
LOGGER = logging.getLogger(__name__)
def init_logging():
"""
Initialize the logging framework with the configuration
:return: Nothing
:rtype: None
"""
here = os.path.dirname(__file__)
log_fp = os.path.join(here, LOG_FILE_NAME)
if not os.path.exists(log_fp):
raise FileNotFoundError("Logging configuration file not found!")
with open(log_fp, mode="r") as log_config:
dictConfig(config=json.load(log_config))
LOGGER.info("Logging framework initialized!")
def load_kite_config():
"""
Load the kite configuration information from the json file
:return: A python dictionary
:rtype: dict
"""
# Get the directory where this python script is being executed
here = os.path.dirname(__file__)
connect_fp = os.path.join(here, CONNECTION_INFO_FILE_NAME)
if not os.path.exists(connect_fp):
raise FileNotFoundError("Connection information file not found!")
with open(connect_fp, mode="r") as connect_file:
config = json.load(connect_file)
LOGGER.info("Kite connection information loaded successfully from file!")
return config
def need_to_generate_token(file):
flag = False
here = os.path.dirname(__file__)
fp = os.path.join(here, file)
login_time=''
if os.path.isfile(fp):
print(fp+" present in pwd")
with open(fp, 'r') as f:
data = json.load(f)
LOGGER.info("Previous login time for "+data["user_name"]+" is "+str(data["login_time"]))
login_time = datetime.datetime.strptime(data["login_time"],"%Y-%m-%d %H:%M:%S")
today = datetime.datetime.now()
cut_off_time = datetime.datetime(today.year,today.month,today.day,7,00,00)
if login_time > cut_off_time:
LOGGER.info("Acces token is fresh")
else:
os.remove(fp)
flag=True
LOGGER.info(file+" has been deleted.")
else:
LOGGER.info(file+" does not exist in pwd")
flag = True
return flag,login_time
def kite_prelogin(config, http_session):
"""
Perform pre-login into kite
:param config: The python dictionary containing kite configuration information
:type config: dict
:param http_session: A http session
:type http_session: :class:`requests.Session`
:return: The response referer url
:rtype: str
"""
url = config["LOGIN_REFERER"] + config["KITE_API_KEY"]
response = http_session.get(url=url)
return response.url
def login_kite(config, http_session):
"""
Perform a login
:param config: The python dictionary containing kite configuration information
:type config: dict
:param http_session: A http session
:type http_session: :class:`requests.Session`
:return: The response payload as a python dictionary
:rtype: dict
"""
url = config["LOGIN_URL"]
data = dict()
data["user_id"] = config["USER"]
data["password"] = config["PASSWORD"]
response = http_session.post(url=url, data=data)
print(response.content)
# Deserialize the response content
resp_dict = json.loads(response.content)
if "message" in resp_dict.keys():
# Since logging framework already expects message as a parameter change the key
resp_dict["err_message"] = resp_dict["message"]
del resp_dict["message"]
if response.status_code != HTTPStatus.OK:
LOGGER.error("Login failure", extra=resp_dict)
raise ConnectionError("Login failed!")
return resp_dict
def kite_twofa(login_resp, config, http_session):
"""
Perform kite-two-factor authentication
:param login_resp: The response payload from the primary user login
:type login_resp: dict
:param config: The python dictionary containing kite configuration information
:type config: dict
:param http_session: A http session
:type http_session: :class:`requests.Session`
:return: The response payload as a python dictionary
:rtype: dict
"""
url = config["TWOFA_URL"]
data = dict()
data["user_id"] = config["USER"]
data["request_id"] = login_resp["data"]["request_id"]
data["twofa_value"] = str(config["PIN"])
response = http_session.post(url=url, data=data)
# Deserialize the response content
resp_dict = json.loads(response.content)
if "message" in resp_dict.keys():
# Since logging framework already expects message as a parameter change the key
resp_dict["err_message"] = resp_dict["message"]
del resp_dict["message"]
if response.status_code != HTTPStatus.OK:
LOGGER.error("Two-factor authentication failure", extra=resp_dict)
raise ConnectionError("Two-factor authentication failed!")
return resp_dict
def kite_post_twofa(url, http_session):
"""
Perform action after kite-two-factor authentication
:param login_resp: The response payload from the primary user login
:type login_resp: dict
:param http_session: A http session
:type http_session: :class:`requests.Session`
:return: The response payload as a python dictionary
:rtype: dict
"""
url = url+"&skip_session=true"
#data = dict()
#data["user_id"] = config["USER"]
#data["request_id"] = login_resp["data"]["request_id"]
#data["twofa_value"] = str(config["PIN"])
response = http_session.get(url=url, allow_redirects=False)
if response.status_code == 302:
reply = response.headers["Location"]
request_token = re.findall(r'request_token=(.*)&action',reply)[0]
# Deserialize the response content
return request_token
def generate_access_token(file,config,request_token):
here = os.path.dirname(__file__)
fp = os.path.join(here, file)
kite = KiteConnect(api_key=config["KITE_API_KEY"])
data = kite.generate_session(request_token, api_secret=config["KITE_API_SECRET"])
user_data = json.dumps(data, indent=4, sort_keys=True,default=str)
#print(data["user_name"],data["login_time"],data["access_token"])
with open(fp, "w") as outfile:
outfile.write(user_data)
#time.sleep(5)
LOGGER.info("Automatic login for "+data["user_name"]+" is done. "+file+" has been written to disk")
if __name__ == "__main__":
# Initialize logging framework
init_logging()
# Load the kite configuration information
kite_config = load_kite_config()
file = 'access_credentials.json'
generate_token,login_time = need_to_generate_token(file)
if generate_token :
sess = requests.Session()
# Attempt pre-login
ref_url = kite_prelogin(config=kite_config, http_session=sess)
# Attempt a login and get the response as a dictionary
user_pass_login_resp = login_kite(config=kite_config, http_session=sess)
LOGGER.info("Login successful!")
# Attempt two-factor auth
two_fa_resp = kite_twofa(login_resp=user_pass_login_resp, config=kite_config, http_session=sess)
LOGGER.info("Two-factor authentication passed!", extra=two_fa_resp)
request_token = kite_post_twofa(url=ref_url,http_session=sess)
LOGGER.info("Generated request token = %s",str(request_token))
generate_access_token(file,kite_config,request_token)
else:
LOGGER.info("Access token is valid till next day 7 am from "+str(login_time))
{
"KITE_API_KEY": "",
"USER": "",
"PASSWORD": "",
"PIN": "",
"KITE_API_SECRET":"",
"LOGIN_REFERER": "https://kite.trade/connect/login?v=3&api_key=",
"LOGIN_URL": "https://kite.zerodha.com/api/login",
"TWOFA_URL": "https://kite.zerodha.com/api/twofa"
}
{
"version": 1,
"disable_existing_loggers": false,
"formatters": {
"package_formatter": {
"format": "[%(asctime)s] - [%(levelname)s] - [%(name)s] : %(message)s"
},
"json": {
"()": "logmatic.JsonFormatter"
}
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": "DEBUG",
"formatter": "json"
}
},
"loggers": {
},
"root": {
"level": "INFO",
"handlers": [
"console"
]
}
}
certifi==2020.4.5.1
chardet==3.0.4
idna==2.9
logmatic-python==0.1.7
python-json-logger==0.1.11
requests==2.23.0
urllib3==1.25.8
@ankit179
Copy link

Hi, really appreciate for your reply in the other post. Your selenium code for auto logging works flawless but difficult to implement on google cloud. I am trying to get this code working but getting the same error despite saving logging.json file in the same folder:
Traceback (most recent call last):
File "C:\Users\A\AppData\Local\Programs\Python\Python36\lib\logging\config.py", line 382, in resolve
found = getattr(found, frag)
AttributeError: module 'logmatic' has no attribute 'JsonFormatter'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "C:\Users\A\AppData\Local\Programs\Python\Python36\lib\logging\config.py", line 384, in resolve
self.importer(used)
ModuleNotFoundError: No module named 'logmatic.JsonFormatter'

Would appreciate your help,
Thanks,
Ankit.

@oldmonkABA
Copy link
Author

@rahulmr
Copy link

rahulmr commented Sep 19, 2020

@ankit179 use jugaad python library search on github.com it is good for cloud. No API access required.

@krishnakatyal
Copy link

access_credentials.json what is that file ?

@phanendrakmv
Copy link

response = http_session.get(url=url, allow_redirects=False)
if response.status_code == 302:
reply = response.headers["Location"]

This code under kite_post_twofa, will not work. There are two redirects for that url, but due to redirect=False, it is stopping at the first one. So you will not get location details in the response headers.

@vivekruhela
Copy link

I am not getting any success on the automatic generation of the access token. I am getting an error of list index out of range in function kite_post_two_fa because of the value of response.headers['Location'] is https://kite.zerodha.com/connect/finish?api_key=0xxxxxxxxxrxxxxx&sess_id=mVW5XlgjxH5qIP6jRSU0Dmi9xqrZLl9g (i.e. the URL does not contain the key work request_token=)

Althouhg I am getting output of {'status': 'success', 'data': {'profile': {}}} on 'kite_twofa function that means till TWOFA , the script is working fine but when I add &skip_session=true to ref_url, kite terminate the session. Any idea how to solve this issue? Thanks.

@ankit179
Copy link

Use Jugaad trader....... Much simpler but you get only 1 active session.

@majaylcu
Copy link

I am not getting any success on the automatic generation of the access token. I am getting an error of list index out of range in function kite_post_two_fa because of the value of response.headers['Location'] is https://kite.zerodha.com/connect/finish?api_key=0xxxxxxxxxrxxxxx&sess_id=mVW5XlgjxH5qIP6jRSU0Dmi9xqrZLl9g (i.e. the URL does not contain the key work request_token=)

Althouhg I am getting output of {'status': 'success', 'data': {'profile': {}}} on 'kite_twofa function that means till TWOFA , the script is working fine but when I add &skip_session=true to ref_url, kite terminate the session. Any idea how to solve this issue? Thanks.

def kite_post_twofa(url, http_session):
"""
Perform action after kite-two-factor authentication
:param login_resp: The response payload from the primary user login
:type login_resp: dict
:param http_session: A http session
:type http_session: :class:requests.Session
:return: The response payload as a python dictionary
:rtype: dict
"""
url = url + "&skip_session=true"
# data = dict()
# data["user_id"] = config["USER"]
# data["request_id"] = login_resp["data"]["request_id"]
# data["twofa_value"] = str(config["PIN"])
request_token = ''
try:
response = http_session.get(url=url, allow_redirects=True)
except Exception as e:
pat = re.compile(r'request_token=(.*?)\s+')
request_token = re.search(pat, str(e)).group(1).strip()

# Deserialize the response content
return request_token

This modification worked for me.

@rajivgpta
Copy link

I just now created a simple version that works with external 2FA TOTP: https://github.com/rajivgpta/kite-api-autologin . Working as of 23 feb 2023.

@chsivateja
Copy link

chsivateja commented Sep 29, 2023

I just now created a simple version that works with external 2FA TOTP: https://github.com/rajivgpta/kite-api-autologin . Working as of 23 feb 2023.

Hi, is this still working for you?
I am receiving below error while using it.

request_token = parse_qs(urlparse(response).query)['request_token'][0]
KeyError: 'request_token' 

I noticed my url is as below, not sure why request_token is not part of it.
'https://kite.zerodha.com/connect/login?sess_id=*******&api_key=*****7&skip_session=true'

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment