Skip to content

Instantly share code, notes, and snippets.

@Hacktivate-TH
Last active December 1, 2022 14:13
Show Gist options
  • Save Hacktivate-TH/cd1345dfe772d8c58741313adf8872ca to your computer and use it in GitHub Desktop.
Save Hacktivate-TH/cd1345dfe772d8c58741313adf8872ca to your computer and use it in GitHub Desktop.
Mitmproxy extension for editing gRPC messages
#
# Author: Hacktivate Co., Ltd. (https://hacktivate.tech)
#
# Description: This is an mitmproxy extension for editing gRPC messages over HTTP/2.
# Full blog post can be found at: https://hacktivate.tech/2022/10/27/a-hackish-way-to-tamper-grpc-traffic-in-android.html
#
from concurrent.futures.process import _threads_wakeups
import logging
import struct
from dataclasses import dataclass, field
from enum import Enum
from typing import Generator, Iterable, Iterator
import ast
import os
import shlex
import subprocess
import shutil
import tempfile
from collections.abc import Sequence
from typing import Optional, cast
from mitmproxy import command
from mitmproxy import ctx, flow, http
from mitmproxy.tools.console import signals
class ProtoParser:
@dataclass
class ParserRule:
"""
A parser rule lists Field definitions which are applied if the filter rule matches the flow.
Matching on flow-level also means, a match applies to request AND response messages.
To restrict a rule to a requests only use 'ParserRuleRequest', instead.
To restrict a rule to a responses only use 'ParserRuleResponse', instead.
"""
field_definitions = []
"""List of field definitions for this rule """
name: str = ""
"""Name of this rule, only used for debugging"""
filter: str = ""
"""
Flowfilter to select which flows to apply to ('~q' and '~s' can not be used to distinguish
if the rule should apply to the request or response of a flow. To do so, use ParserRuleRequest
or ParserRuleResponse. ParserRule always applies to request and response.)
"""
@dataclass
class ParserOptions:
# output should contain wiretype of fields
include_wiretype: bool = False
# output should contain the fields which describe nested messages
# (the nested messages bodies are always included, but the "header fields" could
# add unnecessary output overhead)
exclude_message_headers: bool = False
# output should contain wiretype of fields
include_wiretype: bool = False
# output should contain the fields which describe nested messages
# (the nested messages bodies are always included, but the "header fields" could
# add unnecessary output overhead)
exclude_message_headers: bool = False
# optional: rules
# rules: List[ProtoParser.ParserRule] = field(default_factory=list)
class DecodedTypes(Enum):
# varint
int32 = 0
int64 = 1
uint32 = 2
uint64 = 3
sint32 = 4 # ZigZag encoding
sint64 = 5 # ZigZag encoding
bool = 6
enum = 7
# bit_32
fixed32 = 8
sfixed32 = 9
float = 10
# bit_64
fixed64 = 11
sfixed64 = 12
double = 13
# len_delimited
string = 14
bytes = 15
message = 16
# helper
unknown = 17
@staticmethod
def _read_base128le(data: bytes) -> tuple[int, int]:
res = 0
offset = 0
while offset < len(data):
o = data[offset]
res += (o & 0x7F) << (7 * offset)
offset += 1
if o < 0x80:
# the Kaitai parser for protobuf support base128 le values up
# to 8 groups (bytes). Due to the nature of the encoding, each
# group attributes 7bit to the resulting value, which give
# a 56 bit value at maximum.
# The values which get encoded into protobuf variable length integers,
# on the other hand, include full 64bit types (int64, uint64, sint64).
# This means, the Kaitai encoder can not cover the full range of
# possible values
#
# This decoder puts no limitation on the maximum value of variable
# length integers. Values exceeding 64bit have to be handled externally
return offset, res
raise ValueError("varint exceeds bounds of provided data")
@staticmethod
def _read_u32(data: bytes) -> tuple[int, int]:
return 4, struct.unpack("<I", data[:4])[0]
@staticmethod
def _read_u64(data: bytes) -> tuple[int, int]:
return 8, struct.unpack("<Q", data[:8])[0]
class WireTypes(Enum):
varint = 0
bit_64 = 1
len_delimited = 2
group_start = 3
group_end = 4
bit_32 = 5
@staticmethod
def read_fields(
wire_data: bytes,
parent_field,
options,
rules,
):
res = []
pos = 0
while pos < len(wire_data):
# read field key (tag and wire_type)
offset, key = ProtoParser._read_base128le(wire_data[pos:])
# casting raises exception for invalid WireTypes
wt = ProtoParser.WireTypes(key & 7)
tag = key >> 3
pos += offset
val = None
preferred_decoding: ProtoParser.DecodedTypes
if wt == ProtoParser.WireTypes.varint:
offset, val = ProtoParser._read_base128le(wire_data[pos:])
pos += offset
bl = val.bit_length()
if bl > 64:
preferred_decoding = ProtoParser.DecodedTypes.unknown
if bl > 32:
preferred_decoding = ProtoParser.DecodedTypes.uint64
if bl == 1:
preferred_decoding = ProtoParser.DecodedTypes.bool
else:
preferred_decoding = ProtoParser.DecodedTypes.uint32
elif wt == ProtoParser.WireTypes.bit_64:
offset, val = ProtoParser._read_u64(wire_data[pos:])
pos += offset
preferred_decoding = ProtoParser.DecodedTypes.fixed64
elif wt == ProtoParser.WireTypes.len_delimited:
offset, length = ProtoParser._read_base128le(wire_data[pos:])
pos += offset
if length > len(wire_data[pos:]):
raise ValueError("length delimited field exceeds data size")
val = wire_data[pos : pos + length]
pos += length
preferred_decoding = ProtoParser.DecodedTypes.message
elif (
wt == ProtoParser.WireTypes.group_start
or wt == ProtoParser.WireTypes.group_end
):
raise ValueError(f"deprecated field: {wt}")
elif wt == ProtoParser.WireTypes.bit_32:
offset, val = ProtoParser._read_u32(wire_data[pos:])
pos += offset
preferred_decoding = ProtoParser.DecodedTypes.fixed32
else:
# not reachable as if-else statements contain all possible WireTypes
# wrong types raise Exception during typecasting in `wt = ProtoParser.WireTypes((key & 7))`
raise ValueError("invalid WireType for protobuf messsage field")
field = ProtoParser.Field(
wire_type=wt,
preferred_decoding=preferred_decoding,
options=options,
rules=rules,
tag=tag,
wire_value=val,
parent_field=parent_field,
)
res.append(field)
return res
class Field:
"""
Represents a single field of a protobuf message and handles the varios encodings.
As mitmproxy sees the data passing by as raw protobuf message, it only knows the
WireTypes. Each of the WireTypes could represent different Protobuf field types.
The exact Protobuf field type can not be determined from the wire format, thus different
options for decoding have to be supported.
In addition the parsed WireTypes are (intermediary) stored in Python types, which adds
some additional overhead type conversions.
WireType represented Protobuf Types Python type (intermediary)
0: varint int32, int64, uint32, uint64, enum, int (*)
sint32, sint64 (both ZigZag encoded), int
bool bool
float (**)
1: bit_64 fixed64, sfixed64, int (*)
double float
2: len_delimited string, str
message, class 'Message'
bytes, bytes (*)
packed_repeated_field class 'Message' (fields with same tag)
3: group_start unused (deprecated) -
4: group_end unused (deprecated) -
5: bit_32 fixed32, sfixed32, int (*)
float float
(*) Note 1: Conversion between WireType and intermediary python representation
is handled by Kaitai protobuf decoder and always uses the python
representation marked with (*). Converting to alternative representations
is handled inside this class.
(**) Note 2: Varint is not used to represent floating point values, but some applications
store native floats in uint32 protobuf types (or native double in uint64).
Thus we allow conversion of varint to floating point values for convenience
(A well known APIs "hide" GPS latitude and longitude values in varint types,
much easier to spot such things when rendered as float)
Ref: - https://developers.google.com/protocol-buffers/docs/proto3
- https://developers.google.com/protocol-buffers/docs/encoding
"""
def __init__(
self,
wire_type,
preferred_decoding,
tag: int,
parent_field,
wire_value,
options,
rules,
is_unpacked_children: bool = False,
) -> None:
self.wire_type: ProtoParser.WireTypes = wire_type
self.preferred_decoding: ProtoParser.DecodedTypes = preferred_decoding
self.wire_value = wire_value
self.tag: int = tag
self.options: ProtoParser.ParserOptions = options
self.name: str = ""
self.rules = rules
self.parent_field = parent_field
self.is_unpacked_children: bool = (
is_unpacked_children # marks field as being a result of unpacking
)
self.is_packed_parent: bool = (
False # marks field as being parent of successfully unpacked children
)
self.parent_tags: list[int] = []
if self.parent_field is not None:
self.parent_tags = self.parent_field.parent_tags[:]
self.parent_tags.append(self.parent_field.tag)
self.try_unpack = False
# rules can overwrite self.try_unpack
self.apply_rules()
# do not unpack fields which are the result of unpacking
if parent_field is not None and self.is_unpacked_children:
self.try_unpack = False
# no tests for only_first_hit=False, as not user-changable
def apply_rules(self, only_first_hit=True):
tag_str = self._gen_tag_str()
name = None
decoding = None
as_packed = False
try:
for rule in self.rules:
for fd in rule.field_definitions:
match = False
if len(fd.tag_prefixes) == 0 and fd.tag == tag_str:
match = True
else:
for rt in fd.tag_prefixes:
if rt + fd.tag == tag_str:
match = True
break
if match:
if only_first_hit:
# only first match
self.name = fd.name
self.preferred_decoding = fd.intended_decoding
self.try_unpack = fd.as_packed
return
else:
# overwrite matches till last rule was inspected
# (f.e. allows to define name in one rule and intended_decoding in another one)
name = fd.name if fd.name else name
decoding = (
fd.intended_decoding
if fd.intended_decoding
else decoding
)
if fd.as_packed:
as_packed = True
if name:
self.name = name
if decoding:
self.preferred_decoding = decoding
self.try_unpack = as_packed
except Exception as e:
logging.warning(e)
def _gen_tag_str(self):
tags = self.parent_tags[:]
tags.append(self.tag)
return ".".join([str(tag) for tag in tags])
def safe_decode_as(
self,
intended_decoding,
try_as_packed: bool = False,
):
"""
Tries to decode as intended, applies failover, if not possible
Returns selected decoding and decoded value
"""
if self.wire_type == ProtoParser.WireTypes.varint:
try:
return intended_decoding, self.decode_as(
intended_decoding, try_as_packed
)
except:
if int(self.wire_value).bit_length() > 32:
# ignore the fact that varint could exceed 64bit (would violate the specs)
return ProtoParser.DecodedTypes.uint64, self.wire_value
else:
return ProtoParser.DecodedTypes.uint32, self.wire_value
elif self.wire_type == ProtoParser.WireTypes.bit_64:
try:
return intended_decoding, self.decode_as(
intended_decoding, try_as_packed
)
except:
return ProtoParser.DecodedTypes.fixed64, self.wire_value
elif self.wire_type == ProtoParser.WireTypes.bit_32:
try:
return intended_decoding, self.decode_as(
intended_decoding, try_as_packed
)
except:
return ProtoParser.DecodedTypes.fixed32, self.wire_value
elif self.wire_type == ProtoParser.WireTypes.len_delimited:
try:
return intended_decoding, self.decode_as(
intended_decoding, try_as_packed
)
except:
# failover strategy: message --> string (valid UTF-8) --> bytes
len_delimited_strategy = [
ProtoParser.DecodedTypes.message,
ProtoParser.DecodedTypes.string,
ProtoParser.DecodedTypes.bytes, # should always work
]
for failover_decoding in len_delimited_strategy:
if failover_decoding == intended_decoding and not try_as_packed:
# don't try same decoding twice, unless first attempt was packed
continue
try:
return failover_decoding, self.decode_as(
failover_decoding, False
)
except:
pass
# we should never get here (could not be added to tests)
return ProtoParser.DecodedTypes.unknown, self.wire_value
def decode_as(
self, intended_decoding, as_packed: bool = False
):
if as_packed is True:
return ProtoParser.read_packed_fields(packed_field=self)
if self.wire_type == ProtoParser.WireTypes.varint:
assert isinstance(self.wire_value, int)
if intended_decoding == ProtoParser.DecodedTypes.bool:
# clamp result to 64bit
return self.wire_value & 0xFFFFFFFFFFFFFFFF != 0
elif intended_decoding == ProtoParser.DecodedTypes.int32:
if self.wire_value.bit_length() > 32:
raise TypeError("wire value too large for int32")
return struct.unpack("!i", struct.pack("!I", self.wire_value))[0]
elif intended_decoding == ProtoParser.DecodedTypes.int64:
if self.wire_value.bit_length() > 64:
raise TypeError("wire value too large for int64")
return struct.unpack("!q", struct.pack("!Q", self.wire_value))[0]
elif intended_decoding == ProtoParser.DecodedTypes.uint32:
if self.wire_value.bit_length() > 32:
raise TypeError("wire value too large for uint32")
return self.wire_value # already 'int' which was parsed as unsigned
elif (
intended_decoding == ProtoParser.DecodedTypes.uint64
or intended_decoding == ProtoParser.DecodedTypes.enum
):
if self.wire_value.bit_length() > 64:
raise TypeError("wire value too large")
return self.wire_value # already 'int' which was parsed as unsigned
elif intended_decoding == ProtoParser.DecodedTypes.sint32:
if self.wire_value.bit_length() > 32:
raise TypeError("wire value too large for sint32")
return (self.wire_value >> 1) ^ -(
self.wire_value & 1
) # zigzag_decode
elif intended_decoding == ProtoParser.DecodedTypes.sint64:
if self.wire_value.bit_length() > 64:
raise TypeError("wire value too large for sint64")
# ZigZag decode
# Ref: https://gist.github.com/mfuerstenau/ba870a29e16536fdbaba
return (self.wire_value >> 1) ^ -(self.wire_value & 1)
elif (
intended_decoding == ProtoParser.DecodedTypes.float
or intended_decoding == ProtoParser.DecodedTypes.double
):
# special case, not complying to protobuf specs
return self._wire_value_as_float()
elif self.wire_type == ProtoParser.WireTypes.bit_64:
if intended_decoding == ProtoParser.DecodedTypes.fixed64:
return self.wire_value
elif intended_decoding == ProtoParser.DecodedTypes.sfixed64:
return struct.unpack("!q", struct.pack("!Q", self.wire_value))[0]
elif intended_decoding == ProtoParser.DecodedTypes.double:
return self._wire_value_as_float()
elif self.wire_type == ProtoParser.WireTypes.bit_32:
if intended_decoding == ProtoParser.DecodedTypes.fixed32:
return self.wire_value
elif intended_decoding == ProtoParser.DecodedTypes.sfixed32:
return struct.unpack("!i", struct.pack("!I", self.wire_value))[0]
elif intended_decoding == ProtoParser.DecodedTypes.float:
return self._wire_value_as_float()
elif self.wire_type == ProtoParser.WireTypes.len_delimited:
assert isinstance(self.wire_value, bytes)
if intended_decoding == ProtoParser.DecodedTypes.string:
# According to specs, a protobuf string HAS TO be UTF-8 parsable
# throw exception on invalid UTF-8 chars, but escape linebreaks
return self.wire_value_as_utf8(escape_newline=True)
elif intended_decoding == ProtoParser.DecodedTypes.bytes:
# always works, assure to hand back a copy
return self.wire_value[:]
elif intended_decoding == ProtoParser.DecodedTypes.message:
return ProtoParser.read_fields(
wire_data=self.wire_value,
parent_field=self,
options=self.options,
rules=self.rules,
)
# if here, there is no valid decoding
raise TypeError("intended decoding mismatches wire type")
def encode_from(inputval, intended_encoding):
raise NotImplementedError(
"Future work, needed to manipulate and re-encode protobuf message, with respect to given wire types"
)
def _wire_value_as_float(self) -> float:
"""
Handles double (64bit) and float (32bit).
Assumes Network Byte Order (big endian).
Usable for:
WireType --> Protobuf Type):
----------------------------
varint --> double/float (not intended by ProtoBuf, but used in the wild)
bit_32 --> float
bit_64 --> double
len_delimited --> 4 bytes: float / 8 bytes: double / other sizes return NaN
"""
v = self._value_as_bytes()
if len(v) == 4:
return struct.unpack("!f", v)[0]
elif len(v) == 8:
return struct.unpack("!d", v)[0]
# no need to raise an Exception
raise TypeError("can not be converted to floatingpoint representation")
def _value_as_bytes(self) -> bytes:
if isinstance(self.wire_value, bytes):
return self.wire_value
elif isinstance(self.wire_value, int):
if self.wire_value.bit_length() > 64:
# source for a python int are wiretypes varint/bit_32/bit64 and should never convert to int values 64bit
# currently avoided by kaitai decoder (can not be added to tests)
raise ValueError("value exceeds 64bit, violating protobuf specs")
elif self.wire_value.bit_length() > 32:
# packing uses network byte order (to assure consistent results across architectures)
return struct.pack("!Q", self.wire_value)
else:
# packing uses network byte order (to assure consistent results across architectures)
return struct.pack("!I", self.wire_value)
else:
# should never happen, no tests
raise ValueError("can not be converted to bytes")
def _wire_type_str(self):
return str(self.wire_type).split(".")[-1]
def _decoding_str(self, decoding):
return str(decoding).split(".")[-1]
def wire_value_as_utf8(self, escape_newline=True) -> str:
if isinstance(self.wire_value, bytes):
res = self.wire_value.decode("utf-8")
return res.replace("\n", "\\n") if escape_newline else res
return str(self.wire_value)
def gen_flat_decoded_field_dicts(self) -> Generator[dict, None, None]:
"""
Returns a generator which passes the field as a dict.
In order to return the field value it gets decoded (based on a failover strategy and
provided ParserRules).
If the field holds a nested message, the fields contained in the message are appended.
Ultimately this flattens all fields recursively.
"""
selected_decoding, decoded_val = self.safe_decode_as(
self.preferred_decoding, self.try_unpack
)
field_desc_dict = {
"tag": self._gen_tag_str(),
"wireType": self._wire_type_str(),
"decoding": self._decoding_str(selected_decoding),
"name": self.name,
}
if isinstance(decoded_val, list):
if (
selected_decoding
== ProtoParser.DecodedTypes.message # field is a message with subfields
and not self.is_packed_parent # field is a message, but replaced by packed fields
):
# Field is a message, not packed, thus include it as message header
field_desc_dict["val"] = ""
yield field_desc_dict
# add sub-fields of messages or packed fields
for f in decoded_val:
yield from f.gen_flat_decoded_field_dicts()
else:
field_desc_dict["val"] = decoded_val
yield field_desc_dict
def __init__(
self,
data: bytes,
rules = None,
parser_options: ParserOptions = None,
) -> None:
self.data: bytes = data
if parser_options is None:
parser_options = ProtoParser.ParserOptions()
self.options = parser_options
if rules is None:
rules = []
self.rules = rules
try:
self.root_fields = ProtoParser.read_fields(
wire_data=self.data,
options=self.options,
parent_field=None,
rules=self.rules,
)
except Exception as e:
raise ValueError("not a valid protobuf message") from e
def gen_flat_decoded_field_dicts(self) -> Generator[dict, None, None]:
for f in self.root_fields:
yield from f.gen_flat_decoded_field_dicts()
def gen_str_rows(self) -> Generator[tuple[str, ...], None, None]:
for field_dict in self.gen_flat_decoded_field_dicts():
if (
self.options.exclude_message_headers
and field_dict["decoding"] == "message"
):
continue
if self.options.include_wiretype:
col1 = "[{}->{}]".format(field_dict["wireType"], field_dict["decoding"])
else:
col1 = "[{}]".format(field_dict["decoding"])
col2 = field_dict["name"] # empty string if not set (consumes no space)
col3 = field_dict["tag"]
col4 = str(field_dict["val"])
yield col1, col2, col3, col4
def parse_grpc_messages(
data, compression_scheme
) -> Generator[tuple[bool, bytes], None, None]:
while data:
try:
msg_is_compressed, length = struct.unpack("!?i", data[:5])
decoded_message = struct.unpack("!%is" % length, data[5 : 5 + length])[0]
except Exception as e:
raise ValueError("invalid gRPC message") from e
if msg_is_compressed:
try:
#to implement gzip decoding function
decoded_message = decode(
encoded=decoded_message, encoding=compression_scheme
)
except Exception as e:
raise ValueError("Failed to decompress gRPC message with gzip") from e
yield msg_is_compressed, decoded_message
data = data[5 + length :]
class ProtoSerializer:
@staticmethod
def _write_base128le(proto_tuple: tuple[str, ...]) -> bytearray:
# 0 = decoded type, 1 = name, 2 = tag, 3 = value
decoded_type = proto_tuple[0][1:-1]
res = bytearray(b"")
#remove . from tag of embeded message
tag = proto_tuple[2]
if(proto_tuple[2].find(".") != -1):
tag = tag.split(".")[-1]
#message key is (field_number << 3) | wire_type (varint = 0x00)
res.append(int(tag)<<3|0x00)
#bool
if (ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.bool):
res.append((1 if proto_tuple[3]=="True" else 0))
return res
#uint32, uint64
val = int(proto_tuple[3])
if (ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.int32
or ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.uint32
):
for i in range(4): #32/7 = 4
if(val) == 0:
res = res[:-1]+(res[-1]&0x7F).to_bytes(1,"big")
break
res.append(val&0x7F|0x80)
val = val >> 7
#append remaining 4 bits
if(val) != 0:
res.append(val)
else:
res = res[:-1]+(res[-1]&0x7F).to_bytes(1,"big")
elif (ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.int64
or ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.uint64
):
for i in range(9): #64/7 = 4
if(val) == 0:
res = res[:-1]+(res[-1]&0x7F).to_bytes(1,"big")
break
res.append(val&0x7F|0x80)
val = val >> 7
#append remaining 1 bits
if(val) != 0:
res.append(val)
else:
res = res[:-1]+(res[-1]&0x7F).to_bytes(1,"big")
elif (ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.bool):
res.append((1 if val=="True" else 0))
return res
def _write_u32(proto_tuple: tuple[str, ...]) -> bytearray:
# 0 = decoded type, 1 = name, 2 = tag, 3 = value
decoded_type = proto_tuple[0][1:-1]
res = bytearray(b"")
#remove . from tag of embeded message
tag = proto_tuple[2]
if(proto_tuple[2].find(".") != -1):
tag = tag.split(".")[-1]
#message key is (field_number << 3) | wire_type (32-bit = 0x05)
res.append(int(tag)<<3|0x05)
return res+int(proto_tuple[3]).to_bytes(4,'little')
def _write_u64(proto_tuple: tuple[str, ...]) -> bytearray:
# 0 = decoded type, 1 = name, 2 = tag, 3 = value
decoded_type = proto_tuple[0][1:-1]
res = bytearray(b"")
#remove . from tag of embeded message
tag = proto_tuple[2]
if(proto_tuple[2].find(".") != -1):
tag = tag.split(".")[-1]
#message key is (field_number << 3) | wire_type (64-bit = 0x01)
res.append(int(tag)<<3|0x01)
return res+int(proto_tuple[3]).to_bytes(8,'little')
def _write_length_delimited(proto_tuple: tuple[str, ...]) -> bytearray:
# 0 = decoded type, 1 = name, 2 = tag, 3 = value
decoded_type = proto_tuple[0][1:-1]
res = bytearray(b"")
#remove . from tag of embeded message
tag = proto_tuple[2]
if(proto_tuple[2].find(".") != -1):
tag = tag.split(".")[-1]
#message key is (field_number << 3) | wire_type (64-bit = 0x01)
res.append((int(tag)<<3|0x02))
return res+ ProtoSerializer._int_to_base128le(len(proto_tuple[3])) + bytes(proto_tuple[3], 'latin1')
def _int_to_base128le(val: int) -> bytearray:
res = b""
bl = val.bit_length()
if bl > 32:
for i in range(9): #64/7 = 4
if(val) == 0:
res = res[:-1]+(res[-1]&0x7F).to_bytes(1,"big")
break
res += (val&0x7F|0x80).to_bytes(1,"big")
val = val >> 7
#append remaining 1 bits
if(val) != 0:
res += val.to_bytes(1,"big")
else:
for i in range(4): #32/7 = 4
if(val) == 0:
res = res[:-1]+(res[-1]&0x7F).to_bytes(1,"big")
break
res += (val&0x7F|0x80).to_bytes(1,"big")
val = val >> 7
if(val) != 0:
res += val.to_bytes(1,"big")
return res
def serializeProto(proto_tuples: list[tuple[str,...]]):
res = b""
found_embeded_message = False
embeded_message = []
for i in range(len(proto_tuples)):
proto_tuple = proto_tuples[i]
# 0 = decoded type, 1 = name, 2 = tag, 3 = value
decoded_type = proto_tuple[0][1:-1]
if found_embeded_message == True:
if proto_tuple[2].find(".") != -1:
embeded_message.append(proto_tuple)
if(i==len(proto_tuples)-1):
embeded_message_bytes = ProtoSerializer.serializeProto(embeded_message)
res += ProtoSerializer._int_to_base128le(len(embeded_message_bytes)) + embeded_message_bytes
continue
embeded_message_bytes = ProtoSerializer.serializeProto(embeded_message)
res += ProtoSerializer._int_to_base128le(len(embeded_message_bytes)) + embeded_message_bytes
found_embeded_message = False
if (ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.int32
or ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.int64
or ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.uint32
or ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.uint64
or ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.sint32
or ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.sint64
or ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.bool
or ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.enum
):
res += ProtoSerializer._write_base128le(proto_tuple)
elif (ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.fixed64):
res += ProtoSerializer._write_u64(proto_tuple)
elif (ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.fixed32):
res += ProtoSerializer._write_u32(proto_tuple)
elif (ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.string):
res += ProtoSerializer._write_length_delimited(proto_tuple)
elif (ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.bytes):
tmp = list(proto_tuple)
#removing prefix b' and suffix '
tmp[3] = proto_tuple[3][2:-1]
tmp[3] = tmp[3].encode('utf-8').decode('unicode_escape').encode('latin1').decode('latin1')
res += ProtoSerializer._write_length_delimited(tuple(tmp))
elif (ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.message):
tag = proto_tuple[2]
if(proto_tuple[2].find(".") != -1):
tag = tag.split(".")[-1]
res += (int(tag)<<3|0x02).to_bytes(1,'big')
found_embeded_message=True
continue
return res
def serializeGrpc(proto: bytes):
ctx.log.info(len(proto))
ctx.log.info(len(proto).to_bytes(4, 'big'))
res = b"\x00"+ len(proto).to_bytes(4, 'big') +proto
return res
def string_to_tuples_list(text: str):
result = ""
text = text.strip()
for each_line in text:
result += each_line.replace("\n", ",")
result = '[' + result.strip() + ']'
return ast.literal_eval(result)
class MyAddon:
def __init__(self):
self.num = 0
def get_editor(self) -> str:
# based upon https://github.com/pallets/click/blob/main/src/click/_termui_impl.py
if m := os.environ.get("MITMPROXY_EDITOR"):
return m
if m := os.environ.get("EDITOR"):
return m
for editor in "sensible-editor", "nano", "vim":
if shutil.which(editor):
return editor
if os.name == "nt":
return "notepad"
else:
return "vi"
@command.command("grpc.edit")
def edit(self, flow: flow.Flow):
http_flow = cast(http.HTTPFlow, flow)
content = None
ctx.log.info("This is some informative text.")
if http_flow.response is not None:
content = http_flow.response.content
else:
content = http_flow.request.content
data = ""
i = 0
for compressed, pb_message in ProtoParser.parse_grpc_messages(content,"compression_alg"):
pb_text = ""
for field in ProtoParser(
data=pb_message, parser_options=None, rules=None
).gen_str_rows():
pb_text = pb_text + str(field) + '\n'
data += "=== Protobuf message #" + str(i) + " ===\n" + pb_text
i = i+1
text = not isinstance(data, bytes)
fd, name = tempfile.mkstemp("", "mitmproxy", text=text)
with open(fd, "w" if text else "wb") as f:
f.write(data)
c = self.get_editor()
cmd = shlex.split(c)
cmd.append(name)
modified_data = None
with ctx.master.uistopped():
try:
subprocess.call(cmd)
except:
signals.status_message.send(message="Can't start editor: %s" % c)
else:
with open(name, "r" if text else "rb") as f:
modified_data = f.read()
os.unlink(name)
modified_body = b""
for pb_message in modified_data.split("===")[2::2]:
modified_body += ProtoSerializer.serializeGrpc(ProtoSerializer.serializeProto(string_to_tuples_list(pb_message)))
if http_flow.response is not None:
http_flow.response.content = modified_body
else:
http_flow.request.content = modified_body
ctx.master.window.focus_changed()
addons = [MyAddon()]
@wtfiwtz
Copy link

wtfiwtz commented Nov 29, 2022

Great tool... however, I get this:

error: [15:03:01.492] Traceback (most recent call last):

  File "grpc-edit.py", line 629, in parse_grpc_messages
    decoded_message = decode(
                      ^^^^^^

NameError: name 'decode' is not defined


The above exception was the direct cause of the following exception:


Traceback (most recent call last):

Looks like I have to disable the compression somehow.

@Hacktivate-TH
Copy link
Author

Hi @wtfiwtz,

Unfortunately, so far we haven't implemented the support for compressed gRPC messages. It seems like you are dealing with compressed gRPC so the error occurs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment