Skip to content

Instantly share code, notes, and snippets.

@umrysh
Forked from idler921/_balancer.py
Created November 23, 2013 22:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save umrysh/7620954 to your computer and use it in GitHub Desktop.
Save umrysh/7620954 to your computer and use it in GitHub Desktop.
"""
The portfolio rebalancing bot will buy and sell to maintain a
constant asset allocation ratio of exactly 20/80 = fiat/BTC
"""
import strategy
import os
import threading
import weakref
import inspect
DISTANCE = 5 # percent price distance of next rebalancing orders
FIAT_COLD = 0 # Amount of Fiat stored at home but included in calculations
COIN_COLD = 0 # Amount of Coin stored at home but included in calculations
MARKER = 7 # lowest digit of price to identify bot's own orders
COIN = 1E8 # number of satoshi per coin, this is a constant.
FIATLV = 2.0
BTCLV = 8.0
bfr = BTCLV / FIATLV
def add_marker(price, marker):
"""encode a marker in the price value to find bot's own orders"""
return price / 10 * 10 + marker
def has_marker(price, marker):
"""return true if the price value has the marker"""
return (price % 10) == marker
def mark_own(price):
"""return the price with our own marker embedded"""
return add_marker(price, MARKER)
def is_own(price):
"""return true if this price has our own marker"""
return has_marker(price, MARKER)
class Signal():
"""callback functions (so called slots) can be connected to a signal and
will be called when the signal is called (Signal implements __call__).
The slots receive two arguments: the sender of the signal and a custom
data object. Two different threads won't be allowed to send signals at the
same time application-wide, concurrent threads will have to wait until
the lock is releaesed again. The lock allows recursive reentry of the same
thread to avoid deadlocks when a slot wants to send a signal itself."""
_lock = threading.RLock()
signal_error = None
def __init__(self):
self._functions = weakref.WeakSet()
self._methods = weakref.WeakKeyDictionary()
# the Signal class itself has a static member signal_error where it
# will send tracebacks of exceptions that might happen. Here we
# initialize it if it does not exist already
if not Signal.signal_error:
Signal.signal_error = 1
Signal.signal_error = Signal()
def connect(self, slot):
"""connect a slot to this signal. The parameter slot can be a funtion
that takes exactly 2 arguments or a method that takes self plus 2 more
arguments, or it can even be even another signal. the first argument
is a reference to the sender of the signal and the second argument is
the payload. The payload can be anything, it totally depends on the
sender and type of the signal."""
if inspect.ismethod(slot):
if slot.__self__ not in self._methods:
self._methods[slot.__self__] = set()
self._methods[slot.__self__].add(slot.__func__)
else:
self._functions.add(slot)
def __call__(self, sender, data, error_signal_on_error=True):
"""dispatch signal to all connected slots. This is a synchronuos
operation, It will not return before all slots have been called.
Also only exactly one thread is allowed to emit signals at any time,
all other threads that try to emit *any* signal anywhere in the
application at the same time will be blocked until the lock is released
again. The lock will allow recursive reentry of the seme thread, this
means a slot can itself emit other signals before it returns (or
signals can be directly connected to other signals) without problems.
If a slot raises an exception a traceback will be sent to the static
Signal.signal_error() or to logging.critical()"""
with self._lock:
sent = False
errors = []
for func in self._functions:
try:
func(sender, data)
sent = True
except: # pylint: disable=W0702
errors.append(traceback.format_exc())
for obj, funcs in self._methods.items():
for func in funcs:
try:
func(obj, sender, data)
sent = True
except: # pylint: disable=W0702
errors.append(traceback.format_exc())
for error in errors:
if error_signal_on_error:
Signal.signal_error(self, (error), False)
else:
logging.critical(error)
return sent
class TradesTimer(Signal):
"""a simple timer (used for stuff like keepalive)."""
def __init__(self, interval, one_shot=False):
"""create a new timer, interval is in seconds"""
Signal.__init__(self)
self._one_shot = one_shot
self._canceled = False
self._interval = interval
self._timer = None
self._start()
def _fire(self):
"""fire the signal and restart it"""
if not self._canceled:
self.__call__(self, None)
if not (self._canceled or self._one_shot):
self._start()
def _start(self):
"""start the timer"""
self._timer = threading.Timer(self._interval, self._fire)
self._timer.daemon = True
self._timer.start()
def cancel(self):
"""cancel the timer"""
self._canceled = True
self._timer.cancel()
self._timer = None
class Strategy(strategy.Strategy):
"""a protfolio rebalancing bot"""
def __init__(self, gox):
strategy.Strategy.__init__(self, gox)
self.temp_halt = False
def slot_keypress(self, gox, (key)):
"""a key has been pressed"""
if key == ord("c"):
# cancel existing rebalancing orders and suspend trading
self.debug("canceling all rebalancing orders")
self.temp_halt = True
self.cancel_orders()
if key == ord("p"):
# create the initial two rebalancing orders and start trading.
# Before you do this the portfolio should already be balanced.
# use "i" to show current status and "b" to rebalance with a
# market order at current price.
self.debug("adding new initial rebalancing orders")
self.temp_halt = False
self.place_orders()
if key == ord("u"):
# update the own order list and wallet by forcing what
# normally happens only after reconnect
gox.client.channel_subscribe(False)
if key == ord("i"):
# print some information into the log file about
# current status (how much currently out of balance)
price = (gox.orderbook.bid + gox.orderbook.ask) / 2
vol_buy = self.get_buy_at_price(price)
price_balanced = self.get_price_where_it_was_balanced()
self.debug("BTC difference at current price:",
gox.base2float(vol_buy))
self.debug("Price where it would be balanced:",
gox.quote2float(price_balanced))
if key == ord("b"):
# manually rebalance with market order at current price
price = (gox.orderbook.bid + gox.orderbook.ask) / 2
vol_buy = self.get_buy_at_price(price)
if abs(vol_buy) > 0.01 * COIN:
self.temp_halt = True
self.cancel_orders()
if vol_buy > 0:
self.debug("buy %f at market" %
gox.base2float(vol_buy))
gox.buy(0, vol_buy)
else:
self.debug("sell %f at market" %
gox.base2float(-vol_buy))
gox.sell(0, -vol_buy)
def cancel_orders(self):
"""cancel all rebalancing orders, we identify
them through the marker in the price value"""
must_cancel = []
for order in self.gox.orderbook.owns:
if is_own(order.price):
must_cancel.append(order)
for order in must_cancel:
self.gox.cancel(order.oid)
def get_price_where_it_was_balanced(self):
"""get the price at which it was perfectly balanced, given the current
BTC and Fiat account balances. Immediately after a rebalancing order was
filled this should be pretty much excactly the price where the order was
filled (because by definition it should be quite exactly balanced then),
so even after missing the trade message due to disconnect it should be
possible to place the next 2 orders precisely around the new center"""
gox = self.gox
fiat_have = gox.quote2float(gox.wallet[gox.curr_quote]) + FIAT_COLD
btc_have = gox.base2float(gox.wallet[gox.curr_base]) + COIN_COLD
return gox.quote2int((fiat_have * bfr) / btc_have)
def get_buy_at_price(self, price_int):
"""calculate amount of BTC needed to buy at price to achieve
rebalancing. price and return value are in mtgox integer format"""
gox = self.gox
fiat_have = gox.quote2float(gox.wallet[gox.curr_quote]) + FIAT_COLD
btc_have = gox.base2float(gox.wallet[gox.curr_base]) + COIN_COLD
price_then = gox.quote2float(price_int)
btc_value_then = btc_have * price_then
diff = (fiat_have * bfr - btc_value_then)/bfr
diff_btc = diff / price_then
must_buy = diff_btc / (BTCLV+FIATLV) * FIATLV
return self.gox.base2int(must_buy)
def place_orders(self):
"""place two new rebalancing orders above and below center price"""
center = self.get_price_where_it_was_balanced()
self.debug(
"center is %f" % self.gox.quote2float(center))
step = int(center * DISTANCE / 100.0)
next_sell = mark_own(center + step)
next_buy = mark_own(center - step)
sell_amount = -self.get_buy_at_price(next_sell)
buy_amount = self.get_buy_at_price(next_buy)
if sell_amount < 0.01 * COIN:
sell_amount = int(0.01 * COIN)
self.debug("WARNING! minimal sell amount adjusted to 0.01")
if buy_amount < 0.01 * COIN:
buy_amount = int(0.01 * COIN)
self.debug("WARNING! minimal buy amount adjusted to 0.01")
self.debug("new buy order %f at %f" % (
self.gox.base2float(buy_amount),
self.gox.quote2float(next_buy)
))
self.gox.buy(next_buy, buy_amount)
self.debug("new sell order %f at %f" % (
self.gox.base2float(sell_amount),
self.gox.quote2float(next_sell)
))
self.gox.sell(next_sell, sell_amount)
# Set timer to make sure trades went through
self.debug("Set Timer.")
self._timer = TradesTimer(60)
self._timer.connect(self.trade_timer)
def slot_trade(self, gox, (date, price, volume, typ, own)):
"""a trade message has been receivd"""
# not interested in other people's trades
if not own:
return
# not interested in manually entered (not bot) trades
if not is_own(price):
return
text = {"bid": "sold", "ask": "bought"}[typ]
self.debug("*** %s %f at %f" % (
text,
gox.base2float(volume),
gox.quote2float(price)
))
self.check_trades()
def slot_owns_changed(self, orderbook, _dummy):
"""status or amount of own open orders has changed"""
self.check_trades()
def check_trades(self):
"""find out if we need to place new orders and do it if neccesary"""
# bot temporarily disabled
if self.temp_halt:
return
# still waiting for submitted orders,
# can wait for next signal
if self.gox.count_submitted:
return
# we count the open and pending orders
count = 0
count_pending = 0
book = self.gox.orderbook
for order in book.owns:
if is_own(order.price):
if order.status == "open":
count += 1
else:
count_pending += 1
# as long as there are ANY pending orders around we
# just do nothing and wait for the next signal
if count_pending:
return
# if count is exacty 1 then one of the orders must have been filled,
# now we cancel the other one and place two fresh orders in the
# distance of DISTANCE around center price.
if count == 1:
self.cancel_orders()
self.place_orders()
def trade_timer(self, _sender, _data):
self._timer.cancel()
self.debug("Trade_Timer Called.")
"""find out if we need to place new orders and do it if neccesary"""
# bot temporarily disabled
if self.temp_halt:
return
# still waiting for submitted orders,
# can wait for next signal
if self.gox.count_submitted:
return
# we count the open and pending orders
count = 0
count_pending = 0
book = self.gox.orderbook
for order in book.owns:
if is_own(order.price):
if order.status == "open":
count += 1
else:
count_pending += 1
# as long as there are ANY pending orders around we
# just do nothing and wait for the next signal
if count_pending:
return
# if count is exacty 1 then one of the orders must have been filled,
# now we cancel the other one and place two fresh orders in the
# distance of DISTANCE around center price.
if count == 1:
self.cancel_orders()
self.place_orders()
elif count == 0:
self.debug("Orders Did Not Go Through. Re-Submitting.")
self.cancel_orders()
self.place_orders()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment