Last active
May 7, 2022 02:35
-
-
Save Cphilo/8f0fc05af5551322345472730227341d to your computer and use it in GitHub Desktop.
Test proto read.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import gzip | |
import os | |
from libs.walter_protobuf.python import message_pb2 | |
import delimited_protobuf | |
from google.protobuf.json_format import Parse | |
def wrap_into_message(trade_or_orderbook)->message_pb2.Message: | |
message = message_pb2.Message() | |
if isinstance(trade_or_orderbook, message_pb2.Trade): | |
message.trade.CopyFrom(trade_or_orderbook) | |
if isinstance(trade_or_orderbook, message_pb2.Liquidation): | |
message.liquidation.CopyFrom(trade_or_orderbook) | |
else: | |
message.orderbook.CopyFrom(trade_or_orderbook) | |
return message | |
class ProtobufParser: | |
def get_messages(self, proto_file): | |
f_in = open(proto_file, "rb") | |
market = delimited_protobuf.read(f_in, message_pb2.Market) | |
count = 0 | |
msgs = [] | |
while True: | |
msg = delimited_protobuf.read(f_in, message_pb2.Message) | |
count += 1 | |
if msg is None: | |
break | |
msgs.append(msg) | |
f_in.close() | |
return market, msgs | |
def write_messages(self, market_json, input_file, proto_file): | |
with gzip.open(input_file, 'rb') as f_in: | |
file_content = f_in.read().decode("utf-8") | |
json_items = [] | |
for line in file_content.split("\n"): | |
try: | |
item = json.loads(line) | |
json_items.append(item) | |
except json.decoder.JSONDecodeError as e: | |
print(input_file) | |
print(line) | |
print("Process error") | |
continue | |
if len(json_items) == 0: | |
return | |
f_out = open(proto_file, "wb") | |
market = Parse(json.dumps(market_json), message_pb2.Market()) | |
delimited_protobuf.write(f_out, market) | |
for item in json_items: | |
side = False if item["s"] == "B" else True | |
liquidation_json = { | |
"price": item["o"][0], | |
"quantity_base": item["o"][1], | |
"quantity_quote": item["o"][0] * item["o"][1], | |
"side": side, | |
"timestamp": item["t"], | |
} | |
trade_obj = Parse(json.dumps(liquidation_json), message_pb2.Liquidation()) | |
msg = wrap_into_message(trade_obj) | |
delimited_protobuf.write(f_out, msg) | |
f_out.close() | |
def test_protobuf_parser(): | |
p = ProtobufParser() | |
file_path = "/tt_data/lcheng/hft-data-new/protobuf/binance/PERP_ETH_USDT/LIQ/2022-03-17.proto" | |
market, msgs = p.get_messages(file_path) | |
print(msgs[:10]) | |
print(market) | |
print("Total %s msgs" % len(msgs)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment