Skip to content

Instantly share code, notes, and snippets.

@pushfoo
Last active May 26, 2020 08:21
Show Gist options
  • Save pushfoo/c2e75b807fdfedc7112421c7aba79754 to your computer and use it in GitHub Desktop.
Save pushfoo/c2e75b807fdfedc7112421c7aba79754 to your computer and use it in GitHub Desktop.
Part of a rejected refactor for prayer, a creatures PRAY library
"""
Part of a refactor of prayer (https://github.com/Creatures-Developer-Network/prayer)
Initially rejected for being too radical a change.
This approach might not be the best for final use. The main goals were:
1. always know the size it would take up when written so we can write directly to streams instead of caching & copying
2. try to provide a path toward a nested object structure
"""
class TagTable(MutableMapping):
"""
Dictionary-like tag table that keeps track of size when written as bytes.
This implementation is rough. Changes to consider :
* provide a read-only view object for encoded values & keys
* support for byte types in __getitem__ to avoid unneeded type conversion
* avoid full value deletion for integers; their length is constant
* refactor to use a general serializable objects approach
"""
def __init__(
self,
values_type: type,
source: Union[Mapping[str, IntOrStr], ByteSource] = None,
):
"""
Build a tag table that maps strings to a value type.
:param values_type: the type of value this will hold.
:param source:
"""
if values_type not in {int, str}:
raise TypeError(
f"Tag tables can only hold ints"
f" or strings, not {type(values_type)}")
self.values_type: type = values_type
self._original_pairs: Dict[str, IntOrStr] = {}
# table always starts with a 32-bit length header
self.size: int = 4
# store bytes and lengths as we add them so we can
# always track the size of the table in bytes.
self._encoded_keys: Dict[str, bytes] = {}
self._encoded_key_lengths: Dict[str, int] = {}
self._encoded_values: Dict[str, Union[int, bytes]] = {}
self._encoded_value_lengths: Dict[str, int] = {}
if source:
if isinstance(source, Mapping):
for key, value in source.items():
self[key] = value
elif isinstance(source, BufferedIOBase):
raise NotImplementedError("Reading from stream not yet added")
else:
raise TypeError(
"Source for TagTables must be either"
" a dict or a a binary stream")
def _subtract_length_and_purge(
self,
size_dict: Dict[str, int],
key: str) -> None:
"""
Remove an entry from passed dict, subtract its size from total size
:param size_dict: a reference to the dict to remove from
:param key: the key to purge from the dict
:return:
"""
self.size -= size_dict[key]
del size_dict[key]
def __delitem__(self, key: str):
"""
Clear the key & its value from all internal dicts, update table size
:param key: the key to remove
"""
if not isinstance(key, str):
raise TypeError("Key must be a string")
if key not in self._original_pairs:
raise KeyError(key)
self._subtract_length_and_purge(self._encoded_key_lengths, key)
self._subtract_length_and_purge(self._encoded_value_lengths, key)
del self._encoded_keys[key]
del self._encoded_values[key]
del self._original_pairs[key]
def _encode_and_store_len_prefixed_string(
self,
key: str,
to_encode: str,
encoded_storage: Dict[str, bytes],
length_storage: Dict[str, int]
) -> None:
"""
Encode a string, store the result and its length, update self.size
Encode is to latin-1 ascii. The encoded string is prefixed with its
length as an unsigned 32-bit integer in little endian byte order
before being stored.
Both the encode and the length are stored under the passed key.
:param key: the key value for this string
:param to_encode: the string that will be encoded under the key
:param encoded_storage: a dict to store the encoded string under
:param length_storage: a dict to store the length of the string under
:return: None
"""
encoded_partial = to_encode.encode("latin-1")
# bytestring formatting trick means no bytearrays needed
final_encode = b"%s%s" % (
len(encoded_partial).to_bytes(4, byteorder="little"),
encoded_partial
)
final_length = len(final_encode)
encoded_storage[key] = final_encode
length_storage[key] = final_length
self.size += final_length
def __setitem__(self, key: str, value: IntOrStr):
"""
Set the value for the key->value mapping and update the size count
The type of the value must be match self.values_type
The following takes place:
* delete any prior entries and decrement the byte size of the encoding
* encode and store the tag name
* encode and store the tag value
* add the length for both to the size of the block
:param key: the key to add an entry for
:param value: the value to add, either an int or string
:return:
"""
if not isinstance(key, str):
raise TypeError("Key must be a string")
if not isinstance(value, self.values_type):
raise TypeError(
f"Expected a {self.values_type}, got {type(value)}({value})")
# purge old size info
if key in self._original_pairs:
del self[key]
# store original, encode key
self._original_pairs[key] = value
self._encode_and_store_len_prefixed_string(
key,
key,
self._encoded_keys,
self._encoded_key_lengths
)
# store values
if isinstance(value, int):
self._encoded_values[key] = value.to_bytes(4, byteorder="little")
self._encoded_value_lengths[key] = 4
self.size += 4
else:
self._encode_and_store_len_prefixed_string(
key,
value,
self._encoded_values,
self._encoded_value_lengths
)
def update(
self,
E: Mapping[str, IntOrStr] = None,
**kwargs: Mapping[str, IntOrStr]
) -> None:
"""
Update from the passed mapping, keyword args, or both.
The name E breaks PEP-8, but does so intentionally to match the python
built-in API's way of declaring update. E is a name that shouldn't come
up as a valid keyword argument name.
Although the python documentation for dicts says it will accept from
either/or of these arguments, it does not seem to be exclusive when
tested in CPython 3.7. This implementation matches that behavior.
:param E: a mapping to get args from
:param kwargs: they keyword args to the function
:return:
"""
if E:
for key, value in E.items():
self[key] = value
for key, value in kwargs.items():
self[key] = value
def __getitem__(self, key):
"""
Retrieve original values
:param key:
:return:
"""
return self._original_pairs[key]
def __len__(self):
"""
Return length of this mapping
:return:
"""
return len(self._original_pairs)
def __iter__(self):
"""
Return iterator over key/value pairs
:return:
"""
return self._original_pairs.items()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment