本文是用机器学习打造聊天机器人系列的第四篇,将先对主要模块的代码进行展现和解读,末尾会给出完整代码的地址。建议先看主要模块的代码解读,有助于理解核心代码的思路,而后浏览完整项目代码的README文档,将项目跑起来体验如下,再针对性的根据接口去阅读各模块代码的实现。web
特征向量的构造有两种思想,一种是one-hot,一种是Dristributed Representation(这里用word2vec实现),通常来讲后者可以更好的表示词的含义,可是有时候咱们使用的句子来自特殊的领域,word2vec模型的预训练语料未必可以表示的很好,因此这个时候用one-hot就可能会表现的更好。算法
def build_feature(self, sentence, w_i_dict): """ 根据词汇表构造句子向量,其中用到的'w_i_dict'参数会经过如下方法先构造好: # 构建训练语料库 build_corpus_vocabulary() # 训练语料库分词 cut_corpus_vocabulary() # 构建训练语料库词汇反向索引 word_index_dict_ = load_vocabulary() # 存储训练语料库词汇反向索引 dump_word_index(word_index_dict_) :param sentence: 句子 :param w_i_dict: 词汇-位置索引字典 :return: one-hot 向量 """ # 分词 sentence_seg = jieba.cut(sentence) # 用0初始化one-hot向量,维数为词汇表的词的个数 sen_vec = np.zeros(len(w_i_dict)) # 词汇表的词的列表 w_i_dict_keys = w_i_dict.keys() # one-hot向量对应词在词典中的位置至1 for word in sentence_seg: if w_i_dict_keys.__contains__(word): sen_vec[w_i_dict[word]] = 1 return sen_vec
def sum_vecs_avg(self, text): """ 根据词向量模型构建句子向量 :param text: 句子 :return: """ # 加载词向量模型 word_vec_model = ModelsLoader().sf_words_vec_model # 用0值初始化一个同维数的向量,若是你知道你的词向量模型是多少维的,能够直接指定,不用采用下面的野路子 vec = np.zeros(word_vec_model['是'].shape[0]) # 分词 words_list = list(jieba.cut(text)) for w in words_list: try: # 将全部词的向量累加 vec = vec + word_vec_model[w] except KeyError as e: logging.warning('词‘%s’,不在词向量模型词汇表中', w) continue except ValueError as e: logging.error('Error:', e) break # 计算平均向量 vec = vec / len(words_list) return vec
和特征向量的构建同样,分两种方式,一种是基于贝叶斯算法(对应上面的one-hot特征),另外一种是基于句子向量各份量的算数平均值构成的向量和输入向量的夹角余弦类似度来分类(对应上面的词向量特征)。前者的训练是根据样本计算几率模型,后者的训练是提早计算好每一个类别的中心向量。json
def train_clf(self): """ 基于贝叶斯算法训练意图分类器,并存储为文件,以便下次使用 :return: """ dump_path = "%s/classifier_mnb.m" % get_resources_trained_models() # 加载训练样本数据 features_np, labels_np = load_train_data() features_np = np.array(features_np) labels_np = np.array(labels_np) # 开始训练 starttime = datetime.datetime.now() print("开始训练分类器...") # 使用多项式朴素贝叶斯算法训练模型 clf = MultinomialNB(alpha=0.1, fit_prior=True, class_prior=None) # 从第10个开始归入训练,前10将作为验证集评估模型的表现 clf.fit(features_np[10:], labels_np[10:]) endtime = datetime.datetime.now() print("===========训练耗时: %s" % (endtime - starttime).seconds) # 评估分类器在验证集上的表现 print("评估结果:%s" % clf.score(features_np[:10], labels_np[:10])) self.clf_nb = clf # 存储分类器 dump_clf(self) print("分类器存储位置:%s" % dump_path) return self def predict(self, feature_vec, clf): """ 预测(基于贝叶斯模型) :param feature_vec: 输入句子的特征向量 :param clf: 训练好的贝叶斯模型 :return: """ proba_pred_np = clf.clf_nb.predict_proba(np.array([feature_vec]))[0] logging.debug("预测结果的几率:%s", proba_pred_np) # 加载类别集合 labels_set = load_labels_set() label_score_list = [] for i, num in enumerate(proba_pred_np): # if num != 0.00000000e+00: if num >= current_app.config['THRESHOLD_INTENT_RECOGNITION']: label_score_list.append((labels_set[i], num)) if len(label_score_list) == 0: # 正常阈值下没有匹配项,就降级匹配 logging.debug("意图识别在正常分数阈值下没有匹配到任何项,进行降级匹配...") for i, num in enumerate(proba_pred_np): # if num != 0.00000000e+00: if num >= current_app.config['MINIMUM_THRESHOLD_INTENT_RECOGNITION']: label_score_list.append((labels_set[i], num)) rs = sorted(label_score_list, key=lambda item: item[1], reverse=True) return rs, [c for c, v in rs] def train_clf(self): """ 训练分类器(基于中心向量的方式) :return: """ data = DataLoader().load_train_data() logging.info("开始训练...") _, labels_centroids_dict = self.cal_centroid_vec(data) self.labels_centroids_dict = labels_centroids_dict self.labels = list(labels_centroids_dict.keys()) logging.info("训练完成!") # 存储分类器模型 self.dump(self) return self def cal_centroid_vec(self, data): """ 构建“类别-中心向量”字典 :param data: {'类别':{examples:'句子样本',centroid:'中心向量'}} :return: """ labels_centroids_dict = {} for the_label in data.keys(): centroid = self.get_centroid(data[the_label]["examples"]) data[the_label]["centroid"] = centroid labels_centroids_dict[the_label] = centroid return data, labels_centroids_dict def get_centroid(self, examples): """ 获取当前意图类别的中心向量。中心向量由examples中全部句子向量各份量上的算数平均数表示 :param examples: 当前类别下的全部样本句子 :return: """ word_vec_model = ModelsLoader().sf_words_vec_model word_dim = word_vec_model['是'].shape[0] C = np.zeros((len(examples), word_dim)) for idx, text in enumerate(examples): C[idx, :] = self.sum_vecs_avg(text) centroid = np.mean(C, axis=0) assert centroid.shape[0] == word_dim return centroid def predict(self, feature_vec, clf): """ 预测意图类别(基于向量夹角余弦值) :param feature_vec: 输入句子的特征向量 :param clf: 从接口继承下来的参数,这里用不到 :return: """ intents = self.labels # 分数计算规则:计算新句子的向量和当前意图类别的中心向量的夹角余弦值,下面其实能够改进如下,用矩阵并行计算代替for循环,可是由于类别目前很少,影响暂时不大。 scores = [(label_, np.dot(feature_vec, self.labels_centroids_dict[label_]) / ( np.linalg.norm(feature_vec) * np.linalg.norm(self.labels_centroids_dict[label_]))) for label_ in intents] rs = sorted(scores, key=lambda item: item[1], reverse=True) top1scores = rs[0][1] top1label = rs[0][0] logging.debug("top1的分数:%s,label:%s", top1scores, top1label) if top1scores >= current_app.config['THRESHOLD_INTENT_RECOGNITION']: rs = rs[:1] elif top1scores >= current_app.config['MINIMUM_THRESHOLD_INTENT_RECOGNITION']: logging.debug("意图识别在正常分数阈值下没有匹配到任何项,进行降级匹配...") elif top1scores < current_app.config['MINIMUM_THRESHOLD_INTENT_RECOGNITION']: logging.debug("意图识别在最小分数阈值下没有匹配到任何项...") rs = [] return rs, [c for c, v in rs]
def compare(self, statement, statement_vec): """ 比较夹角余弦值 :param statement: 输入句子对象 :param statement_vec: 句子样本特征向量,是一个二维list :return: 输入句子和各句子样本的类似度构成的二维数组 """ statement_text_vec = statement.text_vector statement_vec = np.array(statement_vec) # 向量化并行计算余弦值 similarity = np.dot(statement_text_vec, statement_vec.T) / ( np.linalg.norm(statement_text_vec) * np.linalg.norm(statement_vec, axis=1)).T print("similarity.shape %s" % similarity.shape) return similarity
本项目里,做者把训练语料的类型分红了闲聊和业务两大类,下面你会看到不少SF关键字,就是指业务,至于为何叫SF,是历史遗留(lan)的问题,没必要过于纠结。闲聊类目前咱们不拆分,因此代码和上面介绍chatterbot的时候的代码相似,可是对于业务类的样本,因为咱们须要分红多个类型,因此这里要建立多个chatterbot实例,下面展现的是业务类的chatbot的实例化过程:数组
def train_sf_chatbot(): data_root_dir = path_configer.get_classifier_train_samples() for file_name in os.listdir(data_root_dir): if file_name.startswith("QA_sf_"): __train(('%s/%s' % (get_chatter_corpus(), file_name)), file_name[:file_name.find('-')]) def __train(corpus_path, collection_name): print("开始训练SF...") starttime = datetime.now() chatbot = SF().chatters[collection_name] chatbot.set_trainer(ListTrainer) chatbot.train(read_custom(corpus_path)) print("SF训练完成!") endtime = datetime.now() print("===========训练耗时: %s秒" % (endtime - starttime).seconds) @singleton class SF(object): def __init__(self): logging.info('预加载sf词向量模型...') logging.info('预加载SF全部实例...') labels = [file_name[:file_name.find("-")] for file_name in os.listdir(path_configer.get_chatter_corpus()) if file_name.startswith("QA_sf_")] chatters = {} bot_name = current_app.config['DATABASE'] # 根据不一样的类型,建立不一样的ChatBot实例 for label in labels: chatters[label] = ( ChatBot( bot_name, database=bot_name, database_uri=current_app.config['DATABASE_URI'], # 使用合适的词向量模型时开启 preprocessors=[ 'kbqa_sf.train.chatter.sf.sf_preprocessors.sum_vecs_avg' ], statement_comparison_function=WordVecComparator(), # statement_comparison_function=levenshtein_distance, logic_adapters=[{'import_path': 'kbqa_sf.train.chatter.sf.sf_adapter.BestMatchExtLogicAdapter'}], storage_adapter="kbqa_sf.train.chatter.sf.sf_mongo_storage.MongoDatabaseExtAdapter", ext_collection_name=label, read_only=True) ) self.chatters = chatters logging.info('SF全部实例预加载完成!')
chatterbot提供了学习接口,就是方便之后再追加新的问答对,代码以下:微信
# a:问题对象Statement,q:回答对象Statement chatbot_.learn_response(a, q)
可是光是执行上面的代码,在咱们的项目中是不够的,由于当样本库变更了,咱们的意图分类器,词汇-索引字典,句子-句向量字典都要从新生成。若是你的样本库数量不大,那么这个过程仍是很快的,可是若是数据量比较大的话,好比上万条,那么这个过程须要几十秒到几分钟。因此不建议让用户可以直接经过web页面就使用这个学习的接口,而是采用异步的方式,先记录下用户提交的反馈,而后定时由程序在后台执行比较合适。固然,若是你是本身随便玩玩,数据量不大的话,直接经过web页面使用这个接口是最方便的了。在线学习的代码以下,分为记录和学习2个接口:app
@qac.route('/record', methods=['POST']) def record(): """ 将要学习的问题、答案、类别,写入文件learn目录下的wait-learn.txt、history-learn.txt :return: """ qac_list = request.get_json() learn_path = path_configer.get_learn() wait_learn_path = "%s/%s" % (learn_path, "wait-learn.txt") history_learn_path = "%s/%s" % (learn_path, "history-learn.txt") with __record_lock: fa_wait = codecs.open(wait_learn_path, "a", encoding="utf-8") fa_history = codecs.open(history_learn_path, "a", encoding="utf-8") for qac_item in qac_list: q = qac_item["q"] a = qac_item["a"] c = qac_item["c"] if 0 < len(a) <= 300 and len(q) > 0 and len(c) > 0: content = 'Q %s\nA %s\nC %s\n' % (q, a, c) fa_wait.write(content) fa_history.write( '%sT %s\n' % (content, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))) else: return make_response(jsonify({'error': '参数不符合要求,请检查!'}), 400) fa_wait.close() fa_history.close() logging.debug("=========待学习问题记录完成!") return "success" @qac.route('/learn/batch', methods=['GET']) def learn_batch(): """ 批量学习给定的问题和答案: 重命名wait-learn.txt为learning.txt,读取learning.txt的内容进行学习 :return: """ _learn_new_batch_lock = threading.Lock() logging.debug("开始学习...") starttime = datetime.datetime.now() learn_path = path_configer.get_learn() wait_learn_path = "%s/%s" % (learn_path, "wait-learn.txt") learning_path = "%s/%s" % (learn_path, "learning.txt") with __record_lock: if os.path.exists(learning_path): # 若上一次的临时文件未能删除,就在这里删除。 os.remove(learning_path) logging.info("=========发现上一次的临时文件未能删除,已删除!") if not os.path.exists(wait_learn_path): msg = "nothing" logging.info(msg) return msg os.rename(wait_learn_path, learning_path) logging.debug("重命名wait-learn.txt为learning.txt ...") with _learn_new_batch_lock: logging.debug("读取learning.txt的内容进行学习 ...") with codecs.open(learning_path, "r", encoding="utf-8") as fr: q = fr.readline().strip("\n\r") while q != "": a = fr.readline().strip("\n\r") assert a.strip("\n\r") != "", 'q,a,c格式没法匹配!缺乏a!' c = fr.readline().strip("\n\r") assert c.strip("\n\r") != "", 'q,a,c格式没法匹配!缺乏a!' # 添加q,a到指定的c类别文件;训练c对应的chatterbot logging.debug("添加%s,%s到指定的%s类别文件;训练对应的chatterbot ...", q, a, c) # 开始学习 learn_(q, a, c[c.find(" ") + 1:]) q = fr.readline().strip("\n\r") logging.debug("learning.txt学习所有完成...") logging.debug("完整的从新训练分类器模型 ...") IntentClassifier().full_retrain_clf() logging.debug("构建文本-向量索引文件,并存储 ...") IntentClassifier().build_text_vec_indx() logging.debug("加载文本向量索引文件 ...") IntentClassifier().load_text_vec_indx() # 删除临时的学习文件 os.remove(learning_path) endtime = datetime.datetime.now() print("===========本次学习耗时: %s秒" % (endtime - starttime).seconds) logging.info("=========本次学习已所有完成!") return "success" def learn_(q, a, c): """ 添加q,a到指定的c类别文件;训练c对应的chatterbot :param q: 问题 :param a: 答案 :param c: 分类 :return: """ file_names = [file_name for file_name in os.listdir(path_configer.get_chatter_corpus()) if file_name.startswith(c)] if not file_names: logging.warning("未知的类别:%s,已忽略", c) return file_name = file_names[0] file_path = "%s/%s" % (path_configer.get_chatter_corpus(), file_name) # 追加到c对应的意图分类文件中 with codecs.open(file_path, "a", encoding="utf-8") as fa: if len(q) > 0 and len(a) > 0: if os.path.getsize(file_path) == 0: fa.write('%s' % q) else: fa.write('\n%s' % q) fa.write('\n%s' % a) # 学习问答 qa_learn(q, a, c) return "success" def qa_learn(q, a, c): a_statement = Statement(a) q_statement = Statement(q) if c.startswith("QA_talk"): chat_bot = Talk().chat else: chat_bot = SF().chatters[c] chat_bot.learn_response(a_statement, q_statement)
以上是主要功能的代码,若要获取可运行的完整代码,能够加做者微信(jiabao512859468)获取,有任何相关技术问题,都欢迎和做者探讨O(∩_∩)O~机器学习
ok,有了代码,下一篇将介绍如何将聊天机器人项目应用到不一样的业务领域,以及如何接入其余项目中。异步
本篇就这么多内容啦~,感谢阅读O(∩_∩)O。学习