Skip to content

Instantly share code, notes, and snippets.

@rolangom
Created October 28, 2021 11:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rolangom/ca1815d063fe95cbc2530523f18e56c1 to your computer and use it in GitHub Desktop.
Save rolangom/ca1815d063fe95cbc2530523f18e56c1 to your computer and use it in GitHub Desktop.
rx_fetch_ib_data
from ibapi.wrapper import EWrapper
from ibapi import utils
from ibapi.client import EClient, Contract
from ibapi.common import TickerId, BarData, TagValueList
from ibapi.utils import iswrapper
import rx
import rx.operators as rx_op
from rx.subject import AsyncSubject, Subject, BehaviorSubject, ReplaySubject
from rx.core.observable import Observable
import time
from typing import List, Optional, Union, Dict, Tuple, Callable, Any
import datetime as dt
import psycopg2
import numpy as np
import pandas as pd
import sys
import logging
def make_contract(symbol: str, sec_type: str = "STK", currency: str = 'USD', exchange: str = 'SMART',
primaryExchange: str = "ISLAND", localsymbol: str = None) -> Contract:
contract = Contract()
contract.symbol = symbol
contract.secType = sec_type
contract.currency = currency
contract.exchange = exchange
contract.primaryExchange = primaryExchange
if localsymbol:
contract.localSymbol = localsymbol
return contract
class AlphaClientWrapper(EClient, EWrapper):
def __init__(self):
EClient.__init__(self, wrapper=self)
EWrapper.__init__(self)
self.request_id = 0
self.started = False
self.next_valid_order_id = None
self.requests: Dict[int, Contract] = {}
self._subjects: Dict[int, Subject[(Contract, BarData)]] = {}
self.connected: BehaviorSubject[bool] = BehaviorSubject(False)
def next_request_id(self, contract: Contract) -> int:
self.request_id += 1
self.requests[self.request_id] = contract
return self.request_id
def historicalDataRequest(self, contract: Contract, endDateTime: str,
durationStr: str, barSizeSetting: str, whatToShow: str = "TRADES",
useRTH: int = 0, formatDate: int = 1, keepUpToDate: bool = False) -> Observable:
print('hist from contract', contract)
print('historicalDataRequest %s, %s, %s, %s, %s, %s, %s' % (
endDateTime, durationStr, barSizeSetting, whatToShow, useRTH, formatDate, keepUpToDate))
cid = self.next_request_id(
contract) # , endDateTime, durationStr, barSizeSetting, whatToShow, useRTH, formatDate, keepUpToDate, chartOptions)
subject = Subject()
self._subjects[cid] = subject
self.reqHistoricalData(
cid, # tickerId, used to identify incoming data
contract,
endDateTime, # always go to midnight
durationStr, # amount of time to go back
barSizeSetting, # bar size
whatToShow, # historical data type
useRTH, # useRTH (regular trading hours)
formatDate, # format the date in yyyyMMdd HH:mm:ss
keepUpToDate, # keep up to date after snapshot
[], # chart options
)
# return subject
return self._subjects[cid]
@iswrapper
def historicalData(self, reqId: int, bar: BarData) -> None:
logging.info('historicalData %s, %s' % (reqId, bar))
# print('historicalData %s, %s' % (reqId, bar))
contract = self.requests[reqId]
subject = self._subjects[reqId]
if contract and subject:
subject.on_next((contract, bar))
@iswrapper
def historicalDataEnd(self, reqId: int, start: str, end: str) -> None:
super().historicalDataEnd(reqId, start, end)
logging.info('historicalDataEnd %s, %s, %s' % (reqId, start, end))
print('historicalDataEnd %s, %s, %s' % (reqId, start, end))
subject = self._subjects[reqId]
subject.on_completed()
def do_connect(self, host: str = "127.0.0.1", port: int = 4001, clientId: int = 0) -> rx.Observable:
self.connect(host, port, clientId)
return self.connected
@iswrapper
def connectAck(self):
print('conected 0')
logging.info("Connected")
self.connected.on_next(True)
@iswrapper
def connectionClosed(self):
logging.info("Disconnected")
self.connected.on_next(False)
self.connected.on_completed()
# @iswrapper
# def nextValidId(self, order_id: int):
# super().nextValidId(order_id)
# # self.connected.on_next(True)
@iswrapper
def error(self, req_id: TickerId, error_code: int, error: str):
super().error(req_id, error_code, error)
print('error', error)
err = Exception(f"Error. Id: {req_id} Code {error_code} Msg: {error}")
if req_id < 0:
logging.debug("Error. Id: %s Code %s Msg: %s", req_id, error_code, error)
# self.connected.on_error(err)
else:
logging.error("Error. Id: %s Code %s Msg: %s", req_id, error_code, error)
subject = self._subjects[req_id]
if (subject is not None):
subject.on_error(err)
def say_bye(self):
print('bye!')
self.disconnect()
def make_ib_contract_req(client: AlphaClientWrapper, endDateTime: str,
durationStr: str, barSizeSetting: str, whatToShow: str = "TRADES",
useRTH: int = 0, formatDate: int = 1, keepUpToDate: bool = False):
def handle_conn_symbol(con_symbol: Tuple[bool, str]) -> Observable:
_is_connected, symbol = con_symbol
print('handle_conn_symbol', _is_connected, symbol)
contract = make_contract(symbol)
# now = dt.date.today()
return client.historicalDataRequest(
contract,
endDateTime,
durationStr,
barSizeSetting,
whatToShow,
useRTH,
formatDate,
keepUpToDate
# endDateTime=now.strftime('%Y%m%d 00:00:00'),
# endDateTime="",
# durationStr='1 M',
# barSizeSetting='1 day',
)
return handle_conn_symbol
def make_ib_contract_req_plain(client: AlphaClientWrapper, symbol: str, endDateTime: str,
durationStr: str, barSizeSetting: str, whatToShow: str = "TRADES",
useRTH: int = 0, formatDate: int = 1, keepUpToDate: bool = False) -> Observable:
print('make_ib_contract_req_plain', symbol)
contract = make_contract(symbol)
# now = dt.date.today()
return client.historicalDataRequest(
contract,
endDateTime,
durationStr,
barSizeSetting,
whatToShow,
useRTH,
formatDate,
keepUpToDate
# endDateTime=now.strftime('%Y%m%d 00:00:00'),
# endDateTime="",
# durationStr='1 M',
# barSizeSetting='1 day',
)
def prepare_insert_ticker(conn: psycopg2._psycopg.connection):
def insert_symbol(symbol: str):
cursor = conn.cursor()
try:
print('insert_symbol', symbol)
# delete data if exists to keep fresh data and clean workflow
cursor.execute(
f'DELETE FROM tickers WHERE symbol = %s',
(symbol,)
)
cursor.execute(
f'INSERT INTO tickers (symbol) VALUES (%s)',
(symbol,)
)
conn.commit()
# return rx.return_value((con, date))
# return conn, symbol
except Exception as ex:
conn.rollback()
raise ex
# return rx.throw(ex)
finally:
cursor.close()
return insert_symbol
def handle_ib_resp(table_name: str, conn: psycopg2._psycopg.connection):
def handle_ib_response(con_data: Tuple[Contract, BarData]) -> Any: # Observable:
cursor = conn.cursor()
try:
print('handle_ib_response', con_data)
con, data = con_data
date = dt.datetime.strptime(data.date, "%Y%m%d" if len(data.date) == 8 else "%Y%m%d %H:%M:%S")
# delete data if exists to keep fresh data and clean workflow
cursor.execute(
f'DELETE FROM {table_name} WHERE date = %s AND symbol = %s',
(date, con.symbol)
)
cursor.execute(
f'INSERT INTO {table_name} (date, symbol, exchange, open, high, low, close, barCount, volume, average) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)',
(date, con.symbol, con.exchange, data.open, data.high, data.low, data.close, data.barCount, data.volume,
data.average)
)
conn.commit()
# return rx.return_value((con, date))
return con, date
except Exception as ex:
conn.rollback()
raise ex
# return rx.throw(ex)
finally:
cursor.close()
return handle_ib_response
def handle_conn_status(is_connected) -> Observable:
if is_connected:
return rx.just(True)
else:
return rx.throw(Exception("Client's not connected"))
def read_symbols_file(observer: rx.core.Observer, scheduler=None) -> rx.core.Observer:
with open('symbols.txt', 'r') as f:
lines = f.readlines()
for line in lines:
observer.on_next(line)
observer.on_completed()
return observer
def read_symbols_arr() -> List[str]:
with open('symbols.txt', 'r') as f:
lines = f.readlines()
return [line.replace('\n', '') for line in lines]
def run_v1():
def on_final_results(con_data: Tuple[Contract, bool]):
con, data = con_data
print('on_final_results', con.symbol, type(data))
print(con, data)
client = AlphaClientWrapper()
connection = client.do_connect(clientId=5)
symbols = rx.of('AAPL', 'TSLA', 'AMZN', 'MSFT', 'ABNB') # rx.create(read_symbols_file) #
conn = psycopg2.connect('dbname=alpha user=postgres')
# from psycopg2.pool import ThreadedConnectionPool
def init_daily_reqs():
connection \
.pipe(
rx_op.flat_map(handle_conn_status),
rx_op.combine_latest(symbols),
rx_op.flat_map(make_ib_contract_req(client, endDateTime="", durationStr="10 Y", barSizeSetting="1 day")),
rx_op.merge(max_concurrent=2),
rx_op.flat_map(handle_ib_resp('ib_bardata_1day', conn)),
# rx_op.combine_latest(conn),
) \
.subscribe(
on_next=on_final_results, # lambda s: print(s),
on_error=lambda s: print('err', s),
# on_completed=lambda: client.say_bye()
)
init_daily_reqs()
client.run()
def init_daily_reqs(client: AlphaClientWrapper, conn: psycopg2._psycopg.connection, symbols: List[str], duration: int):
def run_flow(_: Any):
def build_ibreq(symbol: str):
def run(_):
return make_ib_contract_req_plain(client, symbol, endDateTime="", durationStr=f"{duration} D",
barSizeSetting="1 day") \
.pipe(rx_op.catch(rx.empty()))
return run
obs_list = [
rx.defer(build_ibreq(symbol))
for symbol in symbols
]
return rx.from_iterable(obs_list).pipe(
rx_op.merge(max_concurrent=8),
rx_op.flat_map(handle_ib_resp('ib_bardata_1day', conn)),
)
return client.do_connect(clientId=1).pipe(
rx_op.flat_map(handle_conn_status),
rx_op.flat_map(run_flow),
)
def get_all_1daybar_bd_symbols(conn: psycopg2._psycopg.connection) -> List[str]:
print('get_all_1daybar_bd_symbols')
sql = "SELECT DISTINCT symbol FROM ib_bardata_1day order by 1"
df = pd.read_sql(sql, conn)
return df['symbol'].tolist()
def get_symbol_all_days(conn: psycopg2._psycopg.connection, symbol: str, dst_table: str, do_orderby1_asc=True) -> List[
dt.datetime]: # Tuple[str, List[dt.date]]:
print('get_symbol_all_days', symbol)
sort_dir1 = 'asc' if do_orderby1_asc else 'desc'
sql = f"""SELECT DISTINCT a.date
FROM ib_bardata_1day a left join
{dst_table} b on a.symbol = b.symbol and a.date = date_trunc('day', b.date)
WHERE a.symbol = %(symbol)s
and b.symbol is null
order by 1 {sort_dir1}"""
df = pd.read_sql(sql, conn, params=dict(symbol=symbol))
return df['date'].tolist()
def fetch_symbol_xbar(client: AlphaClientWrapper, barSizeSetting) -> Callable[[str, dt.datetime], rx.Observable]:
def handle_symbol(symbol: str, date: dt.datetime) -> rx.Observable:
print('handle_symbol', symbol, date)
date_str = date.strftime("%Y%m%d 23:59:59")
return make_ib_contract_req_plain(client, symbol, endDateTime=date_str, durationStr="1 D",
barSizeSetting=barSizeSetting) \
.pipe(rx_op.catch(rx.empty()))
return handle_symbol
def run_xbar_process(client: AlphaClientWrapper, conn: psycopg2._psycopg.connection, table_name: str,
barSizeSetting: str, do_orderby1_asc=True):
symbols = get_all_1daybar_bd_symbols(conn)
handle_symbol = fetch_symbol_xbar(client, barSizeSetting)
def run_flow(_: Any):
def lazy_handle_symbol_dates(symbol, date):
def run(_):
return handle_symbol(symbol, date)
return run
def req_symbol_dates(symbol: str):
def run(_):
dates = get_symbol_all_days(conn, symbol, table_name, do_orderby1_asc)
obs = [rx.defer(lazy_handle_symbol_dates(symbol, date)) for date in dates]
return rx.from_iterable(obs) \
.pipe(
rx_op.merge(max_concurrent=8),
rx_op.map(handle_ib_resp(table_name, conn)),
)
return run
obs = [rx.defer(req_symbol_dates(symbol)) for symbol in symbols]
return rx.from_iterable(obs) \
.pipe(
rx_op.merge(max_concurrent=4),
# rx_op.subscribe_on(thread_pool_scheduler),
)
return client.do_connect(clientId=1).pipe(
rx_op.flat_map(handle_conn_status),
rx_op.flat_map(run_flow),
# rx_op.do_action(lambda _: do)
)
def run_1hour_process(client: AlphaClientWrapper, conn: psycopg2._psycopg.connection):
return run_xbar_process(client, conn, "ib_bardata_1hour", "1 hour")
def run_5min_process(client: AlphaClientWrapper, conn: psycopg2._psycopg.connection):
return run_xbar_process(client, conn, "ib_bardata_5min", "5 mins", False)
def get_db_conn():
# conn = psycopg2.connect('dbname=alpha user=postgres')
conn = psycopg2.connect('dbname=deust user=postgres password=Rumpelstiltskin1485')
return conn
def main(mode: str, durationInDays: int):
print('Inicio')
client = AlphaClientWrapper() # rx.create(read_symbols_file) # ['ITE','ABNB', 'AAPL'] #
conn = get_db_conn()
def on_success(res):
print('on success', type(res), res)
def on_error(err):
print('on error', type(err), err)
raise err
def on_complete():
print('on complete')
# def get_db_bardaily_symbols() -> rx.Observable:
# symbols = get_all_1daybar_bd_symbols(conn)
# return rx.just(symbols)
def get_initial_process():
if mode == 'd':
all_symbols = read_symbols_arr()
return init_daily_reqs(client, conn, all_symbols, durationInDays)
elif mode == 'h':
return run_1hour_process(client, conn)
elif mode == '5m':
return run_5min_process(client, conn)
else:
return rx.throw(Exception("Invalid input"))
get_initial_process() \
.subscribe(
on_next=on_success,
on_error=on_error,
on_completed=on_complete,
)
print('Lets run')
# client.do_connect(clientId=1)
client.run()
# symbols.subscribe(
# on_next=lambda s: print(s)
# )
# rx.combine_latest()
# client.connected.pipe(
# rx.
# )
def get_input_mode() -> Tuple[str, int]:
return (sys.argv[1] if len(sys.argv) > 1 else 'd', int(sys.argv[2]) if len(sys.argv) > 2 else 1)
retry_limit = 60
def run_main(retry_i=0):
try:
mode, duration = get_input_mode()
main(mode, duration)
except Exception as err:
print(err)
print('Trying again', retry_i)
if retry_i <= retry_limit:
time.sleep(retry_i * 2)
run_main(retry_i + 1)
def run_insert_tickers():
all_symbols = read_symbols_arr()
conn = get_db_conn()
insert_ticker = prepare_insert_ticker(conn)
for symbol in all_symbols:
insert_ticker(symbol)
if __name__ == "__main__":
run_main()
# run_insert_tickers()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment