Skip to content

Instantly share code, notes, and snippets.

@royguo
Created November 11, 2012 08:31
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save royguo/4054164 to your computer and use it in GitHub Desktop.
Save royguo/4054164 to your computer and use it in GitHub Desktop.
NavieBayes
#!/usr/bin/env python
# encoding: utf-8
"""
author: royguo1988@gmail.com
"""
import os
import random
import re
class DataPrepare(object):
"""处理原始数据,为机器学习模型的训练作准备"""
def __init__(self, input_dir, train_data_file, test_data_file, train_file_percentage):
self.input_dir = input_dir
self.train_data_file = open(train_data_file,'w')
self.test_data_file = open(test_data_file,'w')
self.train_file_percentage = train_file_percentage
self.unique_words = []
# 每一个单词都使用一个数字类型的id表示,python索引的时候才会快一些
self.word_ids = {}
def __del__(self):
self.train_data_file.close()
self.test_data_file.close()
def prepare(self):
file_num = 0
output_file = self.test_data_file
for file_name in os.listdir(self.input_dir):
# arr = (1234,'business')
arr = re.findall(r'(\d+)(\w+)',file_name)[0]
category = arr[1]
# 随即函数按照train_file_percentage指定的百分比来选择训练和测试数据及
if random.random() < self.train_file_percentage:
output_file = self.train_data_file
else:
output_file = self.test_data_file
# 读取文件获得词组
words = []
with open(self.input_dir + '/' + file_name,'r') as f:
words = f.read().decode('utf-8').split()
output_file.write(category + ' ')
for word in words:
if word not in self.word_ids:
self.unique_words.append(word)
# 可以取Hash,这里为了简便期间,直接使用当前数组的长度(也是唯一的)
self.word_ids[word] = len(self.unique_words)
output_file.write(str(self.word_ids[word]) + " ")
output_file.write("#"+file_name+"\n")
# 原始文件较多,需要交互显示进度
file_num += 1
if file_num % 100 == 0:
print file_num,' files processed'
print file_num, " files loaded!"
print len(self.unique_words), " unique words found!"
if __name__ == '__main__':
dp = DataPrepare('newsdata','news.train','news.test',0.8)
dp.prepare()
#!/usr/bin/env python
#coding: utf-8
"""
author: royguo1988@gmail.com
"""
import math
class NavieBayesPredict(object):
"""使用训练好的模型进行预测"""
def __init__(self, test_data_file, model_data_file, result_file):
self.test_data_file = open(test_data_file,'r')
self.model_data_file = open(model_data_file,'r')
# 对测试数据集预测的结果文件
self.result_file = open(result_file,'w')
# 每个类别的先验概率
self.class_probabilities = {}
# 拉普拉斯平滑,防止概率为0的情况出现
self.laplace_smooth = 0.1
# 模型训练结果集
self.class_word_prob_matrix = {}
# 当某个单词在某类别下不存在时,默认的概率(拉普拉斯平滑后)
self.class_default_prob = {}
# 所有单词
self.unique_words = {}
# 实际的新闻分类
self.real_classes = []
# 预测的新闻分类
self.predict_classes = []
def __del__(self):
self.test_data_file.close()
self.model_data_file.close()
self.result_file.close()
def loadModel(self):
# 从模型文件的第一行读取类别的先验概率
class_probs = self.model_data_file.readline().split('#')
for cls in class_probs:
arr = cls.split()
if len(arr) == 3:
self.class_probabilities[arr[0]] = float(arr[1])
self.class_default_prob[arr[0]] = float(arr[2])
# 从模型文件读取单词在每个类别下的概率
line = self.model_data_file.readline().strip()
while len(line) > 0:
arr = line.split()
assert(len(arr) % 2 == 1)
assert(arr[0] in self.class_probabilities)
self.class_word_prob_matrix[arr[0]] = {}
i = 1
while i < len(arr):
word_id = int(arr[i])
probability = float(arr[i+1])
if word_id not in self.unique_words:
self.unique_words[word_id] = 1
self.class_word_prob_matrix[arr[0]][word_id] = probability
i += 2
line = self.model_data_file.readline().strip()
print len(self.class_probabilities), " classes loaded!", len(self.unique_words), "words!"
def caculate(self):
# 读取测试数据集
line = self.test_data_file.readline().strip()
while len(line) > 0:
arr = line.split()
class_id = arr[0]
words = arr[1:len(arr)-1]
# 把真实的分类保存起来
self.real_classes.append(class_id)
# 预测当前行(一个新闻)属于各个分类的概率
class_score = {}
for key in self.class_probabilities.keys():
class_score[key] = math.log(self.class_probabilities[key])
for word_id in words:
word_id = int(word_id)
if word_id not in self.unique_words:
continue
for class_id in self.class_probabilities.keys():
if word_id not in self.class_word_prob_matrix[class_id]:
class_score[class_id] += math.log(self.class_default_prob[class_id])
else:
class_score[class_id] += math.log(self.class_word_prob_matrix[class_id][word_id])
# 对于当前新闻,所属的概率最高的分类
max_class_score = max(class_score.values())
for key in class_score.keys():
if class_score[key] == max_class_score:
self.predict_classes.append(key)
line = self.test_data_file.readline().strip()
print len(self.real_classes),len(self.predict_classes)
def evaluation(self):
# 评价当前分类器的准确性
accuracy = 0
i = 0
while i < len(self.real_classes):
if self.real_classes[i] == self.predict_classes[i]:
accuracy += 1
i += 1
accuracy = (float)(accuracy)/(float)(len(self.real_classes))
print "Accuracy:",accuracy
# 评测精度和召回率
# 精度是指所有预测中,正确的预测
# 召回率是指所有对象中被正确预测的比率
for class_id in self.class_probabilities:
correctNum = 0
allNum = 0
predNum = 0
i = 0
while i < len(self.real_classes):
if self.real_classes[i] == class_id:
allNum += 1
if self.predict_classes[i] == self.real_classes[i]:
correctNum += 1
if self.predict_classes[i] == class_id:
predNum += 1
i += 1
precision = (float)(correctNum)/(float)(predNum)
recall = (float)(correctNum)/(float)(allNum)
print class_id,' -> precision = ',precision,' recall = ',recall
def predict(self):
self.loadModel()
self.caculate()
self.evaluation()
if __name__ == '__main__':
nbp = NavieBayesPredict('news.test','news.model','news.result')
nbp.predict()
#!/usr/bin/env python
# coding: utf-8
"""
author: royguo1988@gmail.com
"""
class NavieBayes(object):
"""朴素贝叶斯模型"""
def __init__(self,train_data_file,model_file):
self.train_data_file = open(train_data_file,'r')
self.model_file = open(model_file,'w')
# 存储每一种类型出现的次数
self.class_count = {}
# 存储每一种类型下各个单词出现的次数
self.class_word_count = {}
# 唯一单词总数
self.unique_words = {}
# ~~~~~~~~~~ NavieBayes参数 ~~~~~~~~~~~~#
# 每个类别的先验概率
self.class_probabilities = {}
# 拉普拉斯平滑,防止概率为0的情况出现
self.laplace_smooth = 0.1
# 模型训练结果集
self.class_word_prob_matrix = {}
# 当某个单词在某类别下不存在时,默认的概率(拉普拉斯平滑后)
self.class_default_prob = {}
def __del__(self):
self.train_data_file.close()
self.model_file.close()
def loadData(self):
line_num = 0
line = self.train_data_file.readline().strip()
while len(line) > 0:
words = line.split('#')[0].split()
category = words[0]
if category not in self.class_count:
self.class_count[category] = 0
self.class_word_count[category] = {}
self.class_word_prob_matrix[category] = {}
self.class_count[category] += 1
for word in words[1:]:
word_id = int(word)
if word_id not in self.unique_words:
self.unique_words[word_id] = 1
if word_id not in self.class_word_count[category]:
self.class_word_count[category][word_id] = 1
else:
self.class_word_count[category][word_id] += 1
line = self.train_data_file.readline().strip()
line_num += 1
if line_num % 100 == 0:
print line_num,' lines processed'
print line_num,' training instances loaded'
print len(self.class_count), " categories!", len(self.unique_words), "words!"
def computeModel(self):
# 计算P(Yi)
news_count = 0
for count in self.class_count.values():
news_count += count
for class_id in self.class_count.keys():
self.class_probabilities[class_id] = float(self.class_count[class_id]) / news_count
# 计算P(X|Yi) <===> 计算所有 P(Xi|Yi)的积 <===> 计算所有 Log(P(Xi|Yi)) 的和
for class_id in self.class_word_count.keys():
# 当前类别下所有单词的总数
sum = 0.0
for word_id in self.class_word_count[class_id].keys():
sum += self.class_word_count[class_id][word_id]
count_Yi = (float)(sum + len(self.unique_words)*self.laplace_smooth)
# 计算单个单词在某类别下的概率,存储在结果矩阵中,所有当前类别没有的单词赋予默认概率(即使用拉普拉斯平滑)
for word_id in self.class_word_count[class_id].keys():
self.class_word_prob_matrix[class_id][word_id] = \
(float)(self.class_word_count[class_id][word_id]+self.laplace_smooth) / count_Yi
self.class_default_prob[class_id] = (float)(self.laplace_smooth) / count_Yi
print class_id,' matrix finished, length = ',len(self.class_word_prob_matrix[class_id])
return
def saveModel(self):
# 把每个分类的先验概率写入文件
for class_id in self.class_probabilities.keys():
self.model_file.write(class_id)
self.model_file.write(' ')
self.model_file.write(str(self.class_probabilities[class_id]))
self.model_file.write(' ')
self.model_file.write(str(self.class_default_prob[class_id]))
self.model_file.write('#')
self.model_file.write('\n')
# 把每个单词在当前类别的概率写入文件
for class_id in self.class_word_prob_matrix.keys():
self.model_file.write(class_id + ' ')
for word_id in self.class_word_prob_matrix[class_id].keys():
self.model_file.write(str(word_id) + ' ' \
+ str(self.class_word_prob_matrix[class_id][word_id]))
self.model_file.write(' ')
self.model_file.write('\n')
return
def train(self):
self.loadData()
self.computeModel()
self.saveModel()
if __name__ == '__main__':
nb = NavieBayes('news.train','news.model')
nb.train()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment