|
''' |
|
Header implementation only slightly modified by me, otherwise copied from |
|
https://github.com/seomoz/gzippy/blob/master/gzippy/header.py, and thus |
|
licensed MIT, Copyright (c) 2016 SEOmoz |
|
|
|
GzippedModule, eval_module, and import_gzip all mine, licensed MIT |
|
Copyright 2018 Josh Klar <josh@klar.sh> |
|
|
|
Requires CircuitPython built with https://github.com/klardotsh/circuitpython/commit/2f1d594c0a1fdad64b3e5fa2b2d48c1fffe1fcdb |
|
''' |
|
|
|
import gc |
|
import struct |
|
import time |
|
import uzlib |
|
|
|
|
|
# pylint: disable=too-many-instance-attributes |
|
class Header(object): |
|
'''Read, write, hold Gzip file headers.''' |
|
|
|
# Constatnts |
|
MAGIC = b'\x1F\x8B' |
|
METHOD = b'\x08' # zlib.DEFLATED |
|
|
|
# Flags |
|
FTEXT = 0x1 |
|
FMULTI = 0x2 |
|
FEXTRA = 0x4 |
|
FNAME = 0x8 |
|
FCOMMENT = 0x10 |
|
|
|
# The attributes for this class |
|
__slots__ = ( |
|
'flags', |
|
'mtime', |
|
'extra_flags', |
|
'os_flags', |
|
'extra', |
|
'name', |
|
'comment', |
|
) |
|
|
|
# pylint: disable=too-many-arguments |
|
def __init__(self, flags=0, mtime=None, extra_flags=2, os_flags=255, extra=None, |
|
name=None, comment=None): |
|
self.flags = flags |
|
self.mtime = mtime or int(time.time()) |
|
self.extra_flags = extra_flags |
|
self.os_flags = os_flags |
|
self.extra = extra |
|
self.name = name |
|
self.comment = comment |
|
|
|
@classmethod |
|
def read_null_terminated(cls, fobj): |
|
'''Read a null-terminated string.''' |
|
result = b'' |
|
while True: |
|
char = fobj.read(1) |
|
if not char or char == b'\x00': |
|
break |
|
result += char |
|
return result |
|
|
|
@classmethod |
|
def read(cls, fobj): |
|
'''Read in headers from a file object.''' |
|
if fobj.read(2) != cls.MAGIC: |
|
raise Error('Invalid magic.') |
|
if fobj.read(1) != cls.METHOD: |
|
raise Error('Unknown compression method') |
|
flags = ord(fobj.read(1)) |
|
mtime = struct.unpack('<L', fobj.read(4))[0] |
|
extra_flags = ord(fobj.read(1)) |
|
os_flags = ord(fobj.read(1)) |
|
|
|
extra = None |
|
if flags & cls.FEXTRA: |
|
length = struct.unpack('<H', fobj.read(2))[0] |
|
extra = fobj.read(length) |
|
|
|
name = None |
|
if flags & cls.FNAME: |
|
raw = cls.read_null_terminated(fobj) |
|
name = str(raw, 'utf-8') |
|
|
|
comment = None |
|
if flags & cls.FCOMMENT: |
|
comment = cls.read_null_terminated(fobj).decode('latin-1') |
|
|
|
if flags & cls.FMULTI: |
|
raise ValueError('Multipart gzips not supported.') |
|
|
|
return cls(flags, mtime, extra_flags, os_flags, extra, name, |
|
comment) |
|
|
|
def write(self, fobj): |
|
'''Write this header to a file object.''' |
|
self.flags = ( |
|
(self.FNAME if (self.name is not None) else 0) | |
|
(self.FEXTRA if (self.extra is not None) else 0) | |
|
(self.FCOMMENT if (self.comment is not None) else 0)) |
|
|
|
fobj.write(self.MAGIC) # magic header |
|
fobj.write(self.METHOD) # compression method |
|
fobj.write(chr(self.flags)) # flags |
|
fobj.write(struct.pack('<L', self.mtime)) # Modification time |
|
fobj.write(chr(self.extra_flags)) # eXtra FLags (max compression) |
|
fobj.write(chr(self.os_flags)) # Operating system (unknown) |
|
|
|
if (self.flags & self.FEXTRA) and self.extra: |
|
fobj.write(struct.pack('<H', len(self.extra))) |
|
fobj.write(self.extra) |
|
|
|
if (self.flags & self.FNAME) and self.name: |
|
fobj.write(self.name) |
|
fobj.write('\000') |
|
|
|
if (self.flags & self.FCOMMENT) and self.comment: |
|
fobj.write(self.comment) |
|
fobj.write('\000') |
|
|
|
def __repr__(self): |
|
joined = ', '.join( |
|
'%s=%s' % (attr, getattr(self, attr)) for attr in self.__slots__) |
|
return '<Gzip.Header %s>' % joined |
|
|
|
def __eq__(self, other): |
|
return all( |
|
getattr(self, attr) == getattr(other, attr) for attr in self.__slots__) |
|
|
|
class GzippedModule: |
|
def __init__(self, name, filename, attrs=None): |
|
self.__name__ = name |
|
self.__file__ = filename |
|
|
|
if attrs is not None: |
|
for k, v in attrs.items(): |
|
setattr(self, k, v) |
|
|
|
def __repr__(self): |
|
return "<gzipped module '{}' from '{}'>".format( |
|
self.__name__, |
|
self.__file__, |
|
) |
|
|
|
|
|
def eval_module(mstr, name=None, file=None): |
|
# generate a new local scope to avoid weird "undefined" errors |
|
# this simulates the bizarre file scope that Python creates, which I |
|
# don't fully understand, however if global and local scope are not |
|
# synced within the exec'd file, things get _bizarre_ - class |
|
# definitions will work, and are even usable from the `module` object |
|
# we return below, but will be inaccessible from anywhere else within |
|
# the file being exec'd - meaning a function within module wouldn't be |
|
# able to access a class also within module |
|
scope = dict() |
|
|
|
if name is not None: |
|
scope['__name__'] = name |
|
|
|
if file is not None: |
|
scope['__file__'] = file |
|
|
|
exec(mstr, scope, scope) |
|
return scope |
|
|
|
|
|
def import_gzip(path): |
|
with open(path, 'rb') as gz_file: |
|
header = Header.read(gz_file) |
|
module_bytes = uzlib.decompress(gz_file.read(), -1) |
|
|
|
filename = header.name or path |
|
module_str = str(module_bytes, 'utf-8') |
|
del module_bytes |
|
gc.collect() |
|
|
|
module = eval_module(module_str, file=filename) |
|
|
|
# forcibly clean up after ourselves |
|
del header |
|
del module_str |
|
gc.collect() |
|
|
|
return GzippedModule( |
|
filename[:-3] if filename.endswith('.py') else filename, |
|
filename, |
|
module, |
|
) |