Skip to content

Instantly share code, notes, and snippets.

@huni1023

huni1023/.py Secret

Created November 27, 2023 02:14
Show Gist options
  • Save huni1023/c3d8228d859f5337cd14837e12417c89 to your computer and use it in GitHub Desktop.
Save huni1023/c3d8228d859f5337cd14837e12417c89 to your computer and use it in GitHub Desktop.
simple implementation of RaschModel by farshadl123
import os
import pandas as pd
import pymc3 as pm
import theano.tensor as tt
import matplotlib.pyplot as plt
from collections import Counter
from datetime import datetime
# 23.11.27 (Mon)
# ref: https://towardsdatascience.com/a-bayesian-approach-to-rasch-models-item-response-theory-cc08805cbb37
App_dir = os.getcwd()
class Rasch:
def __init__(self):
self.data = Rasch.create_data(self)
self.start = datetime.now()
def create_data(self):
data = pd.DataFrame.from_dict({
"Question A": "1 1 1 1 1 1 1 1 1 1 1 0 0 0 1 1".split(),
"Question B": "1 1 1 1 1 1 1 1 0 0 1 0 0 1 0 0".split(),
"Question C": "1 1 1 1 1 1 1 0 1 1 0 0 0 1 0 0".split(),
"Question D": "1 1 1 0 0 0 0 0 0 0 1 1 0 0 0 0".split(),
"Question E": "1 0 0 1 0 0 0 0 0 0 0 1 0 0 0 0".split()
})
data.index = [f"Person {idx}" for idx in range(16)]
data = data.apply(pd.to_numeric)
return data
def exploration(self):
data_c = self.data.copy(deep=True)
fig, axs = plt.subplots(1, 2)
# score distribution
score_ls = list()
for idx in range(data_c.shape[0]):
percent = data_c.iloc[idx, :].sum()
score_ls.append(percent)
data_c['score'] = score_ls
score_vc = data_c['score'].value_counts()
score_vc = score_vc.sort_index()
score_vc.index = [f"{ele*20}%" for ele in list(score_vc.index)]
axs[0].bar(x = score_vc.index, height = score_vc.values)
axs[0].set_title('Score Distribution')
# question difficulty
difficulty_ls = list()
for col in self.data.columns:
total_correct = data_c.loc[:, col].sum()
difficulty_ls.append(total_correct)
difficulti_series = pd.Series(difficulty_ls, index="A B C D E".split())
axs[1] = difficulti_series.plot(kind='bar', title="Question Difficulty")
plt.savefig(os.path.join(App_dir, 'exploration.png'))
return data_c
def build_model(self):
plt.figure(figsize=(16, 7))
with pm.Model() as model:
## Independent priors
alpha = pm.Normal('Person', mu = 0, sigma = 3, shape = (1, len(rch.data)))
gamma = pm.Normal('Question', mu = 0, sigma = 3, shape = (rch.data.shape[1], 1))
## Log-Likelihood
def logp(d):
v1 = tt.transpose(d) * tt.log(tt.nnet.sigmoid(alpha - (gamma - gamma.mean(0))))
v2 = tt.transpose((1-d)) * tt.log(1 - tt.nnet.sigmoid(alpha - (gamma - gamma.mean(0))))
return v1 + v2
ll = pm.DensityDist('ll', logp, observed = {'d': rch.data.values})
trace = pm.sample(1500, cores=-1, step = pm.NUTS())
trace = trace[250:]
pm.traceplot(trace)
plt.savefig(os.path.join(App_dir, 'Figure1.png'))
plt.close()
trace = pm.trace_to_dataframe(trace)
self.trace = trace
trace.to_excel(os.path.join(App_dir, 'trace.xlsx'), index=False)
return trace
def visualize(self, **kwargs):
r"""visualize ability and difficulty"""
try:
self.trace
except AttributeError:
self.trace = pd.read_excel(os.path.join(App_dir, 'trace.xlsx'))
plt.figure(figsize=(16,10))
fig, axs = plt.subplots(16, 2, sharex="row")
for idx in range(self.data.shape[0]):
axs[idx, 0].hist(x=self.trace[f'Person__0_{idx}'].values)
axs[idx, 0].set_ylabel(f'Person_{idx}')
for idx in range(self.data.shape[1]):
axs[idx, 1].hist(x=self.trace[f'Question__{idx}_0'].values)
axs[idx, 1].set_ylabel(f'Person_{idx}')
plt.savefig(os.path.join(App_dir, 'result.png'))
if __name__ == '__main__':
rch = Rasch()
print('>> start: ', rch.start)
rch.exploration()
trace = rch.build_model()
rch.visualize()
print('>> End, computing time: ', datetime.now() - rch.start)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment