Skip to content

Instantly share code, notes, and snippets.

@pinhopro
Last active April 1, 2020 18:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save pinhopro/56f1b740e8447782c9e02ee9dacfbf84 to your computer and use it in GitHub Desktop.
Save pinhopro/56f1b740e8447782c9e02ee9dacfbf84 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import asyncio
import websockets
import json
import time
import pprint
BLINKTRADE_API_ENDPOINT = 'wss://bitcambio_api.blinktrade.com/trade/'
BLINKTRADE_BROKER_ID = 11
BLINKTRADE_API_KEY = ''
BLINKTRADE_API_PASSWORD = ''
class BlinktradeApiClient(object):
def __init__(self, ws):
self._ws = ws
async def testRequestMessage(self, request_id=int(time.time()*1000)):
# TestRequest message. # http://www.onixs.biz/fix-dictionary/4.4/msgType_1_1.html
msg = {'MsgType': '1', 'TestReqID': request_id }
# send a test request message
await self._ws.send(json.dumps(msg))
# get the heartbeat response message
return json.loads( await self._ws.recv() )
async def sendLimitedBuyOrder(self, symbol, qty, price, clientOrderId, brokerID ):
if not symbol or not qty or not qty or not price or not clientOrderId:
raise ValueError('Invalid parameters')
if qty <= 0 or price <= 0:
raise ValueError('Invalid qty or price')
msg = {
"MsgType" : 'D',
"ClOrdID" : str(clientOrderId),
"Symbol" : symbol,
"Side" : '1',
"OrdType" : '2',
"Price" : int(price),
"OrderQty" : int(qty),
"BrokerID" : brokerID
}
await self._ws.send(json.dumps(msg))
async def sendCancelOrder(self, clientOrderId=None, orderId=None ):
if clientOrderId is None and orderId is None:
raise ValueError('Invalid parameters')
msg = {"MsgType" : 'F'}
if clientOrderId is not None:
msg["ClOrdID"] = str(clientOrderId)
elif orderId is not None:
msg["OrderID"] = str(clientOrderId)
await self._ws.send(json.dumps(msg))
async def login(self, broker_id, api_key, api_password, request_id=int(time.time()*1000)):
msg = {
"MsgType" : "BE",
"UserReqID" : request_id,
"BrokerID" : broker_id,
"Username" : api_key,
"Password" : api_password,
"UserReqTyp" : '1',
"UserAgent" : "BlinktradeApi",
"UserAgentLanguage" : "en",
"UserAgentTimezoneOffset" : 0,
"UserAgentPlatform" : "Python3",
"FingerPrint" : 8888
}
await self._ws.send(json.dumps(msg))
login_response_message = json.loads(await self._ws.recv())
assert (login_response_message["MsgType"] == "BF")
return login_response_message
async def requestMarketData(self, symbol_list, entry_types, subscription_type='1', market_depth=0, update_type = '1',
request_id=int(time.time()*1000)):
# Market Data request. # http://www.onixs.biz/fix-dictionary/4.4/msgType_V_86.html
if not symbol_list or not entry_types:
raise ValueError('Invalid parameters')
msg = {
'MsgType' : 'V',
'MDReqID': request_id,
'SubscriptionRequestType': subscription_type,
'MarketDepth': market_depth,
'MDUpdateType': update_type, #
'MDEntryTypes': entry_types, # bid , offer, trade
'Instruments': symbol_list
}
await self._ws.send(json.dumps(msg))
market_data_response_message = json.loads(await self._ws.recv())
assert (market_data_response_message["MsgType"] == 'W')
return market_data_response_message
async def subscribeSecurityStatus(self, symbol_list, requestId=int(time.time()*1000)):
msg = {
"MsgType": 'e',
"SecurityStatusReqID": requestId,
"SubscriptionRequestType": '1',
"Instruments": symbol_list
}
await self._ws.send(json.dumps(msg))
security_status_list_response = []
for x in range(len(symbol_list)):
security_status = json.loads(await self._ws.recv())
assert (security_status["MsgType"] == 'f')
security_status_list_response.append(security_status)
return security_status_list_response
async def requestSecurityList(self, market="BLINK", requestId=int(time.time()*1000)):
msg = {
'MsgType': 'x',
'SecurityReqID': requestId,
'SecurityListRequestType': 0,
'Market': market,
'SecurityRequestResult': 0
}
await self._ws.send(json.dumps(msg))
security_list_response = json.loads(await self._ws.recv())
# Security List Response - http://www.onixs.biz/fix-dictionary/4.4/msgType_y_121.html
assert (security_list_response["MsgType"] == 'y' )
return security_list_response
# this class contains the order book, trades, and ticker information
class MarketData(object):
def __init__(self):
self.market_data = {}
self.security_status = {}
def get_order_book(self, symbol):
return self.market_data.get(symbol)
def get_ticker(self, symbol, market="BLINK"):
market = self.security_status.get(market)
if not market:
return None
return self.security_status.get(market).get(symbol)
def process_market_data_full_refresh(self, msg):
# Market Data - Snapshot/Full Refresh - http://www.onixs.biz/fix-dictionary/4.4/msgType_W_87.html
symbol = msg.get("Symbol")
self.market_data[symbol] = {'bid': [], 'ask': [], 'trades': []} # clear the order book dictionaries
group = msg.get('MDFullGrp')
for entry in group:
entry_type = entry.get('MDEntryType')
if entry_type == '0' or entry_type == '1':
self._handle_market_data_on_book_new_order(symbol, entry)
continue
if entry_type == '2':
self._handle_market_data_on_trade(symbol, entry)
continue
def process_market_data_incremental_refresh(self,msg):
# Market Data - Incremental Refresh <X> message = http://www.onixs.biz/fix-dictionary/4.4/msgType_X_88.html
if msg.get("MDBkTyp") == '3': # Order Depth
group = msg.get("MDIncGrp")
for entry in group:
entry_type = entry.get("MDEntryType")
symbol = entry.get("Symbol")
if entry_type == '0' or entry_type == '1':
update_action = entry.get("MDUpdateAction")
if update_action == '0':
self._handle_market_data_on_book_new_order(symbol, entry)
elif update_action == '1':
self._handle_market_data_on_book_update_order(symbol, entry)
elif update_action == '2':
self._handle_market_data_on_book_delete_order(symbol, entry)
elif update_action == '3':
self._handle_market_data_on_book_delete_orders_thru(symbol, entry)
elif entry_type == '2':
self._handle_market_data_on_trade(symbol, entry)
def process_security_status_response(self,msg):
# Security Status - http://www.onixs.biz/fix-dictionary/4.4/msgType_f_102.html
if msg.get('Market') not in self.security_status:
self.security_status[msg.get('Market')] = {}
if msg.get('Symbol') not in self.security_status[msg.get('Market')]:
self.security_status[msg.get('Market')][msg.get('Symbol')] = {}
self.security_status[msg.get('Market')][msg.get('Symbol')] = msg
def _handle_market_data_on_book_new_order(self, symbol, msg):
index = msg.get('MDEntryPositionNo') - 1
order = {
'price': msg.get("MDEntryPx"),
'qty': msg.get("MDEntrySize"),
'user_id': msg.get("UserID"),
'broker': msg.get("Broker"),
'order_id': msg.get("OrderID"),
'side': msg.get("MDEntryType"),
'order_time': msg.get("MDEntryTime"),
'order_date': msg.get("MDEntryDate")
}
if msg.get('MDEntryType') == '0': # bid
self.market_data[symbol]["bid"].insert(index, order)
elif msg.get('MDEntryType') == '1': # sell
self.market_data[symbol]["ask"].insert(index, order)
def _handle_market_data_on_book_update_order(self, symbol, msg):
index = msg.get('MDEntryPositionNo') - 1
order = {
'price': msg.get("MDEntryPx"),
'qty': msg.get("MDEntrySize"),
'user_id': msg.get("UserID"),
'broker': msg.get("Broker"),
'order_id': msg.get("OrderID"),
'side': msg.get("MDEntryType"),
'order_time': msg.get("MDEntryTime"),
'order_date': msg.get("MDEntryDate")
}
if msg.get('MDEntryType') == '0': # sell
self.market_data[symbol]["bid"][index] = order
elif msg.get('MDEntryType') == '1': # sell
self.market_data[symbol]["ask"][index] = order
def _handle_market_data_on_book_delete_order(self, symbol, msg):
index = msg.get('MDEntryPositionNo') - 1
side = msg.get('MDEntryType')
if side == '0':
del self.market_data[symbol]["bid"][index]
elif side == '1':
del self.market_data[symbol]['ask'][index]
def _handle_market_data_on_book_delete_orders_thru(self, symbol, msg):
index = msg.get('MDEntryPositionNo')
side = msg.get('MDEntryType')
if side == '0':
del self.market_data[symbol]["bid"][index:]
elif side == '1':
del self.market_data[symbol]['ask'][index:]
def _handle_market_data_on_trade(self, symbol, msg):
trade = {
"id": msg.get("TradeID"),
"price": msg.get("MDEntryPx"),
"symbol": msg.get("Symbol"),
"size": msg.get("MDEntrySize"),
"trade_date": msg.get("MDEntryDate"),
"trade_time": msg.get("MDEntryTime"),
"order_id": msg.get("OrderID"),
"side": msg.get("Side"),
"counter_order_id": msg.get("SecondaryOrderID"),
"buyer_id": msg.get("MDEntryBuyerID"),
"seller_id": msg.get("MDEntrySellerID")
}
self.market_data[symbol]['trades'].append(trade)
async def main(api_endpoint, broker_id, api_key, api_password):
async with websockets.connect(api_endpoint) as ws:
market_data = MarketData()
client = BlinktradeApiClient(ws)
initial_heart_beat_message = json.loads(await ws.recv())
assert (initial_heart_beat_message["MsgType"] == '0')
# login
login_response = await client.login(broker_id,api_key,api_password)
if login_response["UserStatus"] == 1:
print("success")
# subscribe to security status (aka ticker) for every security we have available.
security_list_response = await client.requestSecurityList()
symbol_list = []
for instrument_symbol in security_list_response['Instruments']:
symbol_list.append(instrument_symbol["Symbol"])
security_status_response_list = await client.subscribeSecurityStatus(symbol_list)
for security_status in security_status_response_list:
market_data.process_security_status_response(security_status)
# request market data for BTCBRL. We will receive the full order book and all trades.
market_data.process_market_data_full_refresh(await client.requestMarketData(["BTCBRL"], ['0', '1', '2']))
# keep sending a test request message every 30 seconds
async def blinktrade_send_test_request_task(blinktrade_client):
while True:
await blinktrade_client.testRequestMessage()
await asyncio.sleep(30)
asyncio.ensure_future(blinktrade_send_test_request_task(client))
# send an order, wait 3 seconds and then cancel the order
await client.sendLimitedBuyOrder("BTCBRL", 0.001 * 1e8, 20000 * 1e8, "MyUniqueID2", broker_id)
await asyncio.sleep(3)
await client.sendCancelOrder("MyUniqueID2")
while True:
raw_message = await ws.recv()
try:
msg = json.loads(raw_message)
except Exception as e:
print("error", str(e), "parsing message.")
continue
msg_type = msg["MsgType"]
if msg_type == 'f':
market_data.process_security_status_response(msg)
continue
if msg_type == 'X':
market_data.process_market_data_incremental_refresh(msg)
continue
print(raw_message)
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(
main(BLINKTRADE_API_ENDPOINT,BLINKTRADE_BROKER_ID,BLINKTRADE_API_KEY,BLINKTRADE_API_PASSWORD ))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment