Skip to content

Instantly share code, notes, and snippets.

@parodyBit
Created January 19, 2022 03:47
Show Gist options
  • Save parodyBit/7cfa98068cf8338f85012d3d53691929 to your computer and use it in GitHub Desktop.
Save parodyBit/7cfa98068cf8338f85012d3d53691929 to your computer and use it in GitHub Desktop.
DataRequestOutput Protobuf encoder
import hashlib
from dataclasses import dataclass
from enum import Enum
from typing import Union, List
sha256 = lambda x: hashlib.sha256(x).digest()
TAG_TYPE_BITS = 3
TAG_TYPE_MASK = (1 << TAG_TYPE_BITS) - 1
VARINT = 0
FIXED64 = 1
LENGTH_DELIMITED = 2
START_GROUP = 3
END_GROUP = 4
FIXED32 = 5
def str_to_bytes(s: str) -> bytes:
return str.encode(s, 'utf-8')
def bytes_to_hex(b: bytes):
return b.hex()
def concat(values: Union[List[str], List[bytes]]) -> Union[str, bytes]:
if isinstance(values[0], str):
return concat_string(values)
elif isinstance(values[0], bytes):
return concat_bytes(values)
def concat_string(values: List[str]) -> str:
return ''.join(values)
def concat_bytes(values: List[bytes]) -> bytes:
return b''.join(values)
def var_int(value: int):
"""
Write unsigned `VarInt` to a file-like object.
"""
if isinstance(value, str):
value = int(value)
tmp = []
while value > 0x7F:
tmp.append(bytes((value & 0x7F | 0x80,)))
value >>= 7
tmp.append(bytes((value,)))
return concat(tmp)
def var_int_serializer(value: int):
return var_int(value)
def bytes_serializer(value: bytes):
if value:
return concat([var_int(len(value)), value])
else:
return b''
def get_tag_field_number(tag: int):
return tag >> TAG_TYPE_BITS
def get_tag_wire_type(tag: int):
return tag & TAG_TYPE_MASK
def make_tag(field_number: int, tag: int):
return (field_number << TAG_TYPE_BITS) | tag
def make_tag_bytes(field_number: int, tag: int):
return var_int_serializer(make_tag(field_number, tag))
def pb_field(field_number: int, tag: int, value):
_data = []
if tag == VARINT:
_data = concat([var_int_serializer(value=value)])
elif tag == LENGTH_DELIMITED:
_data = bytes_serializer(value=value)
else:
...
return concat([make_tag_bytes(field_number=field_number, tag=tag), _data])
@dataclass
class RADType(Enum):
Unknown = 0
HttpGet = 1
Rng = 2
@classmethod
def from_json(cls, data):
if data == 'Unknown':
return RADType.Unknown
elif data == 'HTTP-GET':
return RADType.HttpGet
elif data == 'RNG':
return RADType.Rng
@dataclass
class RADRetrieve:
kind: RADType
url: str
script: bytes
@classmethod
def from_json(cls, data):
return RADRetrieve(kind=RADType.from_json(data['kind']), url=data['url'], script=bytes(data['script']))
def to_pb_bytes(self):
kind_bytes = pb_field(field_number=1, tag=VARINT, value=self.kind.value)
url_bytes = pb_field(field_number=2, tag=LENGTH_DELIMITED, value=str_to_bytes(self.url))
script_bytes = pb_field(field_number=3, tag=LENGTH_DELIMITED, value=self.script)
if self.kind.value == 0:
return concat([url_bytes, script_bytes])
else:
return concat([kind_bytes, url_bytes, script_bytes])
@dataclass
class RADFilter:
op: int
args: bytes
@classmethod
def from_json(cls, data):
return RADFilter(op=data['op'], args=data['args'])
def to_pb_bytes(self):
op_bytes = pb_field(field_number=1, tag=VARINT, value=self.op)
args_bytes = pb_field(field_number=2, tag=LENGTH_DELIMITED, value=self.args)
return concat([op_bytes, args_bytes])
@dataclass
class RADAggregate:
filters: List[RADFilter]
reducer: int
@classmethod
def from_json(cls, data):
return RADAggregate(
filters=[RADFilter.from_json(data=x) for x in data["filters"]],
reducer=data["reducer"]
)
def to_pb_bytes(self):
filter_bytes = pb_field(
field_number=1,
tag=LENGTH_DELIMITED,
value=concat([x.to_pb_bytes() for x in self.filters])
) if (len(self.filters) > 0) else []
reducer_bytes = pb_field(field_number=2, tag=VARINT, value=self.reducer)
return concat([filter_bytes, reducer_bytes])
@dataclass
class RADTally:
filters: List[RADFilter]
reducer: int
@classmethod
def from_json(cls, data):
return RADTally(
filters=[RADFilter.from_json(data=x) for x in data["filters"]],
reducer=data["reducer"]
)
def to_pb_bytes(self):
filter_bytes = pb_field(
field_number=1,
tag=LENGTH_DELIMITED,
value=concat([x.to_pb_bytes() for x in self.filters])
) if (len(self.filters) > 0) else []
reducer_bytes = pb_field(field_number=2, tag=VARINT, value=self.reducer)
return concat([filter_bytes, reducer_bytes])
@dataclass
class RADRequest:
time_lock: int
retrieve: List[RADRetrieve]
aggregate: RADAggregate
tally: RADTally
@classmethod
def from_json(cls, data):
return RADRequest(
time_lock=data["time_lock"],
retrieve=[RADRetrieve.from_json(x) for x in data['retrieve']],
aggregate=RADAggregate.from_json(data=data["aggregate"]),
tally=RADTally.from_json(data=data["tally"])
)
def to_pb_bytes(self):
timelock_bytes = pb_field(
field_number=1,
tag=VARINT,
value=self.time_lock
) if (self.time_lock > 0) else []
retrieve_bytes = concat(
[pb_field(field_number=2, tag=LENGTH_DELIMITED, value=x.to_pb_bytes()) for x in self.retrieve]
)
aggregate_bytes = pb_field(field_number=3, tag=LENGTH_DELIMITED, value=self.aggregate.to_pb_bytes())
tally_bytes = pb_field(field_number=4, tag=LENGTH_DELIMITED, value=self.tally.to_pb_bytes())
return concat([timelock_bytes, retrieve_bytes, aggregate_bytes, tally_bytes])
@dataclass
class DataRequestOutput:
data_request: RADRequest
witness_reward: int
witnesses: int
commit_and_reveal_fee: int
min_consensus_percentage: int
collateral: int
@classmethod
def from_json(cls, data):
return DataRequestOutput(
collateral=data["collateral"],
commit_and_reveal_fee=data["commit_and_reveal_fee"],
data_request=RADRequest.from_json(data=data["data_request"]),
min_consensus_percentage=data["min_consensus_percentage"],
witness_reward=data["witness_reward"],
witnesses=data["witnesses"],
)
def to_pb_bytes(self):
return concat([
pb_field(field_number=1, tag=LENGTH_DELIMITED, value=self.data_request.to_pb_bytes()),
pb_field(field_number=2, tag=VARINT, value=self.witness_reward),
pb_field(field_number=3, tag=VARINT, value=self.witnesses),
pb_field(field_number=4, tag=VARINT, value=self.commit_and_reveal_fee),
pb_field(field_number=5, tag=VARINT, value=self.min_consensus_percentage),
pb_field(field_number=6, tag=VARINT, value=self.collateral),
])
def test_protobuf():
data_request_output = {
"dr_output": {
"collateral": 2500000000,
"commit_and_reveal_fee": 1,
"data_request": {
"aggregate": {"filters": [], "reducer": 3},
"retrieve": [
{
"kind": "HTTP-GET", "script": [],
"url": "https://www.bitstamp.net/api/ticker/"},
{
"kind": "HTTP-GET",
"script": [], "url": "https://api.coindesk.com/v1/bpi/currentprice.json"
}
],
"tally": {"filters": [], "reducer": 3},
"time_lock": 1574703683
},
"min_consensus_percentage": 51,
"witness_reward": 1,
"witnesses": 100
},
}
tmp = DataRequestOutput.from_json(data=data_request_output["dr_output"])
print(bytes_to_hex(tmp.to_pb_bytes()))
print(bytes_to_hex(sha256(tmp.to_pb_bytes())))
test_protobuf()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment