Last active
October 14, 2018 04:02
-
-
Save klardotsh/cfe3038c59abbc6bb15be4cc6127497c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Copy this to the CIRCUITPY MSC drive on a CircuitPython board, | |
then compress it: | |
gzip -k example_module.py | |
Then you can do something like the following: | |
>>> from import_gzip import import_gzip | |
>>> module = import_gzip('example_module.py.gz') | |
>>> module.make_a_thing() | |
MyThing() | |
Requires CircuitPython built with https://github.com/klardotsh/circuitpython/commit/2f1d594c0a1fdad64b3e5fa2b2d48c1fffe1fcdb | |
This file is public domain | |
''' | |
class MyThing: | |
def __repr__(self): | |
return 'MyThing()' | |
def make_a_thing(): | |
return MyThing() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Header implementation only slightly modified by me, otherwise copied from | |
https://github.com/seomoz/gzippy/blob/master/gzippy/header.py, and thus | |
licensed MIT, Copyright (c) 2016 SEOmoz | |
GzippedModule, eval_module, and import_gzip all mine, licensed MIT | |
Copyright 2018 Josh Klar <josh@klar.sh> | |
Requires CircuitPython built with https://github.com/klardotsh/circuitpython/commit/2f1d594c0a1fdad64b3e5fa2b2d48c1fffe1fcdb | |
''' | |
import gc | |
import struct | |
import time | |
import uzlib | |
# pylint: disable=too-many-instance-attributes | |
class Header(object): | |
'''Read, write, hold Gzip file headers.''' | |
# Constatnts | |
MAGIC = b'\x1F\x8B' | |
METHOD = b'\x08' # zlib.DEFLATED | |
# Flags | |
FTEXT = 0x1 | |
FMULTI = 0x2 | |
FEXTRA = 0x4 | |
FNAME = 0x8 | |
FCOMMENT = 0x10 | |
# The attributes for this class | |
__slots__ = ( | |
'flags', | |
'mtime', | |
'extra_flags', | |
'os_flags', | |
'extra', | |
'name', | |
'comment', | |
) | |
# pylint: disable=too-many-arguments | |
def __init__(self, flags=0, mtime=None, extra_flags=2, os_flags=255, extra=None, | |
name=None, comment=None): | |
self.flags = flags | |
self.mtime = mtime or int(time.time()) | |
self.extra_flags = extra_flags | |
self.os_flags = os_flags | |
self.extra = extra | |
self.name = name | |
self.comment = comment | |
@classmethod | |
def read_null_terminated(cls, fobj): | |
'''Read a null-terminated string.''' | |
result = b'' | |
while True: | |
char = fobj.read(1) | |
if not char or char == b'\x00': | |
break | |
result += char | |
return result | |
@classmethod | |
def read(cls, fobj): | |
'''Read in headers from a file object.''' | |
if fobj.read(2) != cls.MAGIC: | |
raise Error('Invalid magic.') | |
if fobj.read(1) != cls.METHOD: | |
raise Error('Unknown compression method') | |
flags = ord(fobj.read(1)) | |
mtime = struct.unpack('<L', fobj.read(4))[0] | |
extra_flags = ord(fobj.read(1)) | |
os_flags = ord(fobj.read(1)) | |
extra = None | |
if flags & cls.FEXTRA: | |
length = struct.unpack('<H', fobj.read(2))[0] | |
extra = fobj.read(length) | |
name = None | |
if flags & cls.FNAME: | |
raw = cls.read_null_terminated(fobj) | |
name = str(raw, 'utf-8') | |
comment = None | |
if flags & cls.FCOMMENT: | |
comment = cls.read_null_terminated(fobj).decode('latin-1') | |
if flags & cls.FMULTI: | |
raise ValueError('Multipart gzips not supported.') | |
return cls(flags, mtime, extra_flags, os_flags, extra, name, | |
comment) | |
def write(self, fobj): | |
'''Write this header to a file object.''' | |
self.flags = ( | |
(self.FNAME if (self.name is not None) else 0) | | |
(self.FEXTRA if (self.extra is not None) else 0) | | |
(self.FCOMMENT if (self.comment is not None) else 0)) | |
fobj.write(self.MAGIC) # magic header | |
fobj.write(self.METHOD) # compression method | |
fobj.write(chr(self.flags)) # flags | |
fobj.write(struct.pack('<L', self.mtime)) # Modification time | |
fobj.write(chr(self.extra_flags)) # eXtra FLags (max compression) | |
fobj.write(chr(self.os_flags)) # Operating system (unknown) | |
if (self.flags & self.FEXTRA) and self.extra: | |
fobj.write(struct.pack('<H', len(self.extra))) | |
fobj.write(self.extra) | |
if (self.flags & self.FNAME) and self.name: | |
fobj.write(self.name) | |
fobj.write('\000') | |
if (self.flags & self.FCOMMENT) and self.comment: | |
fobj.write(self.comment) | |
fobj.write('\000') | |
def __repr__(self): | |
joined = ', '.join( | |
'%s=%s' % (attr, getattr(self, attr)) for attr in self.__slots__) | |
return '<Gzip.Header %s>' % joined | |
def __eq__(self, other): | |
return all( | |
getattr(self, attr) == getattr(other, attr) for attr in self.__slots__) | |
class GzippedModule: | |
def __init__(self, name, filename, attrs=None): | |
self.__name__ = name | |
self.__file__ = filename | |
if attrs is not None: | |
for k, v in attrs.items(): | |
setattr(self, k, v) | |
def __repr__(self): | |
return "<gzipped module '{}' from '{}'>".format( | |
self.__name__, | |
self.__file__, | |
) | |
def eval_module(mstr, name=None, file=None): | |
# generate a new local scope to avoid weird "undefined" errors | |
# this simulates the bizarre file scope that Python creates, which I | |
# don't fully understand, however if global and local scope are not | |
# synced within the exec'd file, things get _bizarre_ - class | |
# definitions will work, and are even usable from the `module` object | |
# we return below, but will be inaccessible from anywhere else within | |
# the file being exec'd - meaning a function within module wouldn't be | |
# able to access a class also within module | |
scope = dict() | |
if name is not None: | |
scope['__name__'] = name | |
if file is not None: | |
scope['__file__'] = file | |
exec(mstr, scope, scope) | |
return scope | |
def import_gzip(path): | |
with open(path, 'rb') as gz_file: | |
header = Header.read(gz_file) | |
module_bytes = uzlib.decompress(gz_file.read(), -1) | |
filename = header.name or path | |
module_str = str(module_bytes, 'utf-8') | |
del module_bytes | |
gc.collect() | |
module = eval_module(module_str, file=filename) | |
# forcibly clean up after ourselves | |
del header | |
del module_str | |
gc.collect() | |
return GzippedModule( | |
filename[:-3] if filename.endswith('.py') else filename, | |
filename, | |
module, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment