Created
June 28, 2018 10:51
-
-
Save yakneens/72c30f40b95f0c217386beba75fb6b0e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import random | |
import asyncio | |
from ib_insync import * | |
from collections import namedtuple | |
LiveFuturesSettings = namedtuple('LiveFuturesSettings', | |
'tickers, exchange, bar_size, whatToShow, influx_measurement, log_filename,') | |
class LiveFuturesGetter: | |
def __init__(self, ib: IB, futures_settings: LiveFuturesSettings): | |
self.settings = futures_settings | |
self.ib = ib | |
#These help keep track of which event handler applies | |
self.requestIds = [] | |
if not self.onBarUpdate in self.ib.barUpdateEvent: | |
self.ib.barUpdateEvent += self.onBarUpdate | |
def onBarUpdate(self, bars: BarDataList, hasNewBar: bool): | |
if hasNewBar and bars.reqId in self.requestIds: | |
new_bar = bars[-1] | |
print(f'{str(new_bar.time)} {bars.contract.symbol}') | |
del bars[:] | |
async def prepare_contracts(self): | |
# Get the first contract in all the continuous futures of interest | |
cont_fut = [el[0] for el in | |
await asyncio.gather(*[self.ib.qualifyContractsAsync(ContFuture(i, exchange=self.settings.exchange)) for i in | |
self.settings.tickers])] | |
# Get actual future contracts based on the continuous contracts | |
fut = [el[0] for el in await asyncio.gather(*[self.ib.qualifyContractsAsync(Future(conId=c.conId)) for c in | |
cont_fut])] | |
return fut | |
async def get_live_futures(self): | |
fut = await self.prepare_contracts() | |
for contract in fut: | |
self.requestIds.append(self.ib.client._reqIdSeq) | |
self.ib.reqRealTimeBars(contract, self.settings.bar_size, whatToShow=self.settings.whatToShow, useRTH=False) | |
def init_ib(): | |
ib = IB() | |
IB_PORT = 4002 | |
ib.connect('127.0.0.1', IB_PORT, clientId=int(random.random() * 100)) | |
return ib | |
async def my_main(ib, ): | |
try: | |
tasks = [ | |
LiveFuturesGetter(ib, LiveFuturesSettings(tickers=["ES", ], | |
exchange="GLOBEX", | |
bar_size=5, | |
whatToShow="TRADES", | |
influx_measurement="futures_5_sec_bars", | |
log_filename="get_live_futures")).get_live_futures(), | |
LiveFuturesGetter(ib, LiveFuturesSettings(tickers=["VIX"], | |
exchange="CFE", | |
bar_size=5, | |
whatToShow="TRADES", | |
influx_measurement="futures_5_sec_bars", | |
log_filename="get_live_futures")).get_live_futures()] | |
await asyncio.gather(*tasks) | |
except ValueError as e: | |
print(f"Arrived here {e}") | |
if __name__ == '__main__': | |
start_time = time.time() | |
ib = init_ib() | |
asyncio.ensure_future(my_main(ib, )) | |
IB.run() | |
print("Execution time was: {}".format(str(time.time() - start_time))) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment