Skip to content

Instantly share code, notes, and snippets.

@yakneens
Created June 28, 2018 10:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yakneens/72c30f40b95f0c217386beba75fb6b0e to your computer and use it in GitHub Desktop.
Save yakneens/72c30f40b95f0c217386beba75fb6b0e to your computer and use it in GitHub Desktop.
import time
import random
import asyncio
from ib_insync import *
from collections import namedtuple
LiveFuturesSettings = namedtuple('LiveFuturesSettings',
'tickers, exchange, bar_size, whatToShow, influx_measurement, log_filename,')
class LiveFuturesGetter:
def __init__(self, ib: IB, futures_settings: LiveFuturesSettings):
self.settings = futures_settings
self.ib = ib
#These help keep track of which event handler applies
self.requestIds = []
if not self.onBarUpdate in self.ib.barUpdateEvent:
self.ib.barUpdateEvent += self.onBarUpdate
def onBarUpdate(self, bars: BarDataList, hasNewBar: bool):
if hasNewBar and bars.reqId in self.requestIds:
new_bar = bars[-1]
print(f'{str(new_bar.time)} {bars.contract.symbol}')
del bars[:]
async def prepare_contracts(self):
# Get the first contract in all the continuous futures of interest
cont_fut = [el[0] for el in
await asyncio.gather(*[self.ib.qualifyContractsAsync(ContFuture(i, exchange=self.settings.exchange)) for i in
self.settings.tickers])]
# Get actual future contracts based on the continuous contracts
fut = [el[0] for el in await asyncio.gather(*[self.ib.qualifyContractsAsync(Future(conId=c.conId)) for c in
cont_fut])]
return fut
async def get_live_futures(self):
fut = await self.prepare_contracts()
for contract in fut:
self.requestIds.append(self.ib.client._reqIdSeq)
self.ib.reqRealTimeBars(contract, self.settings.bar_size, whatToShow=self.settings.whatToShow, useRTH=False)
def init_ib():
ib = IB()
IB_PORT = 4002
ib.connect('127.0.0.1', IB_PORT, clientId=int(random.random() * 100))
return ib
async def my_main(ib, ):
try:
tasks = [
LiveFuturesGetter(ib, LiveFuturesSettings(tickers=["ES", ],
exchange="GLOBEX",
bar_size=5,
whatToShow="TRADES",
influx_measurement="futures_5_sec_bars",
log_filename="get_live_futures")).get_live_futures(),
LiveFuturesGetter(ib, LiveFuturesSettings(tickers=["VIX"],
exchange="CFE",
bar_size=5,
whatToShow="TRADES",
influx_measurement="futures_5_sec_bars",
log_filename="get_live_futures")).get_live_futures()]
await asyncio.gather(*tasks)
except ValueError as e:
print(f"Arrived here {e}")
if __name__ == '__main__':
start_time = time.time()
ib = init_ib()
asyncio.ensure_future(my_main(ib, ))
IB.run()
print("Execution time was: {}".format(str(time.time() - start_time)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment