Skip to content

Instantly share code, notes, and snippets.

@klardotsh
Last active October 14, 2018 04:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save klardotsh/cfe3038c59abbc6bb15be4cc6127497c to your computer and use it in GitHub Desktop.
Save klardotsh/cfe3038c59abbc6bb15be4cc6127497c to your computer and use it in GitHub Desktop.
'''
Copy this to the CIRCUITPY MSC drive on a CircuitPython board,
then compress it:
gzip -k example_module.py
Then you can do something like the following:
>>> from import_gzip import import_gzip
>>> module = import_gzip('example_module.py.gz')
>>> module.make_a_thing()
MyThing()
Requires CircuitPython built with https://github.com/klardotsh/circuitpython/commit/2f1d594c0a1fdad64b3e5fa2b2d48c1fffe1fcdb
This file is public domain
'''
class MyThing:
def __repr__(self):
return 'MyThing()'
def make_a_thing():
return MyThing()
'''
Header implementation only slightly modified by me, otherwise copied from
https://github.com/seomoz/gzippy/blob/master/gzippy/header.py, and thus
licensed MIT, Copyright (c) 2016 SEOmoz
GzippedModule, eval_module, and import_gzip all mine, licensed MIT
Copyright 2018 Josh Klar <josh@klar.sh>
Requires CircuitPython built with https://github.com/klardotsh/circuitpython/commit/2f1d594c0a1fdad64b3e5fa2b2d48c1fffe1fcdb
'''
import gc
import struct
import time
import uzlib
# pylint: disable=too-many-instance-attributes
class Header(object):
'''Read, write, hold Gzip file headers.'''
# Constatnts
MAGIC = b'\x1F\x8B'
METHOD = b'\x08' # zlib.DEFLATED
# Flags
FTEXT = 0x1
FMULTI = 0x2
FEXTRA = 0x4
FNAME = 0x8
FCOMMENT = 0x10
# The attributes for this class
__slots__ = (
'flags',
'mtime',
'extra_flags',
'os_flags',
'extra',
'name',
'comment',
)
# pylint: disable=too-many-arguments
def __init__(self, flags=0, mtime=None, extra_flags=2, os_flags=255, extra=None,
name=None, comment=None):
self.flags = flags
self.mtime = mtime or int(time.time())
self.extra_flags = extra_flags
self.os_flags = os_flags
self.extra = extra
self.name = name
self.comment = comment
@classmethod
def read_null_terminated(cls, fobj):
'''Read a null-terminated string.'''
result = b''
while True:
char = fobj.read(1)
if not char or char == b'\x00':
break
result += char
return result
@classmethod
def read(cls, fobj):
'''Read in headers from a file object.'''
if fobj.read(2) != cls.MAGIC:
raise Error('Invalid magic.')
if fobj.read(1) != cls.METHOD:
raise Error('Unknown compression method')
flags = ord(fobj.read(1))
mtime = struct.unpack('<L', fobj.read(4))[0]
extra_flags = ord(fobj.read(1))
os_flags = ord(fobj.read(1))
extra = None
if flags & cls.FEXTRA:
length = struct.unpack('<H', fobj.read(2))[0]
extra = fobj.read(length)
name = None
if flags & cls.FNAME:
raw = cls.read_null_terminated(fobj)
name = str(raw, 'utf-8')
comment = None
if flags & cls.FCOMMENT:
comment = cls.read_null_terminated(fobj).decode('latin-1')
if flags & cls.FMULTI:
raise ValueError('Multipart gzips not supported.')
return cls(flags, mtime, extra_flags, os_flags, extra, name,
comment)
def write(self, fobj):
'''Write this header to a file object.'''
self.flags = (
(self.FNAME if (self.name is not None) else 0) |
(self.FEXTRA if (self.extra is not None) else 0) |
(self.FCOMMENT if (self.comment is not None) else 0))
fobj.write(self.MAGIC) # magic header
fobj.write(self.METHOD) # compression method
fobj.write(chr(self.flags)) # flags
fobj.write(struct.pack('<L', self.mtime)) # Modification time
fobj.write(chr(self.extra_flags)) # eXtra FLags (max compression)
fobj.write(chr(self.os_flags)) # Operating system (unknown)
if (self.flags & self.FEXTRA) and self.extra:
fobj.write(struct.pack('<H', len(self.extra)))
fobj.write(self.extra)
if (self.flags & self.FNAME) and self.name:
fobj.write(self.name)
fobj.write('\000')
if (self.flags & self.FCOMMENT) and self.comment:
fobj.write(self.comment)
fobj.write('\000')
def __repr__(self):
joined = ', '.join(
'%s=%s' % (attr, getattr(self, attr)) for attr in self.__slots__)
return '<Gzip.Header %s>' % joined
def __eq__(self, other):
return all(
getattr(self, attr) == getattr(other, attr) for attr in self.__slots__)
class GzippedModule:
def __init__(self, name, filename, attrs=None):
self.__name__ = name
self.__file__ = filename
if attrs is not None:
for k, v in attrs.items():
setattr(self, k, v)
def __repr__(self):
return "<gzipped module '{}' from '{}'>".format(
self.__name__,
self.__file__,
)
def eval_module(mstr, name=None, file=None):
# generate a new local scope to avoid weird "undefined" errors
# this simulates the bizarre file scope that Python creates, which I
# don't fully understand, however if global and local scope are not
# synced within the exec'd file, things get _bizarre_ - class
# definitions will work, and are even usable from the `module` object
# we return below, but will be inaccessible from anywhere else within
# the file being exec'd - meaning a function within module wouldn't be
# able to access a class also within module
scope = dict()
if name is not None:
scope['__name__'] = name
if file is not None:
scope['__file__'] = file
exec(mstr, scope, scope)
return scope
def import_gzip(path):
with open(path, 'rb') as gz_file:
header = Header.read(gz_file)
module_bytes = uzlib.decompress(gz_file.read(), -1)
filename = header.name or path
module_str = str(module_bytes, 'utf-8')
del module_bytes
gc.collect()
module = eval_module(module_str, file=filename)
# forcibly clean up after ourselves
del header
del module_str
gc.collect()
return GzippedModule(
filename[:-3] if filename.endswith('.py') else filename,
filename,
module,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment