Created August 24, 2021
04. 各テーマから6問ずつの30問で、偏差値を算出する。
<a href="" target="_parent">Open In Colab</a>
"source": [
!pip install numpyro
"source": [
"import numpy as np\n",
"import pandas as pd\n",
"import numpyro\n",
"import arviz as az\n",
"import jax\n",
"import matplotlib.pyplot as plt\n",
"import seaborn as sns\n",
"from numpyro import distributions as dist\n",
"from numpyro.infer import NUTS,MCMC\n",
"from google.colab import drive\n",
"source": [
"import jax.numpy as jnp\n",
"import jax\n",
"import time\n",
"def L2P(a, b, x):\n",
" return 1 / (1 + jnp.exp(- a * (x - b)))\n",
"source": [
"item_score_all = pd.read_csv(f\"/content/drive/MyDrive/01. DXクライテリアWG/IRT分析/result-item-all.csv\",index_col=0)\n",
"item_score_all[\"name\"] = item_score_all.index\n",
"item_score_all[\"theme\"] = [x.split(\"-\")[0] for x in item_score_all[\"name\"].values]\n",
"item_score_all = item_score_all.sort_values(\"name\")\n",
"def load_param(name) :\n",
" item_score = pd.read_csv(f\"/content/drive/MyDrive/01. DXクライテリアWG/IRT分析/result-item-{name}.csv\",index_col=0)\n",
" return item_score.to_dict(orient=\"index\")\n",
"ITEM_PARAM = {\n",
" \"all\" : load_param(\"all\"),\n",
" \"team\" : load_param(\"team\"),\n",
" \"system\" : load_param(\"system\"),\n",
" \"data\" : load_param(\"data\"),\n",
" \"design\" : load_param(\"design\"),\n",
" \"corporate\" : load_param(\"corporate\"),\n",
"def gen_user_data(p,num=6) :\n",
" df = item_score_all\n",
" df = df[(df[\"alpha\"] > 0.2) & (df[\"beta\"]> -2)]\n",
" name = pd.concat([\n",
" df[df[\"theme\"] == \"team\"].sample(n=num),\n",
" df[df[\"theme\"] == \"system\"].sample(n=num),\n",
" df[df[\"theme\"] == \"data\"].sample(n=num),\n",
" df[df[\"theme\"] == \"design\"].sample(n=num),\n",
" df[df[\"theme\"] == \"corporate\"].sample(n=num),\n",
" ]).name.to_list()\n",
" ans = stats.bernoulli.rvs(p=p, size=len(name))\n",
" return [[name,ans] for name,ans in zip(name,ans) ]\n",
"def filter_theme(theme,data):\n",
" return [v for v in data if v[0].split(\"-\")[0] == theme]\n",
"source": [
"sample_data = gen_user_data(0.5)\n",
"source": [
"def skill_model_by_theme(theme,data):\n",
" items = np.array([v[0] for v in data])\n",
" answers = np.array([v[1] for v in data])\n",
" alpha = np.array([ITEM_PARAM[theme][e][\"alpha\"] for e in items])\n",
" beta = np.array([ITEM_PARAM[theme][e][\"beta\"] for e in items])\n",
" skill = numpyro.sample(f\"{theme}_skill\",dist.Normal(0,1))\n",
" with numpyro.plate(f\"plate_of_result_{theme}\",size= answers.size) as idx :\n",
" r = numpyro.sample(f\"r_{theme}\",\n",
" dist.Bernoulli( L2P(alpha,beta,skill) ),\n",
" obs=answers ) \n",
" return r \n",
"def skill_model(data):\n",
" r_all =skill_model_by_theme(\"all\",data)\n",
" for theme in \"team system data design corporate\".split(\" \"):\n",
" skill_model_by_theme(theme,filter_theme(theme,data))\n",
" return r_all\n",
"def hensachi(sample):\n",
" lower= np.percentile(sample,10)\n",
" median = np.median(sample)\n",
" higher= np.percentile(sample,90)\n",
" return np.ceil( np.array([lower,median,higher])*10+50)\n",
"def estimate_rank(data):\n",
" kernel = numpyro.infer.NUTS(skill_model)\n",
" mcmc = numpyro.infer.MCMC(kernel,num_warmup=500,num_samples=1500)\n",
" sample = mcmc.get_samples()['all_skill']\n",
" for theme in \"all team system data design corporate\".split(\" \"):\n",
" ret = hensachi(mcmc.get_samples()[f\"{theme}_skill\"])\n",
" print(f\"{theme}: {ret[0]} ~ {ret[1]} ~ {ret[2]}\")\n",
"with numpyro.handlers.seed(rng_seed=0): \n",
" print(skill_model(sample_data))\n"
"source": [
"source": [
"for p in range(6):\n",
" estimate_rank(gen_user_data(p/5))\n"
