Skip to content

Instantly share code, notes, and snippets.

@Cphilo
Last active May 7, 2022 02:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Cphilo/8f0fc05af5551322345472730227341d to your computer and use it in GitHub Desktop.
Save Cphilo/8f0fc05af5551322345472730227341d to your computer and use it in GitHub Desktop.
Test proto read.
import json
import gzip
import os
from libs.walter_protobuf.python import message_pb2
import delimited_protobuf
from google.protobuf.json_format import Parse
def wrap_into_message(trade_or_orderbook)->message_pb2.Message:
message = message_pb2.Message()
if isinstance(trade_or_orderbook, message_pb2.Trade):
message.trade.CopyFrom(trade_or_orderbook)
if isinstance(trade_or_orderbook, message_pb2.Liquidation):
message.liquidation.CopyFrom(trade_or_orderbook)
else:
message.orderbook.CopyFrom(trade_or_orderbook)
return message
class ProtobufParser:
def get_messages(self, proto_file):
f_in = open(proto_file, "rb")
market = delimited_protobuf.read(f_in, message_pb2.Market)
count = 0
msgs = []
while True:
msg = delimited_protobuf.read(f_in, message_pb2.Message)
count += 1
if msg is None:
break
msgs.append(msg)
f_in.close()
return market, msgs
def write_messages(self, market_json, input_file, proto_file):
with gzip.open(input_file, 'rb') as f_in:
file_content = f_in.read().decode("utf-8")
json_items = []
for line in file_content.split("\n"):
try:
item = json.loads(line)
json_items.append(item)
except json.decoder.JSONDecodeError as e:
print(input_file)
print(line)
print("Process error")
continue
if len(json_items) == 0:
return
f_out = open(proto_file, "wb")
market = Parse(json.dumps(market_json), message_pb2.Market())
delimited_protobuf.write(f_out, market)
for item in json_items:
side = False if item["s"] == "B" else True
liquidation_json = {
"price": item["o"][0],
"quantity_base": item["o"][1],
"quantity_quote": item["o"][0] * item["o"][1],
"side": side,
"timestamp": item["t"],
}
trade_obj = Parse(json.dumps(liquidation_json), message_pb2.Liquidation())
msg = wrap_into_message(trade_obj)
delimited_protobuf.write(f_out, msg)
f_out.close()
def test_protobuf_parser():
p = ProtobufParser()
file_path = "/tt_data/lcheng/hft-data-new/protobuf/binance/PERP_ETH_USDT/LIQ/2022-03-17.proto"
market, msgs = p.get_messages(file_path)
print(msgs[:10])
print(market)
print("Total %s msgs" % len(msgs))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment