Skip to content

Instantly share code, notes, and snippets.

@wasade
Last active December 20, 2015 11:29
Show Gist options
  • Save wasade/6123807 to your computer and use it in GitHub Desktop.
Save wasade/6123807 to your computer and use it in GitHub Desktop.
Prototype delayed IO generalized containers
#!/usr/bin/env python
__author__ = "Daniel McDonald"
__copyright__ = "Copyright 2013, The QIIME Project"
__credits__ = ["Daniel McDonald", "Greg Caporaso", "Doug Wendel",
"Jai Ram Rideout"]
__license__ = "GPL"
__version__ = "0.1.0-dev"
__maintainer__ = "Daniel McDonald"
__email__ = "mcdonadt@colorado.edu"
from biom.parse import parse_biom_table
class ContainerError(Exception):
pass
class CannotReadError(ContainerError):
pass
class CannotWriteError(ContainerError):
pass
class Container(object):
_reserved = set(['_reserved','TypeName', 'Info'])
TypeName = "Container"
Info = None
def __init__(self, *args, **kwargs):
if 'Info' in kwargs:
super(Container, self).__setattr__('Info', kwargs['Info'])
def _load_if_needed(self):
pass
def __getattr__(self, attr):
if attr in super(Container, self).__getattribute__('_reserved'):
return super(Container, self).__getattribute__(attr)
self._load_if_needed()
return getattr(self._object, attr)
def __setattr__(self, attr, val):
if attr in super(Container, self).__getattribute__('_reserved'):
return super(Container, self).__setattr__(attr, val)
self._load_if_needed()
setattr(self._object, attr, val)
def __hasattr__(self, attr):
if attr in super(Container, self).__getattribute__('_reserved'):
return True
self._load_if_needed()
return hasattr(self._object, attr)
class IOContainer(Container):
_reserved = set(['_reserved', 'TypeName', '_reader', '_writer',
'_object', 'InPath', 'OutPath','read', 'write',
'_load_if_needed', 'Info'])
TypeName = "IOContainer"
_object = None
InPath = None
OutPath = None
Info = None
def __init__(self, *args, **kwargs):
if 'Object' in kwargs:
super(IOContainer, self).__setattr__('_object', kwargs['Object'])
if 'InPath' in kwargs:
super(IOContainer, self).__setattr__('InPath', kwargs['InPath'])
if 'OutPath' in kwargs:
super(IOContainer, self).__setattr__('OutPath', kwargs['OutPath'])
super(IOContainer, self).__init__(*args, **kwargs)
def _load_if_needed(self):
if self._object is None:
if self.InPath is not None:
self._object = self._reader(self.InPath)
else:
raise CannotReadError("No object and InPath is None!")
def write(self):
if self._object is not None:
if self.OutPath is None:
raise CannotWriteError("OutPath is None!")
self._writer(self.OutPath)
def read(self):
if self._object is None:
if self.InPath is None:
raise CannotReadError("InPath is None!")
self._object = self._reader(self.InPath)
class QIIMEInt(Container, int):
TypeName = "Int"
class QIIMEFloat(Container, float):
TypeName = "Float"
class QIIMEStr(Container, str):
TypeName = "String"
def write_biom_table(path, s):
f = open(path, 'w')
f.write(s)
f.close()
class BIOMTable(IOContainer):
TypeName = "BIOMTable"
_reader = lambda self, x: parse_biom_table(open(x))
_writer = lambda self, x: write_biom_table(x,
self.getBiomFormatJsonString('asd'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment