Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
fix a crash if your server is not running or accepting connections. initialize error handling before trying to open a connection (it can fail, generating errors you need to handle)
# Gist example of IB wrapper ...
# Download API from
# Install python API code /IBJts/source/pythonclient $ python3 install
# Note: The test cases, and the documentation refer to a python package called IBApi,
# but the actual package is called ibapi. Go figure.
# Get the latest version of the gateway:
# (for unix: windows and mac users please find your own version)
# Run the gateway
# user: edemo
# pwd: demo123
# Now I'll try and replicate the time telling example
from ibapi.wrapper import EWrapper
from ibapi.client import EClient
from threading import Thread
import queue
class TestWrapper(EWrapper):
The wrapper deals with the action coming back from the IB gateway or TWS instance
We override methods in EWrapper that will get called when this action happens, like currentTime
## error handling code
def init_error(self):
self._my_errors = error_queue
def get_error(self, timeout=5):
if self.is_error():
return self._my_errors.get(timeout=timeout)
except queue.Empty:
return None
return None
def is_error(self):
an_error_if=not self._my_errors.empty()
return an_error_if
def error(self, id, errorCode, errorString):
## Overriden method
errormsg = "IB error id %d errorcode %d string %s" % (id, errorCode, errorString)
## Time telling code
def init_time(self):
self._time_queue = time_queue
return time_queue
def currentTime(self, time_from_server):
## Overriden method
class TestClient(EClient):
The client method
We don't override native methods, but instead call them from our own wrappers
def __init__(self, wrapper):
## Set up with a wrapper inside
EClient.__init__(self, wrapper)
def speaking_clock(self):
Basic example to tell the time
:return: unix time, as an int
print("Getting the time from the server... ")
## Make a place to store the time we're going to return
## This is a queue
## This is the native method in EClient, asks the server to send us the time please
## Try and get a valid time
current_time = time_storage.get(timeout=MAX_WAIT_SECONDS)
except queue.Empty:
print("Exceeded maximum wait for wrapper to respond")
current_time = None
while self.wrapper.is_error():
return current_time
class TestApp(TestWrapper, TestClient):
def __init__(self, ipaddress, portid, clientid):
TestClient.__init__(self, wrapper=self)
self.connect(ipaddress, portid, clientid)
thread = Thread(target =
setattr(self, "_thread", thread)
if __name__ == '__main__':
## Check that the port is the same as on the Gateway
## ipaddress is if one same machine, clientid is arbitrary
app = TestApp("", 4001, 10)
current_time = app.speaking_clock()

This comment has been minimized.

Copy link

@xappppp xappppp commented Apr 17, 2020

I try to run the code seems its not responding and have to force quit the program, can anyone help me why below is the message after quiting


Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.