Skip to content

Instantly share code, notes, and snippets.

@sc1f
Forked from robcarver17/IBAPIpythonexample1.py
Created April 23, 2018 14:17
Show Gist options
  • Save sc1f/b03d981611d113d42ac681b7329f6d03 to your computer and use it in GitHub Desktop.
Save sc1f/b03d981611d113d42ac681b7329f6d03 to your computer and use it in GitHub Desktop.
# Gist example of IB wrapper ...
#
# Download API from http://interactivebrokers.github.io/#
#
# Install python API code /IBJts/source/pythonclient $ python3 setup.py install
#
# Note: The test cases, and the documentation refer to a python package called IBApi,
# but the actual package is called ibapi. Go figure.
#
# Get the latest version of the gateway:
# https://www.interactivebrokers.com/en/?f=%2Fen%2Fcontrol%2Fsystemstandalone-ibGateway.php%3Fos%3Dunix
# (for unix: windows and mac users please find your own version)
#
# Run the gateway
#
# user: edemo
# pwd: demo123
#
# Now I'll try and replicate the time telling example
from ibapi.wrapper import EWrapper
from ibapi.client import EClient
from threading import Thread
import queue
class TestWrapper(EWrapper):
"""
The wrapper deals with the action coming back from the IB gateway or TWS instance
We override methods in EWrapper that will get called when this action happens, like currentTime
"""
## error handling code
def init_error(self):
error_queue=queue.Queue()
self._my_errors = error_queue
def get_error(self, timeout=5):
if self.is_error():
try:
return self._my_errors.get(timeout=timeout)
except queue.Empty:
return None
return None
def is_error(self):
an_error_if=not self._my_errors.empty()
return an_error_if
def error(self, id, errorCode, errorString):
## Overriden method
errormsg = "IB error id %d errorcode %d string %s" % (id, errorCode, errorString)
self._my_errors.put(errormsg)
## Time telling code
def init_time(self):
time_queue=queue.Queue()
self._time_queue = time_queue
return time_queue
def currentTime(self, time_from_server):
## Overriden method
self._time_queue.put(time_from_server)
class TestClient(EClient):
"""
The client method
We don't override native methods, but instead call them from our own wrappers
"""
def __init__(self, wrapper):
## Set up with a wrapper inside
EClient.__init__(self, wrapper)
def speaking_clock(self):
"""
Basic example to tell the time
:return: unix time, as an int
"""
print("Getting the time from the server... ")
## Make a place to store the time we're going to return
## This is a queue
time_storage=self.wrapper.init_time()
## This is the native method in EClient, asks the server to send us the time please
self.reqCurrentTime()
## Try and get a valid time
MAX_WAIT_SECONDS = 10
try:
current_time = time_storage.get(timeout=MAX_WAIT_SECONDS)
except queue.Empty:
print("Exceeded maximum wait for wrapper to respond")
current_time = None
while self.wrapper.is_error():
print(self.get_error())
return current_time
class TestApp(TestWrapper, TestClient):
def __init__(self, ipaddress, portid, clientid):
TestWrapper.__init__(self)
TestClient.__init__(self, wrapper=self)
self.connect(ipaddress, portid, clientid)
thread = Thread(target = self.run)
thread.start()
setattr(self, "_thread", thread)
self.init_error()
if __name__ == '__main__':
##
## Check that the port is the same as on the Gateway
## ipaddress is 127.0.0.1 if one same machine, clientid is arbitrary
app = TestApp("127.0.0.1", 4001, 10)
current_time = app.speaking_clock()
print(current_time)
app.disconnect()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment