推荐系统-多路召回

介绍
第一部分,已经介绍协同推荐,并利用它实现了一个简单的推荐系统,在第二部分,咱们简要的分析了咱们拥有的数据,包括每一个字段的分布,常见的统计信息等,为接下来的多路召回提供了很好的指引。html

回想一下baseline的思路,咱们首先计算了item的之间的类似度,而后基于用户的正反馈item列表,找到与列表中每个item类似度最高的topn个item,组成一个列表,最后直接按照类似度得分进行排序,获得最后的推荐结果。python

在实际的推荐场景中,一般会有两个阶段,第一个阶段是召回阶段,第二个阶段是排序阶段。第一个阶段召回那些类似度较高的N个item列表,它更关注的是召回率,较为粗略;而排序阶段会使用更为复杂的模型进行监督学习(转化为分类的任务),获得分类几率,也就是置信度,最后按照置信度进行排序,取top K个item做为最后的推荐列表。git

baseline中其实只包含了召回这个阶段,虽然但就这个任务而言,它已经够了。本节介绍的是多路召回,什么是多路召回呢,好比在一个推荐场景中,咱们能够选择ItemCF或者UserCF以及基于热点的召回策略等等,由于咱们召回层的目的是为了尽量的确保召回,因此基于单个的策略确定是效果不如多个策略的,这里就引出了多路召回的概念,也就是多个策略并行的进行召回。下面这个图表示了多路召回的一个例子。
image.pnggithub

在多路召回中,每一个策略之间绝不相关,可使用多种不一样的策略来获取用户排序的候选商品集合,而具体使用哪些召回策略实际上是与业务强相关的 ,针对不一样的任务就会有对于该业务真实场景下须要考虑的召回规则。例如新闻推荐,召回规则能够是“热门视频”、“导演召回”、“演员召回”、“最近上映“、”流行趋势“、”类型召回“等等。算法

导入相关库segmentfault

import pandas as pd  
import numpy as np
from tqdm import tqdm  
from collections import defaultdict  
import os, math, warnings, math, pickle
from tqdm import tqdm
import faiss
import collections
import random
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import LabelEncoder
from datetime import datetime
from deepctr.feature_column import SparseFeat, VarLenSparseFeat
from sklearn.preprocessing import LabelEncoder
from tensorflow.python.keras import backend as K
from tensorflow.python.keras.models import Model
from tensorflow.python.keras.preprocessing.sequence import pad_sequences
from deepmatch.models import *
from deepmatch.utils import sampledsoftmaxloss

warnings.filterwarnings('ignore')
data_path = './data_raw/'
save_path = './temp_results/'
# 作召回评估的一个标志, 若是不进行评估就是直接使用全量数据进行召回
metric_recall = False

读取数据架构

在通常的推荐系统比赛中读取数据部分主要分为三种模式, 不一样的模式对应的不一样的数据集:app

  1. Debug模式: 这个的目的是帮助咱们基于数据先搭建一个简易的baseline并跑通, 保证写的baseline代码没有什么问题。 因为推荐比赛的数据每每很是巨大, 若是一上来直接采用所有的数据进行分析,搭建baseline框架, 每每会带来时间和设备上的损耗, 因此这时候咱们每每须要从海量数据的训练集中随机抽取一部分样原本进行调试(train_click_log_sample), 先跑通一个baseline。
  2. 线下验证模式: 这个的目的是帮助咱们在线下基于已有的训练集数据, 来选择好合适的模型和一些超参数。 因此咱们这一块只须要加载整个训练集(train_click_log), 而后把整个训练集再分红训练集和验证集。 训练集是模型的训练数据, 验证集部分帮助咱们调整模型的参数和其余的一些超参数。
  3. 线上模式: 咱们用debug模式搭建起一个推荐系统比赛的baseline, 用线下验证模式选择好了模型和一些超参数, 这一部分就是真正的对于给定的测试集进行预测, 提交到线上, 因此这一块使用的训练数据集是全量的数据集(train_click_log+test_click_log)

下面就分别对这三种不一样的数据读取模式先创建不一样的代导入函数, 方便后面针对不一样的模式下导入数据。框架

# debug模式: 从训练集中划出一部分数据来调试代码
def get_all_click_sample(data_path, sample_nums=10000):
    """
        训练集中采样一部分数据调试
        data_path: 原数据的存储路径
        sample_nums: 采样数目(这里因为机器的内存限制,能够采样用户作)
    """
    all_click = pd.read_csv(data_path + 'train_click_log.csv')
    all_user_ids = all_click.user_id.unique()

    sample_user_ids = np.random.choice(all_user_ids, size=sample_nums, replace=False) 
    all_click = all_click[all_click['user_id'].isin(sample_user_ids)]
    
    all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp']))
    return all_click

# 读取点击数据,这里分红线上和线下,若是是为了获取线上提交结果应该讲测试集中的点击数据合并到总的数据中
# 若是是为了线下验证模型的有效性或者特征的有效性,能够只使用训练集
def get_all_click_df(data_path='./data', offline=True):
    if offline:
        all_click = pd.read_csv(data_path + '/train_click_log.csv')
    else:
        trn_click = pd.read_csv(data_path + '/train_click_log.csv')
        tst_click = pd.read_csv(data_path + '/testA_click_log.csv')

        all_click = trn_click.append(tst_click)
    
    all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp']))
    return all_click
# 读取文章的基本属性
def get_item_info_df(data_path):
    item_info_df = pd.read_csv(data_path + 'articles.csv')
    
    # 为了方便与训练集中的click_article_id拼接,须要把article_id修改为click_article_id
    item_info_df = item_info_df.rename(columns={'article_id': 'click_article_id'})
    
    return item_info_df
# 读取文章的Embedding数据
def get_item_emb_dict(data_path):
    item_emb_df = pd.read_csv(data_path + '/articles_emb.csv')
    
    item_emb_cols = [x for x in item_emb_df.columns if 'emb' in x]
    item_emb_np = np.ascontiguousarray(item_emb_df[item_emb_cols])
    # 进行归一化
    item_emb_np = item_emb_np / np.linalg.norm(item_emb_np, axis=1, keepdims=True)

    item_emb_dict = dict(zip(item_emb_df['article_id'], item_emb_np))
    pickle.dump(item_emb_dict, open(save_path + 'item_content_emb.pkl', 'wb'))
    
    return item_emb_dict
# min-max 归一化函数
max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x))
# 采样数据
# all_click_df = get_all_click_sample(data_path)

# 全量训练集
all_click_df = get_all_click_df(offline=False)

# 对时间戳进行归一化,用于在关联规则的时候计算权重
all_click_df['click_timestamp'] = all_click_df[['click_timestamp']].apply(max_min_scaler)

item_info_df = get_item_info_df(data_path)

item_emb_dict = get_item_emb_dict(data_path)

工具函数
获取用户-文章-时间函数dom

这个在基于关联规则的用户协同过滤的时候会用到

# 根据点击时间获取用户的点击文章序列   {user1: [(item1, time1), (item2, time2)..]...}
def get_user_item_time(click_df):
    
    click_df = click_df.sort_values('click_timestamp')
    
    def make_item_time_pair(df):
        return list(zip(df['click_article_id'], df['click_timestamp']))
    
    user_item_time_df = click_df.groupby('user_id')['click_article_id', 'click_timestamp'].apply(lambda x: make_item_time_pair(x))\
                                                            .reset_index().rename(columns={0: 'item_time_list'})
    user_item_time_dict = dict(zip(user_item_time_df['user_id'], user_item_time_df['item_time_list']))
    
    return user_item_time_dict

获取文章-用户-时间函数
这个在基于关联规则的文章协同过滤的时候会用到

# 根据时间获取商品被点击的用户序列  {item1: [(user1, time1), (user2, time2)...]...}
# 这里的时间是用户点击当前商品的时间,好像没有直接的关系。
def get_item_user_time_dict(click_df):
    def make_user_time_pair(df):
        return list(zip(df['user_id'], df['click_timestamp']))
    
    click_df = click_df.sort_values('click_timestamp')
    item_user_time_df = click_df.groupby('click_article_id')['user_id', 'click_timestamp'].apply(lambda x: make_user_time_pair(x))\
                                                            .reset_index().rename(columns={0: 'user_time_list'})
    
    item_user_time_dict = dict(zip(item_user_time_df['click_article_id'], item_user_time_df['user_time_list']))
    return item_user_time_dict

获取历史和最后一次点击

这个在评估召回结果, 特征工程和制做标签转成监督学习测试集的时候回用到

# 获取当前数据的历史点击和最后一次点击
def get_hist_and_last_click(all_click):
    
    all_click = all_click.sort_values(by=['user_id', 'click_timestamp'])
    click_last_df = all_click.groupby('user_id').tail(1)

    # 若是用户只有一个点击,hist为空了,会致使训练的时候这个用户不可见,此时默认泄露一下
    def hist_func(user_df):
        if len(user_df) == 1:
            return user_df
        else:
            return user_df[:-1]

    click_hist_df = all_click.groupby('user_id').apply(hist_func).reset_index(drop=True)

    return click_hist_df, click_last_df

获取文章属性特征

# 获取文章id对应的基本属性,保存成字典的形式,方便后面召回阶段,冷启动阶段直接使用
def get_item_info_dict(item_info_df):
    max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x))
    item_info_df['created_at_ts'] = item_info_df[['created_at_ts']].apply(max_min_scaler)
    
    item_type_dict = dict(zip(item_info_df['click_article_id'], item_info_df['category_id']))
    item_words_dict = dict(zip(item_info_df['click_article_id'], item_info_df['words_count']))
    item_created_time_dict = dict(zip(item_info_df['click_article_id'], item_info_df['created_at_ts']))
    
    return item_type_dict, item_words_dict, item_created_time_dict

获取用户历史点击的文章信息

def get_user_hist_item_info_dict(all_click):
    
    # 获取user_id对应的用户历史点击文章类型的集合字典
    user_hist_item_typs = all_click.groupby('user_id')['category_id'].agg(set).reset_index()
    user_hist_item_typs_dict = dict(zip(user_hist_item_typs['user_id'], user_hist_item_typs['category_id']))
    
    # 获取user_id对应的用户点击文章的集合
    user_hist_item_ids_dict = all_click.groupby('user_id')['click_article_id'].agg(set).reset_index()
    user_hist_item_ids_dict = dict(zip(user_hist_item_ids_dict['user_id'], user_hist_item_ids_dict['click_article_id']))
    
    # 获取user_id对应的用户历史点击的文章的平均字数字典
    user_hist_item_words = all_click.groupby('user_id')['words_count'].agg('mean').reset_index()
    user_hist_item_words_dict = dict(zip(user_hist_item_words['user_id'], user_hist_item_words['words_count']))
    
    # 获取user_id对应的用户最后一次点击的文章的建立时间
    all_click_ = all_click.sort_values('click_timestamp')
    user_last_item_created_time = all_click_.groupby('user_id')['created_at_ts'].apply(lambda x: x.iloc[-1]).reset_index()
    
    max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x))
    user_last_item_created_time['created_at_ts'] = user_last_item_created_time[['created_at_ts']].apply(max_min_scaler)
    
    user_last_item_created_time_dict = dict(zip(user_last_item_created_time['user_id'], \
                                                user_last_item_created_time['created_at_ts']))
    
    return user_hist_item_typs_dict, user_hist_item_ids_dict, user_hist_item_words_dict, user_last_item_created_time_dict

获取点击次数最多的Top-k个文章
获取近期点击最多的文章

def get_item_topk_click(click_df, k):
    topk_click = click_df['click_article_id'].value_counts().index[:k]
    return topk_click

定义多路召回字典
获取文章的属性信息,保存成字典的形式方便查询

item_type_dict, item_words_dict, item_created_time_dict = get_item_info_dict(item_info_df)

# 定义一个多路召回的字典,将各路召回的结果都保存在这个字典当中
user_multi_recall_dict =  {'itemcf_sim_itemcf_recall': {},
                           'embedding_sim_item_recall': {},
                           'youtubednn_recall': {},
                           'youtubednn_usercf_recall': {}, 
                           'cold_start_recall': {}}
                           
                           
# 提取最后一次点击做为召回评估,若是不须要作召回评估直接使用全量的训练集进行召回(线下验证模型)
# 若是不是召回评估,直接使用全量数据进行召回,不用将最后一次提取出来
trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)

召回效果评估
作完了召回有时候也须要对当前的召回方法或者参数进行调整以达到更好的召回效果,由于召回的结果决定了最终排序的上限,下面也会提供一个召回评估的方法

# 依次评估召回的前10, 20, 30, 40, 50个文章中的击中率
def metrics_recall(user_recall_items_dict, trn_last_click_df, topk=5):
    last_click_item_dict = dict(zip(trn_last_click_df['user_id'], trn_last_click_df['click_article_id']))
    user_num = len(user_recall_items_dict)
    
    for k in range(10, topk+1, 10):
        hit_num = 0
        for user, item_list in user_recall_items_dict.items():
            # 获取前k个召回的结果
            tmp_recall_items = [x[0] for x in user_recall_items_dict[user][:k]]
            if last_click_item_dict[user] in set(tmp_recall_items):
                hit_num += 1
        
        hit_rate = round(hit_num * 1.0 / user_num, 5)
        print(' topk: ', k, ' : ', 'hit_num: ', hit_num, 'hit_rate: ', hit_rate, 'user_num : ', user_num)

计算类似性矩阵
这一部分主要是经过协同过滤以及向量检索获得类似性矩阵,类似性矩阵主要分为user2user和item2item,下面依次获取基于itemCF的item2item的类似性矩阵。

itemCF i2i_sim
借鉴KDD2020的去偏商品推荐,在计算item2item类似性矩阵时,使用关联规则,使得计算的文章的类似性还考虑到了:

  1. 用户点击的时间权重
  2. 用户点击的顺序权重
  3. 文章建立的时间权重
def itemcf_sim(df, item_created_time_dict):
    """
        文章与文章之间的类似性矩阵计算
        :param df: 数据表
        :item_created_time_dict:  文章建立时间的字典
        return : 文章与文章的类似性矩阵
        
        思路: 基于物品的协同过滤(详细请参考上一期推荐系统基础的组队学习) + 关联规则
    """
    
    user_item_time_dict = get_user_item_time(df)
    
    # 计算物品类似度
    i2i_sim = {}
    item_cnt = defaultdict(int)
    for user, item_time_list in tqdm(user_item_time_dict.items()):
        # 在基于商品的协同过滤优化的时候能够考虑时间因素
        for loc1, (i, i_click_time) in enumerate(item_time_list):
            item_cnt[i] += 1
            i2i_sim.setdefault(i, {})
            for loc2, (j, j_click_time) in enumerate(item_time_list):
                if(i == j):
                    continue
                    
                # 考虑文章的正向顺序点击和反向顺序点击    
                loc_alpha = 1.0 if loc2 > loc1 else 0.7
                # 位置信息权重,其中的参数能够调节
                loc_weight = loc_alpha * (0.9 ** (np.abs(loc2 - loc1) - 1))
                # 点击时间权重,其中的参数能够调节
                click_time_weight = np.exp(0.7 ** np.abs(i_click_time - j_click_time))
                # 两篇文章建立时间的权重,其中的参数能够调节
                created_time_weight = np.exp(0.8 ** np.abs(item_created_time_dict[i] - item_created_time_dict[j]))
                i2i_sim[i].setdefault(j, 0)
                # 考虑多种因素的权重计算最终的文章之间的类似度
                i2i_sim[i][j] += loc_weight * click_time_weight * created_time_weight / math.log(len(item_time_list) + 1)
                
    i2i_sim_ = i2i_sim.copy()
    for i, related_items in i2i_sim.items():
        for j, wij in related_items.items():
            i2i_sim_[i][j] = wij / math.sqrt(item_cnt[i] * item_cnt[j])
    
    # 将获得的类似性矩阵保存到本地
    pickle.dump(i2i_sim_, open(save_path + 'itemcf_i2i_sim.pkl', 'wb'))
    
    return i2i_sim_

i2i_sim = itemcf_sim(all_click_df, item_created_time_dict)
100%|██████████| 250000/250000 [14:20<00:00, 290.38it/s]

userCF u2u_sim
在计算用户之间的类似度的时候,也可使用一些简单的关联规则,好比用户活跃度权重,这里将用户的点击次数做为用户活跃度的指标

def get_user_activate_degree_dict(all_click_df):
    all_click_df_ = all_click_df.groupby('user_id')['click_article_id'].count().reset_index()
    
    # 用户活跃度归一化
    mm = MinMaxScaler()
    all_click_df_['click_article_id'] = mm.fit_transform(all_click_df_[['click_article_id']])
    user_activate_degree_dict = dict(zip(all_click_df_['user_id'], all_click_df_['click_article_id']))
    
    return user_activate_degree_dict
def usercf_sim(all_click_df, user_activate_degree_dict):
    """
        用户类似性矩阵计算
        :param all_click_df: 数据表
        :param user_activate_degree_dict: 用户活跃度的字典
        return 用户类似性矩阵
        
        思路: 基于用户的协同过滤(详细请参考上一期推荐系统基础的组队学习) + 关联规则
    """
    item_user_time_dict = get_item_user_time_dict(all_click_df)
    
    u2u_sim = {}
    user_cnt = defaultdict(int)
    for item, user_time_list in tqdm(item_user_time_dict.items()):
        for u, click_time in user_time_list:
            user_cnt[u] += 1
            u2u_sim.setdefault(u, {})
            for v, click_time in user_time_list:
                u2u_sim[u].setdefault(v, 0)
                if u == v:
                    continue
                # 用户平均活跃度做为活跃度的权重,这里的式子也能够改善
                activate_weight = 100 * 0.5 * (user_activate_degree_dict[u] + user_activate_degree_dict[v])   
                u2u_sim[u][v] += activate_weight / math.log(len(user_time_list) + 1)
    
    u2u_sim_ = u2u_sim.copy()
    for u, related_users in u2u_sim.items():
        for v, wij in related_users.items():
            u2u_sim_[u][v] = wij / math.sqrt(user_cnt[u] * user_cnt[v])
    
    # 将获得的类似性矩阵保存到本地
    pickle.dump(u2u_sim_, open(save_path + 'usercf_u2u_sim.pkl', 'wb'))

    return u2u_sim_
item_type_dict, item_words_dict, item_created_time_dict = get_item_info_dict(item_info_df)
# 因为usercf计算时候太耗费内存了,这里就不直接运行了
# 若是是采样的话,是能够运行的
user_activate_degree_dict = get_user_activate_degree_dict(all_click_df)
u2u_sim = usercf_sim(all_click_df, user_activate_degree_dict)

item embedding sim
使用Embedding计算item之间的类似度是为了后续冷启动的时候能够获取未出如今点击数据中的文章,后面有对冷启动专门的介绍,这里简单的说一下faiss。

aiss是Facebook的AI团队开源的一套用于作聚类或者类似性搜索的软件库,底层是用C++实现。Faiss由于超级优越的性能,被普遍应用于推荐相关的业务当中.

faiss工具包通常使用在推荐系统中的向量召回部分。在作向量召回的时候要么是u2u,u2i或者i2i,这里的u和i指的是user和item.咱们知道在实际的场景中user和item的数量都是海量的,咱们最容易想到的基于向量类似度的召回就是使用两层循环遍历user列表或者item列表计算两个向量的类似度,可是这样作在面对海量数据是不切实际的,faiss就是用来加速计算某个查询向量最类似的topk个索引向量。

faiss查询的原理:

faiss使用了PCA和PQ(Product quantization乘积量化)两种技术进行向量压缩和编码,固然还使用了其余的技术进行优化,可是PCA和PQ是其中最核心部分。

  1. PCA降维算法细节参考下面这个连接进行学习
    主成分分析(PCA)原理总结 4
  2. PQ编码的细节下面这个连接进行学习
    实例理解product quantization算法 4

faiss使用

faiss官方教程 7

# 向量检索类似度计算
# topk指的是每一个item, faiss搜索后返回最类似的topk个item
def embdding_sim(click_df, item_emb_df, save_path, topk):
    """
        基于内容的文章embedding类似性矩阵计算
        :param click_df: 数据表
        :param item_emb_df: 文章的embedding
        :param save_path: 保存路径
        :patam topk: 找最类似的topk篇
        return 文章类似性矩阵
        
        思路: 对于每一篇文章, 基于embedding的类似性返回topk个与其最类似的文章, 只不过因为文章数量太多,这里用了faiss进行加速
    """
    
    # 文章索引与文章id的字典映射
    item_idx_2_rawid_dict = dict(zip(item_emb_df.index, item_emb_df['article_id']))
    
    item_emb_cols = [x for x in item_emb_df.columns if 'emb' in x]
    item_emb_np = np.ascontiguousarray(item_emb_df[item_emb_cols].values, dtype=np.float32)
    # 向量进行单位化
    item_emb_np = item_emb_np / np.linalg.norm(item_emb_np, axis=1, keepdims=True)
    
    # 创建faiss索引
    item_index = faiss.IndexFlatIP(item_emb_np.shape[1])
    item_index.add(item_emb_np)
    # 类似度查询,给每一个索引位置上的向量返回topk个item以及类似度
    sim, idx = item_index.search(item_emb_np, topk) # 返回的是列表
    
    # 将向量检索的结果保存成原始id的对应关系
    item_sim_dict = collections.defaultdict(dict)
    for target_idx, sim_value_list, rele_idx_list in tqdm(zip(range(len(item_emb_np)), sim, idx)):
        target_raw_id = item_idx_2_rawid_dict[target_idx]
        # 从1开始是为了去掉商品自己, 因此最终得到的类似商品只有topk-1
        for rele_idx, sim_value in zip(rele_idx_list[1:], sim_value_list[1:]): 
            rele_raw_id = item_idx_2_rawid_dict[rele_idx]
            item_sim_dict[target_raw_id][rele_raw_id] = item_sim_dict.get(target_raw_id, {}).get(rele_raw_id, 0) + sim_value
    
    # 保存i2i类似度矩阵
    pickle.dump(item_sim_dict, open(save_path + 'emb_i2i_sim.pkl', 'wb'))   
    
    return item_sim_dict
item_emb_df = pd.read_csv(data_path + '/articles_emb.csv')
emb_i2i_sim = embdding_sim(all_click_df, item_emb_df, save_path, topk=10) # topk能够自行设置

召回
这个就是咱们开篇提到的那个问题, 面的36万篇文章, 20多万用户的推荐, 咱们又有哪些策略来缩减问题的规模? 咱们就能够再召回阶段筛选出用户对于点击文章的候选集合, 从而下降问题的规模。召回经常使用的策略:

  • Youtube DNN 召回
  • 基于文章的召回

    • 文章的协同过滤
    • 基于文章embedding的召回
  • 基于用户的召回

    • 用户的协同过滤
    • 用户embedding

上面的各类召回方式一部分在基于用户已经看得文章的基础上去召回与这些文章类似的一些文章, 而这个类似性的计算方式不一样, 就获得了不一样的召回方式, 好比文章的协同过滤, 文章内容的embedding等。还有一部分是根据用户的类似性进行推荐,对于某用户推荐与其类似的其余用户看过的文章,好比用户的协同过滤和用户embedding。 还有一种思路是相似矩阵分解的思路,先计算出用户和文章的embedding以后,就能够直接算用户和文章的类似度, 根据这个类似度进行推荐, 好比YouTube DNN。 咱们下面详细来看一下每个召回方法:

YoutubeDNN召回

(这一步是直接获取用户召回的候选文章列表)

Youtubednn召回架构
image.png
关于YoutubeDNN原理和应用推荐看王喆的两篇博客:

# 获取双塔召回时的训练验证数据
# negsample指的是经过滑窗构建样本的时候,负样本的数量
def gen_data_set(data, negsample=0):
    data.sort_values("click_timestamp", inplace=True)
    item_ids = data['click_article_id'].unique()

    train_set = []
    test_set = []
    for reviewerID, hist in tqdm(data.groupby('user_id')):
        pos_list = hist['click_article_id'].tolist()
        
        if negsample > 0:
            candidate_set = list(set(item_ids) - set(pos_list))   # 用户没看过的文章里面选择负样本
            neg_list = np.random.choice(candidate_set,size=len(pos_list)*negsample,replace=True)  # 对于每一个正样本,选择n个负样本
            
        # 长度只有一个的时候,须要把这条数据也放到训练集中,否则的话最终学到的embedding就会有缺失
        if len(pos_list) == 1:
            train_set.append((reviewerID, [pos_list[0]], pos_list[0],1,len(pos_list)))
            test_set.append((reviewerID, [pos_list[0]], pos_list[0],1,len(pos_list)))
            
        # 滑窗构造正负样本
        for i in range(1, len(pos_list)):
            hist = pos_list[:i]
            
            if i != len(pos_list) - 1:
                train_set.append((reviewerID, hist[::-1], pos_list[i], 1, len(hist[::-1])))  # 正样本 [user_id, his_item, pos_item, label, len(his_item)]
                for negi in range(negsample):
                    train_set.append((reviewerID, hist[::-1], neg_list[i*negsample+negi], 0,len(hist[::-1]))) # 负样本 [user_id, his_item, neg_item, label, len(his_item)]
            else:
                # 将最长的那一个序列长度做为测试数据
                test_set.append((reviewerID, hist[::-1], pos_list[i],1,len(hist[::-1])))
                
    random.shuffle(train_set)
    random.shuffle(test_set)
    
    return train_set, test_set

# 将输入的数据进行padding,使得序列特征的长度都一致
def gen_model_input(train_set,user_profile,seq_max_len):

    train_uid = np.array([line[0] for line in train_set])
    train_seq = [line[1] for line in train_set]
    train_iid = np.array([line[2] for line in train_set])
    train_label = np.array([line[3] for line in train_set])
    train_hist_len = np.array([line[4] for line in train_set])

    train_seq_pad = pad_sequences(train_seq, maxlen=seq_max_len, padding='post', truncating='post', value=0)
    train_model_input = {"user_id": train_uid, "click_article_id": train_iid, "hist_article_id": train_seq_pad,
                         "hist_len": train_hist_len}

    return train_model_input, train_label
def youtubednn_u2i_dict(data, topk=20):    
    sparse_features = ["click_article_id", "user_id"]
    SEQ_LEN = 30 # 用户点击序列的长度,短的填充,长的截断
    
    user_profile_ = data[["user_id"]].drop_duplicates('user_id')
    item_profile_ = data[["click_article_id"]].drop_duplicates('click_article_id')  
    
    # 类别编码
    features = ["click_article_id", "user_id"]
    feature_max_idx = {}
    
    for feature in features:
        lbe = LabelEncoder()
        data[feature] = lbe.fit_transform(data[feature])
        feature_max_idx[feature] = data[feature].max() + 1
    
    # 提取user和item的画像,这里具体选择哪些特征还须要进一步的分析和考虑
    user_profile = data[["user_id"]].drop_duplicates('user_id')
    item_profile = data[["click_article_id"]].drop_duplicates('click_article_id')  
    
    user_index_2_rawid = dict(zip(user_profile['user_id'], user_profile_['user_id']))
    item_index_2_rawid = dict(zip(item_profile['click_article_id'], item_profile_['click_article_id']))
    
    # 划分训练和测试集
    # 因为深度学习须要的数据量一般都是很是大的,因此为了保证召回的效果,每每会经过滑窗的形式扩充训练样本
    train_set, test_set = gen_data_set(data, 0)
    # 整理输入数据,具体的操做能够看上面的函数
    train_model_input, train_label = gen_model_input(train_set, user_profile, SEQ_LEN)
    test_model_input, test_label = gen_model_input(test_set, user_profile, SEQ_LEN)
    
    # 肯定Embedding的维度
    embedding_dim = 16
    
    # 将数据整理成模型能够直接输入的形式
    user_feature_columns = [SparseFeat('user_id', feature_max_idx['user_id'], embedding_dim),
                            VarLenSparseFeat(SparseFeat('hist_article_id', feature_max_idx['click_article_id'], embedding_dim,
                                                        embedding_name="click_article_id"), SEQ_LEN, 'mean', 'hist_len'),]
    item_feature_columns = [SparseFeat('click_article_id', feature_max_idx['click_article_id'], embedding_dim)]
    
    # 模型的定义 
    # num_sampled: 负采样时的样本数量
    model = YoutubeDNN(user_feature_columns, item_feature_columns, num_sampled=5, user_dnn_hidden_units=(64, embedding_dim))
    # 模型编译
    model.compile(optimizer="adam", loss=sampledsoftmaxloss)  
    
    # 模型训练,这里能够定义验证集的比例,若是设置为0的话就是全量数据直接进行训练
    history = model.fit(train_model_input, train_label, batch_size=256, epochs=1, verbose=1, validation_split=0.0)
    
    # 训练完模型以后,提取训练的Embedding,包括user端和item端
    test_user_model_input = test_model_input
    all_item_model_input = {"click_article_id": item_profile['click_article_id'].values}

    user_embedding_model = Model(inputs=model.user_input, outputs=model.user_embedding)
    item_embedding_model = Model(inputs=model.item_input, outputs=model.item_embedding)
    
    # 保存当前的item_embedding 和 user_embedding 排序的时候可能可以用到,可是须要注意保存的时候须要和原始的id对应
    user_embs = user_embedding_model.predict(test_user_model_input, batch_size=2 ** 12)
    item_embs = item_embedding_model.predict(all_item_model_input, batch_size=2 ** 12)
    
    # embedding保存以前归一化一下
    user_embs = user_embs / np.linalg.norm(user_embs, axis=1, keepdims=True)
    item_embs = item_embs / np.linalg.norm(item_embs, axis=1, keepdims=True)
    
    # 将Embedding转换成字典的形式方便查询
    raw_user_id_emb_dict = {user_index_2_rawid[k]: 
                                v for k, v in zip(user_profile['user_id'], user_embs)}
    raw_item_id_emb_dict = {item_index_2_rawid[k]: 
                                v for k, v in zip(item_profile['click_article_id'], item_embs)}
    # 将Embedding保存到本地
    pickle.dump(raw_user_id_emb_dict, open(save_path + 'user_youtube_emb.pkl', 'wb'))
    pickle.dump(raw_item_id_emb_dict, open(save_path + 'item_youtube_emb.pkl', 'wb'))
    
    # faiss紧邻搜索,经过user_embedding 搜索与其类似性最高的topk个item
    index = faiss.IndexFlatIP(embedding_dim)
    # 上面已经进行了归一化,这里能够不进行归一化了
#     faiss.normalize_L2(user_embs)
#     faiss.normalize_L2(item_embs)
    index.add(item_embs) # 将item向量构建索引
    sim, idx = index.search(np.ascontiguousarray(user_embs), topk) # 经过user去查询最类似的topk个item
    
    user_recall_items_dict = collections.defaultdict(dict)
    for target_idx, sim_value_list, rele_idx_list in tqdm(zip(test_user_model_input['user_id'], sim, idx)):
        target_raw_id = user_index_2_rawid[target_idx]
        # 从1开始是为了去掉商品自己, 因此最终得到的类似商品只有topk-1
        for rele_idx, sim_value in zip(rele_idx_list[1:], sim_value_list[1:]): 
            rele_raw_id = item_index_2_rawid[rele_idx]
            user_recall_items_dict[target_raw_id][rele_raw_id] = user_recall_items_dict.get(target_raw_id, {})
                                                                    .get(rele_raw_id, 0) + sim_value
            
    user_recall_items_dict = {k: sorted(v.items(), key=lambda x: x[1], reverse=True) for k, v in user_recall_items_dict.items()}
    # 将召回的结果进行排序
    
    # 保存召回的结果
    # 这里是直接经过向量的方式获得了召回结果,相比于上面的召回方法,上面的只是获得了i2i及u2u的类似性矩阵,还须要进行协同过滤召回才能获得召回结果
    # 能够直接对这个召回结果进行评估,为了方即可以统一写一个评估函数对全部的召回结果进行评估
    pickle.dump(user_recall_items_dict, open(save_path + 'youtube_u2i_dict.pkl', 'wb'))
    return user_recall_items_dict
# 因为这里须要作召回评估,因此讲训练集中的最后一次点击都提取了出来
if not metric_recall:
    user_multi_recall_dict['youtubednn_recall'] = youtubednn_u2i_dict(all_click_df, topk=20)
else:
    trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)
    user_multi_recall_dict['youtubednn_recall'] = youtubednn_u2i_dict(trn_hist_click_df, topk=20)
    # 召回效果评估
    metrics_recall(user_multi_recall_dict['youtubednn_recall'], trn_last_click_df, topk=20)

itemCF recall

上面已经经过协同过滤,Embedding检索的方式获得了文章的类似度矩阵,下面使用协同过滤的思想,给用户召回与其历史文章类似的文章。
这里在召回的时候,也是用了关联规则的方式:

  1. 考虑类似文章与历史点击文章顺序的权重(细节看代码)
  2. 考虑文章建立时间的权重,也就是考虑类似文章与历史点击文章建立时间差的权重
  3. 考虑文章内容类似度权重(使用Embedding计算类似文章类似度,可是这里须要注意,在Embedding的时候并无计算全部商品两两之间的类似度,因此类似的文章与历史点击文章不存在类似度,须要作特殊处理)
# 基于商品的召回i2i
def item_based_recommend(user_id, user_item_time_dict, i2i_sim, sim_item_topk, recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim):
    """
        基于文章协同过滤的召回
        :param user_id: 用户id
        :param user_item_time_dict: 字典, 根据点击时间获取用户的点击文章序列   {user1: [(item1, time1), (item2, time2)..]...}
        :param i2i_sim: 字典,文章类似性矩阵
        :param sim_item_topk: 整数, 选择与当前文章最类似的前k篇文章
        :param recall_item_num: 整数, 最后的召回文章数量
        :param item_topk_click: 列表,点击次数最多的文章列表,用户召回补全
        :param emb_i2i_sim: 字典基于内容embedding算的文章类似矩阵
        
        return: 召回的文章列表 [(item1, score1), (item2, score2)...]
        
    """
    # 获取用户历史交互的文章
    user_hist_items = user_item_time_dict[user_id]
    user_hist_items_ = {user_id for user_id, _ in user_hist_items }

    item_rank = {}
    for loc, (i, click_time) in enumerate(user_hist_items):
        for j, wij in sorted(i2i_sim[i].items(), key=lambda x: x[1], reverse=True)[:sim_item_topk]:
            if j in user_hist_items_:
                continue
            
            # 文章建立时间差权重
            created_time_weight = np.exp(0.8 ** np.abs(item_created_time_dict[i] - item_created_time_dict[j]))
            # 类似文章和历史点击文章序列中历史文章所在的位置权重
            loc_weight = (0.9 ** (len(user_hist_items) - loc))
            
            content_weight = 1.0
            if emb_i2i_sim.get(i, {}).get(j, None) is not None:
                content_weight += emb_i2i_sim[i][j]
            if emb_i2i_sim.get(j, {}).get(i, None) is not None:
                content_weight += emb_i2i_sim[j][i]
                
            item_rank.setdefault(j, 0)
            item_rank[j] += created_time_weight * loc_weight * content_weight * wij
    
    # 不足10个,用热门商品补全
    if len(item_rank) < recall_item_num:
        for i, item in enumerate(item_topk_click):
            if item in item_rank.items(): # 填充的item应该不在原来的列表中
                continue
            item_rank[item] = - i - 100 # 随便给个负数就行
            if len(item_rank) == recall_item_num:
                break
    
    item_rank = sorted(item_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num]
        
    return item_rank

itemCF sim召回

# 先进行itemcf召回, 为了召回评估,因此提取最后一次点击

if metric_recall:
    trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)
else:
    trn_hist_click_df = all_click_df

user_recall_items_dict = collections.defaultdict(dict)
user_item_time_dict = get_user_item_time(trn_hist_click_df)

i2i_sim = pickle.load(open(save_path + 'itemcf_i2i_sim.pkl', 'rb'))
emb_i2i_sim = pickle.load(open(save_path + 'emb_i2i_sim.pkl', 'rb'))

sim_item_topk = 20
recall_item_num = 10
item_topk_click = get_item_topk_click(trn_hist_click_df, k=50)

for user in tqdm(trn_hist_click_df['user_id'].unique()):
    user_recall_items_dict[user] = item_based_recommend(user, user_item_time_dict, \
                                                        i2i_sim, sim_item_topk, recall_item_num, \
                                                        item_topk_click, item_created_time_dict, emb_i2i_sim)

user_multi_recall_dict['itemcf_sim_itemcf_recall'] = user_recall_items_dict
pickle.dump(user_multi_recall_dict['itemcf_sim_itemcf_recall'], open(save_path + 'itemcf_recall_dict.pkl', 'wb'))

if metric_recall:
    # 召回效果评估
    metrics_recall(user_multi_recall_dict['itemcf_sim_itemcf_recall'], trn_last_click_df, topk=recall_item_num)

embedding sim 召回

# 这里是为了召回评估,因此提取最后一次点击
if metric_recall:
    trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)
else:
    trn_hist_click_df = all_click_df

user_recall_items_dict = collections.defaultdict(dict)
user_item_time_dict = get_user_item_time(trn_hist_click_df)
i2i_sim = pickle.load(open(save_path + 'emb_i2i_sim.pkl','rb'))

sim_item_topk = 20
recall_item_num = 10

item_topk_click = get_item_topk_click(trn_hist_click_df, k=50)

for user in tqdm(trn_hist_click_df['user_id'].unique()):
    user_recall_items_dict[user] = item_based_recommend(user, user_item_time_dict, i2i_sim, sim_item_topk, 
                                                        recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim)
    
user_multi_recall_dict['embedding_sim_item_recall'] = user_recall_items_dict
pickle.dump(user_multi_recall_dict['embedding_sim_item_recall'], open(save_path + 'embedding_sim_item_recall.pkl', 'wb'))

if metric_recall:
    # 召回效果评估
    metrics_recall(user_multi_recall_dict['embedding_sim_item_recall'], trn_last_click_df, topk=recall_item_num)

userCF召回
基于用户协同过滤,核心思想是给用户推荐与其类似的用户历史点击文章,由于这里涉及到了类似用户的历史文章,这里仍然能够加上一些关联规则来给用户可能点击的文章进行加权,这里使用的关联规则主要是考虑类似用户的历史点击文章与被推荐用户历史点击商品的关系权重,而这里的关系就能够直接借鉴基于物品的协同过滤类似的作法,只不过这里是对被推荐物品关系的一个累加的过程,下面是使用的一些关系权重,及相关的代码:

  1. 计算被推荐用户历史点击文章与类似用户历史点击文章的类似度,文章建立时间差,相对位置的总和,做为各自的权重
# 基于用户的召回 u2u2i
def user_based_recommend(user_id, user_item_time_dict, u2u_sim, sim_user_topk, recall_item_num, 
                         item_topk_click, item_created_time_dict, emb_i2i_sim):
    """
        基于文章协同过滤的召回
        :param user_id: 用户id
        :param user_item_time_dict: 字典, 根据点击时间获取用户的点击文章序列   {user1: [(item1, time1), (item2, time2)..]...}
        :param u2u_sim: 字典,文章类似性矩阵
        :param sim_user_topk: 整数, 选择与当前用户最类似的前k个用户
        :param recall_item_num: 整数, 最后的召回文章数量
        :param item_topk_click: 列表,点击次数最多的文章列表,用户召回补全
        :param item_created_time_dict: 文章建立时间列表
        :param emb_i2i_sim: 字典基于内容embedding算的文章类似矩阵
        
        return: 召回的文章列表 [(item1, score1), (item2, score2)...]
    """
    # 历史交互
    user_item_time_list = user_item_time_dict[user_id]    # {item1: time1, item2: time2...}
    user_hist_items = set([i for i, t in user_item_time_list])   # 存在一个用户与某篇文章的屡次交互, 这里得去重
    
    items_rank = {}
    for sim_u, wuv in sorted(u2u_sim[user_id].items(), key=lambda x: x[1], reverse=True)[:sim_user_topk]:
        for i, click_time in user_item_time_dict[sim_u]:
            if i in user_hist_items:
                continue
            items_rank.setdefault(i, 0)
            
            loc_weight = 1.0
            content_weight = 1.0
            created_time_weight = 1.0
            
            # 当前文章与该用户看的历史文章进行一个权重交互
            for loc, (j, click_time) in enumerate(user_item_time_list):
                # 点击时的相对位置权重
                loc_weight += 0.9 ** (len(user_item_time_list) - loc)
                # 内容类似性权重
                if emb_i2i_sim.get(i, {}).get(j, None) is not None:
                    content_weight += emb_i2i_sim[i][j]
                if emb_i2i_sim.get(j, {}).get(i, None) is not None:
                    content_weight += emb_i2i_sim[j][i]
                
                # 建立时间差权重
                created_time_weight += np.exp(0.8 * np.abs(item_created_time_dict[i] - item_created_time_dict[j]))
                
            items_rank[i] += loc_weight * content_weight * created_time_weight * wuv
        
    # 热度补全
    if len(items_rank) < recall_item_num:
        for i, item in enumerate(item_topk_click):
            if item in items_rank.items(): # 填充的item应该不在原来的列表中
                continue
            items_rank[item] = - i - 100 # 随便给个复数就行
            if len(items_rank) == recall_item_num:
                break
        
    items_rank = sorted(items_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num]    
    
    return items_rank

userCF sim召回

# 这里是为了召回评估,因此提取最后一次点击
# 因为usercf中计算user之间的类似度的过程太费内存了,全量数据这里就没有跑,跑了一个采样以后的数据
if metric_recall:
    trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)
else:
    trn_hist_click_df = all_click_df
    
user_recall_items_dict = collections.defaultdict(dict)
user_item_time_dict = get_user_item_time(trn_hist_click_df)

u2u_sim = pickle.load(open(save_path + 'usercf_u2u_sim.pkl', 'rb'))

sim_user_topk = 20
recall_item_num = 10
item_topk_click = get_item_topk_click(trn_hist_click_df, k=50)

for user in tqdm(trn_hist_click_df['user_id'].unique()):
    user_recall_items_dict[user] = user_based_recommend(user, user_item_time_dict, u2u_sim, sim_user_topk, \
                                                        recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim)    

pickle.dump(user_recall_items_dict, open(save_path + 'usercf_u2u2i_recall.pkl', 'wb'))

if metric_recall:
    # 召回效果评估
    metrics_recall(user_recall_items_dict, trn_last_click_df, topk=recall_item_num)

user embedding sim召回

虽然没有直接跑usercf的计算用户之间的类似度,为了验证上述基于用户的协同过滤的代码,下面使用了YoutubeDNN过程当中产生的user embedding来进行向量检索每一个user最类似的topk个user,在使用这里获得的u2u的类似性矩阵,使用usercf进行召回,具体代码以下

# 使用Embedding的方式获取u2u的类似性矩阵
# topk指的是每一个user, faiss搜索后返回最类似的topk个user
def u2u_embdding_sim(click_df, user_emb_dict, save_path, topk):
    
    user_list = []
    user_emb_list = []
    for user_id, user_emb in user_emb_dict.items():
        user_list.append(user_id)
        user_emb_list.append(user_emb)
        
    user_index_2_rawid_dict = {k: v for k, v in zip(range(len(user_list)), user_list)}    
    
    user_emb_np = np.array(user_emb_list, dtype=np.float32)
    
    # 创建faiss索引
    user_index = faiss.IndexFlatIP(user_emb_np.shape[1])
    user_index.add(user_emb_np)
    # 类似度查询,给每一个索引位置上的向量返回topk个item以及类似度
    sim, idx = user_index.search(user_emb_np, topk) # 返回的是列表
   
    # 将向量检索的结果保存成原始id的对应关系
    user_sim_dict = collections.defaultdict(dict)
    for target_idx, sim_value_list, rele_idx_list in tqdm(zip(range(len(user_emb_np)), sim, idx)):
        target_raw_id = user_index_2_rawid_dict[target_idx]
        # 从1开始是为了去掉商品自己, 因此最终得到的类似商品只有topk-1
        for rele_idx, sim_value in zip(rele_idx_list[1:], sim_value_list[1:]): 
            rele_raw_id = user_index_2_rawid_dict[rele_idx]
            user_sim_dict[target_raw_id][rele_raw_id] = user_sim_dict.get(target_raw_id, {}).get(rele_raw_id, 0) + sim_value
    
    # 保存i2i类似度矩阵
    pickle.dump(user_sim_dict, open(save_path + 'youtube_u2u_sim.pkl', 'wb'))   
    return user_sim_dict
# 读取YoutubeDNN过程当中产生的user embedding, 而后使用faiss计算用户之间的类似度
# 这里须要注意,这里获得的user embedding其实并非很好,由于YoutubeDNN中使用的是用户点击序列来训练的user embedding,
# 若是序列广泛都比较短的话,其实效果并非很好
user_emb_dict = pickle.load(open(save_path + 'user_youtube_emb.pkl', 'rb'))
u2u_sim = u2u_embdding_sim(all_click_df, user_emb_dict, save_path, topk=10)

经过YoutubeDNN获得的user_embedding

# 使用召回评估函数验证当前召回方式的效果
if metric_recall:
    trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)
else:
    trn_hist_click_df = all_click_df

user_recall_items_dict = collections.defaultdict(dict)
user_item_time_dict = get_user_item_time(trn_hist_click_df)
u2u_sim = pickle.load(open(save_path + 'youtube_u2u_sim.pkl', 'rb'))

sim_user_topk = 20
recall_item_num = 10

item_topk_click = get_item_topk_click(trn_hist_click_df, k=50)
for user in tqdm(trn_hist_click_df['user_id'].unique()):
    user_recall_items_dict[user] = user_based_recommend(user, user_item_time_dict, u2u_sim, sim_user_topk, \
                                                        recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim)
    
user_multi_recall_dict['youtubednn_usercf_recall'] = user_recall_items_dict
pickle.dump(user_multi_recall_dict['youtubednn_usercf_recall'], open(save_path + 'youtubednn_usercf_recall.pkl', 'wb'))

if metric_recall:
    # 召回效果评估
    metrics_recall(user_multi_recall_dict['youtubednn_usercf_recall'], trn_last_click_df, topk=recall_item_num)

冷启动问题

冷启动问题能够分红三类:文章冷启动,用户冷启动,系统冷启动。

  • 文章冷启动:对于一个平台系统新加入的文章,该文章没有任何的交互记录,如何推荐给用户的问题。(对于咱们场景能够认为是,日志数据中没有出现过的文章均可以认为是冷启动的文章)
  • 用户冷启动:对于一个平台系统新来的用户,该用户尚未文章的交互信息,如何给该用户进行推荐。(对于咱们场景就是,测试集中的用户是否在测试集对应的log数据中出现过,若是没有出现过,那么能够认为该用户是冷启动用户。可是有时候并无这么严格,咱们也能够本身设定某些指标来判别哪些用户是冷启动用户,好比经过使用时长,点击率,留存率等等)
  • 系统冷启动:就是对于一个平台刚上线,尚未任何的相关历史数据,此时就是系统冷启动,其实也就是前面两种的一个综合。

当前场景下冷启动问题的分析:

对当前的数据进行分析会发现,日志中全部出现过的点击文章只有3w多个,而整个文章库中却有30多万,那么测试集中的用户最后一次点击是否会点击没有出如今日志中的文章呢?若是存在这种状况,说明用户点击的文章以前没有任何的交互信息,这也就是咱们所说的文章冷启动。经过数据分析还能够发现,测试集用户只有一次点击的数据占得比例还很多,其实仅仅经过用户的一次点击就给用户推荐文章使用模型的方式也是比较难的,这里其实也能够考虑用户冷启动的问题,可是这里只给出物品冷启动的一些解决方案及代码,关于用户冷启动的话提一些可行性的作法。

  1. 文章冷启动(没有冷启动的探索问题)
    其实咱们这里不是为了作文章的冷启动而作冷启动,而是猜想用户可能会点击一些没有在log数据中出现的文章,咱们要作的就是如何从将近27万的文章中选择一些文章做为用户冷启动的文章,这里其实也能够当作是一种召回策略,咱们这里就采用简单的比较好理解的基于规则的召回策略来获取用户可能点击的未出如今log数据中的文章。
    如今的问题变成了:如何给每一个用户考虑从27万个商品中获取一小部分商品?随机选一些多是一种方案。下面给出一些参考的方案。

    1. 首先基于Embedding召回一部分与用户历史类似的文章
    2. 从基于Embedding召回的文章中经过一些规则过滤掉一些文章,使得留下的文章用户更可能点击。咱们这里的规则,能够是,留下那些与用户历史点击文章主题相同的文章,或者字数相差不大的文章。而且留下的文章尽可能是与测试集用户最后一次点击时间更接近的文章,或者是当天的文章也行。
  2. 用户冷启动
    这里对测试集中的用户点击数据进行分析会发现,测试集中有百分之20的用户只有一次点击,那么这些点击特别少的用户的召回是否是能够单独作一些策略上的补充呢?或者是在排序后直接基于规则加上一些文章呢?这些均可以去尝试,这里没有提供具体的作法。

注意:

这里看似和基于embedding计算的item之间类似度而后作itemcf是一致的,可是如今咱们的目的不同,咱们这里的目的是找到类似的向量,而且尚未出如今log日志中的商品,再加上一些其余的冷启动的策略,这里须要找回的数量会偏多一点,否则被筛选完以后可能都没有文章了

# 先进行itemcf召回,这里不须要作召回评估,这里只是一种策略
trn_hist_click_df = all_click_df

user_recall_items_dict = collections.defaultdict(dict)
user_item_time_dict = get_user_item_time(trn_hist_click_df)
i2i_sim = pickle.load(open(save_path + 'emb_i2i_sim.pkl','rb'))

sim_item_topk = 150
recall_item_num = 100 # 稍微召回多一点文章,便于后续的规则筛选

item_topk_click = get_item_topk_click(trn_hist_click_df, k=50)
for user in tqdm(trn_hist_click_df['user_id'].unique()):
    user_recall_items_dict[user] = item_based_recommend(user, user_item_time_dict, i2i_sim, sim_item_topk, 
                                                        recall_item_num, item_topk_click,item_created_time_dict, emb_i2i_sim)
pickle.dump(user_recall_items_dict, open(save_path + 'cold_start_items_raw_dict.pkl', 'wb'))
# 基于规则进行文章过滤
# 保留文章主题与用户历史浏览主题类似的文章
# 保留文章字数与用户历史浏览文章字数相差不大的文章
# 保留最后一次点击当天的文章
# 按照类似度返回最终的结果

def get_click_article_ids_set(all_click_df):
    return set(all_click_df.click_article_id.values)

def cold_start_items(user_recall_items_dict, user_hist_item_typs_dict, user_hist_item_words_dict, \
                     user_last_item_created_time_dict, item_type_dict, item_words_dict, 
                     item_created_time_dict, click_article_ids_set, recall_item_num):
    """
        冷启动的状况下召回一些文章
        :param user_recall_items_dict: 基于内容embedding类似性召回来的不少文章, 字典, {user1: [item1, item2, ..], }
        :param user_hist_item_typs_dict: 字典, 用户点击的文章的主题映射
        :param user_hist_item_words_dict: 字典, 用户点击的历史文章的字数映射
        :param user_last_item_created_time_idct: 字典,用户点击的历史文章建立时间映射
        :param item_tpye_idct: 字典,文章主题映射
        :param item_words_dict: 字典,文章字数映射
        :param item_created_time_dict: 字典, 文章建立时间映射
        :param click_article_ids_set: 集合,用户点击过得文章, 也就是日志里面出现过的文章
        :param recall_item_num: 召回文章的数量, 这个指的是没有出如今日志里面的文章数量
    """
    
    cold_start_user_items_dict = {}
    for user, item_list in tqdm(user_recall_items_dict.items()):
        cold_start_user_items_dict.setdefault(user, [])
        for item, score in item_list:
            # 获取历史文章信息
            hist_item_type_set = user_hist_item_typs_dict[user]
            hist_mean_words = user_hist_item_words_dict[user]
            hist_last_item_created_time = user_last_item_created_time_dict[user]
            hist_last_item_created_time = datetime.fromtimestamp(hist_last_item_created_time)
            
            # 获取当前召回文章的信息
            curr_item_type = item_type_dict[item]
            curr_item_words = item_words_dict[item]
            curr_item_created_time = item_created_time_dict[item]
            curr_item_created_time = datetime.fromtimestamp(curr_item_created_time)

            # 首先,文章不能出如今用户的历史点击中, 而后根据文章主题,文章单词数,文章建立时间进行筛选
            if curr_item_type not in hist_item_type_set or \
                item in click_article_ids_set or \
                abs(curr_item_words - hist_mean_words) > 200 or \
                abs((curr_item_created_time - hist_last_item_created_time).days) > 90: 
                continue
                
            cold_start_user_items_dict[user].append((item, score))      # {user1: [(item1, score1), (item2, score2)..]...}
    
    # 须要控制一下冷启动召回的数量
    cold_start_user_items_dict = {k: sorted(v, key=lambda x:x[1], reverse=True)[:recall_item_num] \
                                  for k, v in cold_start_user_items_dict.items()}
    
    pickle.dump(cold_start_user_items_dict, open(save_path + 'cold_start_user_items_dict.pkl', 'wb'))
    
    return cold_start_user_items_dict
all_click_df_ = all_click_df.copy()
all_click_df_ = all_click_df_.merge(item_info_df, how='left', on='click_article_id')
user_hist_item_typs_dict, user_hist_item_ids_dict, user_hist_item_words_dict, user_last_item_created_time_dict = get_user_hist_item_info_dict(all_click_df_)
click_article_ids_set = get_click_article_ids_set(all_click_df)
# 须要注意的是
# 这里使用了不少规则来筛选冷启动的文章,因此前面再召回的阶段就应该尽量的多召回一些文章,不然很容易被删掉
cold_start_user_items_dict = cold_start_items(user_recall_items_dict, user_hist_item_typs_dict, user_hist_item_words_dict, 
                                              user_last_item_created_time_dict, item_type_dict, item_words_dict, 
                                              item_created_time_dict, click_article_ids_set, recall_item_num)

user_multi_recall_dict['cold_start_recall'] = cold_start_user_items_dict

多路召回合并

多路召回合并就是将前面全部的召回策略获得的用户文章列表合并起来,下面是对前面全部召回结果的汇总

  1. 基于itemcf计算的item之间的类似度sim进行的召回
  2. 基于embedding搜索获得的item之间的类似度进行的召回
  3. YoutubeDNN召回
  4. YoutubeDNN获得的user之间的类似度进行的召回
  5. 基于冷启动策略的召回

注意:
在作召回评估的时候就会发现有些召回的效果不错有些召回的效果不好,因此对每一路召回的结果,咱们能够认为的定义一些权重,来作最终的类似度融合

def combine_recall_results(user_multi_recall_dict, weight_dict=None, topk=25):
    final_recall_items_dict = {}
    
    # 对每一种召回结果按照用户进行归一化,方便后面多种召回结果,相同用户的物品之间权重相加
    def norm_user_recall_items_sim(sorted_item_list):
        # 若是冷启动中没有文章或者只有一篇文章,直接返回,出现这种状况的缘由多是冷启动召回的文章数量太少了,
        # 基于规则筛选以后就没有文章了, 这里还能够作一些其余的策略性的筛选
        if len(sorted_item_list) < 2:
            return sorted_item_list
        
        min_sim = sorted_item_list[-1][1]
        max_sim = sorted_item_list[0][1]
        
        norm_sorted_item_list = []
        for item, score in sorted_item_list:
            if max_sim > 0:
                norm_score = 1.0 * (score - min_sim) / (max_sim - min_sim) if max_sim > min_sim else 1.0
            else:
                norm_score = 0.0
            norm_sorted_item_list.append((item, norm_score))
            
        return norm_sorted_item_list
    
    print('多路召回合并...')
    for method, user_recall_items in tqdm(user_multi_recall_dict.items()):
        print(method + '...')
        # 在计算最终召回结果的时候,也能够为每一种召回结果设置一个权重
        if weight_dict == None:
            recall_method_weight = 1
        else:
            recall_method_weight = weight_dict[method]
        
        for user_id, sorted_item_list in user_recall_items.items(): # 进行归一化
            user_recall_items[user_id] = norm_user_recall_items_sim(sorted_item_list)
        
        for user_id, sorted_item_list in user_recall_items.items():
            # print('user_id')
            final_recall_items_dict.setdefault(user_id, {})
            for item, score in sorted_item_list:
                final_recall_items_dict[user_id].setdefault(item, 0)
                final_recall_items_dict[user_id][item] += recall_method_weight * score  
    
    final_recall_items_dict_rank = {}
    # 多路召回时也能够控制最终的召回数量
    for user, recall_item_dict in final_recall_items_dict.items():
        final_recall_items_dict_rank[user] = sorted(recall_item_dict.items(), key=lambda x: x[1], reverse=True)[:topk]

    # 将多路召回后的最终结果字典保存到本地
    pickle.dump(final_recall_items_dict, open(os.path.join(save_path, 'final_recall_items_dict.pkl'),'wb'))

    return final_recall_items_dict_rank
# 这里直接对多路召回的权重给了一个相同的值,其实能够根据前面召回的状况来调整参数的值
weight_dict = {'itemcf_sim_itemcf_recall': 1.0,
               'embedding_sim_item_recall': 1.0,
               'youtubednn_recall': 1.0,
               'youtubednn_usercf_recall': 1.0, 
               'cold_start_recall': 1.0}
               
               
# 最终合并以后每一个用户召回150个商品进行排序
final_recall_items_dict_rank = combine_recall_results(user_multi_recall_dict, weight_dict, topk=150)

总结

上述实现了以下召回策略:

  1. 基于关联规则的itemcf
  2. 基于关联规则的usercf
  3. youtubednn召回
  4. 冷启动召回

对于上述实现的召回策略其实都不是最优的结果,咱们只是作了个简单的尝试,其中还有不少地方能够优化,包括已经实现的这些召回策略的参数或者新加一些,修改一些关联规则均可以。固然还能够尝试更多的召回策略,好比对新闻进行热度召回等等。

Reference

  1. 重读Youtube深度学习推荐系统论文,字字珠玑,惊为神文
  2. YouTube深度学习推荐系统的十大工程问题
  3. YouTubeDNN原理
  4. Word2Vec知乎众赞文章 — word2vec放到排序中的w2v的介绍部分
相关文章
相关标签/搜索