Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save starius/53a7355fc196965106d5a38dc20da7ef to your computer and use it in GitHub Desktop.
Save starius/53a7355fc196965106d5a38dc20da7ef to your computer and use it in GitHub Desktop.
"""
Order copier
"""
# API key основного аккаунта
API_KEY = '69eQkHl0R0vgPod21c3XjQsT'
API_SECRET = 'EFAHxtJiOVYHSWa_HPtLAW50GpV31G_2GVSdm1gBZKH85Ltv'
# Список API ключей куда нужно транслировать ордера
API_KYES = ['hLlWHxkqxs42g-WeEt1_wC20']
API_SECRETS = ['RUpMAQoEQUM6CMqOkRGRak4h1HQUFgZfDuZCQKe_Cms3oKvx']
# URL API
#BASE_URL = "https://www.bitmex.com/api/v1/"
BASE_URL = "https://testnet.bitmex.com/api/v1/"
# Импортирование необходимых библиотек
import time
from urllib.parse import urlparse, urlunparse
import hmac
import hashlib
import os
import json
import requests
from requests.auth import AuthBase
import uuid
import base64
import datetime
import ssl
import websocket
import threading
import traceback
import decimal
# Получить наунс
def generate_nonce():
return int(round(time.time() * 1000))
def generate_signature(secret, verb, url, nonce, data):
parsedURL = urlparse(url)
path = parsedURL.path
if parsedURL.query:
path = path + '?' + parsedURL.query
if isinstance(data, (bytes, bytearray)):
data = data.decode('utf8')
message = verb + path + str(nonce) + data
signature = hmac.new(bytes(secret, 'utf8'), bytes(message, 'utf8'), digestmod=hashlib.sha256).hexdigest()
return signature
def findItemByKeys(keys, table, matchData):
for item in table:
matched = True
for key in keys:
if item[key] != matchData[key]:
matched = False
if matched:
return item
class APIKeyAuthWithExpires(AuthBase):
def __init__(self, apiKey, apiSecret):
self.apiKey = apiKey
self.apiSecret = apiSecret
def __call__(self, r):
# modify and return the request
expires = int(round(time.time()) + 5) # 5s grace period in case of clock skew
r.headers['api-expires'] = str(expires)
r.headers['api-key'] = self.apiKey
r.headers['api-signature'] = generate_signature(self.apiSecret, r.method, r.url, expires, r.body or '')
return r
class AccessTokenAuth(AuthBase):
def __init__(self, accessToken):
self.token = accessToken
def __call__(self, r):
if (self.token):
r.headers['access-token'] = self.token
return r
class BitMEXWebsocket():
MAX_TABLE_LEN = 200
def __init__(self,api_key,api_secret):
self.api_key = api_key
self.api_secret = api_secret
self.__reset()
def __del__(self):
self.exit()
def connect(self, endpoint):
subscriptions = ["instrument","margin", "position","order"]
urlParts = list(urlparse(endpoint))
urlParts[0] = urlParts[0].replace('http', 'ws')
urlParts[2] = "/realtime?subscribe=" + ",".join(subscriptions)
wsURL = urlunparse(urlParts)
self.__connect(wsURL)
# Connected. Wait for partials
self.__wait_for_account()
def get_instrument(self, symbol):
instruments = self.data['instrument']
matchingInstruments = [i for i in instruments if i['symbol'] == symbol]
if len(matchingInstruments) == 0:
raise Exception("Unable to find instrument or index with symbol: " + symbol)
instrument = matchingInstruments[0]
instrument['tickLog'] = decimal.Decimal(str(instrument['tickSize'])).as_tuple().exponent * -1
return instrument
def get_ticker(self, symbol):
instrument = self.get_instrument(symbol)
# If this is an index, we have to get the data from the last trade.
if instrument['symbol'][0] == '.':
ticker = {}
ticker['mid'] = ticker['buy'] = ticker['sell'] = ticker['last'] = instrument['markPrice']
# Normal instrument
else:
bid = instrument['bidPrice'] or instrument['lastPrice']
ask = instrument['askPrice'] or instrument['lastPrice']
ticker = {
"last": instrument['lastPrice'],
"buy": bid,
"sell": ask,
"mid": (bid + ask) / 2
}
return {k: round(float(v or 0), instrument['tickLog']) for k, v in dict.items(ticker)}
def funds(self):
return self.data['margin'][0]
def market_depth(self, symbol):
raise NotImplementedError('orderBook is not subscribed; use askPrice and bidPrice on instrument')
def open_orders(self):
orders = self.data['order']
return [o for o in orders if o['orderQty'] != 0 and (o['ordType'] == 'Limit' or o['ordType'] == 'Stop')]
def position(self):
position = self.data['position']
return [o for o in position if o['isOpen'] == True]
def recent_trades(self):
return self.data['trade']
def error(self, err):
self._error = err
print(err)
self.exit()
def exit(self):
self.exited = True
self.ws.close()
def __connect(self, wsURL):
ssl_defaults = ssl.get_default_verify_paths()
sslopt_ca_certs = {'ca_certs': ssl_defaults.cafile}
self.ws = websocket.WebSocketApp(wsURL,
on_message=self.__on_message,
on_close=self.__on_close,
on_open=self.__on_open,
on_error=self.__on_error,
header=self.__get_auth()
)
self.wst = threading.Thread(target=lambda: self.ws.run_forever(sslopt=sslopt_ca_certs))
self.wst.daemon = True
self.wst.start()
# Wait for connect before continuing
conn_timeout = 5
while (not self.ws.sock or not self.ws.sock.connected) and conn_timeout and not self._error:
time.sleep(1)
conn_timeout -= 1
if not conn_timeout or self._error:
print("Couldn't connect to WS! Exiting.")
self.exit()
exit(1)
def __get_auth(self):
# To auth to the WS using an API key, we generate a signature of a nonce and
# the WS API endpoint.
nonce = generate_nonce()
return [
"api-nonce: " + str(nonce),
"api-signature: " + generate_signature(self.api_secret, 'GET', '/realtime', nonce, ''),
"api-key:" + self.api_key
]
def __wait_for_account(self):
# Wait for the keys to show up from the ws
while not {'margin', 'position', 'order'} <= set(self.data):
time.sleep(0.1)
def __send_command(self, command, args=[]):
self.ws.send(json.dumps({"op": command, "args": args}))
def __on_message(self, ws, message):
message = json.loads(message)
table = message['table'] if 'table' in message else None
action = message['action'] if 'action' in message else None
try:
if 'subscribe' in message:
if message['success']:
pass
else:
self.error("Unable to subscribe to %s. Error: \"%s\" Please check and restart." %
(message['request']['args'][0], message['error']))
elif 'status' in message:
if message['status'] == 400:
self.error(message['error'])
if message['status'] == 401:
self.error("API Key incorrect, please check and restart.")
elif action:
if table not in self.data:
self.data[table] = []
if table not in self.keys:
self.keys[table] = []
# There are four possible actions from the WS:
# 'partial' - full table image
# 'insert' - new row
# 'update' - update row
# 'delete' - delete row
if action == 'partial':
self.data[table] += message['data']
# Keys are communicated on partials to let you know how to uniquely identify
# an item. We use it for updates.
self.keys[table] = message['keys']
elif action == 'insert':
self.data[table] += message['data']
# Limit the max length of the table to avoid excessive memory usage.
# Don't trim orders because we'll lose valuable state if we do.
if table != 'order' and len(self.data[table]) > BitMEXWebsocket.MAX_TABLE_LEN:
self.data[table] = self.data[table][(BitMEXWebsocket.MAX_TABLE_LEN // 2):]
elif action == 'update':
# Locate the item in the collection and update it.
for updateData in message['data']:
item = findItemByKeys(self.keys[table], self.data[table], updateData)
if not item:
continue # No item found to update. Could happen before push
item.update(updateData)
if table == 'order' and item['leavesQty'] <= 0:
self.data[table].remove(item)
elif action == 'delete':
# Locate the item in the collection and remove it.
for deleteData in message['data']:
item = findItemByKeys(self.keys[table], self.data[table], deleteData)
self.data[table].remove(item)
else:
raise Exception("Unknown action: %s" % action)
except:
print(traceback.format_exc())
def __on_open(self, ws):
print(time.strftime("%d %b %Y %H:%M:%S",time.gmtime()),"Websocket Opened.")
def __on_close(self, ws):
print(time.strftime("%d %b %Y %H:%M:%S",time.gmtime()),'Websocket Closed')
self.exit()
def __on_error(self, ws, error):
if not self.exited:
self.error(error)
def __reset(self):
self.data = {}
self.keys = {}
self.exited = False
self._error = None
class BitMEX(object):
"""BitMEX API Connector."""
def __init__(self, base_url=None, symbol=None, login=None, password=None, otpToken=None,
apiKey=None, apiSecret=None, shouldWSAuth=True):
"""Init connector."""
self.base_url = base_url
self.symbol = symbol
self.token = None
# User/pass auth is no longer supported
if (login or password or otpToken):
raise Exception("User/password authentication is no longer supported via the API. Please use " +
"an API key. You can generate one at https://www.bitmex.com/app/apiKeys")
if (apiKey is None):
raise Exception("Please set an API key and Secret to get started. See " +
"https://github.com/BitMEX/sample-market-maker/#getting-started for more information."
)
self.apiKey = apiKey
self.apiSecret = apiSecret
# Prepare HTTPS session
self.session = requests.Session()
# These headers are always sent
self.session.headers.update({'user-agent': 'liquidbot-' + 'v1.1'})
self.session.headers.update({'content-type': 'application/json'})
self.session.headers.update({'accept': 'application/json'})
# Create websocket for streaming data
self.ws = BitMEXWebsocket(self.apiKey,self.apiSecret)
self.ws.connect(base_url)
def __del__(self):
self.exit()
def exit(self):
self.ws.exit()
def authentication_required(function):
"""Annotation for methods that require auth."""
def wrapped(self, *args, **kwargs):
if not (self.apiKey):
msg = "You must be authenticated to use this method"
print(time.strftime("%d %b %Y %H:%M:%S",time.gmtime()),msg)
else:
return function(self, *args, **kwargs)
return wrapped
@authentication_required
def funds(self):
return self.ws.funds()
@authentication_required
def position(self):
return self.ws.position()
@authentication_required
def close(self, symbol):
endpoint = "order"
# Generate a unique clOrdID with our prefix so we can identify it.
clOrdID = base64.b64encode(uuid.uuid4().bytes).decode('utf-8').rstrip('=\n')
postdict = {
'symbol': symbol,
'ordType': "Market",
'execInst': "Close",
'clOrdID': clOrdID
}
return self._curl_bitmex(api=endpoint, postdict=postdict, verb="POST")
@authentication_required
def place_order_limit(self,quantity, price, symbol):
if price < 0:
raise Exception("Price must be positive.")
endpoint = "order"
# Generate a unique clOrdID with our prefix so we can identify it.
clOrdID = base64.b64encode(uuid.uuid4().bytes).decode('utf-8').rstrip('=\n')
postdict = {
'symbol': symbol,
'orderQty': quantity,
'price': price,
'clOrdID': clOrdID
}
return self._curl_bitmex(api=endpoint, postdict=postdict, verb="POST")
@authentication_required
def place_order_stop(self,quantity, price, symbol):
"""Place an order."""
if price < 0:
raise Exception("Price must be positive.")
endpoint = "order"
# Generate a unique clOrdID with our prefix so we can identify it.
clOrdID = base64.b64encode(uuid.uuid4().bytes).decode('utf-8').rstrip('=\n')
postdict = {
'symbol': symbol,
'orderQty': quantity,
'stopPx': price,
'clOrdID': clOrdID
}
return self._curl_bitmex(api=endpoint, postdict=postdict, verb="POST")
@authentication_required
def open_orders(self):
"""Get open orders."""
return self.ws.open_orders()
@authentication_required
def cancel(self, orderID):
"""Cancel an existing order."""
api = "order"
postdict = {
'orderID': orderID,
}
return self._curl_bitmex(api=api, postdict=postdict, verb="DELETE")
def _curl_bitmex(self, api, query=None, postdict=None, timeout=3, verb=None):
"""Send a request to BitMEX Servers."""
# Handle URL
url = self.base_url + api
# Default to POST if data is attached, GET otherwise
if not verb:
verb = 'POST' if postdict else 'GET'
# Auth: Use Access Token by default, API Key/Secret if provided
auth = AccessTokenAuth(self.token)
if self.apiKey:
auth = APIKeyAuthWithExpires(self.apiKey, self.apiSecret)
# Make the request
try:
req = requests.Request(verb, url, json=postdict, auth=auth, params=query)
prepped = self.session.prepare_request(req)
response = self.session.send(prepped, timeout=timeout)
# Make non-200s throw
response.raise_for_status()
except requests.exceptions.HTTPError as e:
# 401 - Auth error. This is fatal with API keys.
if response.status_code == 401:
print(time.strftime("%d %b %Y %H:%M:%S",time.gmtime()),"Login information or API Key incorrect, please check and restart.")
# 404, can be thrown if order canceled does not exist.
elif response.status_code == 404:
if verb == 'DELETE':
print(time.strftime("%d %b %Y %H:%M:%S",time.gmtime()),"Order not found: %s" % postdict['orderID'])
return
print("Unable to contact the BitMEX API (404). ")
# 429, ratelimit; cancel orders & wait until X-Ratelimit-Reset
elif response.status_code == 429:
print(time.strftime("%d %b %Y %H:%M:%S",time.gmtime()),"Ratelimited on current request. Sleeping, then trying again. Try fewer " +
"order pairs or contact support@bitmex.com to raise your limits. " +
"Request: %s \n %s" % (url, json.dumps(postdict)))
# Figure out how long we need to wait.
ratelimit_reset = response.headers['X-Ratelimit-Reset']
to_sleep = int(ratelimit_reset) - int(time.time())
reset_str = datetime.datetime.fromtimestamp(int(ratelimit_reset)).strftime('%X')
# We're ratelimited, and we may be waiting for a long time. Cancel orders.
print(time.strftime("%d %b %Y %H:%M:%S",time.gmtime()),"Canceling all known orders in the meantime.")
self.cancel([o['orderID'] for o in self.open_orders()])
print(time.strftime("%d %b %Y %H:%M:%S",time.gmtime()),"Your ratelimit will reset at %s. Sleeping for %d seconds." % (reset_str, to_sleep))
time.sleep(to_sleep)
# Retry the request.
return self._curl_bitmex(api, query, postdict, timeout, verb)
# 503 - BitMEX temporary downtime, likely due to a deploy. Try again
elif response.status_code == 503:
print(time.strftime("%d %b %Y %H:%M:%S",time.gmtime()),"Unable to contact the BitMEX API (503), retrying. " +
"Request: %s \n %s" % (url, json.dumps(postdict)))
time.sleep(3)
return self._curl_bitmex(api, query, postdict, timeout, verb)
elif response.status_code == 400:
error = response.json()['error']
message = error['message'].lower()
# Duplicate clOrdID: that's fine, probably a deploy, go get the order and return it
if 'duplicate clordid' in message:
order = self._curl_bitmex('/order',
query={'filter': json.dumps({'clOrdID': postdict['clOrdID']})},
verb='GET')[0]
if (
order['orderQty'] != abs(postdict['orderQty']) or
order['side'] != ('Buy' if postdict['orderQty'] > 0 else 'Sell') or
order['price'] != postdict['price'] or
order['symbol'] != postdict['symbol']):
raise Exception('Attempted to recover from duplicate clOrdID, but order returned from API ' +
'did not match POST.\nPOST data: %s\nReturned order: %s' % (
json.dumps(postdict), json.dumps(order)))
# All good
return order
elif 'insufficient available balance' in message:
raise Exception('Account out of funds. The message: %s' % error['message'])
# If we haven't returned or re-raised yet, we get here.
print("Error: %s: %s" % (e, response.text))
print(time.strftime("%d %b %Y %H:%M:%S",time.gmtime()),"Endpoint was: %s %s: %s" % (verb, api, json.dumps(postdict)))
raise e
except requests.exceptions.Timeout as e:
# Timeout, re-run this request
print(time.strftime("%d %b %Y %H:%M:%S",time.gmtime()),"Timed out, retrying...")
return self._curl_bitmex(api, query, postdict, timeout, verb)
except requests.exceptions.ConnectionError as e:
print(time.strftime("%d %b %Y %H:%M:%S",time.gmtime()),"Unable to contact the BitMEX API (ConnectionError). Please check the URL. Retrying. " +
"Request: %s \n %s" % (url, json.dumps(postdict)))
time.sleep(1)
return self._curl_bitmex(api, query, postdict, timeout, verb)
return response.json()
def cls():
os.system(['clear','cls'][os.name == 'nt'])
if not len(API_KYES) or len(API_SECRETS)!=len(API_KYES):
print(time.strftime("%d %b %Y %H:%M:%S",time.gmtime()),"Некорректный список API!")
time.sleep(5)
exit(1)
main = BitMEX(base_url=BASE_URL,apiKey=API_KEY,apiSecret=API_SECRET)
# Инициализация API key для трансляции
accounts = []
for i in range(len(API_KYES)):
accounts.append(BitMEX(base_url=BASE_URL,apiKey=API_KYES[i],apiSecret=API_SECRETS[i]))
print(time.strftime("%d %b %Y %H:%M:%S",time.gmtime()),"START")
# Основной цикл
while True:
time.sleep(0.5)
# Получаем ордера и позиции основного аккаунта
orders = main.open_orders()
positions = main.position()
funds = main.funds()
# Проходим по всем аккаунтан для дублирования
for account in accounts:
account_orders = account.open_orders()
account_positions = account.position()
account_founds = account.funds()
coefficient = account_founds['amount'] / funds['amount']
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment