Created
January 19, 2022 03:47
-
-
Save parodyBit/7cfa98068cf8338f85012d3d53691929 to your computer and use it in GitHub Desktop.
DataRequestOutput Protobuf encoder
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import hashlib | |
from dataclasses import dataclass | |
from enum import Enum | |
from typing import Union, List | |
sha256 = lambda x: hashlib.sha256(x).digest() | |
TAG_TYPE_BITS = 3 | |
TAG_TYPE_MASK = (1 << TAG_TYPE_BITS) - 1 | |
VARINT = 0 | |
FIXED64 = 1 | |
LENGTH_DELIMITED = 2 | |
START_GROUP = 3 | |
END_GROUP = 4 | |
FIXED32 = 5 | |
def str_to_bytes(s: str) -> bytes: | |
return str.encode(s, 'utf-8') | |
def bytes_to_hex(b: bytes): | |
return b.hex() | |
def concat(values: Union[List[str], List[bytes]]) -> Union[str, bytes]: | |
if isinstance(values[0], str): | |
return concat_string(values) | |
elif isinstance(values[0], bytes): | |
return concat_bytes(values) | |
def concat_string(values: List[str]) -> str: | |
return ''.join(values) | |
def concat_bytes(values: List[bytes]) -> bytes: | |
return b''.join(values) | |
def var_int(value: int): | |
""" | |
Write unsigned `VarInt` to a file-like object. | |
""" | |
if isinstance(value, str): | |
value = int(value) | |
tmp = [] | |
while value > 0x7F: | |
tmp.append(bytes((value & 0x7F | 0x80,))) | |
value >>= 7 | |
tmp.append(bytes((value,))) | |
return concat(tmp) | |
def var_int_serializer(value: int): | |
return var_int(value) | |
def bytes_serializer(value: bytes): | |
if value: | |
return concat([var_int(len(value)), value]) | |
else: | |
return b'' | |
def get_tag_field_number(tag: int): | |
return tag >> TAG_TYPE_BITS | |
def get_tag_wire_type(tag: int): | |
return tag & TAG_TYPE_MASK | |
def make_tag(field_number: int, tag: int): | |
return (field_number << TAG_TYPE_BITS) | tag | |
def make_tag_bytes(field_number: int, tag: int): | |
return var_int_serializer(make_tag(field_number, tag)) | |
def pb_field(field_number: int, tag: int, value): | |
_data = [] | |
if tag == VARINT: | |
_data = concat([var_int_serializer(value=value)]) | |
elif tag == LENGTH_DELIMITED: | |
_data = bytes_serializer(value=value) | |
else: | |
... | |
return concat([make_tag_bytes(field_number=field_number, tag=tag), _data]) | |
@dataclass | |
class RADType(Enum): | |
Unknown = 0 | |
HttpGet = 1 | |
Rng = 2 | |
@classmethod | |
def from_json(cls, data): | |
if data == 'Unknown': | |
return RADType.Unknown | |
elif data == 'HTTP-GET': | |
return RADType.HttpGet | |
elif data == 'RNG': | |
return RADType.Rng | |
@dataclass | |
class RADRetrieve: | |
kind: RADType | |
url: str | |
script: bytes | |
@classmethod | |
def from_json(cls, data): | |
return RADRetrieve(kind=RADType.from_json(data['kind']), url=data['url'], script=bytes(data['script'])) | |
def to_pb_bytes(self): | |
kind_bytes = pb_field(field_number=1, tag=VARINT, value=self.kind.value) | |
url_bytes = pb_field(field_number=2, tag=LENGTH_DELIMITED, value=str_to_bytes(self.url)) | |
script_bytes = pb_field(field_number=3, tag=LENGTH_DELIMITED, value=self.script) | |
if self.kind.value == 0: | |
return concat([url_bytes, script_bytes]) | |
else: | |
return concat([kind_bytes, url_bytes, script_bytes]) | |
@dataclass | |
class RADFilter: | |
op: int | |
args: bytes | |
@classmethod | |
def from_json(cls, data): | |
return RADFilter(op=data['op'], args=data['args']) | |
def to_pb_bytes(self): | |
op_bytes = pb_field(field_number=1, tag=VARINT, value=self.op) | |
args_bytes = pb_field(field_number=2, tag=LENGTH_DELIMITED, value=self.args) | |
return concat([op_bytes, args_bytes]) | |
@dataclass | |
class RADAggregate: | |
filters: List[RADFilter] | |
reducer: int | |
@classmethod | |
def from_json(cls, data): | |
return RADAggregate( | |
filters=[RADFilter.from_json(data=x) for x in data["filters"]], | |
reducer=data["reducer"] | |
) | |
def to_pb_bytes(self): | |
filter_bytes = pb_field( | |
field_number=1, | |
tag=LENGTH_DELIMITED, | |
value=concat([x.to_pb_bytes() for x in self.filters]) | |
) if (len(self.filters) > 0) else [] | |
reducer_bytes = pb_field(field_number=2, tag=VARINT, value=self.reducer) | |
return concat([filter_bytes, reducer_bytes]) | |
@dataclass | |
class RADTally: | |
filters: List[RADFilter] | |
reducer: int | |
@classmethod | |
def from_json(cls, data): | |
return RADTally( | |
filters=[RADFilter.from_json(data=x) for x in data["filters"]], | |
reducer=data["reducer"] | |
) | |
def to_pb_bytes(self): | |
filter_bytes = pb_field( | |
field_number=1, | |
tag=LENGTH_DELIMITED, | |
value=concat([x.to_pb_bytes() for x in self.filters]) | |
) if (len(self.filters) > 0) else [] | |
reducer_bytes = pb_field(field_number=2, tag=VARINT, value=self.reducer) | |
return concat([filter_bytes, reducer_bytes]) | |
@dataclass | |
class RADRequest: | |
time_lock: int | |
retrieve: List[RADRetrieve] | |
aggregate: RADAggregate | |
tally: RADTally | |
@classmethod | |
def from_json(cls, data): | |
return RADRequest( | |
time_lock=data["time_lock"], | |
retrieve=[RADRetrieve.from_json(x) for x in data['retrieve']], | |
aggregate=RADAggregate.from_json(data=data["aggregate"]), | |
tally=RADTally.from_json(data=data["tally"]) | |
) | |
def to_pb_bytes(self): | |
timelock_bytes = pb_field( | |
field_number=1, | |
tag=VARINT, | |
value=self.time_lock | |
) if (self.time_lock > 0) else [] | |
retrieve_bytes = concat( | |
[pb_field(field_number=2, tag=LENGTH_DELIMITED, value=x.to_pb_bytes()) for x in self.retrieve] | |
) | |
aggregate_bytes = pb_field(field_number=3, tag=LENGTH_DELIMITED, value=self.aggregate.to_pb_bytes()) | |
tally_bytes = pb_field(field_number=4, tag=LENGTH_DELIMITED, value=self.tally.to_pb_bytes()) | |
return concat([timelock_bytes, retrieve_bytes, aggregate_bytes, tally_bytes]) | |
@dataclass | |
class DataRequestOutput: | |
data_request: RADRequest | |
witness_reward: int | |
witnesses: int | |
commit_and_reveal_fee: int | |
min_consensus_percentage: int | |
collateral: int | |
@classmethod | |
def from_json(cls, data): | |
return DataRequestOutput( | |
collateral=data["collateral"], | |
commit_and_reveal_fee=data["commit_and_reveal_fee"], | |
data_request=RADRequest.from_json(data=data["data_request"]), | |
min_consensus_percentage=data["min_consensus_percentage"], | |
witness_reward=data["witness_reward"], | |
witnesses=data["witnesses"], | |
) | |
def to_pb_bytes(self): | |
return concat([ | |
pb_field(field_number=1, tag=LENGTH_DELIMITED, value=self.data_request.to_pb_bytes()), | |
pb_field(field_number=2, tag=VARINT, value=self.witness_reward), | |
pb_field(field_number=3, tag=VARINT, value=self.witnesses), | |
pb_field(field_number=4, tag=VARINT, value=self.commit_and_reveal_fee), | |
pb_field(field_number=5, tag=VARINT, value=self.min_consensus_percentage), | |
pb_field(field_number=6, tag=VARINT, value=self.collateral), | |
]) | |
def test_protobuf(): | |
data_request_output = { | |
"dr_output": { | |
"collateral": 2500000000, | |
"commit_and_reveal_fee": 1, | |
"data_request": { | |
"aggregate": {"filters": [], "reducer": 3}, | |
"retrieve": [ | |
{ | |
"kind": "HTTP-GET", "script": [], | |
"url": "https://www.bitstamp.net/api/ticker/"}, | |
{ | |
"kind": "HTTP-GET", | |
"script": [], "url": "https://api.coindesk.com/v1/bpi/currentprice.json" | |
} | |
], | |
"tally": {"filters": [], "reducer": 3}, | |
"time_lock": 1574703683 | |
}, | |
"min_consensus_percentage": 51, | |
"witness_reward": 1, | |
"witnesses": 100 | |
}, | |
} | |
tmp = DataRequestOutput.from_json(data=data_request_output["dr_output"]) | |
print(bytes_to_hex(tmp.to_pb_bytes())) | |
print(bytes_to_hex(sha256(tmp.to_pb_bytes()))) | |
test_protobuf() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment