Last active
August 29, 2015 14:00
-
-
Save whosaysni/11361218 to your computer and use it in GitHub Desktop.
Modelizing auxiliary DBMS using Django's multi-db framework
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
"""外部DBMS | |
""" | |
import os | |
from functools import wraps | |
from json import loads | |
from logging import getLogger | |
from threading import local | |
import django.db # connections のオーバライドのため | |
from django.conf import settings | |
from django.core.exceptions import ImproperlyConfigured | |
from django.core.management.color import no_style | |
from django.core.management.sql import sql_all, sql_delete, sql_destroy_indexes, sql_flush | |
from django.core.signals import request_started | |
from django.db import connections as db_connections, models, IntegrityError, OperationalError | |
from django.db.models.signals import pre_delete | |
from django.db.transaction import atomic | |
from django.db.utils import ConnectionHandler, load_backend | |
from django.dispatch import receiver | |
from django.utils.translation import ugettext_lazy as _ | |
""" | |
ストラテジ | |
----------- | |
Django のマルチDB は、ほんらいモデルごとの DB 分割を想定して | |
いるため、データベースルータは、モデルとインスタンス程度の | |
情報しか扱えない。 | |
従って、共有アプリケーション以外のモデルを操作するときには、 | |
using を使って明示的に DB を切り分けていく。 | |
特に、フォーム操作やクエリセットの取り扱いのときに、 | |
using の指定を忘れてはならないので注意すること。 | |
また、settings からはデータベースルータを明示的に除去しておき、 | |
デフォルト DB のテーブルにエントリが書き込まれないように注意 | |
すること。 | |
""" | |
# ロガーを作成しておく | |
logger = getLogger('django') | |
# 共有アプリケーションのスキーマは、外部DBには投入しない。 | |
SHARED_APPLICATIONS = [ | |
# django 関連は外部DBには投入しない | |
'dbms', # 忘れちゃダメ :) | |
] | |
# 外部DBに保存するアプリケーションのテーブル | |
AUX_DB_APPLICATIONS = settings.AUX_DB_APPLICATIONS | |
# データベースエンジン名と、モジュールのマップ。 | |
DATABASE_ENGINE_MAP = ( | |
('PostgreSQL', 'django.db.backends.postgresql_psycopg2'), | |
('MySQL', 'django.db.backends.mysql'), | |
('SQLite', 'django.db.backends.sqlite3'), | |
('Oracle', 'django.db.backends.oracle')) | |
# Dbms.db_engine フィールド用の選択肢。 | |
DATABASE_ENGINE_CHOICES = [(v, k) for k, v in DATABASE_ENGINE_MAP] | |
# Style used by sql_* functions | |
NO_STYLE = no_style() | |
# ファイルベースのデータベースエンジンは、DATABASE_NAME の扱いが特別 | |
SQLITE3_ENGINE = 'django.db.backends.sqlite3' | |
FILE_BASED_DB_ENGINES = (SQLITE3_ENGINE,) | |
# ファイルベースの外部 DB の置き場 | |
DATABASE_FILE_DIR = os.path.join(settings.AUX_DB_ROOT) | |
# db_connection.databases は settings.DATABASES そのものになるケースがあるので、 | |
# 辞書のコピーを取って明示的に置き換える | |
db_connections.databases = dict(db_connections.databases) | |
class AUX_DB: | |
"""外部 DB 名保持用のコンテナ | |
""" | |
aliases = [] | |
builtin_aliases = settings.DATABASES.keys() | |
class Dbms(models.Model): | |
"""外部DBMS | |
""" | |
name = models.SlugField( | |
_('Name'), max_length=64, unique=True, | |
help_text=_('Unique name to identify this DBMS connection.')) | |
db_engine = models.CharField( | |
_('Database Engine'), max_length=1024, choices=DATABASE_ENGINE_CHOICES, | |
default=SQLITE3_ENGINE, blank=False, null=False, | |
help_text=_('Database Engine Type.')) | |
db_name = models.CharField( | |
_('Database Name'), max_length=1024, | |
help_text=_('Name of the database. On SQLite, this value is ' | |
'interpreted as relative path from intnernal database directory.')) | |
db_user = models.CharField( | |
_('Database User'), max_length=256, blank=True, | |
help_text=_('Database user name. It may be blank on SQLite.')) | |
db_password = models.CharField( | |
_('Database Password'), max_length=256, blank=True, | |
help_text=_('Password for database access. It may be blank on SQLite.')) | |
db_host = models.CharField( | |
_('Database Host'), max_length=256, blank=True, | |
help_text=(_('IP address or FQDN of database server. ' | |
'It may be blank on SQLite or those backends ' | |
'supporting domain socket connection.'))) | |
db_port = models.CharField( | |
_('Database Port'), max_length=16, blank=True, | |
help_text=(_('Port number of database server. ' | |
'It may be blank on SQLite or those backends ' | |
'supporting domain socket connection.'))) | |
db_options = models.TextField( | |
_('Connection options (JSON)'), blank=True, | |
help_text=(_('Extra options given to database backend, in JSON ' | |
'dictionary format. Supported options varies for backends.'))) | |
enabled = models.BooleanField( | |
_('Enabled'), default=True, | |
help_text=(_('True if connection is enabled.'))) | |
is_admin = models.BooleanField( | |
_('Admin role'), default=True, | |
help_text=(_('Mark True if connection is under the administrative role. ' | |
'Workspace cannot use connection with Admin role.'))) | |
# さしあたって使わないが、将来の拡張のために用意しておく。 | |
management_connection = models.ForeignKey( | |
'Dbms', verbose_name=_('Management Connection'), null=True, | |
related_name='managed_connections', | |
help_text=(_('True if this connection is used for db administration. ' | |
'Management connection cannot be used from ordinal users.'))) | |
def __unicode__(self): | |
"""Adminなどで識別可能な文字列表現を返す | |
""" | |
return self.name | |
@classmethod | |
def do_remove_connection(cls, alias): | |
"""特定のデータベース接続を除去する | |
""" | |
# コネクションがキャッシュされていたら除去する | |
try: | |
# conn は local() のアトリビュートとして保持されている | |
conn = getattr(db_connections._connections, alias, None) | |
if conn: | |
conn.close() | |
delattr(db_connections._connections, alias) | |
except Exception as e: | |
# エラーが起きてもログに記録して処理を継続する | |
logger.warning('Exception raised during Dbms.do_remove_connection.', e) | |
# db_connections.databases から alias を除去する | |
if db_connections.databases.has_key(alias): | |
del db_connections.databases[alias] | |
# AUX_DB の追跡対象からも外す | |
if alias in AUX_DB.aliases: | |
AUX_DB.remove(alias) | |
@classmethod | |
def do_remove_connections(cls): | |
"""全てのデータベース接続を除去する | |
""" | |
# AUX_DB で追跡している全ての接続を除去する | |
# 通常はこの処理だけで除去すべきコネクションをカバーできる | |
for alias in AUX_DB.aliases: | |
cls.do_remove_connection(alias) | |
# ... が、テストケースの失敗などで、たまに AUX_DB に乗らないものがある。 | |
# 何らかの理由で AUX_DB.aliases のトラッキングから外れてしまったものを削除 | |
for alias in db_connections.databases.keys(): | |
if alias not in AUX_DB.builtin_aliases: | |
# ログに警告を残して除去する | |
logger.warning( | |
'Dbms.do_remove_connections found untracked ' | |
'alias, removing it.', alias) | |
cls.do_remove_connection(alias) | |
@classmethod | |
def rebuild_connections(cls): | |
"""DbmsBackedConnectionHandler をリセットして、コネクションキャッシュを消去する。 | |
DBMSの設定を変更したら、このメソッドを明に呼び出してDB接続を再構築すること。 | |
""" | |
# 全ての外部DBのコネクションを閉じて除去する | |
cls.do_remove_connections() | |
# enabled=True であるような Dbms オブジェクトからデータベース情報を構成する | |
for dbms in cls.objects.filter(enabled=True): | |
# データベース情報を、一旦 ConnectionHandler に登録する | |
db_connections.databases[dbms.name] = dbms.params | |
db_connections.ensure_defaults(dbms.name) # ensure_defaults 忘れないこと | |
# コネクションを作ってみて、うまくいかないものは除去する | |
try: | |
cur = db_connections[dbms.name].cursor() | |
except Exception as e: | |
cls.do_remove_connection(dbms.name) | |
def remove_connection(self): | |
"""DB への接続を除去する | |
""" | |
self.__class__.do_remove_connection(self.name) | |
@property | |
def params(self): | |
"""データベースパラメタ辞書を構築して返す | |
""" | |
db_name, db_engine = self.db_name, self.db_engine | |
# SQLite の場合で、名前が ':memory:' なら、絶対パスにしない | |
if (db_name, db_engine) == (':memory:', SQLITE3_ENGINE): | |
pass | |
# それ以外のファイルベースのものは、全てプレフィクスを付加して | |
# 絶対パスにする。(任意の場所にDBを作れないようにするための対策) | |
elif self.db_engine in FILE_BASED_DB_ENGINES: | |
db_name = os.path.join(DATABASE_FILE_DIR, db_name) | |
# パラメタ辞書を構築 | |
params = dict(ENGINE=db_engine, NAME=db_name) | |
# user, password, host, port, options の追加 | |
for key, attr_name, data_type in ( | |
('USER', 'db_user', str), ('PASSWORD', 'db_password', str), | |
('HOST', 'db_host', str), ('PORT', 'db_port', int), | |
('OPTIONS', 'db_options', loads)): # loads は JSON のパージング | |
value = getattr(self, attr_name) | |
# 適切な型に変換してパラメタ辞書に追加する | |
if value: | |
try: | |
value = data_type(value) | |
params[key] = value | |
except ValueError: | |
# 変換がうまく行かなければ追加しない | |
pass | |
return params | |
def save(self): | |
"""Dbms インスタンスを DB に保存する | |
""" | |
# 外部DBの組み込み前からコネクションハンドラ内にある名前は使わせない | |
if self.name in AUX_DB.builtin_aliases: | |
raise IntegrityError('Database aliases predefined in settings.DATABASES is not allowed.', {'alias': self.name}) | |
# db_port が空文字列でなく、かつ integer でない場合は ValueError | |
# 不正な db_port が設定されないよう、フォームで防ぐこと。 | |
if self.db_port: | |
try: | |
int(self.db_port) | |
except ValueError: | |
raise ValueError('db_port should be an empty string or digits of integer.') | |
# db_options が空文字列でなく、不正な JSON の時は ValueError | |
# 不正な JSON が設定されないよう、フォームで防ぐこと。 | |
if self.db_options: | |
try: | |
loads(self.db_options) | |
except ValueError: | |
raise ValueError('db_options should be an empty string or valid JSON-formatted data.') | |
saved_object = super(Dbms, self).save() | |
# 名前が変更されている可能性もあるので、常に db_connections をリビルドする | |
self.__class__.rebuild_connections() | |
return saved_object | |
@property | |
def connection(self): | |
"""データベースへの接続を返す。接続に失敗したら None を返す。 | |
""" | |
try: | |
return db_connections[self.name] | |
except Exception as e: | |
logger.debug('Dbms.connection failed.', self.name) | |
return None | |
@property | |
def connection_error(self): | |
"""データベースへの接続を試み、エラーのときは例外オブジェクトを返す。 | |
""" | |
ret = None | |
try: | |
db_connections[self.name] | |
except Exception as e: | |
ret = e | |
return ret | |
# *_tables_sql は、 django.core.management.sql の機能を使って | |
# テーブルの構築・削除用の SQL を生成する。 | |
@property | |
def cursor(self): | |
"""データベースのカーソルを返す。 | |
""" | |
if self.connection: | |
return self.connection.cursor() | |
return None | |
def execute(self, sql_statement): | |
"""カーソルを生成して、SQL を実行する。 | |
""" | |
ret = None | |
if self.cursor: | |
res = self.cursor.execute(sql_statement) | |
ret = res.fetchall() | |
return ret | |
def execute_list(self, sql_statements): | |
"""カーソルを生成して、SQL を実行する。 | |
""" | |
if self.cursor: | |
ret = [] | |
for sql_statement in sql_statements: | |
res = self.cursor.execute(sql_statement) | |
ret.extend(res.fetchall()) | |
return ret | |
# カーソルが取得できないときは None を返す | |
return None | |
def get_app(self, app_package_name): | |
"""パッケージパスから app を返す。 | |
app の名前が同じでも、パッケージパスが異なる場合は None を返す。 | |
""" | |
app_name = app_package_name.split('.')[-1] | |
try: | |
app = models.get_app(app_name) | |
if app.__package__==app_package_name: | |
return app | |
else: | |
# 別のパッケージの app が見つかってしまったら、ログで警告する | |
logger.warning( | |
'Dbms.get_app detected app listed in ' | |
'AUX_DB_APPLICATIONS but models.get_app() returns ' | |
'another app in different package.', app.__pacage__) | |
except ImproperlyConfigured as e: | |
# パッケージが見つからないと ImproperlyConfigured が送出される | |
# ログで警告する | |
logger.warning( | |
'Dbms.get_app detected invalid app declaration in ' | |
'AUX_DB_APPLICATIONS', app_package_name) | |
# うまくいかない時は None を返す | |
return None | |
def get_apps(self): | |
"""AUX_DB_APPLICATIONS に登録されている app を返す。 | |
""" | |
return filter( | |
None, (self.get_app(app_package_name) | |
for app_package_name in AUX_DB_APPLICATIONS)) | |
def create_tables_sql(self): | |
"""テーブルとインデクスを構築するための SQL を返す | |
""" | |
sql_statements = [] | |
if self.connection: | |
for app in self.get_apps(): | |
sql_statements.extend(sql_all(app, NO_STYLE, self.connection)) | |
return sql_statements | |
def delete_tables_sql(self): | |
"""テーブルを破棄するための SQL を返す | |
""" | |
sql_statements = [] | |
if self.connection: | |
for app in self.get_apps(): | |
sql_statements.extend(sql_delete(app, NO_STYLE, self.connection)) | |
return sql_statements | |
def destroy_indexes_sql(self): | |
"""インデクスを破棄するための SQL を返す | |
""" | |
sql_statements = [] | |
if self.connection: | |
for app in self.get_apps(): | |
sql_statements.extend(sql_destroy_indexes(app, NO_STYLE, self.connection)) | |
return sql_statements | |
def flush_tables_sql(self, reset_sequences=True, allow_cascade=False): | |
"""テーブル上のデータを消去するための SQL を返す | |
""" | |
sql_statements = [] | |
if self.connection: | |
for app in self.get_apps(): | |
sql_statements.extend(sql_flush( | |
NO_STYLE, self.connection, | |
reset_sequences=reset_sequences, | |
allow_cascade=allow_cascade)) | |
return sql_statements | |
# 後述の *_tables メソッドは、 Admin role でない接続が | |
# 管理用の SQL を実行しないように、 | |
# ensure_admin_role_connection デコレータで排除する。 | |
# また、これらのメソッドは複数の SQL 文を実行するため、 | |
# メソッドを django.db.transaction.atomic で修飾しておく。 | |
def ensure_admin_role_connection(func): | |
"""Admin 接続でなければ ValueError とするためのデコレータ | |
このデコレータをインスタンスメソッドとして呼び出さないように! | |
""" | |
@wraps(func) | |
def wrapped(self, *args, **kwargs): | |
# Admin 接続でなければ ValueError | |
if not (self.is_admin==True): | |
raise ValueError('Not allowed for non-admin connection.') | |
# そうでなければラップ対象の関数を呼ぶ | |
return func(self, *args, **kwargs) | |
# ... という関数を作成して返す | |
return atomic(wrapped) | |
@ensure_admin_role_connection | |
def create_tables(self): | |
"""テーブルを構築する | |
""" | |
return self.execute_list(self.create_tables_sql()) | |
@ensure_admin_role_connection | |
def delete_tables(self, delete_index=True, raise_delete_index_error=False): | |
"""テーブルを破棄する。 | |
""" | |
ret = [] | |
delete_result = self.execute_list(self.delete_tables_sql()) | |
if delete_result is None: # カーソルを取得できない | |
return None | |
# delete_index が true なら、 delete_index も実行する | |
if delete_index: | |
try: | |
destroy_indexes_result = self.execute_list(self.destroy_indexes_sql()) | |
if destroy_indexes_result is None: # 何故か途中でカーソルを取得できなくなった場合 | |
return ret | |
ret.extend(destroy_indexes_result) | |
except OperationalError as e: | |
if raise_delete_index_error: | |
raise e | |
return ret | |
@ensure_admin_role_connection | |
def flush_tables(self): | |
"""テーブル上のデータを破棄する。 | |
""" | |
return self.execute_list(self.flush_tables_sql()) | |
# このメソッドの実装は複雑なのであとまわしとし、将来のために用意だけしておく | |
@ensure_admin_role_connection | |
def install_fixtures(self, fixtures): | |
"""Install fixtures into database. | |
""" | |
pass | |
# このメソッドの実装は複雑なので、将来のために用意だけしておく | |
@ensure_admin_role_connection | |
def migrate(self): | |
"""データベーススキーマを移行する | |
""" | |
pass | |
# シグナルハンドラ | |
# インスタンスの delete() や QuerySet の delete() でレコードが削除されるときに、 | |
# コネクションを除去しておく。 | |
# モデルの delete() だけでは、Dbms.objects.all().delete() の時にうまくいかないので | |
# シグナルで実装する。詳しくは Django ドキュメントのシグナルと pre_delete を参照 | |
# すること。 | |
@receiver(pre_delete, sender=Dbms, dispatch_uid='remove_connection_on_delete_dbms') | |
def pre_delete_dbms(sender, instance, **kwargs): | |
alias = instance.name | |
Dbms.do_remove_connection(alias) | |
def load_dbms_connections(*args, **kwargs): | |
"""外部DBへの接続をロードする | |
""" | |
logger.debug('Triggered load_dbms_connections.') | |
Dbms.rebuild_connections() | |
# request_started シグナルレシーバで、初期化時に自動的にDB接続を構築する | |
@receiver(request_started) | |
def on_first_request(*args, **kwargs): | |
# 最初の一度だけで、このレシーバを disconnect する | |
request_started.disconnect(on_first_request) | |
load_dbms_connections(*args, **kwargs) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment