实时计算可以及时捕获用户短时兴趣,同时可以快速反馈分发当前系统的用户兴趣内容。大量实践以及发表的文章都显示了推荐系统实时化,对推荐精准度的提高的有效性和必要性。php
实时推荐相关工做很是多,腾讯和北大合做的两篇SIGMOD文章是比较实际和详细的实现,采用的计算框架可以支持大规模数据的实时推荐,如下将会分开简述如下两篇文章。python
Huang发表了基于Storm和KV存储的大规模实时推荐系统 (TencentRec: Real-time Stream Recommendation in Practice)git
腾讯采用使用storm缘由,支持实时数据流式计算,良好的可扩展性、可容错性,采用简单编程模型。
文章核心包括实时增量计算的ItemCF,以及用户隐式反馈计算、实时剪枝算法、基于用户画像的数据稀疏性策略。应用在多个业务上都有不一样程度的提高,最明显的是腾讯视频的全局表现提高高达30%。redis
全文核心应该是下图六道公式,阐述腾讯如何具体实现的增量itemcf。算法
文章中的co-rating,其实就是咱们常说的user bias. 公式3和4解决了用户隐式反馈问题,细节的计算能够参考2016的文章,实际是一个log函数融合了用户的浏览、点击、分享、购买等行为,转化成rating.apache
请注意公式4,因为他们定义了corating,实际是将类似度的增量计算从L2范数的计算转化成了L1范数计算.(当Rup取x的时候,y=1/x)。编程
可扩展的增量计算数据结构
腾讯视频的推荐应用(Real-time Video Recommendation Exploration)架构
在咱们看来,全文核心在于实时计算的数据流转,以下图所示:app
基于storm的实时计算topology图:
糖豆总体推荐框架,从离线,近线,在线三套计算流程组合而成。在线流程基于Spark Streaming框架实现,部署在近线集群。 在线推荐框架实时根据用户行为,生成实时推荐列表,从而知足用户瞬时兴趣,提升推荐系统的推荐新鲜度。简单架构图以下:
实时计算流程以下图所示:
分解步骤:
python实现:
if __name__ == "__main__": print sys.argv reload(sys) sys.setdefaultencoding('utf-8') sc = SparkContext(appName="real_time_etl") #20秒 ssc = StreamingContext(sc, 15) brokers = "kafka-servers:9092" topic = "logstash" #读取kafka kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) #解析日志、过滤无关数据、读取类似视频 lines = kvs.map(lambda x : readJson(x[1])).filter(lambda x: x is not None).map(lambda x: getTopkfromRedis(x)) #lines.pprint() #写入推荐结果 lines.foreachRDD(lambda rdd: list2Redis(rdd)) ssc.start() ssc.awaitTermination()
部署在集群Master节点的监控脚本会每30s扫描一次实时计算代码进程,若是发现进程被failed,会自动拉起实时计算Spark Steaming进程。若是进程拉起失败会触发邮件、短信报警
#! /bin/sh MOBILE="your phone numbers" RT_HOME=/home/realtime/recommend.py DIR=/data/rtdamon PID_FILE=$DIR/.run/rt-litetl-damon.pid LOG_FILE=$DIR/.log/rt-litetl-damon.log t=$(date -d "today" +"%Y-%m-%d %H:%M:%S") source /etc/profile echo $PID_FILE $LOG_FILE if [ -e "$PID_FILE" ]; then pid=`cat $PID_FILE` echo $pid damon_process_exists=`ps v -p $pid | grep "rt-litetl-damon.sh" | grep -v grep|grep -v \<defunct\> ` echo "damon process exists : $process_exists" if [ -n "$damon_process_exists" ] then echo "Process rt-litetl-damon.sh is running! $t" >> $LOG_FILE exit fi fi pid=$$ echo "$pid" > $PID_FILE while : do process_exists=`ps -ef|grep "$RT_HOME"|grep "spark"|grep -v grep|wc -l` echo "process exists : $process_exists" >>$LOG_FILE if [ "$process_exists" == "0" ]; then /hadoop/spark/bin/spark-submit --master yarn --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 --py-files /hadoop/user/rt/redis.zip --num-executors 10 --executor-cores 7 --executor-memory 6g /home/realtime/recommend.py>>/data/rtlog/rtrecommed.log 2>&1 & /usr/bin/php -f /data/rtdamon/yunsms.class.php "$MOBILE" "recommend.py" echo "realtime recommendation process already restarted at $t" >> $LOG_FILE fi #sleep `expr 3600 \* 3` sleep `expr 60 \* 1` done
根据咱们的AB测试数据来看,总体CTR提高25%。用推荐系统的A版对比无推荐的B版,用户观看时长提高47%。