Skip to content

Instantly share code, notes, and snippets.

@gnzsnz
Forked from Macfly/Main.py
Created December 8, 2023 21:32
Show Gist options
  • Save gnzsnz/dc58e56e16ce2c4ac16f2e9a4c302220 to your computer and use it in GitHub Desktop.
Save gnzsnz/dc58e56e16ce2c4ac16f2e9a4c302220 to your computer and use it in GitHub Desktop.
Async server with socketio and Ib_insync
import math
import asyncio
import logging, sys
import sqlite3
from asyncio import sleep
from logging.handlers import TimedRotatingFileHandler
import aiohttp_cors
import aiosqlite
import socketio
from aiohttp import web
from dateutil import tz
from ib_insync import *
from sys import platform
from aiogram import Bot
# initialization of the differents modules
if platform == "linux" or platform == "linux2":
import uvloop
uvloop.install()
sio = socketio.AsyncServer(async_mode='aiohttp')
app = web.Application()
sio.attach(app)
routes = web.RouteTableDef()
API_TOKEN = ''
CHATROOM_ID =
bot = Bot(token=API_TOKEN)
logname = "logs/my_app.log"
FORMAT = '%(asctime)s|%(name)s|%(levelname)s: %(message)s'
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = TimedRotatingFileHandler(logname, when="midnight", interval=1)
formatter = logging.Formatter(FORMAT)
handler.setFormatter(formatter)
logger.addHandler(handler)
local_tz = tz.tzlocal()
# live update setup
@sio.on('connect')
async def connect(sid, message):
logging.info('Client connected')
@sio.on('disconnect')
async def disconnect(sid):
logging.info('Client disconnected')
# different function to get parameters and information
@routes.get('/position')
async def get_position(request):
resp = web.json_response(positions())
return resp
@routes.get('/get_open_trades')
async def get_open_trades(request):
open_trades = []
for open_trade in app['ib'].openTrades():
open_trades.append({'orderId': open_trade.order.orderId,
'quantity': open_trade.order.totalQuantity,
'action': open_trade.order.action,
'limitPrice': open_trade.order.lmtPrice})
return web.json_response(open_trades)
@routes.get('/cancel_all')
async def login(request):
logging.info('cancel_all')
await cancel_all()
return web.json_response({'success': True})
def round_limit_order(x):
return round(x * 4) / 4
def positions():
pos = app['ib'].positions()
if pos:
return {'avgCost': pos[0].avgCost / float(app['mesInfo'].contract.multiplier),
'position': int(pos[0].position)}
else:
return {'avgCost': 0.0,
'position': 0}
async def cancel_all():
for open_trade in app['ib'].openTrades():
app['ib'].cancelOrder(open_trade.order)
logging.info('cancel all done')
await sio.emit('open_trade', {})
async def send_message_to_telegram_room(message):
await bot.send_message(CHATROOM_ID, message)
# functions that listen to live events from IB
async def ticker_listener(app):
async for q in app['ib'].pendingTickersEvent.aiter(skip_to_last=True):
t = next(iter(q))
# logging.info(t)
openT = t.last
close = t.last
if not math.isnan(t.open):
openT = t.open
if not math.isnan(t.close):
close = t.close
await sio.emit('quote',
{'bid': t.bid, 'bidSize': t.bidSize,
'ask': t.ask, 'askSize': t.askSize, 'last': t.last,
'open': openT, 'close': close})
async def pnl_listener(app):
async for pnl in app['ib'].pnlEvent.aiter(skip_to_last=True):
# logging.info(pnl)
await sio.emit('pnl',
{'dailyPnL': pnl.dailyPnL, 'unrealizedPnL': pnl.unrealizedPnL,
'realizedPnL': pnl.realizedPnL})
async def exec_listener(app):
async for trade in app['ib'].orderStatusEvent:
if trade.orderStatus.status == 'Submitted':
logging.info('order submitted: {}'.format(trade.order))
await sio.emit('open_trade',
{'orderId': trade.order.orderId,
'quantity': trade.order.totalQuantity,
'action': trade.order.action,
'limitPrice': trade.order.lmtPrice})
if trade.orderStatus.status == 'Cancelled':
logging.info('order cancelled: {}'.format(trade))
if trade.orderStatus.status == 'Filled':
logging.info('order Filled: {}'.format(trade.order))
position = positions()
await sio.emit('new_trade', trade)
await sio.emit('position', position)
await bot.send_message(CHATROOM_ID,
f"new trade: "
f"\nposition: {position['position']})
# initialization functions
async def startup_tasks(app):
app['db'] = await aiosqlite.connect('database.db')
app['quote_listener'] = asyncio.create_task(ticker_listener(app))
app['pnl_listener'] = asyncio.create_task(pnl_listener(app))
app['exec_listener'] = asyncio.create_task(exec_listener(app))
async def shutdown_tasks(app):
logging.info('shutdown_tasks cancelling all orders')
await cancel_all()
async def start():
if not app['app_params']['started']:
logging.info('start function')
if not app['ib'].isConnected():
logging.info('connect to IB')
await app['ib'].connectAsync('127.0.0.1', 7497, clientId=0)
mes_contract = await app['ib'].reqContractDetailsAsync(, 'GLOBEX'))
app['mesInfo'] = mes_contract[0]
app['ib'].reqMktData(app['mesInfo'].contract, '', False, False)
app['ib'].reqPnL(app['ACCOUNT_NUMBER'])
await app['ib'].reqAccountSummaryAsync()
app['app_params']['started'] = True
await send_launch_orders()
async def stop():
if app['app_params']['started']:
logging.info('stopping')
app['ib'].cancelMktData(contract=app['mesInfo'].contract)
app['ib'].cancelPnL(app['ACCOUNT_NUMBER'])
await cancel_all()
app['app_params']['started'] = False
def init_parameters():
logging.info('init_parameters')
def init_ib():
logging.info('init_ib')
app['ib'] = IB()
app['ib'].connect('127.0.0.1', 4003, clientId=0)
app['ib'].reqMarketDataType(3)
app['mesInfo'] = app['ib'].reqContractDetails(, 'GLOBEX'))[0]
app['ib'].reqMktData(app['mesInfo'].contract, '', False, False)
app['app_params'] = {}
app['app_params']['started'] = False
# Configure default CORS settings.
cors = aiohttp_cors.setup(app, defaults={
"*": aiohttp_cors.ResourceOptions(
allow_credentials=True,
expose_headers="*",
allow_headers="*",
allow_methods=["POST", "PUT", "GET"]
)
})
if __name__ == '__main__':
app.router.add_routes(routes)
for route in list(app.router.routes()):
if route.resource.canonical != '/socket.io/':
cors.add(route)
logging.info('starting')
init_parameters()
init_ib()
app.on_startup.append(startup_tasks)
app.on_shutdown.append(shutdown_tasks)
web.run_app(app, port=5000, access_log=None)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment