Created December 21, 2020 12:55
import numpy as np
import matplotlib.pyplot as plt
import time
from sklearn.datasets import make_blobs,make_circles,make_moons
from sklearn.preprocessing import StandardScaler
class SMOStruct:
""" 按照John Platt的论文构造SMO的数据结构"""
def __init__(self, X, y, C, kernel, alphas, b, errors, tol, eps, user_linear_optim, loop_threshold=2000):
self.X = X # 训练样本
self.y = y # 类别 label
self.C = C # 正则化常量,用于调整(过)拟合的程度
self.kernel = kernel # 核函数,实现了两个核函数,线性和高斯(RBF)
self.alphas = alphas # 拉格朗日乘子,与样本一一相对
self.b = b # 标量,偏移量
self.errors = errors # 用于存储alpha值实际与预测值得差值,与样本数量一一相对
self.tol = tol # error tolerance
self.eps = eps # alpha tolerance
self.m, self.n = np.shape(self.X) # 训练样本的个数m 和 每个样本的features数量n
self.user_linear_optim = user_linear_optim # 判断模型是否使用线性核函数
self.w = np.zeros(self.n) # 初始化权重w的值,主要用于线性核函数
#self.b = 0
self.loop_threshold = loop_threshold
# 差值矩阵
initial_error = SMOStruct.decision_function(self.alphas, self.y, self.kernel, self.X, self.X, self.b) - self.y
self.errors = initial_error
# 判别函数一: f(x)=WX-b, 用于单一样本
def decision_function_output(cls, model, i):
if model.user_linear_optim:
# Equation (J1)
# 返回在平面中的位置(f(x)=WX-b)
return float(model.w.T @ model.X[i]) - model.b
# Equation (J10)
return np.sum([model.alphas[j] * model.y[j] * model.kernel(model.X[j], model.X[i]) for j in range(model.m)]) - model.b
# 判别函数二,用于多个样本
def decision_function(alphas, target, kernel, X_train, x_test, b):
""" Applies the SVM decision function to the input feature vectors in 'x_test'. """
result = (alphas * target) @ kernel(X_train, x_test) - b # *,@ 两个Operators的区别?
return result
# 选择了alpha2, alpha1后开始第一步优化,然后迭代
# 主要的优化步骤在这里发生
def take_step(self, i1, i2):
# 如果两个 alphas 相同,跳过
if i1 == i2:
return 0
# alpha1, alpha2, y1, y2, E1, E2, s 都是论文中出现的变量,含义与论文一致
alpha1 = self.alphas[i1]
alpha2 = self.alphas[i2]
y1 = self.y[i1]
y2 = self.y[i2]
E1 = get_error(self, i1)
E2 = get_error(self, i2)
s = y1 * y2
# 计算alpha的边界,L, H
if(y1 != y2):
# y1,y2 异号,使用 Equation (J13)
L = max(0, alpha2 - alpha1)
H = min(self.C, self.C + alpha2 - alpha1)
elif (y1 == y2):
# y1,y2 同号,使用 Equation (J14)
L = max(0, alpha1 + alpha2 - self.C)
H = min(self.C, alpha1 + alpha2)
if L == H:
return 0
# 分别计算样本1, 2对应的核函数组合,目的在于计算eta 也就是求一阶导数后的值,目的在于计算a2_new
k11 = self.kernel(self.X[i1], self.X[i1])
k12 = self.kernel(self.X[i1], self.X[i2])
k22 = self.kernel(self.X[i2], self.X[i2])
# 计算 eta,equation (J15)
eta = k11 + k22 - 2*k12
# 如论文中所述,分两种情况根据 eta 为正还是为负或0来计算计算 a2_new
# equation (J16) 计算alpha2
a2 = alpha2 + y2 * (E1 - E2)/eta
# 把a2夹到限定区间 equation (J17)
if L < a2 < H:
a2 = a2
elif (a2 <= L):
a2 = L
elif (a2 >= H):
a2 = H
# TODO: 这里还没有搞懂
# if eta is non-positive, move new a2 to bound with greater objective function value
# Equation (J19)
# 在特殊情况下,eta可能不为正
f1 = y1*(E1 + self.b) - alpha1*k11 - s*alpha2*k12
f2 = y2*(E2 + self.b) - s*alpha1*k12 - alpha2*k22
L1 = alpha1 + s*(alpha2 - L)
H1 = alpha1 + s*(alpha2 - H)
Lobj = L1 * f1 + L * f2 + 0.5 * (L1 ** 2) * k11 + 0.5 * (L**2) * k22 + s * L * L1 * k12
Hobj = H1 * f1 + H * f2 + 0.5 * (H1**2) * k11 + 0.5 * (H**2) * k22 + s * H * H1 * k12
if Lobj < Hobj - self.eps:
a2 = L
elif Lobj > Hobj + self.eps:
a2 = H
a2 = alpha2
# 当 new_a2 接近C或0时候,就让它等于C或0
if a2 <1e-8:
a2 = 0.0
elif a2 > (self.C - 1e-8):
a2 = self.C
#If examples can't be optimized within epsilon(eps), skip this pair
if (np.abs(a2 - alpha2) < self.eps * (a2 + alpha2 + self.eps)):
return 0
#根据新 a2 计算新 a1 Equation(J18)
a1 = alpha1 + s * (alpha2 - a2)
#更新 bias b的值 Equation (J20)
b1 = E1 + y1*(a1 - alpha1) * k11 + y2 * (a2 - alpha2) * k12 + self.b
#equation (J21)
b2 = E2 + y1*(a1 - alpha1) * k12 + y2 * (a2 - alpha2) * k22 + self.b
# Set new threshoold based on if a1 or a2 is bound by L and/or H
if 0 < a1 and a1 < C:
b_new = b1
elif 0 < a2 and a2 < C:
b_new = b2
# Average thresholds if both are bound
b_new = (b1 + b2) * 0.5
# 更新模型的 b
self.b = b_new
# 当所训练模型为线性核函数时
# Equation (J22) 计算w的值
if self.user_linear_optim:
self.w = self.w + y1 * (a1 - alpha1)*self.X[i1] + y2 * (a2 - alpha2) * self.X[i2]
# 在 alphas 矩阵中分别更新a1, a2的值
self.alphas[i1] = a1
self.alphas[i2] = a2
# 优化完了,更新差值矩阵的对应值; 同时更新差值矩阵其它值
self.errors[i1] = 0
self.errors[i2] = 0
# 更新差值 Equation (12)
for i in range(self.m):
if 0 < self.alphas[i] < self.C:
self.errors[i] += y1*(a1 - alpha1) * self.kernel(self.X[i1], self.X[i]) + \
y2*(a2 - alpha2) * self.kernel(self.X[i2], self.X[i]) + self.b - b_new
return 1
def examine_example(self, i2):
启发式寻找 alpha1
y2 = self.y[i2]
alpha2 = self.alphas[i2]
E2 = get_error(self, i2)
r2 = E2 * y2
#XXX: 这一段的重点在于确定 alpha1, 也就是 old_a1,并送到 take_step 去分析优化
# 下面条件之一满足,进入 if 开始找第二个 alpha,送到 take_step 进行优化
if ((r2 < -self.tol and alpha2 < self.C) or (r2 > self.tol and alpha2 > 0)):
if len(self.alphas[(self.alphas != 0) & (self.alphas != self.C)]) > 1:
# 选择Ei矩阵中差值最大的先进行优化
# 要想|E1-E2|最大,只需要在E2为正时,选择最小的Ei作为E1
# 在E2为负时选择最大的Ei作为E1
if self.errors[i2] > 0:
i1 = np.argmin(self.errors)
elif self.errors[i2] <= 0:
i1 = np.argmax(self.errors)
step_result = self.take_step(i1, i2)
if step_result:
return 1
# 循环所有非0 非C alphas值进行优化,随机选择起始点
for i1 in np.roll(np.where((self.alphas != 0) & (self.alphas != self.C))[0],
step_result = self.take_step(i1, i2)
if step_result:
return 1
#a2确定的情况下,如何选择a1? 循环所有(m-1) alphas, 随机选择起始点
for i1 in np.roll(np.arange(self.m), np.random.choice(np.arange(self.m))):
#print("what is the first i1",i1)
step_result = self.take_step(i1, i2)
if step_result:
return 1
return 0
def fit(self):
确定第一个alpha: alpha2
numChanged = 0 # alpha 更新量
examineAll = True # 需要遍历所有样本
# 计数器,记录优化时的循环次数
loopnum = 0
loopnum1 = 0
loopnum2 = 0
当numChanged = 0(alpha没有更新) and examineAll(不再需要遍历样本)时 循环退出
实际是顺序地执行完所有的样本 (第一个if中的循环)
else中的循环没有可优化的 alpha, 目标函数收敛了; 且在容差之内, 并且满足KKT条件; 则循环退出,如果执行3000次循环仍未收敛, 也退出
# XXX: 确定 alpha2,也就是old_a2, 或者说alpha2的下标,old_a2 和 old_a1都是启发式选择
while(numChanged > 0) or (examineAll):
numChanged = 0
if loopnum == self.loop_threshold: # 循环量大于最大阈值
loopnum = loopnum + 1
if examineAll:
loopnum1 = loopnum1 + 1 # 记录顺序一个一个选择 alpha2 时的循环次数
# 从0,1,2,3,...,m顺序选择 a2 的,送给 examine_example 选择 alpha1,总共 m*(m-1)种选法
for i in range(self.alphas.shape[0]):
examine_result = self.examine_example(i) # 是否发生改变
numChanged += examine_result
else: # 上面if里m(m-1)执行完的后执行
loopnum2 = loopnum2 + 1
# 遍历样本中 alphas 未更新的值
for i in np.where((self.alphas != 0) & (self.alphas != self.C))[0]:
examine_result = self.examine_example(i)
numChanged += examine_result
if examineAll:
examineAll = False
elif numChanged == 0:
examineAll = True
print(" loopnum012: %s, : %s, : %s" % (loopnum, loopnum1, loopnum2))
def linear_kernel(x, y, b=1):
""" returns the linear combination of arrays 'x' and 'y' with
the optional bias term 'b' (set to 1 by default). """
result = x @ y.T + b
return result # Note the @ operator for matrix multiplications
def gaussian_kernel(x, y, sigma=1):
"""Returns the gaussian similarity of arrays 'x' and 'y' with
kernel width paramenter 'sigma' (set to 1 by default)"""
if np.ndim(x) == 1 and np.ndim(y) == 1:
result = np.exp(-(np.linalg.norm(x-y,2))**2/(2*sigma**2))
elif(np.ndim(x)>1 and np.ndim(y) == 1) or (np.ndim(x) == 1 and np.ndim(y)>1):
result = np.exp(-(np.linalg.norm(x-y, 2, axis=1)**2)/(2*sigma**2))
elif np.ndim(x) > 1 and np.ndim(y) > 1 :
result = np.exp(-(np.linalg.norm(x[:, np.newaxis]- y[np.newaxis, :], 2, axis = 2) ** 2)/(2*sigma**2))
return result
def get_error(model, i1):
if 0< model.alphas[i1] <model.C:
return model.errors[i1]
return SMOStruct.decision_function_output(model, i1) - model.y[i1]
def plot_decision_boundary(model, ax, resolution=100, colors=('b','k','r'), levels = (-1, 0, 1)):
# Generate coordinate grid of shape [resolution x resolution]
# and evalute the model over the entire space
x_range = np.linspace(model.X[:, 0].min(), model.X[:, 0].max(), resolution)
yrange = np.linspace(model.X[:, 1].min(), model.X[:, 1].max(), resolution)
grid = [[SMOStruct.decision_function(model.alphas, model.y, model.kernel, model.X,
np.array([xr,yr]), model.b) for xr in x_range] for yr in yrange]
grid = np.array(grid).reshape(len(x_range), len(yrange))
# Plot decision contours using grid and make a scatter plot of training data
ax.contour(x_range, yrange, grid, levels=levels, linewidths = (1,1,1),
linestyles=('--', '-', '--'), colors=colors)
ax.scatter(model.X[:, 0], model.X[:, 1],
c=model.y,, lw=0, alpha =0.25)
# Plot support vectors (non-zero alphas) as circled points (linewidth >0)
mask = np.round(model.alphas, decimals = 2) !=0.0
ax.scatter(model.X[mask, 0], model.X[mask, 1],
c=model.y[mask],, lw=1, edgecolors='k')
return grid, ax
if __name__ == '__main__':
# 生成测试数据,训练样本
X_train, y = make_blobs(n_samples = 1000, centers =2, n_features=2, random_state = 2)
# StandardScaler()以及fit_transfrom函数的作用需要解释一下
scaler = StandardScaler() #数据预处理,使得经过处理的数据符合正态分布,即均值为0,标准差为1
X_train_scaled = scaler.fit_transform(X_train, y)
y[y == 0] = -1
# 将训练数据绘制出来
fig, ax = plt.subplots()
ax.scatter(X_train_scaled[:, 0], X_train_scaled[:, 1],
c=y,, lw=0, alpha =0.25)
# SVM超参数
C = 20.0 # 软间隔-惩罚因子
m = len(X_train_scaled) # 样本数
initial_alphas = np.zeros(m) # 初始化 alphas
initial_b = 0.0 # 初始化 b
tol = 0.01 # error 容忍度
eps = 0.01 # alpha 容忍度
model = SMOStruct(X_train_scaled, y, C, linear_kernel, initial_alphas, initial_b, np.zeros(m), tol, eps, user_linear_optim=True)
print("starting to fit...")
start = time.time()
# 开始训练
print("fit cost time: ", time.time()-start)
# 绘制训练完,找到分割平面的图
fig, ax = plt.subplots()
grid, ax = plot_decision_boundary(model, ax)
