Skip to content

Instantly share code, notes, and snippets.

@whosaysni
Last active August 29, 2015 14:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save whosaysni/11361218 to your computer and use it in GitHub Desktop.
Save whosaysni/11361218 to your computer and use it in GitHub Desktop.
Modelizing auxiliary DBMS using Django's multi-db framework
# coding: utf-8
"""外部DBMS
"""
import os
from functools import wraps
from json import loads
from logging import getLogger
from threading import local
import django.db # connections のオーバライドのため
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.core.management.color import no_style
from django.core.management.sql import sql_all, sql_delete, sql_destroy_indexes, sql_flush
from django.core.signals import request_started
from django.db import connections as db_connections, models, IntegrityError, OperationalError
from django.db.models.signals import pre_delete
from django.db.transaction import atomic
from django.db.utils import ConnectionHandler, load_backend
from django.dispatch import receiver
from django.utils.translation import ugettext_lazy as _
"""
ストラテジ
-----------
Django のマルチDB は、ほんらいモデルごとの DB 分割を想定して
いるため、データベースルータは、モデルとインスタンス程度の
情報しか扱えない。
従って、共有アプリケーション以外のモデルを操作するときには、
using を使って明示的に DB を切り分けていく。
特に、フォーム操作やクエリセットの取り扱いのときに、
using の指定を忘れてはならないので注意すること。
また、settings からはデータベースルータを明示的に除去しておき、
デフォルト DB のテーブルにエントリが書き込まれないように注意
すること。
"""
# ロガーを作成しておく
logger = getLogger('django')
# 共有アプリケーションのスキーマは、外部DBには投入しない。
SHARED_APPLICATIONS = [
# django 関連は外部DBには投入しない
'dbms', # 忘れちゃダメ :)
]
# 外部DBに保存するアプリケーションのテーブル
AUX_DB_APPLICATIONS = settings.AUX_DB_APPLICATIONS
# データベースエンジン名と、モジュールのマップ。
DATABASE_ENGINE_MAP = (
('PostgreSQL', 'django.db.backends.postgresql_psycopg2'),
('MySQL', 'django.db.backends.mysql'),
('SQLite', 'django.db.backends.sqlite3'),
('Oracle', 'django.db.backends.oracle'))
# Dbms.db_engine フィールド用の選択肢。
DATABASE_ENGINE_CHOICES = [(v, k) for k, v in DATABASE_ENGINE_MAP]
# Style used by sql_* functions
NO_STYLE = no_style()
# ファイルベースのデータベースエンジンは、DATABASE_NAME の扱いが特別
SQLITE3_ENGINE = 'django.db.backends.sqlite3'
FILE_BASED_DB_ENGINES = (SQLITE3_ENGINE,)
# ファイルベースの外部 DB の置き場
DATABASE_FILE_DIR = os.path.join(settings.AUX_DB_ROOT)
# db_connection.databases は settings.DATABASES そのものになるケースがあるので、
# 辞書のコピーを取って明示的に置き換える
db_connections.databases = dict(db_connections.databases)
class AUX_DB:
"""外部 DB 名保持用のコンテナ
"""
aliases = []
builtin_aliases = settings.DATABASES.keys()
class Dbms(models.Model):
"""外部DBMS
"""
name = models.SlugField(
_('Name'), max_length=64, unique=True,
help_text=_('Unique name to identify this DBMS connection.'))
db_engine = models.CharField(
_('Database Engine'), max_length=1024, choices=DATABASE_ENGINE_CHOICES,
default=SQLITE3_ENGINE, blank=False, null=False,
help_text=_('Database Engine Type.'))
db_name = models.CharField(
_('Database Name'), max_length=1024,
help_text=_('Name of the database. On SQLite, this value is '
'interpreted as relative path from intnernal database directory.'))
db_user = models.CharField(
_('Database User'), max_length=256, blank=True,
help_text=_('Database user name. It may be blank on SQLite.'))
db_password = models.CharField(
_('Database Password'), max_length=256, blank=True,
help_text=_('Password for database access. It may be blank on SQLite.'))
db_host = models.CharField(
_('Database Host'), max_length=256, blank=True,
help_text=(_('IP address or FQDN of database server. '
'It may be blank on SQLite or those backends '
'supporting domain socket connection.')))
db_port = models.CharField(
_('Database Port'), max_length=16, blank=True,
help_text=(_('Port number of database server. '
'It may be blank on SQLite or those backends '
'supporting domain socket connection.')))
db_options = models.TextField(
_('Connection options (JSON)'), blank=True,
help_text=(_('Extra options given to database backend, in JSON '
'dictionary format. Supported options varies for backends.')))
enabled = models.BooleanField(
_('Enabled'), default=True,
help_text=(_('True if connection is enabled.')))
is_admin = models.BooleanField(
_('Admin role'), default=True,
help_text=(_('Mark True if connection is under the administrative role. '
'Workspace cannot use connection with Admin role.')))
# さしあたって使わないが、将来の拡張のために用意しておく。
management_connection = models.ForeignKey(
'Dbms', verbose_name=_('Management Connection'), null=True,
related_name='managed_connections',
help_text=(_('True if this connection is used for db administration. '
'Management connection cannot be used from ordinal users.')))
def __unicode__(self):
"""Adminなどで識別可能な文字列表現を返す
"""
return self.name
@classmethod
def do_remove_connection(cls, alias):
"""特定のデータベース接続を除去する
"""
# コネクションがキャッシュされていたら除去する
try:
# conn は local() のアトリビュートとして保持されている
conn = getattr(db_connections._connections, alias, None)
if conn:
conn.close()
delattr(db_connections._connections, alias)
except Exception as e:
# エラーが起きてもログに記録して処理を継続する
logger.warning('Exception raised during Dbms.do_remove_connection.', e)
# db_connections.databases から alias を除去する
if db_connections.databases.has_key(alias):
del db_connections.databases[alias]
# AUX_DB の追跡対象からも外す
if alias in AUX_DB.aliases:
AUX_DB.remove(alias)
@classmethod
def do_remove_connections(cls):
"""全てのデータベース接続を除去する
"""
# AUX_DB で追跡している全ての接続を除去する
# 通常はこの処理だけで除去すべきコネクションをカバーできる
for alias in AUX_DB.aliases:
cls.do_remove_connection(alias)
# ... が、テストケースの失敗などで、たまに AUX_DB に乗らないものがある。
# 何らかの理由で AUX_DB.aliases のトラッキングから外れてしまったものを削除
for alias in db_connections.databases.keys():
if alias not in AUX_DB.builtin_aliases:
# ログに警告を残して除去する
logger.warning(
'Dbms.do_remove_connections found untracked '
'alias, removing it.', alias)
cls.do_remove_connection(alias)
@classmethod
def rebuild_connections(cls):
"""DbmsBackedConnectionHandler をリセットして、コネクションキャッシュを消去する。
DBMSの設定を変更したら、このメソッドを明に呼び出してDB接続を再構築すること。
"""
# 全ての外部DBのコネクションを閉じて除去する
cls.do_remove_connections()
# enabled=True であるような Dbms オブジェクトからデータベース情報を構成する
for dbms in cls.objects.filter(enabled=True):
# データベース情報を、一旦 ConnectionHandler に登録する
db_connections.databases[dbms.name] = dbms.params
db_connections.ensure_defaults(dbms.name) # ensure_defaults 忘れないこと
# コネクションを作ってみて、うまくいかないものは除去する
try:
cur = db_connections[dbms.name].cursor()
except Exception as e:
cls.do_remove_connection(dbms.name)
def remove_connection(self):
"""DB への接続を除去する
"""
self.__class__.do_remove_connection(self.name)
@property
def params(self):
"""データベースパラメタ辞書を構築して返す
"""
db_name, db_engine = self.db_name, self.db_engine
# SQLite の場合で、名前が ':memory:' なら、絶対パスにしない
if (db_name, db_engine) == (':memory:', SQLITE3_ENGINE):
pass
# それ以外のファイルベースのものは、全てプレフィクスを付加して
# 絶対パスにする。(任意の場所にDBを作れないようにするための対策)
elif self.db_engine in FILE_BASED_DB_ENGINES:
db_name = os.path.join(DATABASE_FILE_DIR, db_name)
# パラメタ辞書を構築
params = dict(ENGINE=db_engine, NAME=db_name)
# user, password, host, port, options の追加
for key, attr_name, data_type in (
('USER', 'db_user', str), ('PASSWORD', 'db_password', str),
('HOST', 'db_host', str), ('PORT', 'db_port', int),
('OPTIONS', 'db_options', loads)): # loads は JSON のパージング
value = getattr(self, attr_name)
# 適切な型に変換してパラメタ辞書に追加する
if value:
try:
value = data_type(value)
params[key] = value
except ValueError:
# 変換がうまく行かなければ追加しない
pass
return params
def save(self):
"""Dbms インスタンスを DB に保存する
"""
# 外部DBの組み込み前からコネクションハンドラ内にある名前は使わせない
if self.name in AUX_DB.builtin_aliases:
raise IntegrityError('Database aliases predefined in settings.DATABASES is not allowed.', {'alias': self.name})
# db_port が空文字列でなく、かつ integer でない場合は ValueError
# 不正な db_port が設定されないよう、フォームで防ぐこと。
if self.db_port:
try:
int(self.db_port)
except ValueError:
raise ValueError('db_port should be an empty string or digits of integer.')
# db_options が空文字列でなく、不正な JSON の時は ValueError
# 不正な JSON が設定されないよう、フォームで防ぐこと。
if self.db_options:
try:
loads(self.db_options)
except ValueError:
raise ValueError('db_options should be an empty string or valid JSON-formatted data.')
saved_object = super(Dbms, self).save()
# 名前が変更されている可能性もあるので、常に db_connections をリビルドする
self.__class__.rebuild_connections()
return saved_object
@property
def connection(self):
"""データベースへの接続を返す。接続に失敗したら None を返す。
"""
try:
return db_connections[self.name]
except Exception as e:
logger.debug('Dbms.connection failed.', self.name)
return None
@property
def connection_error(self):
"""データベースへの接続を試み、エラーのときは例外オブジェクトを返す。
"""
ret = None
try:
db_connections[self.name]
except Exception as e:
ret = e
return ret
# *_tables_sql は、 django.core.management.sql の機能を使って
# テーブルの構築・削除用の SQL を生成する。
@property
def cursor(self):
"""データベースのカーソルを返す。
"""
if self.connection:
return self.connection.cursor()
return None
def execute(self, sql_statement):
"""カーソルを生成して、SQL を実行する。
"""
ret = None
if self.cursor:
res = self.cursor.execute(sql_statement)
ret = res.fetchall()
return ret
def execute_list(self, sql_statements):
"""カーソルを生成して、SQL を実行する。
"""
if self.cursor:
ret = []
for sql_statement in sql_statements:
res = self.cursor.execute(sql_statement)
ret.extend(res.fetchall())
return ret
# カーソルが取得できないときは None を返す
return None
def get_app(self, app_package_name):
"""パッケージパスから app を返す。
app の名前が同じでも、パッケージパスが異なる場合は None を返す。
"""
app_name = app_package_name.split('.')[-1]
try:
app = models.get_app(app_name)
if app.__package__==app_package_name:
return app
else:
# 別のパッケージの app が見つかってしまったら、ログで警告する
logger.warning(
'Dbms.get_app detected app listed in '
'AUX_DB_APPLICATIONS but models.get_app() returns '
'another app in different package.', app.__pacage__)
except ImproperlyConfigured as e:
# パッケージが見つからないと ImproperlyConfigured が送出される
# ログで警告する
logger.warning(
'Dbms.get_app detected invalid app declaration in '
'AUX_DB_APPLICATIONS', app_package_name)
# うまくいかない時は None を返す
return None
def get_apps(self):
"""AUX_DB_APPLICATIONS に登録されている app を返す。
"""
return filter(
None, (self.get_app(app_package_name)
for app_package_name in AUX_DB_APPLICATIONS))
def create_tables_sql(self):
"""テーブルとインデクスを構築するための SQL を返す
"""
sql_statements = []
if self.connection:
for app in self.get_apps():
sql_statements.extend(sql_all(app, NO_STYLE, self.connection))
return sql_statements
def delete_tables_sql(self):
"""テーブルを破棄するための SQL を返す
"""
sql_statements = []
if self.connection:
for app in self.get_apps():
sql_statements.extend(sql_delete(app, NO_STYLE, self.connection))
return sql_statements
def destroy_indexes_sql(self):
"""インデクスを破棄するための SQL を返す
"""
sql_statements = []
if self.connection:
for app in self.get_apps():
sql_statements.extend(sql_destroy_indexes(app, NO_STYLE, self.connection))
return sql_statements
def flush_tables_sql(self, reset_sequences=True, allow_cascade=False):
"""テーブル上のデータを消去するための SQL を返す
"""
sql_statements = []
if self.connection:
for app in self.get_apps():
sql_statements.extend(sql_flush(
NO_STYLE, self.connection,
reset_sequences=reset_sequences,
allow_cascade=allow_cascade))
return sql_statements
# 後述の *_tables メソッドは、 Admin role でない接続が
# 管理用の SQL を実行しないように、
# ensure_admin_role_connection デコレータで排除する。
# また、これらのメソッドは複数の SQL 文を実行するため、
# メソッドを django.db.transaction.atomic で修飾しておく。
def ensure_admin_role_connection(func):
"""Admin 接続でなければ ValueError とするためのデコレータ
このデコレータをインスタンスメソッドとして呼び出さないように!
"""
@wraps(func)
def wrapped(self, *args, **kwargs):
# Admin 接続でなければ ValueError
if not (self.is_admin==True):
raise ValueError('Not allowed for non-admin connection.')
# そうでなければラップ対象の関数を呼ぶ
return func(self, *args, **kwargs)
# ... という関数を作成して返す
return atomic(wrapped)
@ensure_admin_role_connection
def create_tables(self):
"""テーブルを構築する
"""
return self.execute_list(self.create_tables_sql())
@ensure_admin_role_connection
def delete_tables(self, delete_index=True, raise_delete_index_error=False):
"""テーブルを破棄する。
"""
ret = []
delete_result = self.execute_list(self.delete_tables_sql())
if delete_result is None: # カーソルを取得できない
return None
# delete_index が true なら、 delete_index も実行する
if delete_index:
try:
destroy_indexes_result = self.execute_list(self.destroy_indexes_sql())
if destroy_indexes_result is None: # 何故か途中でカーソルを取得できなくなった場合
return ret
ret.extend(destroy_indexes_result)
except OperationalError as e:
if raise_delete_index_error:
raise e
return ret
@ensure_admin_role_connection
def flush_tables(self):
"""テーブル上のデータを破棄する。
"""
return self.execute_list(self.flush_tables_sql())
# このメソッドの実装は複雑なのであとまわしとし、将来のために用意だけしておく
@ensure_admin_role_connection
def install_fixtures(self, fixtures):
"""Install fixtures into database.
"""
pass
# このメソッドの実装は複雑なので、将来のために用意だけしておく
@ensure_admin_role_connection
def migrate(self):
"""データベーススキーマを移行する
"""
pass
# シグナルハンドラ
# インスタンスの delete() や QuerySet の delete() でレコードが削除されるときに、
# コネクションを除去しておく。
# モデルの delete() だけでは、Dbms.objects.all().delete() の時にうまくいかないので
# シグナルで実装する。詳しくは Django ドキュメントのシグナルと pre_delete を参照
# すること。
@receiver(pre_delete, sender=Dbms, dispatch_uid='remove_connection_on_delete_dbms')
def pre_delete_dbms(sender, instance, **kwargs):
alias = instance.name
Dbms.do_remove_connection(alias)
def load_dbms_connections(*args, **kwargs):
"""外部DBへの接続をロードする
"""
logger.debug('Triggered load_dbms_connections.')
Dbms.rebuild_connections()
# request_started シグナルレシーバで、初期化時に自動的にDB接続を構築する
@receiver(request_started)
def on_first_request(*args, **kwargs):
# 最初の一度だけで、このレシーバを disconnect する
request_started.disconnect(on_first_request)
load_dbms_connections(*args, **kwargs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment