Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2024.1.12 Add new CAT Strategy #10

Merged
merged 12 commits into from
Jan 15, 2024
Next Next commit
Add BECAT
  • Loading branch information
Hhhhhhand committed Oct 24, 2023
commit 52a1e6fb788f3aae8abf0f99467845e7442072dd
91 changes: 89 additions & 2 deletions CAT/model/IRT.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
import torch
import torch.nn as nn
import numpy as np
import math
import torch.utils.data as data
from math import exp as exp
from sklearn.metrics import roc_auc_score
from scipy import integrate

import time
from CAT.model.abstract_model import AbstractModel
from CAT.dataset import AdapTestDataset, TrainDataset, Dataset

from sklearn.metrics import accuracy_score

class IRT(nn.Module):
def __init__(self, num_students, num_questions, num_dim):
Expand Down Expand Up @@ -157,10 +158,16 @@ def evaluate(self, adaptest_data: AdapTestDataset):
real = np.array(real)
pred = np.array(pred)
auc = roc_auc_score(real, pred)

# Calculate accuracy
threshold = 0.5 # You may adjust the threshold based on your use case
binary_pred = (pred >= threshold).astype(int)
acc = accuracy_score(real, binary_pred)

return {
'auc': auc,
'cov': cov,
'acc': acc
}

def get_pred(self, adaptest_data: AdapTestDataset):
Expand Down Expand Up @@ -277,6 +284,86 @@ def get_fisher(self, student_id, question_id, pred_all):
q = 1 - pred
fisher_info = (q*pred*(alpha * alpha.T)).numpy()
return fisher_info
def IRT_derivate(self,pred_all):

new_predictions = {}
for sid, qid_dict in pred_all.items():
new_predictions[sid] = {}
for qid, pred in qid_dict.items():
new_pred = pred * (1 - pred)
new_predictions[sid][qid] = new_pred

def bce_loss_derivative(self,pred, target):
derivative = (pred - target) / (pred * (1 - pred))
return derivative
def get_BE_weights(self, pred_all):
"""
Returns:
predictions, dict[sid][qid]
"""
d = 100
Pre_true={}
Pre_false={}
Der={}
for qid, pred in pred_all.items():
Pre_true[qid] = pred
Pre_false[qid] = 1 - pred
Der[qid] =pred*(1-pred)*self.get_alpha(qid)
w_ij_matrix={}
for i ,_ in pred_all.items():
w_ij_matrix[i] = {}
for j,_ in pred_all.items():
w_ij_matrix[i][j] = 0
for i,_ in pred_all.items():
for j,_ in pred_all.items():
gradients_theta1 = self.bce_loss_derivative(Pre_true[i],1.0) * Der[i]
gradients_theta2 = self.bce_loss_derivative(Pre_true[i],0.0) * Der[i]
gradients_theta3 = self.bce_loss_derivative(Pre_true[j],1.0) * Der[j]
gradients_theta4 = self.bce_loss_derivative(Pre_true[j],0.0) * Der[j]
diff_norm_00 = math.fabs(gradients_theta1 - gradients_theta3)
diff_norm_01 = math.fabs(gradients_theta1 - gradients_theta4)
diff_norm_10 = math.fabs(gradients_theta2 - gradients_theta3)
diff_norm_11 = math.fabs(gradients_theta2 - gradients_theta4)
Expect = Pre_false[i]*Pre_false[j]*diff_norm_00 + Pre_false[i]*Pre_true[j]*diff_norm_01 +Pre_true[i]*Pre_false[j]*diff_norm_10 + Pre_true[i]*Pre_true[j]*diff_norm_11
w_ij_matrix[i][j] = d - Expect
return w_ij_matrix

def F_s_func(self,S_set,w_ij_matrix):
res = 0.0
for w_i in w_ij_matrix:
if(w_i not in S_set):
mx = float('-inf')
for j in S_set:
if w_ij_matrix[w_i][j] > mx:
mx = w_ij_matrix[w_i][j]
res +=mx

return res

def delta_q_S_t(self, question_id, pred_all,S_set,sampled_elements):
""" get BECAT Questions weights delta
Args:
student_id: int, student id
question_id: int, question id
Returns:
v: float, Each weight information
"""

Sp_set = list(S_set)
b_array = np.array(Sp_set)
sampled_elements = np.concatenate((sampled_elements, b_array), axis=0)
if question_id not in sampled_elements:
sampled_elements = np.append(sampled_elements, question_id)
sampled_dict = {key: value for key, value in pred_all.items() if key in sampled_elements}

w_ij_matrix = self.get_BE_weights(sampled_dict)

F_s = self.F_s_func(Sp_set,w_ij_matrix)

Sp_set.append(question_id)
F_sp =self.F_s_func(Sp_set,w_ij_matrix)
return F_sp - F_s


def expected_model_change(self, sid: int, qid: int, adaptest_data: AdapTestDataset, pred_all: dict):
""" get expected model change
Expand Down
97 changes: 94 additions & 3 deletions CAT/model/NCD.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import numpy as np
import torch.nn as nn
import torch.utils.data as data
from sklearn.metrics import roc_auc_score

from sklearn.metrics import roc_auc_score,accuracy_score
import math
from CAT.model.abstract_model import AbstractModel
from CAT.dataset import AdapTestDataset, TrainDataset, Dataset

Expand Down Expand Up @@ -216,12 +216,19 @@ def evaluate(self, adaptest_data: AdapTestDataset):
cov = sum(coverages) / len(coverages)

real = np.array(real)
real = np.where(real < 0.5, 0.0, 1.0)
pred = np.array(pred)
auc = roc_auc_score(real, pred)

# Calculate accuracy
threshold = 0.5 # You may adjust the threshold based on your use case
binary_pred = (pred >= threshold).astype(int)
acc = accuracy_score(real, binary_pred)

return {
'auc': auc,
'cov': cov,
'acc': acc
}

def get_pred(self, adaptest_data: AdapTestDataset):
Expand Down Expand Up @@ -307,4 +314,88 @@ def expected_model_change(self, sid: int, qid: int, adaptest_data: AdapTestDatas
# pred = self.model(student_id, question_id, concepts_emb).item()
pred = pred_all[sid][qid]
return pred * torch.norm(pos_weights - original_weights).item() + \
(1 - pred) * torch.norm(neg_weights - original_weights).item()
(1 - pred) * torch.norm(neg_weights - original_weights).item()

def get_BE_weights(self, pred_all):
"""
Returns:
predictions, dict[sid][qid]
"""
d = 100
Pre_true={}
Pre_false={}
for qid, pred in pred_all.items():
Pre_true[qid] = pred
Pre_false[qid] = 1 - pred
w_ij_matrix={}
for i ,_ in pred_all.items():
w_ij_matrix[i] = {}
for j,_ in pred_all.items():
w_ij_matrix[i][j] = 0
for i,_ in pred_all.items():
for j,_ in pred_all.items():
criterion_true_1 = nn.BCELoss() # Binary Cross-Entropy Loss for loss(predict_true, 1)
criterion_false_1 = nn.BCELoss() # Binary Cross-Entropy Loss for loss(predict_false, 1)
criterion_true_0 = nn.BCELoss() # Binary Cross-Entropy Loss for loss(predict_true, 0)
criterion_false_0 = nn.BCELoss() # Binary Cross-Entropy Loss for loss(predict_false, 0)
tensor_11=torch.tensor(Pre_true[i],requires_grad=True)
tensor_12=torch.tensor(Pre_true[j],requires_grad=True)
loss_true_1 = criterion_true_1(tensor_11, torch.tensor(1.0))
loss_false_1 = criterion_false_1(tensor_11, torch.tensor(0.0))
loss_true_0 = criterion_true_0(tensor_12, torch.tensor(1.0))
loss_false_0 = criterion_false_0(tensor_12, torch.tensor(0.0))
loss_true_1.backward()
grad_true_1 = tensor_11.grad.clone()
tensor_11.grad.zero_()
loss_false_1.backward()
grad_false_1 = tensor_11.grad.clone()
tensor_11.grad.zero_()
loss_true_0.backward()
grad_true_0 = tensor_12.grad.clone()
tensor_12.grad.zero_()
loss_false_0.backward()
grad_false_0 = tensor_12.grad.clone()
tensor_12.grad.zero_()
diff_norm_00 = math.fabs(grad_true_1 - grad_true_0)
diff_norm_01 = math.fabs(grad_true_1 - grad_false_0)
diff_norm_10 = math.fabs(grad_false_1 - grad_true_0)
diff_norm_11 = math.fabs(grad_false_1 - grad_false_0)
Expect = Pre_false[i]*Pre_false[j]*diff_norm_00 + Pre_false[i]*Pre_true[j]*diff_norm_01 +Pre_true[i]*Pre_false[j]*diff_norm_10 + Pre_true[i]*Pre_true[j]*diff_norm_11
w_ij_matrix[i][j] = d - Expect
return w_ij_matrix

def F_s_func(self,S_set,w_ij_matrix):
res = 0.0
for w_i in w_ij_matrix:
if(w_i not in S_set):
mx = float('-inf')
for j in S_set:
if w_ij_matrix[w_i][j] > mx:
mx = w_ij_matrix[w_i][j]
res +=mx

return res

def delta_q_S_t(self, question_id, pred_all,S_set,sampled_elements):
""" get BECAT Questions weights delta
Args:
student_id: int, student id
question_id: int, question id
Returns:
v: float, Each weight information
"""

Sp_set = list(S_set)
b_array = np.array(Sp_set)
sampled_elements = np.concatenate((sampled_elements, b_array), axis=0)
if question_id not in sampled_elements:
sampled_elements = np.append(sampled_elements, question_id)
sampled_dict = {key: value for key, value in pred_all.items() if key in sampled_elements}

w_ij_matrix = self.get_BE_weights(sampled_dict)

F_s = self.F_s_func(Sp_set,w_ij_matrix)

Sp_set.append(question_id)
F_sp =self.F_s_func(Sp_set,w_ij_matrix)
return F_sp - F_s
37 changes: 37 additions & 0 deletions CAT/strategy/BECAT_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import numpy as np

from CAT.strategy.abstract_strategy import AbstractStrategy
from CAT.model import AbstractModel
from CAT.dataset import AdapTestDataset
import random

class BECATstrategy(AbstractStrategy):

def __init__(self):
super().__init__()
@property
def name(self):
return 'BECAT Strategy'
def adaptest_select(self, model: AbstractModel, adaptest_data: AdapTestDataset,S_set):
"""
submodular computation
"""
assert hasattr(model, 'delta_q_S_t'), \
'the models must implement delta_q_S_t method'
assert hasattr(model, 'get_pred'), \
'the models must implement get_pred method for accelerating'
pred_all = model.get_pred(adaptest_data)

#reduced_pred_all = {**reduced_pred_all, **selected_questions_sample}
selection = {}
for sid in range(adaptest_data.num_students):
tmplen = (len(S_set[sid]))
untested_questions = np.array(list(adaptest_data.untested[sid]))
sampled_elements = np.random.choice(untested_questions, tmplen + 5)
untested_deltaq = [model.delta_q_S_t(qid, pred_all[sid],S_set[sid],sampled_elements) for qid in untested_questions]

j = np.argmax(untested_deltaq)
selection[sid] = untested_questions[j]
# Question bank Q
return selection

5 changes: 4 additions & 1 deletion CAT/strategy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,7 @@
from .MFI_strategy import DoptStrategy
from .KLI_strategy import KLIStrategy
from .KLI_strategy import MKLIStrategy
from .MAAT_strategy import MAATStrategy
from .MAAT_strategy import MAATStrategy
from .BECAT_strategy import BECATstrategy
from .graph_stratgy import Grstrategy

22 changes: 19 additions & 3 deletions scripts/test.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@
],
"source": [
"for strategy in strategies:\n",
" model = CAT.model.IRTModel(**config)\n",
" model = CAT.model.NCDModel(**config)\n",
" model.init_model(test_data)\n",
" model.adaptest_load(ckpt_path)\n",
" test_data.reset()\n",
Expand All @@ -243,13 +243,29 @@
" results = model.evaluate(test_data)\n",
" for name, value in results.items():\n",
" logging.info(f'{name}:{value}')\n",
" \n",
" S_sel ={}\n",
" for sid in range(test_data.num_students):\n",
" key = sid\n",
" S_sel[key] = []\n",
" selected_questions={}\n",
" for it in range(1, test_length + 1):\n",
" logging.info(f'Iteration {it}')\n",
" # select question\n",
" selected_questions = strategy.adaptest_select(model, test_data)\n",
" if it == 1 and strategy.name == 'BECAT Strategy':\n",
" for sid in range(test_data.num_students):\n",
" untested_questions = np.array(list(test_data.untested[sid]))\n",
" random_index = random.randint(0, len(untested_questions)-1)\n",
" selected_questions[sid] = untested_questions[random_index]\n",
" S_sel[sid].append(untested_questions[random_index])\n",
" elif strategy.name == 'BECAT Strategy': \n",
" selected_questions = strategy.adaptest_select(model, test_data,S_sel)\n",
" for sid in range(test_data.num_students):\n",
" S_sel[sid].append(selected_questions[sid])\n",
" else:\n",
" selected_questions = strategy.adaptest_select(model, test_data)\n",
" for student, question in selected_questions.items():\n",
" test_data.apply_selection(student, question)\n",
" \n",
" # update models\n",
" model.adaptest_update(test_data)\n",
" # evaluate models\n",
Expand Down