Last active
August 29, 2015 14:15
-
-
Save nakagami/755bc066c5e9ee8354dd to your computer and use it in GitHub Desktop.
PostgreSQL 統計処理用の集約関数の python テストコード
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding:utf-8 | |
# PostgreSQL 統計処理用の集約関数 SQL:2006 | |
# https://www.postgresql.jp/document/9.3/html/functions-aggregate.html | |
import unittest | |
import minipg | |
import decimal | |
values_with_null = [ | |
(1, 1), | |
(2, 2), | |
(3, 4), | |
(4, None), | |
(5, 16), | |
(None, 32), | |
(7, 64), | |
(8, 128), | |
(9, 256), | |
(10, 512), | |
] | |
values_without_null = [ | |
(1, 1), | |
(2, 2), | |
(3, 4), | |
(4, 8), | |
(5, 16), | |
(6, 32), | |
(7, 64), | |
(8, 128), | |
(9, 256), | |
(10, 512), | |
] | |
class TestAggregate(unittest.TestCase): | |
host='localhost' | |
user='postgres' | |
password='' | |
database='postgres' | |
def setUp(self): | |
self.connection = minipg.connect( | |
host=self.host, | |
user=self.user, | |
password=self.password, | |
database=self.database) | |
cur = self.connection.cursor() | |
cur.execute(""" | |
create temporary table test_int_with_null ( | |
x integer, | |
y integer | |
) | |
""") | |
cur.execute(""" | |
create temporary table test_double_with_null ( | |
x double precision, | |
y double precision | |
) | |
""") | |
cur.execute(""" | |
create temporary table test_int_without_null ( | |
x integer, | |
y integer | |
) | |
""") | |
cur.execute(""" | |
create temporary table test_double_without_null ( | |
x double precision, | |
y double precision | |
) | |
""") | |
for x, y in values_with_null: | |
cur.execute("insert into test_int_with_null (X, Y) values (%s, %s)", (x, y)) | |
cur.execute("insert into test_double_with_null (X, Y) values (%s, %s)", (x, y)) | |
for x, y in values_without_null: | |
cur.execute("insert into test_int_without_null (X, Y) values (%s, %s)", (x, y)) | |
cur.execute("insert into test_double_without_null (X, Y) values (%s, %s)", (x, y)) | |
self.connection.commit() | |
def tearDown(self): | |
self.connection.close() | |
def test_corr(self): | |
# corr(Y, X): 相関係数 | |
cur = self.connection.cursor() | |
cur.execute("select corr(y, x) from test_double_with_null") | |
self.assertEqual(cur.fetchone()[0], 0.812251960260244) | |
cur.execute("select corr(y, x) from test_double_without_null") | |
self.assertEqual(cur.fetchone()[0], 0.798837242618123) | |
cur.execute("select corr(y, x) from test_int_with_null") | |
self.assertEqual(cur.fetchone()[0], 0.812251960260244) | |
cur.execute("select corr(y, x) from test_int_without_null") | |
self.assertEqual(cur.fetchone()[0], 0.798837242618123) | |
def test_covar_pop(self): | |
# covar_pop(Y, X): 母共分散 | |
cur = self.connection.cursor() | |
cur.execute("select covar_pop(y, x) from test_double_with_null") | |
self.assertEqual(cur.fetchone()[0], 432.953125) | |
cur.execute("select covar_pop(y, x) from test_double_without_null") | |
self.assertEqual(cur.fetchone()[0], 359.05) | |
cur.execute("select covar_pop(y, x) from test_int_with_null") | |
self.assertEqual(cur.fetchone()[0], 432.953125) | |
cur.execute("select covar_pop(y, x) from test_int_without_null") | |
self.assertEqual(cur.fetchone()[0], 359.05) | |
def test_covar_samp(self): | |
# covar_samp(Y, X): 標本共分散 | |
cur = self.connection.cursor() | |
cur.execute("select covar_samp(y, x) from test_double_with_null") | |
self.assertEqual(cur.fetchone()[0], 494.803571428571) | |
cur.execute("select covar_samp(y, x) from test_double_without_null") | |
self.assertEqual(cur.fetchone()[0], 398.944444444444) | |
cur.execute("select covar_samp(y, x) from test_int_with_null") | |
self.assertEqual(cur.fetchone()[0], 494.803571428571) | |
cur.execute("select covar_samp(y, x) from test_int_without_null") | |
self.assertEqual(cur.fetchone()[0], 398.944444444444) | |
def test_regr_avgx(self): | |
# regr_avgx(Y, X): 独立変数の平均値 (sum(X)/N) | |
cur = self.connection.cursor() | |
cur.execute("select regr_avgx(y, x) from test_double_with_null") | |
self.assertEqual(cur.fetchone()[0], 5.625) | |
cur.execute("select regr_avgx(y, x) from test_double_without_null") | |
self.assertEqual(cur.fetchone()[0], 5.5) | |
cur.execute("select regr_avgx(y, x) from test_int_with_null") | |
self.assertEqual(cur.fetchone()[0], 5.625) | |
cur.execute("select regr_avgx(y, x) from test_int_without_null") | |
self.assertEqual(cur.fetchone()[0], 5.5) | |
def test_regr_avgy(self): | |
# regr_avgy(Y, X): 依存変数の平均値 (sum(Y)/N) | |
cur = self.connection.cursor() | |
cur.execute("select regr_avgy(y, x) from test_double_with_null") | |
self.assertEqual(cur.fetchone()[0], 122.875) | |
cur.execute("select regr_avgy(y, x) from test_double_without_null") | |
self.assertEqual(cur.fetchone()[0], 102.3) | |
cur.execute("select regr_avgy(y, x) from test_int_with_null") | |
self.assertEqual(cur.fetchone()[0], 122.875) | |
cur.execute("select regr_avgy(y, x) from test_int_without_null") | |
self.assertEqual(cur.fetchone()[0], 102.3) | |
def test_regr_count(self): | |
# regr_count(Y, X): 両式が非NULLとなる入力行の個数 | |
cur = self.connection.cursor() | |
cur.execute("select regr_count(y, x) from test_int_with_null") | |
self.assertEqual(cur.fetchone()[0], 8) | |
cur.execute("select regr_count(y, x) from test_int_without_null") | |
self.assertEqual(cur.fetchone()[0], 10) | |
def test_regr_intercept(self): | |
# regr_intercept(Y, X): (X, Y)の組み合わせで決まる、線型方程式に対する最小二乗法のY切片 | |
cur = self.connection.cursor() | |
cur.execute("select regr_intercept(y, x) from test_double_with_null") | |
self.assertEqual(cur.fetchone()[0], -121.042253521127) | |
cur.execute("select regr_intercept(y, x) from test_double_without_null") | |
self.assertEqual(cur.fetchone()[0], -137.066666666667) | |
cur.execute("select regr_intercept(y, x) from test_int_with_null") | |
self.assertEqual(cur.fetchone()[0], -121.042253521127) | |
cur.execute("select regr_intercept(y, x) from test_int_without_null") | |
self.assertEqual(cur.fetchone()[0], -137.066666666667) | |
def test_regr_r2(self): | |
# regr_r2(Y, X): 相関係数自乗値 | |
cur = self.connection.cursor() | |
cur.execute("select regr_r2(y, x) from test_double_with_null") | |
self.assertEqual(cur.fetchone()[0], 0.659753246946609) | |
cur.execute("select regr_r2(y, x) from test_double_without_null") | |
self.assertEqual(cur.fetchone()[0], 0.638140940193726) | |
cur.execute("select regr_r2(y, x) from test_int_with_null") | |
self.assertEqual(cur.fetchone()[0], 0.659753246946609) | |
cur.execute("select regr_r2(y, x) from test_int_without_null") | |
self.assertEqual(cur.fetchone()[0], 0.638140940193726) | |
def test_regr_slope(self): | |
# regr_slope(Y, X): (X, Y)の組み合わせで決まる、最小自乗法に合う線型方程式の傾き | |
cur = self.connection.cursor() | |
cur.execute("select regr_slope(y, x) from test_double_with_null") | |
self.assertEqual(cur.fetchone()[0], 43.3630672926448) | |
cur.execute("select regr_slope(y, x) from test_double_without_null") | |
self.assertEqual(cur.fetchone()[0], 43.5212121212121) | |
cur.execute("select regr_slope(y, x) from test_int_with_null") | |
self.assertEqual(cur.fetchone()[0], 43.3630672926448) | |
cur.execute("select regr_slope(y, x) from test_int_without_null") | |
self.assertEqual(cur.fetchone()[0], 43.5212121212121) | |
def test_regr_sxx(self): | |
# regr_sxx(Y, X): sum(X^2) - sum(X)^2/N (依存変数の"二乗和") | |
cur = self.connection.cursor() | |
cur.execute("select regr_sxx(y, x) from test_double_with_null") | |
self.assertEqual(cur.fetchone()[0], 79.875) | |
cur.execute("select regr_sxx(y, x) from test_double_without_null") | |
self.assertEqual(cur.fetchone()[0], 82.5) | |
cur.execute("select regr_sxx(y, x) from test_int_with_null") | |
self.assertEqual(cur.fetchone()[0], 79.875) | |
cur.execute("select regr_sxx(y, x) from test_int_without_null") | |
self.assertEqual(cur.fetchone()[0], 82.5) | |
def test_regr_sxy(self): | |
# regr_sxy(Y, X): sum(X*Y) - sum(X) * sum(Y)/N (依存変数×独立変数の"和") | |
cur = self.connection.cursor() | |
cur.execute("select regr_sxy(y, x) from test_double_with_null") | |
self.assertEqual(cur.fetchone()[0], 3463.625) | |
cur.execute("select regr_sxy(y, x) from test_double_without_null") | |
self.assertEqual(cur.fetchone()[0], 3590.5) | |
cur.execute("select regr_sxy(y, x) from test_int_with_null") | |
self.assertEqual(cur.fetchone()[0], 3463.625) | |
cur.execute("select regr_sxy(y, x) from test_int_without_null") | |
self.assertEqual(cur.fetchone()[0], 3590.5) | |
def test_regr_syy(self): | |
# regr_syy(Y, X): sum(Y^2) - sum(Y)^2/N (独立変数の"自乗和") | |
cur = self.connection.cursor() | |
cur.execute("select regr_syy(y, x) from test_double_with_null") | |
self.assertEqual(cur.fetchone()[0], 227650.875) | |
cur.execute("select regr_syy(y, x) from test_double_without_null") | |
self.assertEqual(cur.fetchone()[0], 244872.1) | |
cur.execute("select regr_syy(y, x) from test_int_with_null") | |
self.assertEqual(cur.fetchone()[0], 227650.875) | |
cur.execute("select regr_syy(y, x) from test_int_without_null") | |
self.assertEqual(cur.fetchone()[0], 244872.1) | |
def test_stddev_pop(self): | |
# stddev_pop(expression): 母標準偏差 | |
cur = self.connection.cursor() | |
cur.execute("select stddev_pop(x) from test_double_with_null") | |
self.assertEqual(cur.fetchone()[0], 3.02254900194121) | |
cur.execute("select stddev_pop(x) from test_double_without_null") | |
self.assertEqual(cur.fetchone()[0], 2.87228132326901) | |
cur.execute("select stddev_pop(x) from test_int_with_null") | |
self.assertEqual(cur.fetchone()[0], decimal.Decimal("3.0225490019412096")) | |
cur.execute("select stddev_pop(x) from test_int_without_null") | |
self.assertEqual(cur.fetchone()[0], decimal.Decimal("2.8722813232690143")) | |
def test_stddev_samp(self): | |
# stddev_samp(expression): 標本標準偏差 | |
cur = self.connection.cursor() | |
cur.execute("select stddev_samp(x) from test_double_with_null") | |
self.assertEqual(cur.fetchone()[0], 3.20589734361189) | |
cur.execute("select stddev_samp(x) from test_double_without_null") | |
self.assertEqual(cur.fetchone()[0], 3.02765035409749) | |
cur.execute("select stddev_samp(x) from test_int_with_null") | |
self.assertEqual(cur.fetchone()[0], decimal.Decimal("3.2058973436118908")) | |
cur.execute("select stddev_samp(x) from test_int_without_null") | |
self.assertEqual(cur.fetchone()[0], decimal.Decimal("3.0276503540974917")) | |
def test_var_pop(self): | |
# var_pop(expression): 母分散(母標準偏差の自乗) | |
cur = self.connection.cursor() | |
cur.execute("select var_pop(x) from test_double_with_null") | |
self.assertEqual(cur.fetchone()[0], 9.1358024691358) | |
cur.execute("select var_pop(x) from test_double_without_null") | |
self.assertEqual(cur.fetchone()[0], 8.25) | |
cur.execute("select var_pop(x) from test_int_with_null") | |
self.assertEqual(cur.fetchone()[0], decimal.Decimal("9.1358024691358025")) | |
cur.execute("select var_pop(x) from test_int_without_null") | |
self.assertEqual(cur.fetchone()[0], 8.25) | |
def test_var_samp(self): | |
# var_samp(expression): 標本分散(標本標準偏差の二乗) | |
cur = self.connection.cursor() | |
cur.execute("select var_samp(x) from test_double_with_null") | |
self.assertEqual(cur.fetchone()[0], 10.2777777777778) | |
cur.execute("select var_samp(x) from test_double_without_null") | |
self.assertEqual(cur.fetchone()[0], 9.16666666666667) | |
cur.execute("select var_samp(x) from test_int_with_null") | |
self.assertEqual(cur.fetchone()[0], decimal.Decimal("10.2777777777777778")) | |
cur.execute("select var_samp(x) from test_int_without_null") | |
self.assertEqual(cur.fetchone()[0], decimal.Decimal("9.1666666666666667")) | |
if __name__ == "__main__": | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment