Skip to content

Instantly share code, notes, and snippets.

@LuizArmesto
Created April 13, 2015 20:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save LuizArmesto/025aeba8f5c6d6f058ee to your computer and use it in GitHub Desktop.
Save LuizArmesto/025aeba8f5c6d6f058ee to your computer and use it in GitHub Desktop.
Create SQLAlchemy models and populate using DataPackage
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import unicodedata
import sqlalchemy
import sqlalchemy.orm
from datapackage import DataPackage
TYPES = {
'string': {
'default': sqlalchemy.types.String,
'binary': sqlalchemy.types.LargeBinary
},
'number': {
'default': sqlalchemy.types.Numeric
},
'integer': {
'default': sqlalchemy.types.Integer
},
'boolean': {
'default': sqlalchemy.types.Boolean
},
'null': {
'default': sqlalchemy.types.NullType
},
'object': {
'default': None
},
'datetime': {
'default': sqlalchemy.types.DateTime
},
'date': {
'default': sqlalchemy.types.Date
},
'time': {
'default': sqlalchemy.types.Time
},
'geopoint': {
'default': None
},
'geojson': {
'default': None
},
'any': {
'default': None
}
}
def normalize_name(value):
value = unicodedata.normalize('NFKD', value).encode('ascii','ignore')
return ''.join(c for c in value if c.isalnum() or c.isspace() or c in ['_'])
def to_camelcase(value):
return ''.join(c for c in value.title().strip() if not c.isspace())
def to_underscore(value):
return value.lower().strip().replace(' ', '_')
def get_column_type(field):
type_ = field.get('type')
format_ = field.get('format', 'default')
formats = TYPES.get(type_, {})
return formats.get(format_) or formats.get('default')
def _create_table(resource, metadata, tablename):
schema = resource.schema
columns = [
sqlalchemy.Column('_uid', # Internal id
sqlalchemy.types.Integer,
primary_key=True,
autoincrement=True)
]
for field in schema.get('fields', []):
column_type = get_column_type(field)
column_name = to_underscore(normalize_name(field.get('name')))
column = sqlalchemy.Column(column_name, column_type)
columns.append(column)
return sqlalchemy.Table(tablename, metadata, *columns)
def populate(session, model):
engine = session.get_bind()
table = getattr(model, '__table__')
table.create(engine, checkfirst=True)
datapackage = getattr(model, '__datapackage_instance__')
resource = getattr(model, '__resource_instance__')
data = datapackage.get_data(resource)
engine.execute(model.__table__.insert(),
[{to_underscore(normalize_name(name)): value for
(name, value) in item.iteritems()} for item in data]
)
metadata = sqlalchemy.MetaData()
def mapper(cls, datapackage, resource_name):
resource = next((r for r in datapackage.resources if r.name == resource_name))
tablename = getattr(cls, '__tablename__',
to_underscore(normalize_name(resource.name)))
table = _create_table(resource, metadata, tablename)
cls.__table__ = table
cls.__datapackage_instance__ = datapackage
cls.__resource__ = resource.name
cls.__resource_instance__ = resource
sqlalchemy.orm.mapper(cls, table)
class BaseMeta(type):
def __new__(mcls, name, bases, attrs):
cls = super(BaseMeta, mcls).__new__(mcls, name, bases, attrs)
datapackage = attrs.get('__datapackage__')
if datapackage:
resource_name = unicode(attrs.get('__resource__'))
if isinstance(datapackage, basestring):
datapackage = DataPackage(unicode(datapackage))
tablename = '_'.join([ datapackage.name, resource_name])
attrs.update({
'__tablename__': to_underscore(normalize_name(tablename))
})
mapper(cls, datapackage, resource_name)
return cls
class Base(object):
__metaclass__ = BaseMeta
class ModelsMaker(object):
def __init__(self, datapackage, session=None, table_prefix=None):
if isinstance(datapackage, basestring):
datapackage = DataPackage(unicode(datapackage))
self.datapackage = datapackage
self.session = session
self.table_prefix = table_prefix or self.datapackage.name
self._models = {}
@property
def models(self):
if not self._models:
self.create_models()
return self._models.values()
def get_model(self, name):
if not self._models:
self.create_models()
return self._models[name]
def create_models(self):
self._models = {}
for resource in self.datapackage.resources:
class_ = self._create_class(resource)
mapper(class_, self.datapackage, resource.name)
self._models[resource.name] = class_
return self._models
def populate(self, models=None, session=None):
session = session or self.session
models = models or self.models
for model in models:
populate(session, model)
def _create_class(self, resource):
classname = to_camelcase(normalize_name(resource.name))
tablename = '_'.join([self.table_prefix, resource.name])
def __init__(self, **kwargs):
for (name, value) in kwargs.iteritems():
attr_name = to_underscore(normalize_name(name))
if hasattr(self, attr_name):
setattr(self, attr_name, value)
return type(classname, (object, ), {
'__init__': __init__,
'__tablename__': to_underscore(normalize_name(tablename))
})
def example():
''' Exemplo usando ModelsMaker, que cria um model para cada resource do datapackage '''
print 'Iniciando...'
engine = sqlalchemy.create_engine('sqlite:///:memory:')
# Cria sessão do sqlalchemy
Session = sqlalchemy.orm.sessionmaker(bind=engine)
session = Session()
# Cria todos os models a partir dos resources do datapackage
print 'Criando modelos...'
modelsMaker = ModelsMaker('http://data.okfn.org/data/cpi/')
print 'Carregando CSV e populando modelos...'
modelsMaker.populate(session=session)
print 'Executando uma query...'
Cpi = modelsMaker.get_model('cpi')
print [(i.cpi, i.country_code, i.year) for i in session.query(Cpi).filter(Cpi.country_code == 'BRA').all()]
# Devolve uma tupla com as classes dos Models e a sessão da base de dados
return (modelsMaker.models, session)
def example2():
''' Exemplo declarando um model explicitamente para um resource de um datapackage '''
print 'Iniciando...'
engine = sqlalchemy.create_engine('sqlite:///:memory:')
# Cria sessão do sqlalchemy
Session = sqlalchemy.orm.sessionmaker(bind=engine)
session = Session()
print 'Declarando modelo...'
class Cpi(Base):
__datapackage__ = 'http://data.okfn.org/data/cpi/'
__resource__ = 'cpi'
print 'Carregando CSV e populando modelo...'
populate(session, Cpi)
print 'Executando uma query...'
print [(i.cpi, i.country_code, i.year) for i in session.query(Cpi).filter(Cpi.country_code == 'BRA').all()]
if __name__ == '__main__':
example()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment