Skip to content

Instantly share code, notes, and snippets.

@miguelvm
Last active July 4, 2017 15:27
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save miguelvm/d40ea5a9735533f582cd80a63c51b5cf to your computer and use it in GitHub Desktop.
Save miguelvm/d40ea5a9735533f582cd80a63c51b5cf to your computer and use it in GitHub Desktop.
# Download API from http://interactivebrokers.github.io/#
#
# Install python API code /IBJts/source/pythonclient $ python3 setup.py install
#
# Note: The test cases, and the documentation refer to a python package called IBApi,
# but the actual package is called ibapi. Go figure.
#
# Get the latest version of the gateway:
# https://www.interactivebrokers.com/en/?f=%2Fen%2Fcontrol%2Fsystemstandalone-ibGateway.php%3Fos%3Dunix
# (for unix: windows and mac users please find your own version)
#
# Run the gateway
#
# user: edemo
# pwd: demo123
#
# Now I'll try and replicate the time telling example
import time
from ibapi.wrapper import EWrapper
from ibapi.client import EClient
class TestWrapper(EWrapper):
def __init__(self):
setattr(self, "_my_data", dict())
## error handling code
def init_error(self):
self._my_data['error']=None
def iserror(self):
an_error_if=self._my_data['error'] is not None
return an_error_if
def error(self, id, errorCode, errorString):
## Overriden method
errormsg = "IB error id %d errorcode %d string %s" % (id, errorCode, errorString)
self._my_data['error']=(id, errorCode, errorString)
## Time telling code
def init_time(self):
self._my_data['time']=0
def currentTime(self, time):
## Overriden method
self._my_data['time']=int(time)
def isValidTime(self):
is_valid_time_if=not self._my_data['time']==0
return is_valid_time_if
class TestClient(EClient):
def __init__(self, wrapper):
EClient.__init__(self, wrapper)
def speaking_clock(self):
start_time = time.time()
self.wrapper.init_error()
self.wrapper.init_time()
error_occured = False
finished = False
print("Getting the time... ")
self.reqCurrentTime()
while not finished:
current_time = self.wrapper._my_data['time']
finished = self.wrapper.isValidTime()
iserror = self.wrapper.iserror()
MAX_WAIT=10
if (time.time() - start_time) > MAX_WAIT:
print("Exceeded maximum wait for wrapper to respond")
finished = True
if iserror:
finished = True
if iserror:
print("Error happened")
print(self._my_data['error'])
return current_time
class TestApp(TestWrapper, TestClient):
def __init__(self):
TestWrapper.__init__(self)
TestClient.__init__(self, wrapper=self)
app = TestApp()
app.connect("127.0.0.1", 4001, 10)
## It isn't documented that we need to do this... but probably can't hurt
app.startApi()
from threading import Thread
thread = Thread(target = app.run)
thread.start()
## this works, but doesnt' call the wrapper
print("serverVersion:%s connectionTime:%s" % (app.serverVersion(), app.twsConnectionTime()))
## this never returns a valid time - HELP?!
app.speaking_clock()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment