Sometimes is necessary to create database scripts and run it as a view in Django, but it could be hard, because you must manage the scripts outside the Django project structure.
To make it simple I've created a simple way to run those database view scripts integrated with django.
To make it possible, is necessary to create an engine to find and run these scripts inside the django apps. In your project create a file named db.py
and put the bellow code inside:
# coding: utf-8
import os
import sys
from collections import OrderedDict
from django.apps import apps
from django.db import connections, DEFAULT_DB_ALIAS
class DatabaseWrapper:
default_db_alias = DEFAULT_DB_ALIAS
def get_connection(self, using=None):
"""
Returns a a database connection which can be used
to execute scripts direct to the a database.
"""
return connections[using or self.default_db_alias]
def get_cursor(self, using=None):
"""
Returns a cursor based on a connection.
"""
connection = self.get_connection(using=using)
return connection.cursor()
def select(self, query, params=None, as_dict=False, using=None):
"""
Executes a query on a database that will return one information,
it was can be used to execute SELECT statements or any statement
which returns any values from a database.
"""
cursor = self.get_cursor(using=using)
cursor.execute(query.format(**(params or {})))
# Get all returned data
data = cursor.fetchall()
if as_dict is True:
columns = [col[0] for col in cursor.description]
return [OrderedDict(zip(columns, row)) for row in data]
return data
def select_one(self, query, params=None, using=None):
"""
Executes a query on a database that will return one information,
it was can be used to execute SELECT statements or any statement
which returns a value from a database.
"""
cursor = self.get_cursor(using=using)
cursor.execute(query.format(**(params or {})))
# Get the returned data
data = cursor.fetchone()
return data[0]
def execute(self, query, params=None, using=None):
"""
Executes any query on a database and it does not return anything.
"""
cursor = self.get_cursor(using=using)
cursor.execute(query.format(**(params or {})))
db = DatabaseWrapper()
def _get_database_view_scripts():
"""
Returns all database views scripts to be executed.
"""
scripts = []
for app_config in apps.get_app_configs():
app_scripts_path = os.path.join(app_config.path, 'scripts')
# if there is no a script folder on app directory,
# skip it and go ahead.
if not os.path.exists(app_scripts_path):
continue
for (dirpath, dirnames, filenames) in os.walk(app_scripts_path):
scripts += [(
app_config.label,
os.path.join(app_scripts_path, filename)
) for filename in filenames if filename.endswith('.view.sql')]
return scripts
def create_or_update_database_views(verbose=True):
"""
Apply or update database views based on scripts
in each app module.
"""
scripts = _get_database_view_scripts()
if verbose:
print('\n\033[1mCreating database views:\033[0m',)
for app_label, script_path in scripts:
script_name = os.path.basename(script_path)
with open(script_path, 'r') as f:
content = f.read()
db.execute(content)
if verbose:
print('{app_label: <50}{script_name}'.format(
app_label=app_label,
script_name=script_name))
if verbose:
print('Done.')
This script will find any file named *.view.sql
inside a folder named script
inside all installed apps on your project, the structure must seem like:
[projectname]/
├── ...
├── [appname]/
│ ├── ...
│ ├── scripts
│ │ ├── myview1.view.sql
│ │ ├── myview2.view.sql
└── └── └── myview3.view.sql
After that you can create a django signal to create all views after the django migrations. It will also apply the views on the test database.
# coding: utf-8
from django.db.models.signals import post_migrate
from django.dispatch.dispatcher import receiver
@receiver(post_migrate)
def create_database_views_signal(app_config, *args, **kwargs):
"""
After migrating api models, create or update all database views.
"""
if app_config.label == 'auth':
from apps.utils.db import create_or_update_database_views
create_or_update_database_views(verbose=True)