Forked from svishi/interactive_upstox_api_sample.py
Created
February 17, 2019 17:04
-
-
Save balu2903/9c2d413e0449f6df191e7f6267969fc1 to your computer and use it in GitHub Desktop.
Python SDK - Sample Code - Interactive API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from upstox_api.api import * | |
from datetime import datetime | |
from pprint import pprint | |
import os, sys | |
from tempfile import gettempdir | |
try: input = raw_input | |
except NameError: pass | |
u = None | |
s = None | |
break_symbol = '@' | |
profile = None | |
def main(): | |
global s, u | |
logged_in = False | |
print('Welcome to Upstox API!\n') | |
print('This is an interactive Python connector to help you understand how to get connected quickly') | |
print('The source code for this connector is publicly available') | |
print('To get started, please create an app on the Developer Console (developer.upstox.com)') | |
print('Once you have created an app, keep your app credentials handy\n') | |
stored_api_key = read_key_from_settings('api_key') | |
stored_access_token = read_key_from_settings('access_token') | |
if stored_access_token is not None and stored_api_key is not None: | |
print('You already have a stored access token: [%s] paired with API key [%s]' % (stored_access_token, stored_api_key)) | |
print('Do you want to use the above credentials?') | |
selection = input('Type N for no, any key for yes: ') | |
if selection.lower() != 'n': | |
try: | |
u = Upstox(stored_api_key, stored_access_token) | |
logged_in = True | |
except requests.HTTPError as e: | |
print('Sorry, there was an error [%s]. Let''s start over\n\n' % e) | |
if logged_in is False: | |
stored_api_key = read_key_from_settings('api_key') | |
if stored_api_key is not None: | |
api_key = input('What is your app''s API key [%s]: ' % stored_api_key) | |
if api_key == '': | |
api_key = stored_api_key | |
else: | |
api_key = input('What is your app''s API key: ') | |
write_key_to_settings('api_key', api_key) | |
stored_api_secret = read_key_from_settings('api_secret') | |
if stored_api_secret is not None: | |
api_secret = input('What is your app''s API secret [%s]: ' % stored_api_secret) | |
if api_secret == '': | |
api_secret = stored_api_secret | |
else: | |
api_secret = input('What is your app''s API secret: ') | |
write_key_to_settings('api_secret', api_secret) | |
stored_redirect_uri = read_key_from_settings('redirect_uri') | |
if stored_redirect_uri is not None: | |
redirect_uri = input('What is your app''s redirect_uri [%s]: ' % stored_redirect_uri) | |
if redirect_uri == '': | |
redirect_uri = stored_redirect_uri | |
else: | |
redirect_uri = input('What is your app''s redirect_uri: ') | |
write_key_to_settings('redirect_uri', redirect_uri) | |
s = Session(api_key) | |
s.set_redirect_uri(redirect_uri) | |
s.set_api_secret(api_secret) | |
print('\n') | |
print('Great! Now paste the following URL on your browser and type the code that you get in return') | |
print('URL: %s\n' % s.get_login_url()) | |
input('Press the enter key to continue\n') | |
code = input('What is the code you got from the browser: ') | |
s.set_code(code) | |
try: | |
access_token = s.retrieve_access_token() | |
except SystemError as se: | |
print('Uh oh, there seems to be something wrong. Error: [%s]' % se) | |
return | |
write_key_to_settings('access_token', access_token) | |
u = Upstox(api_key, access_token) | |
clear_screen() | |
show_home_screen() | |
def show_home_screen(): | |
global s, u | |
global profile | |
print('\n*** Welcome to Upstox API ***\n\n') | |
print('1. Get Profile\n') | |
print('2. Get Balance\n') | |
print('3. Get Positions\n') | |
print('4. Get Holdings\n') | |
print('5. Get Order History\n') | |
print('6. Get LTP Quote\n') | |
print('7. Get Full Quote\n') | |
print('8. Show socket example\n') | |
print('9. Quit\n') | |
selection = input('Select your option: \n') | |
try: | |
int(selection) | |
except: | |
clear_screen() | |
show_home_screen() | |
selection = int(selection) | |
clear_screen() | |
if selection == 1: | |
load_profile() | |
pprint(profile) | |
elif selection == 2: | |
pprint(u.get_balance()) | |
elif selection == 3: | |
pprint(u.get_positions()) | |
elif selection == 4: | |
pprint(u.get_holdings()) | |
elif selection == 5: | |
pprint(u.get_order_history()) | |
elif selection == 6: | |
product = select_product() | |
if product is not None: | |
pprint(u.get_live_feed(product, LiveFeedType.LTP)) | |
elif selection == 7: | |
product = select_product() | |
if product is not None: | |
pprint(u.get_live_feed(product, LiveFeedType.Full)) | |
elif selection == 8: | |
socket_example() | |
elif selection == 9: | |
sys.exit(0) | |
show_home_screen(); | |
def load_profile(): | |
global profile | |
# load user profile to variable | |
profile = u.get_profile() | |
def select_product(): | |
global u | |
exchange = select_exchange() | |
product = None | |
clear_screen() | |
while exchange is not None: | |
u.get_master_contract(exchange) | |
product = find_product(exchange) | |
clear_screen() | |
if product is not None: | |
break | |
exchange = select_exchange() | |
return product | |
def find_product(exchange): | |
found_product = False | |
result = None | |
while not found_product: | |
query = input('Type the symbol that you are looking for. Type %s to go back: ' % break_symbol) | |
if query.lower() == break_symbol: | |
found_product = True | |
result = None | |
break | |
results = u.search_instruments(exchange, query) | |
if len(results) == 0: | |
print('No results found for [%s] in [%s] \n\n' % (query, exchange)) | |
break | |
else: | |
for index, result in enumerate(results): | |
if index > 9: | |
break | |
print ('%d. %s' % (index, result.symbol)) | |
selection = input('Please make your selection. Type %s to go back: ' % break_symbol) | |
if query.lower() == break_symbol: | |
found_product = False | |
result = None | |
break | |
try: | |
selection = int(selection) | |
except ValueError: | |
found_product = False | |
result = None | |
break | |
if 0 <= selection <= 9 and len(results) >= selection + 1: | |
found_product = True | |
result = results[selection] | |
break | |
found_product = False | |
return result | |
def select_exchange(): | |
global profile | |
if profile is None: | |
load_profile() | |
back_to_home_screen = False | |
valid_exchange_selected = False | |
while not valid_exchange_selected: | |
print('** Live quote streaming example **\n') | |
for index, item in enumerate(profile[u'exchanges_enabled']): | |
print ('%d. %s' % (index + 1, item)) | |
print ('9. Back') | |
print('\n') | |
selection = input('Select exchange: ') | |
try: | |
selection = int(selection) | |
except ValueError: | |
break | |
if selection == 9: | |
valid_exchange_selected = True | |
back_to_home_screen = True | |
break | |
selected_index = selection - 1 | |
if 0 <= selected_index < len(profile[u'exchanges_enabled']): | |
valid_exchange_selected = True | |
break | |
if back_to_home_screen: | |
return None | |
return profile[u'exchanges_enabled'][selected_index] | |
def socket_example(): | |
print('Press Ctrl+C to return to the main screen\n') | |
u.set_on_quote_update(event_handler_quote_update) | |
u.get_master_contract('NSE_EQ') | |
try: | |
u.subscribe(u.get_instrument_by_symbol('NSE_EQ', 'TATASTEEL'), LiveFeedType.Full) | |
except: | |
pass | |
try: | |
u.subscribe(u.get_instrument_by_symbol('NSE_EQ', 'RELIANCE'), LiveFeedType.LTP) | |
except: | |
pass | |
u.start_websocket(False) | |
def event_handler_quote_update(message): | |
pprint("Quote Update: %s" % str(message)) | |
def clear_screen(): | |
os.system('cls' if os.name == 'nt' else 'clear') | |
def write_key_to_settings(key, value): | |
filename = os.path.join(gettempdir(), 'interactive_api.json') | |
try: | |
file = open(filename, 'r') | |
except IOError: | |
data = {"api_key" : "", "api_secret" : "", "redirect_uri" : "", "access_token" : ""} | |
with open(filename, 'w') as output_file: | |
json.dump(data, output_file) | |
file = open(filename, 'r') | |
try: | |
data = json.load(file) | |
except: | |
data = {} | |
data[key] = value | |
with open(filename, 'w') as output_file: | |
json.dump(data, output_file) | |
def read_key_from_settings(key): | |
filename = os.path.join(gettempdir(), 'interactive_api.json') | |
try: | |
file = open(filename, 'r') | |
except IOError: | |
file = open(filename, 'w') | |
file = open(filename, 'r') | |
try: | |
data = json.load(file) | |
return data[key] | |
except: | |
pass | |
return None | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment