Created
April 13, 2015 20:26
-
-
Save LuizArmesto/025aeba8f5c6d6f058ee to your computer and use it in GitHub Desktop.
Create SQLAlchemy models and populate using DataPackage
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import unicodedata | |
import sqlalchemy | |
import sqlalchemy.orm | |
from datapackage import DataPackage | |
TYPES = { | |
'string': { | |
'default': sqlalchemy.types.String, | |
'binary': sqlalchemy.types.LargeBinary | |
}, | |
'number': { | |
'default': sqlalchemy.types.Numeric | |
}, | |
'integer': { | |
'default': sqlalchemy.types.Integer | |
}, | |
'boolean': { | |
'default': sqlalchemy.types.Boolean | |
}, | |
'null': { | |
'default': sqlalchemy.types.NullType | |
}, | |
'object': { | |
'default': None | |
}, | |
'datetime': { | |
'default': sqlalchemy.types.DateTime | |
}, | |
'date': { | |
'default': sqlalchemy.types.Date | |
}, | |
'time': { | |
'default': sqlalchemy.types.Time | |
}, | |
'geopoint': { | |
'default': None | |
}, | |
'geojson': { | |
'default': None | |
}, | |
'any': { | |
'default': None | |
} | |
} | |
def normalize_name(value): | |
value = unicodedata.normalize('NFKD', value).encode('ascii','ignore') | |
return ''.join(c for c in value if c.isalnum() or c.isspace() or c in ['_']) | |
def to_camelcase(value): | |
return ''.join(c for c in value.title().strip() if not c.isspace()) | |
def to_underscore(value): | |
return value.lower().strip().replace(' ', '_') | |
def get_column_type(field): | |
type_ = field.get('type') | |
format_ = field.get('format', 'default') | |
formats = TYPES.get(type_, {}) | |
return formats.get(format_) or formats.get('default') | |
def _create_table(resource, metadata, tablename): | |
schema = resource.schema | |
columns = [ | |
sqlalchemy.Column('_uid', # Internal id | |
sqlalchemy.types.Integer, | |
primary_key=True, | |
autoincrement=True) | |
] | |
for field in schema.get('fields', []): | |
column_type = get_column_type(field) | |
column_name = to_underscore(normalize_name(field.get('name'))) | |
column = sqlalchemy.Column(column_name, column_type) | |
columns.append(column) | |
return sqlalchemy.Table(tablename, metadata, *columns) | |
def populate(session, model): | |
engine = session.get_bind() | |
table = getattr(model, '__table__') | |
table.create(engine, checkfirst=True) | |
datapackage = getattr(model, '__datapackage_instance__') | |
resource = getattr(model, '__resource_instance__') | |
data = datapackage.get_data(resource) | |
engine.execute(model.__table__.insert(), | |
[{to_underscore(normalize_name(name)): value for | |
(name, value) in item.iteritems()} for item in data] | |
) | |
metadata = sqlalchemy.MetaData() | |
def mapper(cls, datapackage, resource_name): | |
resource = next((r for r in datapackage.resources if r.name == resource_name)) | |
tablename = getattr(cls, '__tablename__', | |
to_underscore(normalize_name(resource.name))) | |
table = _create_table(resource, metadata, tablename) | |
cls.__table__ = table | |
cls.__datapackage_instance__ = datapackage | |
cls.__resource__ = resource.name | |
cls.__resource_instance__ = resource | |
sqlalchemy.orm.mapper(cls, table) | |
class BaseMeta(type): | |
def __new__(mcls, name, bases, attrs): | |
cls = super(BaseMeta, mcls).__new__(mcls, name, bases, attrs) | |
datapackage = attrs.get('__datapackage__') | |
if datapackage: | |
resource_name = unicode(attrs.get('__resource__')) | |
if isinstance(datapackage, basestring): | |
datapackage = DataPackage(unicode(datapackage)) | |
tablename = '_'.join([ datapackage.name, resource_name]) | |
attrs.update({ | |
'__tablename__': to_underscore(normalize_name(tablename)) | |
}) | |
mapper(cls, datapackage, resource_name) | |
return cls | |
class Base(object): | |
__metaclass__ = BaseMeta | |
class ModelsMaker(object): | |
def __init__(self, datapackage, session=None, table_prefix=None): | |
if isinstance(datapackage, basestring): | |
datapackage = DataPackage(unicode(datapackage)) | |
self.datapackage = datapackage | |
self.session = session | |
self.table_prefix = table_prefix or self.datapackage.name | |
self._models = {} | |
@property | |
def models(self): | |
if not self._models: | |
self.create_models() | |
return self._models.values() | |
def get_model(self, name): | |
if not self._models: | |
self.create_models() | |
return self._models[name] | |
def create_models(self): | |
self._models = {} | |
for resource in self.datapackage.resources: | |
class_ = self._create_class(resource) | |
mapper(class_, self.datapackage, resource.name) | |
self._models[resource.name] = class_ | |
return self._models | |
def populate(self, models=None, session=None): | |
session = session or self.session | |
models = models or self.models | |
for model in models: | |
populate(session, model) | |
def _create_class(self, resource): | |
classname = to_camelcase(normalize_name(resource.name)) | |
tablename = '_'.join([self.table_prefix, resource.name]) | |
def __init__(self, **kwargs): | |
for (name, value) in kwargs.iteritems(): | |
attr_name = to_underscore(normalize_name(name)) | |
if hasattr(self, attr_name): | |
setattr(self, attr_name, value) | |
return type(classname, (object, ), { | |
'__init__': __init__, | |
'__tablename__': to_underscore(normalize_name(tablename)) | |
}) | |
def example(): | |
''' Exemplo usando ModelsMaker, que cria um model para cada resource do datapackage ''' | |
print 'Iniciando...' | |
engine = sqlalchemy.create_engine('sqlite:///:memory:') | |
# Cria sessão do sqlalchemy | |
Session = sqlalchemy.orm.sessionmaker(bind=engine) | |
session = Session() | |
# Cria todos os models a partir dos resources do datapackage | |
print 'Criando modelos...' | |
modelsMaker = ModelsMaker('http://data.okfn.org/data/cpi/') | |
print 'Carregando CSV e populando modelos...' | |
modelsMaker.populate(session=session) | |
print 'Executando uma query...' | |
Cpi = modelsMaker.get_model('cpi') | |
print [(i.cpi, i.country_code, i.year) for i in session.query(Cpi).filter(Cpi.country_code == 'BRA').all()] | |
# Devolve uma tupla com as classes dos Models e a sessão da base de dados | |
return (modelsMaker.models, session) | |
def example2(): | |
''' Exemplo declarando um model explicitamente para um resource de um datapackage ''' | |
print 'Iniciando...' | |
engine = sqlalchemy.create_engine('sqlite:///:memory:') | |
# Cria sessão do sqlalchemy | |
Session = sqlalchemy.orm.sessionmaker(bind=engine) | |
session = Session() | |
print 'Declarando modelo...' | |
class Cpi(Base): | |
__datapackage__ = 'http://data.okfn.org/data/cpi/' | |
__resource__ = 'cpi' | |
print 'Carregando CSV e populando modelo...' | |
populate(session, Cpi) | |
print 'Executando uma query...' | |
print [(i.cpi, i.country_code, i.year) for i in session.query(Cpi).filter(Cpi.country_code == 'BRA').all()] | |
if __name__ == '__main__': | |
example() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment