Last active
May 26, 2020 08:21
-
-
Save pushfoo/c2e75b807fdfedc7112421c7aba79754 to your computer and use it in GitHub Desktop.
Part of a rejected refactor for prayer, a creatures PRAY library
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Part of a refactor of prayer (https://github.com/Creatures-Developer-Network/prayer) | |
Initially rejected for being too radical a change. | |
This approach might not be the best for final use. The main goals were: | |
1. always know the size it would take up when written so we can write directly to streams instead of caching & copying | |
2. try to provide a path toward a nested object structure | |
""" | |
class TagTable(MutableMapping): | |
""" | |
Dictionary-like tag table that keeps track of size when written as bytes. | |
This implementation is rough. Changes to consider : | |
* provide a read-only view object for encoded values & keys | |
* support for byte types in __getitem__ to avoid unneeded type conversion | |
* avoid full value deletion for integers; their length is constant | |
* refactor to use a general serializable objects approach | |
""" | |
def __init__( | |
self, | |
values_type: type, | |
source: Union[Mapping[str, IntOrStr], ByteSource] = None, | |
): | |
""" | |
Build a tag table that maps strings to a value type. | |
:param values_type: the type of value this will hold. | |
:param source: | |
""" | |
if values_type not in {int, str}: | |
raise TypeError( | |
f"Tag tables can only hold ints" | |
f" or strings, not {type(values_type)}") | |
self.values_type: type = values_type | |
self._original_pairs: Dict[str, IntOrStr] = {} | |
# table always starts with a 32-bit length header | |
self.size: int = 4 | |
# store bytes and lengths as we add them so we can | |
# always track the size of the table in bytes. | |
self._encoded_keys: Dict[str, bytes] = {} | |
self._encoded_key_lengths: Dict[str, int] = {} | |
self._encoded_values: Dict[str, Union[int, bytes]] = {} | |
self._encoded_value_lengths: Dict[str, int] = {} | |
if source: | |
if isinstance(source, Mapping): | |
for key, value in source.items(): | |
self[key] = value | |
elif isinstance(source, BufferedIOBase): | |
raise NotImplementedError("Reading from stream not yet added") | |
else: | |
raise TypeError( | |
"Source for TagTables must be either" | |
" a dict or a a binary stream") | |
def _subtract_length_and_purge( | |
self, | |
size_dict: Dict[str, int], | |
key: str) -> None: | |
""" | |
Remove an entry from passed dict, subtract its size from total size | |
:param size_dict: a reference to the dict to remove from | |
:param key: the key to purge from the dict | |
:return: | |
""" | |
self.size -= size_dict[key] | |
del size_dict[key] | |
def __delitem__(self, key: str): | |
""" | |
Clear the key & its value from all internal dicts, update table size | |
:param key: the key to remove | |
""" | |
if not isinstance(key, str): | |
raise TypeError("Key must be a string") | |
if key not in self._original_pairs: | |
raise KeyError(key) | |
self._subtract_length_and_purge(self._encoded_key_lengths, key) | |
self._subtract_length_and_purge(self._encoded_value_lengths, key) | |
del self._encoded_keys[key] | |
del self._encoded_values[key] | |
del self._original_pairs[key] | |
def _encode_and_store_len_prefixed_string( | |
self, | |
key: str, | |
to_encode: str, | |
encoded_storage: Dict[str, bytes], | |
length_storage: Dict[str, int] | |
) -> None: | |
""" | |
Encode a string, store the result and its length, update self.size | |
Encode is to latin-1 ascii. The encoded string is prefixed with its | |
length as an unsigned 32-bit integer in little endian byte order | |
before being stored. | |
Both the encode and the length are stored under the passed key. | |
:param key: the key value for this string | |
:param to_encode: the string that will be encoded under the key | |
:param encoded_storage: a dict to store the encoded string under | |
:param length_storage: a dict to store the length of the string under | |
:return: None | |
""" | |
encoded_partial = to_encode.encode("latin-1") | |
# bytestring formatting trick means no bytearrays needed | |
final_encode = b"%s%s" % ( | |
len(encoded_partial).to_bytes(4, byteorder="little"), | |
encoded_partial | |
) | |
final_length = len(final_encode) | |
encoded_storage[key] = final_encode | |
length_storage[key] = final_length | |
self.size += final_length | |
def __setitem__(self, key: str, value: IntOrStr): | |
""" | |
Set the value for the key->value mapping and update the size count | |
The type of the value must be match self.values_type | |
The following takes place: | |
* delete any prior entries and decrement the byte size of the encoding | |
* encode and store the tag name | |
* encode and store the tag value | |
* add the length for both to the size of the block | |
:param key: the key to add an entry for | |
:param value: the value to add, either an int or string | |
:return: | |
""" | |
if not isinstance(key, str): | |
raise TypeError("Key must be a string") | |
if not isinstance(value, self.values_type): | |
raise TypeError( | |
f"Expected a {self.values_type}, got {type(value)}({value})") | |
# purge old size info | |
if key in self._original_pairs: | |
del self[key] | |
# store original, encode key | |
self._original_pairs[key] = value | |
self._encode_and_store_len_prefixed_string( | |
key, | |
key, | |
self._encoded_keys, | |
self._encoded_key_lengths | |
) | |
# store values | |
if isinstance(value, int): | |
self._encoded_values[key] = value.to_bytes(4, byteorder="little") | |
self._encoded_value_lengths[key] = 4 | |
self.size += 4 | |
else: | |
self._encode_and_store_len_prefixed_string( | |
key, | |
value, | |
self._encoded_values, | |
self._encoded_value_lengths | |
) | |
def update( | |
self, | |
E: Mapping[str, IntOrStr] = None, | |
**kwargs: Mapping[str, IntOrStr] | |
) -> None: | |
""" | |
Update from the passed mapping, keyword args, or both. | |
The name E breaks PEP-8, but does so intentionally to match the python | |
built-in API's way of declaring update. E is a name that shouldn't come | |
up as a valid keyword argument name. | |
Although the python documentation for dicts says it will accept from | |
either/or of these arguments, it does not seem to be exclusive when | |
tested in CPython 3.7. This implementation matches that behavior. | |
:param E: a mapping to get args from | |
:param kwargs: they keyword args to the function | |
:return: | |
""" | |
if E: | |
for key, value in E.items(): | |
self[key] = value | |
for key, value in kwargs.items(): | |
self[key] = value | |
def __getitem__(self, key): | |
""" | |
Retrieve original values | |
:param key: | |
:return: | |
""" | |
return self._original_pairs[key] | |
def __len__(self): | |
""" | |
Return length of this mapping | |
:return: | |
""" | |
return len(self._original_pairs) | |
def __iter__(self): | |
""" | |
Return iterator over key/value pairs | |
:return: | |
""" | |
return self._original_pairs.items() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment