介绍
在第一部分,已经介绍协同推荐,并利用它实现了一个简单的推荐系统,在第二部分,咱们简要的分析了咱们拥有的数据,包括每一个字段的分布,常见的统计信息等,为接下来的多路召回提供了很好的指引。html
回想一下baseline的思路,咱们首先计算了item的之间的类似度,而后基于用户的正反馈item列表,找到与列表中每个item类似度最高的topn个item,组成一个列表,最后直接按照类似度得分进行排序,获得最后的推荐结果。python
在实际的推荐场景中,一般会有两个阶段,第一个阶段是召回阶段,第二个阶段是排序阶段。第一个阶段召回那些类似度较高的N个item列表,它更关注的是召回率,较为粗略;而排序阶段会使用更为复杂的模型进行监督学习(转化为分类的任务),获得分类几率,也就是置信度,最后按照置信度进行排序,取top K个item做为最后的推荐列表。git
baseline中其实只包含了召回
这个阶段,虽然但就这个任务而言,它已经够了。本节介绍的是多路召回,什么是多路召回呢,好比在一个推荐场景中,咱们能够选择ItemCF或者UserCF以及基于热点的召回策略等等,由于咱们召回层的目的是为了尽量的确保召回,因此基于单个的策略确定是效果不如多个策略的,这里就引出了多路召回的概念,也就是多个策略并行的进行召回。下面这个图表示了多路召回的一个例子。github
在多路召回中,每一个策略之间绝不相关,可使用多种不一样的策略来获取用户排序的候选商品集合,而具体使用哪些召回策略实际上是与业务强相关
的 ,针对不一样的任务就会有对于该业务真实场景下须要考虑的召回规则。例如新闻推荐,召回规则能够是“热门视频”、“导演召回”、“演员召回”、“最近上映“、”流行趋势“、”类型召回“等等。算法
导入相关库segmentfault
import pandas as pd import numpy as np from tqdm import tqdm from collections import defaultdict import os, math, warnings, math, pickle from tqdm import tqdm import faiss import collections import random from sklearn.preprocessing import MinMaxScaler from sklearn.preprocessing import LabelEncoder from datetime import datetime from deepctr.feature_column import SparseFeat, VarLenSparseFeat from sklearn.preprocessing import LabelEncoder from tensorflow.python.keras import backend as K from tensorflow.python.keras.models import Model from tensorflow.python.keras.preprocessing.sequence import pad_sequences from deepmatch.models import * from deepmatch.utils import sampledsoftmaxloss warnings.filterwarnings('ignore')
data_path = './data_raw/' save_path = './temp_results/' # 作召回评估的一个标志, 若是不进行评估就是直接使用全量数据进行召回 metric_recall = False
读取数据架构
在通常的推荐系统比赛中读取数据部分主要分为三种模式, 不一样的模式对应的不一样的数据集:app
下面就分别对这三种不一样的数据读取模式先创建不一样的代导入函数, 方便后面针对不一样的模式下导入数据。框架
# debug模式: 从训练集中划出一部分数据来调试代码 def get_all_click_sample(data_path, sample_nums=10000): """ 训练集中采样一部分数据调试 data_path: 原数据的存储路径 sample_nums: 采样数目(这里因为机器的内存限制,能够采样用户作) """ all_click = pd.read_csv(data_path + 'train_click_log.csv') all_user_ids = all_click.user_id.unique() sample_user_ids = np.random.choice(all_user_ids, size=sample_nums, replace=False) all_click = all_click[all_click['user_id'].isin(sample_user_ids)] all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp'])) return all_click # 读取点击数据,这里分红线上和线下,若是是为了获取线上提交结果应该讲测试集中的点击数据合并到总的数据中 # 若是是为了线下验证模型的有效性或者特征的有效性,能够只使用训练集 def get_all_click_df(data_path='./data', offline=True): if offline: all_click = pd.read_csv(data_path + '/train_click_log.csv') else: trn_click = pd.read_csv(data_path + '/train_click_log.csv') tst_click = pd.read_csv(data_path + '/testA_click_log.csv') all_click = trn_click.append(tst_click) all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp'])) return all_click
# 读取文章的基本属性 def get_item_info_df(data_path): item_info_df = pd.read_csv(data_path + 'articles.csv') # 为了方便与训练集中的click_article_id拼接,须要把article_id修改为click_article_id item_info_df = item_info_df.rename(columns={'article_id': 'click_article_id'}) return item_info_df
# 读取文章的Embedding数据 def get_item_emb_dict(data_path): item_emb_df = pd.read_csv(data_path + '/articles_emb.csv') item_emb_cols = [x for x in item_emb_df.columns if 'emb' in x] item_emb_np = np.ascontiguousarray(item_emb_df[item_emb_cols]) # 进行归一化 item_emb_np = item_emb_np / np.linalg.norm(item_emb_np, axis=1, keepdims=True) item_emb_dict = dict(zip(item_emb_df['article_id'], item_emb_np)) pickle.dump(item_emb_dict, open(save_path + 'item_content_emb.pkl', 'wb')) return item_emb_dict
# min-max 归一化函数 max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x))
# 采样数据 # all_click_df = get_all_click_sample(data_path) # 全量训练集 all_click_df = get_all_click_df(offline=False) # 对时间戳进行归一化,用于在关联规则的时候计算权重 all_click_df['click_timestamp'] = all_click_df[['click_timestamp']].apply(max_min_scaler) item_info_df = get_item_info_df(data_path) item_emb_dict = get_item_emb_dict(data_path)
工具函数
获取用户-文章-时间函数dom
这个在基于关联规则的用户协同过滤的时候会用到
# 根据点击时间获取用户的点击文章序列 {user1: [(item1, time1), (item2, time2)..]...} def get_user_item_time(click_df): click_df = click_df.sort_values('click_timestamp') def make_item_time_pair(df): return list(zip(df['click_article_id'], df['click_timestamp'])) user_item_time_df = click_df.groupby('user_id')['click_article_id', 'click_timestamp'].apply(lambda x: make_item_time_pair(x))\ .reset_index().rename(columns={0: 'item_time_list'}) user_item_time_dict = dict(zip(user_item_time_df['user_id'], user_item_time_df['item_time_list'])) return user_item_time_dict
获取文章-用户-时间函数
这个在基于关联规则的文章协同过滤的时候会用到
# 根据时间获取商品被点击的用户序列 {item1: [(user1, time1), (user2, time2)...]...} # 这里的时间是用户点击当前商品的时间,好像没有直接的关系。 def get_item_user_time_dict(click_df): def make_user_time_pair(df): return list(zip(df['user_id'], df['click_timestamp'])) click_df = click_df.sort_values('click_timestamp') item_user_time_df = click_df.groupby('click_article_id')['user_id', 'click_timestamp'].apply(lambda x: make_user_time_pair(x))\ .reset_index().rename(columns={0: 'user_time_list'}) item_user_time_dict = dict(zip(item_user_time_df['click_article_id'], item_user_time_df['user_time_list'])) return item_user_time_dict
获取历史和最后一次点击
这个在评估召回结果, 特征工程和制做标签转成监督学习测试集的时候回用到
# 获取当前数据的历史点击和最后一次点击 def get_hist_and_last_click(all_click): all_click = all_click.sort_values(by=['user_id', 'click_timestamp']) click_last_df = all_click.groupby('user_id').tail(1) # 若是用户只有一个点击,hist为空了,会致使训练的时候这个用户不可见,此时默认泄露一下 def hist_func(user_df): if len(user_df) == 1: return user_df else: return user_df[:-1] click_hist_df = all_click.groupby('user_id').apply(hist_func).reset_index(drop=True) return click_hist_df, click_last_df
获取文章属性特征
# 获取文章id对应的基本属性,保存成字典的形式,方便后面召回阶段,冷启动阶段直接使用 def get_item_info_dict(item_info_df): max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x)) item_info_df['created_at_ts'] = item_info_df[['created_at_ts']].apply(max_min_scaler) item_type_dict = dict(zip(item_info_df['click_article_id'], item_info_df['category_id'])) item_words_dict = dict(zip(item_info_df['click_article_id'], item_info_df['words_count'])) item_created_time_dict = dict(zip(item_info_df['click_article_id'], item_info_df['created_at_ts'])) return item_type_dict, item_words_dict, item_created_time_dict
获取用户历史点击的文章信息
def get_user_hist_item_info_dict(all_click): # 获取user_id对应的用户历史点击文章类型的集合字典 user_hist_item_typs = all_click.groupby('user_id')['category_id'].agg(set).reset_index() user_hist_item_typs_dict = dict(zip(user_hist_item_typs['user_id'], user_hist_item_typs['category_id'])) # 获取user_id对应的用户点击文章的集合 user_hist_item_ids_dict = all_click.groupby('user_id')['click_article_id'].agg(set).reset_index() user_hist_item_ids_dict = dict(zip(user_hist_item_ids_dict['user_id'], user_hist_item_ids_dict['click_article_id'])) # 获取user_id对应的用户历史点击的文章的平均字数字典 user_hist_item_words = all_click.groupby('user_id')['words_count'].agg('mean').reset_index() user_hist_item_words_dict = dict(zip(user_hist_item_words['user_id'], user_hist_item_words['words_count'])) # 获取user_id对应的用户最后一次点击的文章的建立时间 all_click_ = all_click.sort_values('click_timestamp') user_last_item_created_time = all_click_.groupby('user_id')['created_at_ts'].apply(lambda x: x.iloc[-1]).reset_index() max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x)) user_last_item_created_time['created_at_ts'] = user_last_item_created_time[['created_at_ts']].apply(max_min_scaler) user_last_item_created_time_dict = dict(zip(user_last_item_created_time['user_id'], \ user_last_item_created_time['created_at_ts'])) return user_hist_item_typs_dict, user_hist_item_ids_dict, user_hist_item_words_dict, user_last_item_created_time_dict
获取点击次数最多的Top-k个文章
获取近期点击最多的文章
def get_item_topk_click(click_df, k): topk_click = click_df['click_article_id'].value_counts().index[:k] return topk_click
定义多路召回字典
获取文章的属性信息,保存成字典的形式方便查询
item_type_dict, item_words_dict, item_created_time_dict = get_item_info_dict(item_info_df) # 定义一个多路召回的字典,将各路召回的结果都保存在这个字典当中 user_multi_recall_dict = {'itemcf_sim_itemcf_recall': {}, 'embedding_sim_item_recall': {}, 'youtubednn_recall': {}, 'youtubednn_usercf_recall': {}, 'cold_start_recall': {}} # 提取最后一次点击做为召回评估,若是不须要作召回评估直接使用全量的训练集进行召回(线下验证模型) # 若是不是召回评估,直接使用全量数据进行召回,不用将最后一次提取出来 trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)
召回效果评估
作完了召回有时候也须要对当前的召回方法或者参数进行调整以达到更好的召回效果,由于召回的结果决定了最终排序的上限,下面也会提供一个召回评估的方法
# 依次评估召回的前10, 20, 30, 40, 50个文章中的击中率 def metrics_recall(user_recall_items_dict, trn_last_click_df, topk=5): last_click_item_dict = dict(zip(trn_last_click_df['user_id'], trn_last_click_df['click_article_id'])) user_num = len(user_recall_items_dict) for k in range(10, topk+1, 10): hit_num = 0 for user, item_list in user_recall_items_dict.items(): # 获取前k个召回的结果 tmp_recall_items = [x[0] for x in user_recall_items_dict[user][:k]] if last_click_item_dict[user] in set(tmp_recall_items): hit_num += 1 hit_rate = round(hit_num * 1.0 / user_num, 5) print(' topk: ', k, ' : ', 'hit_num: ', hit_num, 'hit_rate: ', hit_rate, 'user_num : ', user_num)
计算类似性矩阵
这一部分主要是经过协同过滤以及向量检索获得类似性矩阵,类似性矩阵主要分为user2user和item2item,下面依次获取基于itemCF的item2item的类似性矩阵。
itemCF i2i_sim
借鉴KDD2020的去偏商品推荐,在计算item2item类似性矩阵时,使用关联规则,使得计算的文章的类似性还考虑到了:
def itemcf_sim(df, item_created_time_dict): """ 文章与文章之间的类似性矩阵计算 :param df: 数据表 :item_created_time_dict: 文章建立时间的字典 return : 文章与文章的类似性矩阵 思路: 基于物品的协同过滤(详细请参考上一期推荐系统基础的组队学习) + 关联规则 """ user_item_time_dict = get_user_item_time(df) # 计算物品类似度 i2i_sim = {} item_cnt = defaultdict(int) for user, item_time_list in tqdm(user_item_time_dict.items()): # 在基于商品的协同过滤优化的时候能够考虑时间因素 for loc1, (i, i_click_time) in enumerate(item_time_list): item_cnt[i] += 1 i2i_sim.setdefault(i, {}) for loc2, (j, j_click_time) in enumerate(item_time_list): if(i == j): continue # 考虑文章的正向顺序点击和反向顺序点击 loc_alpha = 1.0 if loc2 > loc1 else 0.7 # 位置信息权重,其中的参数能够调节 loc_weight = loc_alpha * (0.9 ** (np.abs(loc2 - loc1) - 1)) # 点击时间权重,其中的参数能够调节 click_time_weight = np.exp(0.7 ** np.abs(i_click_time - j_click_time)) # 两篇文章建立时间的权重,其中的参数能够调节 created_time_weight = np.exp(0.8 ** np.abs(item_created_time_dict[i] - item_created_time_dict[j])) i2i_sim[i].setdefault(j, 0) # 考虑多种因素的权重计算最终的文章之间的类似度 i2i_sim[i][j] += loc_weight * click_time_weight * created_time_weight / math.log(len(item_time_list) + 1) i2i_sim_ = i2i_sim.copy() for i, related_items in i2i_sim.items(): for j, wij in related_items.items(): i2i_sim_[i][j] = wij / math.sqrt(item_cnt[i] * item_cnt[j]) # 将获得的类似性矩阵保存到本地 pickle.dump(i2i_sim_, open(save_path + 'itemcf_i2i_sim.pkl', 'wb')) return i2i_sim_ i2i_sim = itemcf_sim(all_click_df, item_created_time_dict)
100%|██████████| 250000/250000 [14:20<00:00, 290.38it/s]
userCF u2u_sim
在计算用户之间的类似度的时候,也可使用一些简单的关联规则,好比用户活跃度权重,这里将用户的点击次数做为用户活跃度的指标
def get_user_activate_degree_dict(all_click_df): all_click_df_ = all_click_df.groupby('user_id')['click_article_id'].count().reset_index() # 用户活跃度归一化 mm = MinMaxScaler() all_click_df_['click_article_id'] = mm.fit_transform(all_click_df_[['click_article_id']]) user_activate_degree_dict = dict(zip(all_click_df_['user_id'], all_click_df_['click_article_id'])) return user_activate_degree_dict
def usercf_sim(all_click_df, user_activate_degree_dict): """ 用户类似性矩阵计算 :param all_click_df: 数据表 :param user_activate_degree_dict: 用户活跃度的字典 return 用户类似性矩阵 思路: 基于用户的协同过滤(详细请参考上一期推荐系统基础的组队学习) + 关联规则 """ item_user_time_dict = get_item_user_time_dict(all_click_df) u2u_sim = {} user_cnt = defaultdict(int) for item, user_time_list in tqdm(item_user_time_dict.items()): for u, click_time in user_time_list: user_cnt[u] += 1 u2u_sim.setdefault(u, {}) for v, click_time in user_time_list: u2u_sim[u].setdefault(v, 0) if u == v: continue # 用户平均活跃度做为活跃度的权重,这里的式子也能够改善 activate_weight = 100 * 0.5 * (user_activate_degree_dict[u] + user_activate_degree_dict[v]) u2u_sim[u][v] += activate_weight / math.log(len(user_time_list) + 1) u2u_sim_ = u2u_sim.copy() for u, related_users in u2u_sim.items(): for v, wij in related_users.items(): u2u_sim_[u][v] = wij / math.sqrt(user_cnt[u] * user_cnt[v]) # 将获得的类似性矩阵保存到本地 pickle.dump(u2u_sim_, open(save_path + 'usercf_u2u_sim.pkl', 'wb')) return u2u_sim_
item_type_dict, item_words_dict, item_created_time_dict = get_item_info_dict(item_info_df)
# 因为usercf计算时候太耗费内存了,这里就不直接运行了 # 若是是采样的话,是能够运行的 user_activate_degree_dict = get_user_activate_degree_dict(all_click_df) u2u_sim = usercf_sim(all_click_df, user_activate_degree_dict)
item embedding sim
使用Embedding计算item之间的类似度是为了后续冷启动的时候能够获取未出如今点击数据中的文章,后面有对冷启动专门的介绍,这里简单的说一下faiss。
aiss是Facebook的AI团队开源的一套用于作聚类或者类似性搜索的软件库,底层是用C++实现。Faiss由于超级优越的性能,被普遍应用于推荐相关的业务当中.
faiss工具包通常使用在推荐系统中的向量召回部分。在作向量召回的时候要么是u2u,u2i或者i2i,这里的u和i指的是user和item.咱们知道在实际的场景中user和item的数量都是海量的,咱们最容易想到的基于向量类似度的召回就是使用两层循环遍历user列表或者item列表计算两个向量的类似度,可是这样作在面对海量数据是不切实际的,faiss就是用来加速计算某个查询向量最类似的topk个索引向量。
faiss查询的原理:
faiss使用了PCA和PQ(Product quantization乘积量化)两种技术进行向量压缩和编码,固然还使用了其余的技术进行优化,可是PCA和PQ是其中最核心部分。
faiss使用
# 向量检索类似度计算 # topk指的是每一个item, faiss搜索后返回最类似的topk个item def embdding_sim(click_df, item_emb_df, save_path, topk): """ 基于内容的文章embedding类似性矩阵计算 :param click_df: 数据表 :param item_emb_df: 文章的embedding :param save_path: 保存路径 :patam topk: 找最类似的topk篇 return 文章类似性矩阵 思路: 对于每一篇文章, 基于embedding的类似性返回topk个与其最类似的文章, 只不过因为文章数量太多,这里用了faiss进行加速 """ # 文章索引与文章id的字典映射 item_idx_2_rawid_dict = dict(zip(item_emb_df.index, item_emb_df['article_id'])) item_emb_cols = [x for x in item_emb_df.columns if 'emb' in x] item_emb_np = np.ascontiguousarray(item_emb_df[item_emb_cols].values, dtype=np.float32) # 向量进行单位化 item_emb_np = item_emb_np / np.linalg.norm(item_emb_np, axis=1, keepdims=True) # 创建faiss索引 item_index = faiss.IndexFlatIP(item_emb_np.shape[1]) item_index.add(item_emb_np) # 类似度查询,给每一个索引位置上的向量返回topk个item以及类似度 sim, idx = item_index.search(item_emb_np, topk) # 返回的是列表 # 将向量检索的结果保存成原始id的对应关系 item_sim_dict = collections.defaultdict(dict) for target_idx, sim_value_list, rele_idx_list in tqdm(zip(range(len(item_emb_np)), sim, idx)): target_raw_id = item_idx_2_rawid_dict[target_idx] # 从1开始是为了去掉商品自己, 因此最终得到的类似商品只有topk-1 for rele_idx, sim_value in zip(rele_idx_list[1:], sim_value_list[1:]): rele_raw_id = item_idx_2_rawid_dict[rele_idx] item_sim_dict[target_raw_id][rele_raw_id] = item_sim_dict.get(target_raw_id, {}).get(rele_raw_id, 0) + sim_value # 保存i2i类似度矩阵 pickle.dump(item_sim_dict, open(save_path + 'emb_i2i_sim.pkl', 'wb')) return item_sim_dict
item_emb_df = pd.read_csv(data_path + '/articles_emb.csv') emb_i2i_sim = embdding_sim(all_click_df, item_emb_df, save_path, topk=10) # topk能够自行设置
召回
这个就是咱们开篇提到的那个问题, 面的36万篇文章, 20多万用户的推荐, 咱们又有哪些策略来缩减问题的规模? 咱们就能够再召回阶段筛选出用户对于点击文章的候选集合, 从而下降问题的规模。召回经常使用的策略:
基于文章的召回
基于用户的召回
上面的各类召回方式一部分在基于用户已经看得文章的基础上去召回与这些文章类似的一些文章, 而这个类似性的计算方式不一样, 就获得了不一样的召回方式, 好比文章的协同过滤, 文章内容的embedding等。还有一部分是根据用户的类似性进行推荐,对于某用户推荐与其类似的其余用户看过的文章,好比用户的协同过滤和用户embedding。 还有一种思路是相似矩阵分解的思路,先计算出用户和文章的embedding以后,就能够直接算用户和文章的类似度, 根据这个类似度进行推荐, 好比YouTube DNN。 咱们下面详细来看一下每个召回方法:
(这一步是直接获取用户召回的候选文章列表)
Youtubednn召回架构
关于YoutubeDNN原理和应用推荐看王喆的两篇博客:
# 获取双塔召回时的训练验证数据 # negsample指的是经过滑窗构建样本的时候,负样本的数量 def gen_data_set(data, negsample=0): data.sort_values("click_timestamp", inplace=True) item_ids = data['click_article_id'].unique() train_set = [] test_set = [] for reviewerID, hist in tqdm(data.groupby('user_id')): pos_list = hist['click_article_id'].tolist() if negsample > 0: candidate_set = list(set(item_ids) - set(pos_list)) # 用户没看过的文章里面选择负样本 neg_list = np.random.choice(candidate_set,size=len(pos_list)*negsample,replace=True) # 对于每一个正样本,选择n个负样本 # 长度只有一个的时候,须要把这条数据也放到训练集中,否则的话最终学到的embedding就会有缺失 if len(pos_list) == 1: train_set.append((reviewerID, [pos_list[0]], pos_list[0],1,len(pos_list))) test_set.append((reviewerID, [pos_list[0]], pos_list[0],1,len(pos_list))) # 滑窗构造正负样本 for i in range(1, len(pos_list)): hist = pos_list[:i] if i != len(pos_list) - 1: train_set.append((reviewerID, hist[::-1], pos_list[i], 1, len(hist[::-1]))) # 正样本 [user_id, his_item, pos_item, label, len(his_item)] for negi in range(negsample): train_set.append((reviewerID, hist[::-1], neg_list[i*negsample+negi], 0,len(hist[::-1]))) # 负样本 [user_id, his_item, neg_item, label, len(his_item)] else: # 将最长的那一个序列长度做为测试数据 test_set.append((reviewerID, hist[::-1], pos_list[i],1,len(hist[::-1]))) random.shuffle(train_set) random.shuffle(test_set) return train_set, test_set # 将输入的数据进行padding,使得序列特征的长度都一致 def gen_model_input(train_set,user_profile,seq_max_len): train_uid = np.array([line[0] for line in train_set]) train_seq = [line[1] for line in train_set] train_iid = np.array([line[2] for line in train_set]) train_label = np.array([line[3] for line in train_set]) train_hist_len = np.array([line[4] for line in train_set]) train_seq_pad = pad_sequences(train_seq, maxlen=seq_max_len, padding='post', truncating='post', value=0) train_model_input = {"user_id": train_uid, "click_article_id": train_iid, "hist_article_id": train_seq_pad, "hist_len": train_hist_len} return train_model_input, train_label
def youtubednn_u2i_dict(data, topk=20): sparse_features = ["click_article_id", "user_id"] SEQ_LEN = 30 # 用户点击序列的长度,短的填充,长的截断 user_profile_ = data[["user_id"]].drop_duplicates('user_id') item_profile_ = data[["click_article_id"]].drop_duplicates('click_article_id') # 类别编码 features = ["click_article_id", "user_id"] feature_max_idx = {} for feature in features: lbe = LabelEncoder() data[feature] = lbe.fit_transform(data[feature]) feature_max_idx[feature] = data[feature].max() + 1 # 提取user和item的画像,这里具体选择哪些特征还须要进一步的分析和考虑 user_profile = data[["user_id"]].drop_duplicates('user_id') item_profile = data[["click_article_id"]].drop_duplicates('click_article_id') user_index_2_rawid = dict(zip(user_profile['user_id'], user_profile_['user_id'])) item_index_2_rawid = dict(zip(item_profile['click_article_id'], item_profile_['click_article_id'])) # 划分训练和测试集 # 因为深度学习须要的数据量一般都是很是大的,因此为了保证召回的效果,每每会经过滑窗的形式扩充训练样本 train_set, test_set = gen_data_set(data, 0) # 整理输入数据,具体的操做能够看上面的函数 train_model_input, train_label = gen_model_input(train_set, user_profile, SEQ_LEN) test_model_input, test_label = gen_model_input(test_set, user_profile, SEQ_LEN) # 肯定Embedding的维度 embedding_dim = 16 # 将数据整理成模型能够直接输入的形式 user_feature_columns = [SparseFeat('user_id', feature_max_idx['user_id'], embedding_dim), VarLenSparseFeat(SparseFeat('hist_article_id', feature_max_idx['click_article_id'], embedding_dim, embedding_name="click_article_id"), SEQ_LEN, 'mean', 'hist_len'),] item_feature_columns = [SparseFeat('click_article_id', feature_max_idx['click_article_id'], embedding_dim)] # 模型的定义 # num_sampled: 负采样时的样本数量 model = YoutubeDNN(user_feature_columns, item_feature_columns, num_sampled=5, user_dnn_hidden_units=(64, embedding_dim)) # 模型编译 model.compile(optimizer="adam", loss=sampledsoftmaxloss) # 模型训练,这里能够定义验证集的比例,若是设置为0的话就是全量数据直接进行训练 history = model.fit(train_model_input, train_label, batch_size=256, epochs=1, verbose=1, validation_split=0.0) # 训练完模型以后,提取训练的Embedding,包括user端和item端 test_user_model_input = test_model_input all_item_model_input = {"click_article_id": item_profile['click_article_id'].values} user_embedding_model = Model(inputs=model.user_input, outputs=model.user_embedding) item_embedding_model = Model(inputs=model.item_input, outputs=model.item_embedding) # 保存当前的item_embedding 和 user_embedding 排序的时候可能可以用到,可是须要注意保存的时候须要和原始的id对应 user_embs = user_embedding_model.predict(test_user_model_input, batch_size=2 ** 12) item_embs = item_embedding_model.predict(all_item_model_input, batch_size=2 ** 12) # embedding保存以前归一化一下 user_embs = user_embs / np.linalg.norm(user_embs, axis=1, keepdims=True) item_embs = item_embs / np.linalg.norm(item_embs, axis=1, keepdims=True) # 将Embedding转换成字典的形式方便查询 raw_user_id_emb_dict = {user_index_2_rawid[k]: v for k, v in zip(user_profile['user_id'], user_embs)} raw_item_id_emb_dict = {item_index_2_rawid[k]: v for k, v in zip(item_profile['click_article_id'], item_embs)} # 将Embedding保存到本地 pickle.dump(raw_user_id_emb_dict, open(save_path + 'user_youtube_emb.pkl', 'wb')) pickle.dump(raw_item_id_emb_dict, open(save_path + 'item_youtube_emb.pkl', 'wb')) # faiss紧邻搜索,经过user_embedding 搜索与其类似性最高的topk个item index = faiss.IndexFlatIP(embedding_dim) # 上面已经进行了归一化,这里能够不进行归一化了 # faiss.normalize_L2(user_embs) # faiss.normalize_L2(item_embs) index.add(item_embs) # 将item向量构建索引 sim, idx = index.search(np.ascontiguousarray(user_embs), topk) # 经过user去查询最类似的topk个item user_recall_items_dict = collections.defaultdict(dict) for target_idx, sim_value_list, rele_idx_list in tqdm(zip(test_user_model_input['user_id'], sim, idx)): target_raw_id = user_index_2_rawid[target_idx] # 从1开始是为了去掉商品自己, 因此最终得到的类似商品只有topk-1 for rele_idx, sim_value in zip(rele_idx_list[1:], sim_value_list[1:]): rele_raw_id = item_index_2_rawid[rele_idx] user_recall_items_dict[target_raw_id][rele_raw_id] = user_recall_items_dict.get(target_raw_id, {}) .get(rele_raw_id, 0) + sim_value user_recall_items_dict = {k: sorted(v.items(), key=lambda x: x[1], reverse=True) for k, v in user_recall_items_dict.items()} # 将召回的结果进行排序 # 保存召回的结果 # 这里是直接经过向量的方式获得了召回结果,相比于上面的召回方法,上面的只是获得了i2i及u2u的类似性矩阵,还须要进行协同过滤召回才能获得召回结果 # 能够直接对这个召回结果进行评估,为了方即可以统一写一个评估函数对全部的召回结果进行评估 pickle.dump(user_recall_items_dict, open(save_path + 'youtube_u2i_dict.pkl', 'wb')) return user_recall_items_dict
# 因为这里须要作召回评估,因此讲训练集中的最后一次点击都提取了出来 if not metric_recall: user_multi_recall_dict['youtubednn_recall'] = youtubednn_u2i_dict(all_click_df, topk=20) else: trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df) user_multi_recall_dict['youtubednn_recall'] = youtubednn_u2i_dict(trn_hist_click_df, topk=20) # 召回效果评估 metrics_recall(user_multi_recall_dict['youtubednn_recall'], trn_last_click_df, topk=20)
itemCF recall
上面已经经过协同过滤,Embedding检索的方式获得了文章的类似度矩阵,下面使用协同过滤的思想,给用户召回与其历史文章类似的文章。
这里在召回的时候,也是用了关联规则的方式:
# 基于商品的召回i2i def item_based_recommend(user_id, user_item_time_dict, i2i_sim, sim_item_topk, recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim): """ 基于文章协同过滤的召回 :param user_id: 用户id :param user_item_time_dict: 字典, 根据点击时间获取用户的点击文章序列 {user1: [(item1, time1), (item2, time2)..]...} :param i2i_sim: 字典,文章类似性矩阵 :param sim_item_topk: 整数, 选择与当前文章最类似的前k篇文章 :param recall_item_num: 整数, 最后的召回文章数量 :param item_topk_click: 列表,点击次数最多的文章列表,用户召回补全 :param emb_i2i_sim: 字典基于内容embedding算的文章类似矩阵 return: 召回的文章列表 [(item1, score1), (item2, score2)...] """ # 获取用户历史交互的文章 user_hist_items = user_item_time_dict[user_id] user_hist_items_ = {user_id for user_id, _ in user_hist_items } item_rank = {} for loc, (i, click_time) in enumerate(user_hist_items): for j, wij in sorted(i2i_sim[i].items(), key=lambda x: x[1], reverse=True)[:sim_item_topk]: if j in user_hist_items_: continue # 文章建立时间差权重 created_time_weight = np.exp(0.8 ** np.abs(item_created_time_dict[i] - item_created_time_dict[j])) # 类似文章和历史点击文章序列中历史文章所在的位置权重 loc_weight = (0.9 ** (len(user_hist_items) - loc)) content_weight = 1.0 if emb_i2i_sim.get(i, {}).get(j, None) is not None: content_weight += emb_i2i_sim[i][j] if emb_i2i_sim.get(j, {}).get(i, None) is not None: content_weight += emb_i2i_sim[j][i] item_rank.setdefault(j, 0) item_rank[j] += created_time_weight * loc_weight * content_weight * wij # 不足10个,用热门商品补全 if len(item_rank) < recall_item_num: for i, item in enumerate(item_topk_click): if item in item_rank.items(): # 填充的item应该不在原来的列表中 continue item_rank[item] = - i - 100 # 随便给个负数就行 if len(item_rank) == recall_item_num: break item_rank = sorted(item_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num] return item_rank
itemCF sim召回
# 先进行itemcf召回, 为了召回评估,因此提取最后一次点击 if metric_recall: trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df) else: trn_hist_click_df = all_click_df user_recall_items_dict = collections.defaultdict(dict) user_item_time_dict = get_user_item_time(trn_hist_click_df) i2i_sim = pickle.load(open(save_path + 'itemcf_i2i_sim.pkl', 'rb')) emb_i2i_sim = pickle.load(open(save_path + 'emb_i2i_sim.pkl', 'rb')) sim_item_topk = 20 recall_item_num = 10 item_topk_click = get_item_topk_click(trn_hist_click_df, k=50) for user in tqdm(trn_hist_click_df['user_id'].unique()): user_recall_items_dict[user] = item_based_recommend(user, user_item_time_dict, \ i2i_sim, sim_item_topk, recall_item_num, \ item_topk_click, item_created_time_dict, emb_i2i_sim) user_multi_recall_dict['itemcf_sim_itemcf_recall'] = user_recall_items_dict pickle.dump(user_multi_recall_dict['itemcf_sim_itemcf_recall'], open(save_path + 'itemcf_recall_dict.pkl', 'wb')) if metric_recall: # 召回效果评估 metrics_recall(user_multi_recall_dict['itemcf_sim_itemcf_recall'], trn_last_click_df, topk=recall_item_num)
embedding sim 召回
# 这里是为了召回评估,因此提取最后一次点击 if metric_recall: trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df) else: trn_hist_click_df = all_click_df user_recall_items_dict = collections.defaultdict(dict) user_item_time_dict = get_user_item_time(trn_hist_click_df) i2i_sim = pickle.load(open(save_path + 'emb_i2i_sim.pkl','rb')) sim_item_topk = 20 recall_item_num = 10 item_topk_click = get_item_topk_click(trn_hist_click_df, k=50) for user in tqdm(trn_hist_click_df['user_id'].unique()): user_recall_items_dict[user] = item_based_recommend(user, user_item_time_dict, i2i_sim, sim_item_topk, recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim) user_multi_recall_dict['embedding_sim_item_recall'] = user_recall_items_dict pickle.dump(user_multi_recall_dict['embedding_sim_item_recall'], open(save_path + 'embedding_sim_item_recall.pkl', 'wb')) if metric_recall: # 召回效果评估 metrics_recall(user_multi_recall_dict['embedding_sim_item_recall'], trn_last_click_df, topk=recall_item_num)
userCF召回
基于用户协同过滤,核心思想是给用户推荐与其类似的用户历史点击文章,由于这里涉及到了类似用户的历史文章,这里仍然能够加上一些关联规则来给用户可能点击的文章进行加权,这里使用的关联规则主要是考虑类似用户的历史点击文章与被推荐用户历史点击商品的关系权重,而这里的关系就能够直接借鉴基于物品的协同过滤类似的作法,只不过这里是对被推荐物品关系的一个累加的过程,下面是使用的一些关系权重,及相关的代码:
# 基于用户的召回 u2u2i def user_based_recommend(user_id, user_item_time_dict, u2u_sim, sim_user_topk, recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim): """ 基于文章协同过滤的召回 :param user_id: 用户id :param user_item_time_dict: 字典, 根据点击时间获取用户的点击文章序列 {user1: [(item1, time1), (item2, time2)..]...} :param u2u_sim: 字典,文章类似性矩阵 :param sim_user_topk: 整数, 选择与当前用户最类似的前k个用户 :param recall_item_num: 整数, 最后的召回文章数量 :param item_topk_click: 列表,点击次数最多的文章列表,用户召回补全 :param item_created_time_dict: 文章建立时间列表 :param emb_i2i_sim: 字典基于内容embedding算的文章类似矩阵 return: 召回的文章列表 [(item1, score1), (item2, score2)...] """ # 历史交互 user_item_time_list = user_item_time_dict[user_id] # {item1: time1, item2: time2...} user_hist_items = set([i for i, t in user_item_time_list]) # 存在一个用户与某篇文章的屡次交互, 这里得去重 items_rank = {} for sim_u, wuv in sorted(u2u_sim[user_id].items(), key=lambda x: x[1], reverse=True)[:sim_user_topk]: for i, click_time in user_item_time_dict[sim_u]: if i in user_hist_items: continue items_rank.setdefault(i, 0) loc_weight = 1.0 content_weight = 1.0 created_time_weight = 1.0 # 当前文章与该用户看的历史文章进行一个权重交互 for loc, (j, click_time) in enumerate(user_item_time_list): # 点击时的相对位置权重 loc_weight += 0.9 ** (len(user_item_time_list) - loc) # 内容类似性权重 if emb_i2i_sim.get(i, {}).get(j, None) is not None: content_weight += emb_i2i_sim[i][j] if emb_i2i_sim.get(j, {}).get(i, None) is not None: content_weight += emb_i2i_sim[j][i] # 建立时间差权重 created_time_weight += np.exp(0.8 * np.abs(item_created_time_dict[i] - item_created_time_dict[j])) items_rank[i] += loc_weight * content_weight * created_time_weight * wuv # 热度补全 if len(items_rank) < recall_item_num: for i, item in enumerate(item_topk_click): if item in items_rank.items(): # 填充的item应该不在原来的列表中 continue items_rank[item] = - i - 100 # 随便给个复数就行 if len(items_rank) == recall_item_num: break items_rank = sorted(items_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num] return items_rank
userCF sim召回
# 这里是为了召回评估,因此提取最后一次点击 # 因为usercf中计算user之间的类似度的过程太费内存了,全量数据这里就没有跑,跑了一个采样以后的数据 if metric_recall: trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df) else: trn_hist_click_df = all_click_df user_recall_items_dict = collections.defaultdict(dict) user_item_time_dict = get_user_item_time(trn_hist_click_df) u2u_sim = pickle.load(open(save_path + 'usercf_u2u_sim.pkl', 'rb')) sim_user_topk = 20 recall_item_num = 10 item_topk_click = get_item_topk_click(trn_hist_click_df, k=50) for user in tqdm(trn_hist_click_df['user_id'].unique()): user_recall_items_dict[user] = user_based_recommend(user, user_item_time_dict, u2u_sim, sim_user_topk, \ recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim) pickle.dump(user_recall_items_dict, open(save_path + 'usercf_u2u2i_recall.pkl', 'wb')) if metric_recall: # 召回效果评估 metrics_recall(user_recall_items_dict, trn_last_click_df, topk=recall_item_num)
user embedding sim召回
虽然没有直接跑usercf的计算用户之间的类似度,为了验证上述基于用户的协同过滤的代码,下面使用了YoutubeDNN过程当中产生的user embedding来进行向量检索每一个user最类似的topk个user,在使用这里获得的u2u的类似性矩阵,使用usercf进行召回,具体代码以下
# 使用Embedding的方式获取u2u的类似性矩阵 # topk指的是每一个user, faiss搜索后返回最类似的topk个user def u2u_embdding_sim(click_df, user_emb_dict, save_path, topk): user_list = [] user_emb_list = [] for user_id, user_emb in user_emb_dict.items(): user_list.append(user_id) user_emb_list.append(user_emb) user_index_2_rawid_dict = {k: v for k, v in zip(range(len(user_list)), user_list)} user_emb_np = np.array(user_emb_list, dtype=np.float32) # 创建faiss索引 user_index = faiss.IndexFlatIP(user_emb_np.shape[1]) user_index.add(user_emb_np) # 类似度查询,给每一个索引位置上的向量返回topk个item以及类似度 sim, idx = user_index.search(user_emb_np, topk) # 返回的是列表 # 将向量检索的结果保存成原始id的对应关系 user_sim_dict = collections.defaultdict(dict) for target_idx, sim_value_list, rele_idx_list in tqdm(zip(range(len(user_emb_np)), sim, idx)): target_raw_id = user_index_2_rawid_dict[target_idx] # 从1开始是为了去掉商品自己, 因此最终得到的类似商品只有topk-1 for rele_idx, sim_value in zip(rele_idx_list[1:], sim_value_list[1:]): rele_raw_id = user_index_2_rawid_dict[rele_idx] user_sim_dict[target_raw_id][rele_raw_id] = user_sim_dict.get(target_raw_id, {}).get(rele_raw_id, 0) + sim_value # 保存i2i类似度矩阵 pickle.dump(user_sim_dict, open(save_path + 'youtube_u2u_sim.pkl', 'wb')) return user_sim_dict
# 读取YoutubeDNN过程当中产生的user embedding, 而后使用faiss计算用户之间的类似度 # 这里须要注意,这里获得的user embedding其实并非很好,由于YoutubeDNN中使用的是用户点击序列来训练的user embedding, # 若是序列广泛都比较短的话,其实效果并非很好 user_emb_dict = pickle.load(open(save_path + 'user_youtube_emb.pkl', 'rb')) u2u_sim = u2u_embdding_sim(all_click_df, user_emb_dict, save_path, topk=10)
经过YoutubeDNN获得的user_embedding
# 使用召回评估函数验证当前召回方式的效果 if metric_recall: trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df) else: trn_hist_click_df = all_click_df user_recall_items_dict = collections.defaultdict(dict) user_item_time_dict = get_user_item_time(trn_hist_click_df) u2u_sim = pickle.load(open(save_path + 'youtube_u2u_sim.pkl', 'rb')) sim_user_topk = 20 recall_item_num = 10 item_topk_click = get_item_topk_click(trn_hist_click_df, k=50) for user in tqdm(trn_hist_click_df['user_id'].unique()): user_recall_items_dict[user] = user_based_recommend(user, user_item_time_dict, u2u_sim, sim_user_topk, \ recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim) user_multi_recall_dict['youtubednn_usercf_recall'] = user_recall_items_dict pickle.dump(user_multi_recall_dict['youtubednn_usercf_recall'], open(save_path + 'youtubednn_usercf_recall.pkl', 'wb')) if metric_recall: # 召回效果评估 metrics_recall(user_multi_recall_dict['youtubednn_usercf_recall'], trn_last_click_df, topk=recall_item_num)
冷启动问题
冷启动问题能够分红三类:文章冷启动,用户冷启动,系统冷启动。
当前场景下冷启动问题的分析:
对当前的数据进行分析会发现,日志中全部出现过的点击文章只有3w多个,而整个文章库中却有30多万,那么测试集中的用户最后一次点击是否会点击没有出如今日志中的文章呢?若是存在这种状况,说明用户点击的文章以前没有任何的交互信息,这也就是咱们所说的文章冷启动。经过数据分析还能够发现,测试集用户只有一次点击的数据占得比例还很多,其实仅仅经过用户的一次点击就给用户推荐文章使用模型的方式也是比较难的,这里其实也能够考虑用户冷启动的问题,可是这里只给出物品冷启动的一些解决方案及代码,关于用户冷启动的话提一些可行性的作法。
文章冷启动(没有冷启动的探索问题)
其实咱们这里不是为了作文章的冷启动而作冷启动,而是猜想用户可能会点击一些没有在log数据中出现的文章,咱们要作的就是如何从将近27万的文章中选择一些文章做为用户冷启动的文章,这里其实也能够当作是一种召回策略,咱们这里就采用简单的比较好理解的基于规则的召回策略来获取用户可能点击的未出如今log数据中的文章。
如今的问题变成了:如何给每一个用户考虑从27万个商品中获取一小部分商品?随机选一些多是一种方案。下面给出一些参考的方案。
注意:
这里看似和基于embedding计算的item之间类似度而后作itemcf是一致的,可是如今咱们的目的不同,咱们这里的目的是找到类似的向量,而且尚未出如今log日志中的商品,再加上一些其余的冷启动的策略,这里须要找回的数量会偏多一点,否则被筛选完以后可能都没有文章了
# 先进行itemcf召回,这里不须要作召回评估,这里只是一种策略 trn_hist_click_df = all_click_df user_recall_items_dict = collections.defaultdict(dict) user_item_time_dict = get_user_item_time(trn_hist_click_df) i2i_sim = pickle.load(open(save_path + 'emb_i2i_sim.pkl','rb')) sim_item_topk = 150 recall_item_num = 100 # 稍微召回多一点文章,便于后续的规则筛选 item_topk_click = get_item_topk_click(trn_hist_click_df, k=50) for user in tqdm(trn_hist_click_df['user_id'].unique()): user_recall_items_dict[user] = item_based_recommend(user, user_item_time_dict, i2i_sim, sim_item_topk, recall_item_num, item_topk_click,item_created_time_dict, emb_i2i_sim) pickle.dump(user_recall_items_dict, open(save_path + 'cold_start_items_raw_dict.pkl', 'wb'))
# 基于规则进行文章过滤 # 保留文章主题与用户历史浏览主题类似的文章 # 保留文章字数与用户历史浏览文章字数相差不大的文章 # 保留最后一次点击当天的文章 # 按照类似度返回最终的结果 def get_click_article_ids_set(all_click_df): return set(all_click_df.click_article_id.values) def cold_start_items(user_recall_items_dict, user_hist_item_typs_dict, user_hist_item_words_dict, \ user_last_item_created_time_dict, item_type_dict, item_words_dict, item_created_time_dict, click_article_ids_set, recall_item_num): """ 冷启动的状况下召回一些文章 :param user_recall_items_dict: 基于内容embedding类似性召回来的不少文章, 字典, {user1: [item1, item2, ..], } :param user_hist_item_typs_dict: 字典, 用户点击的文章的主题映射 :param user_hist_item_words_dict: 字典, 用户点击的历史文章的字数映射 :param user_last_item_created_time_idct: 字典,用户点击的历史文章建立时间映射 :param item_tpye_idct: 字典,文章主题映射 :param item_words_dict: 字典,文章字数映射 :param item_created_time_dict: 字典, 文章建立时间映射 :param click_article_ids_set: 集合,用户点击过得文章, 也就是日志里面出现过的文章 :param recall_item_num: 召回文章的数量, 这个指的是没有出如今日志里面的文章数量 """ cold_start_user_items_dict = {} for user, item_list in tqdm(user_recall_items_dict.items()): cold_start_user_items_dict.setdefault(user, []) for item, score in item_list: # 获取历史文章信息 hist_item_type_set = user_hist_item_typs_dict[user] hist_mean_words = user_hist_item_words_dict[user] hist_last_item_created_time = user_last_item_created_time_dict[user] hist_last_item_created_time = datetime.fromtimestamp(hist_last_item_created_time) # 获取当前召回文章的信息 curr_item_type = item_type_dict[item] curr_item_words = item_words_dict[item] curr_item_created_time = item_created_time_dict[item] curr_item_created_time = datetime.fromtimestamp(curr_item_created_time) # 首先,文章不能出如今用户的历史点击中, 而后根据文章主题,文章单词数,文章建立时间进行筛选 if curr_item_type not in hist_item_type_set or \ item in click_article_ids_set or \ abs(curr_item_words - hist_mean_words) > 200 or \ abs((curr_item_created_time - hist_last_item_created_time).days) > 90: continue cold_start_user_items_dict[user].append((item, score)) # {user1: [(item1, score1), (item2, score2)..]...} # 须要控制一下冷启动召回的数量 cold_start_user_items_dict = {k: sorted(v, key=lambda x:x[1], reverse=True)[:recall_item_num] \ for k, v in cold_start_user_items_dict.items()} pickle.dump(cold_start_user_items_dict, open(save_path + 'cold_start_user_items_dict.pkl', 'wb')) return cold_start_user_items_dict
all_click_df_ = all_click_df.copy() all_click_df_ = all_click_df_.merge(item_info_df, how='left', on='click_article_id') user_hist_item_typs_dict, user_hist_item_ids_dict, user_hist_item_words_dict, user_last_item_created_time_dict = get_user_hist_item_info_dict(all_click_df_) click_article_ids_set = get_click_article_ids_set(all_click_df) # 须要注意的是 # 这里使用了不少规则来筛选冷启动的文章,因此前面再召回的阶段就应该尽量的多召回一些文章,不然很容易被删掉 cold_start_user_items_dict = cold_start_items(user_recall_items_dict, user_hist_item_typs_dict, user_hist_item_words_dict, user_last_item_created_time_dict, item_type_dict, item_words_dict, item_created_time_dict, click_article_ids_set, recall_item_num) user_multi_recall_dict['cold_start_recall'] = cold_start_user_items_dict
多路召回合并
多路召回合并就是将前面全部的召回策略获得的用户文章列表合并起来,下面是对前面全部召回结果的汇总
注意:
在作召回评估的时候就会发现有些召回的效果不错有些召回的效果不好,因此对每一路召回的结果,咱们能够认为的定义一些权重,来作最终的类似度融合
def combine_recall_results(user_multi_recall_dict, weight_dict=None, topk=25): final_recall_items_dict = {} # 对每一种召回结果按照用户进行归一化,方便后面多种召回结果,相同用户的物品之间权重相加 def norm_user_recall_items_sim(sorted_item_list): # 若是冷启动中没有文章或者只有一篇文章,直接返回,出现这种状况的缘由多是冷启动召回的文章数量太少了, # 基于规则筛选以后就没有文章了, 这里还能够作一些其余的策略性的筛选 if len(sorted_item_list) < 2: return sorted_item_list min_sim = sorted_item_list[-1][1] max_sim = sorted_item_list[0][1] norm_sorted_item_list = [] for item, score in sorted_item_list: if max_sim > 0: norm_score = 1.0 * (score - min_sim) / (max_sim - min_sim) if max_sim > min_sim else 1.0 else: norm_score = 0.0 norm_sorted_item_list.append((item, norm_score)) return norm_sorted_item_list print('多路召回合并...') for method, user_recall_items in tqdm(user_multi_recall_dict.items()): print(method + '...') # 在计算最终召回结果的时候,也能够为每一种召回结果设置一个权重 if weight_dict == None: recall_method_weight = 1 else: recall_method_weight = weight_dict[method] for user_id, sorted_item_list in user_recall_items.items(): # 进行归一化 user_recall_items[user_id] = norm_user_recall_items_sim(sorted_item_list) for user_id, sorted_item_list in user_recall_items.items(): # print('user_id') final_recall_items_dict.setdefault(user_id, {}) for item, score in sorted_item_list: final_recall_items_dict[user_id].setdefault(item, 0) final_recall_items_dict[user_id][item] += recall_method_weight * score final_recall_items_dict_rank = {} # 多路召回时也能够控制最终的召回数量 for user, recall_item_dict in final_recall_items_dict.items(): final_recall_items_dict_rank[user] = sorted(recall_item_dict.items(), key=lambda x: x[1], reverse=True)[:topk] # 将多路召回后的最终结果字典保存到本地 pickle.dump(final_recall_items_dict, open(os.path.join(save_path, 'final_recall_items_dict.pkl'),'wb')) return final_recall_items_dict_rank
# 这里直接对多路召回的权重给了一个相同的值,其实能够根据前面召回的状况来调整参数的值 weight_dict = {'itemcf_sim_itemcf_recall': 1.0, 'embedding_sim_item_recall': 1.0, 'youtubednn_recall': 1.0, 'youtubednn_usercf_recall': 1.0, 'cold_start_recall': 1.0} # 最终合并以后每一个用户召回150个商品进行排序 final_recall_items_dict_rank = combine_recall_results(user_multi_recall_dict, weight_dict, topk=150)
总结
上述实现了以下召回策略:
对于上述实现的召回策略其实都不是最优的结果,咱们只是作了个简单的尝试,其中还有不少地方能够优化,包括已经实现的这些召回策略的参数或者新加一些,修改一些关联规则均可以。固然还能够尝试更多的召回策略,好比对新闻进行热度召回等等。