点击流(Click Stream)是指用户在网站上持续访问的轨迹。这个概念更注重用户浏览网站的整个流程。用户对网站的每次访问包含了一系列的点击动做行为,这些点击行为数据就构成了点击流数据(Click Stream Data),它表明了用户浏览网站的整个流程。java
点击流和网站日志是两个不一样的概念。node
点击流是从用户的角度出发,注重用户浏览网站的整个流程;jquery
网站日志是面向整个站点,它包含了用户行为数据、服务器响应数据等众多日志信息,咱们经过对网站日志的分析能够得到用户的点击流数据。nginx
网站是由多个网页(Page)构成,当用户在访问多个网页时,网页与网页之间是靠Referrers参数来标识上级网页来源。由此,能够肯定网页被依次访问的顺序,固然也能够经过时间来标识访问的次序。其次,用户对网站的每次访问,可视做是一次会话(Session),在网站日志中将会用不一样的Sessionid来惟一标识每次会话。若是把 Page 视为“点”的话,那么咱们能够很容易的把 Session 描绘成一条“线”,也就是用户的点击流数据轨迹曲线。git
图:点击流概念模型 web
点击流数据在具体操做上是由散点状的点击日志数据梳理所得。点击数据在数据建模时存在两张模型表Pageviews和visits,例如: 正则表达式
页面点击流模型 Pageviews 表 apache
Sessionjson |
时间 浏览器 |
|
访问页面 URL |
停留时长 |
第几步 |
S001 |
2012-01-01 12: |
31:12 |
/a/.... |
30 |
1 |
S002 |
2012-01-01 12: |
31:16 |
/a/.... |
10 |
1 |
S002 |
2012-01-01 12: |
31:26 |
/b/.... |
10 |
2 |
S002 |
2012-01-01 12: |
31:36 |
/e/.... |
30 |
3 |
S003 |
2012-01-01 15: |
35:06 |
/a/.... |
30 |
1 |
点击流模型 Visits 表(按 session 汇集的页面访问信息)
Session |
起始时间 |
结束时间 |
进 入页面 |
离 开页面 |
访问页面数 |
IP |
referal |
S001 |
2012-01-01 12:1:12 |
2012-01-01 12:1:12 |
/a/... |
/a/... |
1 |
101.0.0.1 |
somesite.com |
S002 |
2012-01-01 12:31:16 |
2012-01-01 12:35:06 |
/a/... |
/e/... |
3 |
201.0.0.2 |
- |
S003 |
2012-01-01 12:35:42 |
2012-01-01 12:35:42 |
/c/... |
/c/... |
1 |
234.0.0.3 |
baidu.com |
S004 |
2012-01-01 15:16:39 |
2012-01-01 15:19:23 |
/c/... |
/e/... |
3 |
101.0.0.1 |
google.com |
…… |
…… |
…… |
…… |
…… |
…… |
…… |
…… |
流量分析总体来讲是一个内涵很是丰富的体系,总体过程是一个金字塔结构:
金字塔的顶部是网站的目标:投资回报率(ROI)。
流量对于每一个网站来讲都是很重要,但流量并非越多越好,应该更加看重流量的质量,换句话来讲就是流量能够为咱们带来多少收入。
X 轴表明量,指网站得到的访问量。Y 轴表明质,指能够促进网站目标的事件次数(好比商品浏览、注册、购买等行为)。圆圈大小表示得到流量的成本。
BD 流量是指商务拓展流量。通常指的是互联网通过运营或者竞价排名等方式,从外部拉来的流量。好比电商网站在百度上花钱来竞价排名,产生的流量就是 BD 流量的一部分。
细分是指经过不一样维度对指标进行分割,查看同一个指标在不一样维度下的表现,进而找出有问题的那部分指标,对这部分指标进行优化。
对于全部网站来讲,页面均可以被划分为三个类别:导航页、功能页、内容页
导航页的目的是引导访问者找到信息,功能页的目的是帮助访问者完成特定任务,内容页的目的是向访问者展现信息并帮助访问者进行决策。
首页和列表页都是典型的导航页,站内搜索页面、注册表单页面和购物车页面都是典型的功能页,而产品详情页、新闻和文章页都是典型的内容页。
好比从内容导航分析中,如下两类行为就是网站运营者不但愿看到的行为:
第一个问题:访问者从导航页(首页)尚未看到内容页面以前就从导航页离开网站,须要分析导航页形成访问者中途离开的缘由。
第二个问题:访问者从导航页进入内容页后,又返回到导航页,说明须要分
析内容页的最初设计,并考虑中内容页提供交叉的信息推荐。
所谓转化,即网站业务流程中的一个封闭渠道,引导用户按照流程最终实现业务目标(好比商品成交);而漏斗模型则是指进入渠道的用户在各环节递进过程当中逐渐流失的形象描述;
对于转化渠道,主要进行两部分的分析:
访问者的流失和迷失
阻力的流失
形成流失的缘由不少,如:不恰当的商品或活动推荐对支付环节中专业名词的解释、帮助信息等内容不当
迷失
形成迷失的主要缘由是转化流量设计不合理,访问者在特定阶段得不到须要的信息,而且不能根据现有的信息做出决策,好比在线购买演唱会门票,直到支付也没看到在线选座的提示,这时候就极可能会产生迷失,返回查看。
总之,网站数据分析是一门内容很是丰富的学科,本课程中主要关注网站流量分析过程当中的技术运用,更多关于网站数据分析的业务知识可学习文档首页推荐的资料。
指标是网站分析的基础,用来记录和衡量访问者在网站自的各类行为。好比咱们常常说的流量就是一个网站指标,它是用来衡量网站得到的访问量。在进行流量分析以前,咱们先来了解一些常见的指标。
IP:1 天以内,访问网站的不重复 IP 数。一天内相同 IP 地址屡次访问网站只被计算 1 次。曾经 IP 指标能够用来表示用户访问身份,目前则更多的用来获取访问者的地理位置信息。
PageView 浏览量: 即一般说的 PV 值,用户每打开 1 个网站页面,记录 1 个
PV。用户屡次打开同一页面 PV 累计屡次。通俗解释就是页面被加载的总次数。
Unique PageView: 1 天以内,访问网站的不重复用户数(以浏览器 cookie 为依据),一天内同一访客屡次访问网站只被计算 1 次。
访问次数:访客从进入网站到离开网站的一系列活动记为一次访问,也称会话(session),1 次访问(会话)可能包含多个 PV。
网站停留时间:访问者在网站上花费的时间。
页面停留时间:访问者在某个特定页面或某组网页上所花费的时间。
人均浏览页数:平均每一个独立访客产生的 PV。人均浏览页数=浏览次数/独立访客。体现网站对访客的吸引程度。
跳出率:指某一范围内单页访问次数或访问者与总访问次数的百分比。其中跳出指单页访问或访问者的次数,即在一次访问中访问者进入网站后只访问了一个页面就离开的数量。
退出率:指某一范围内退出的访问者与综合访问量的百分比。其中退出指访问者离开网站的次数,一般是基于某个范围的。
有了上述这些指标以后,就能结合业务进行各类不一样角度的分类分析,主要是如下几大方面:
趋势分析:根据选定的时段,提供网站流量数据,经过流量趋势变化形态,分析网站访客的访问规律、网站发展情况提供参考。
对比分析:根据选定的两个对比时段,提供网站流量在时间上的纵向对比报表,帮您发现网站发展情况、发展规律、流量变化率等。
当前在线:提供当前时刻站点上的访客量,以及最近 15 分钟流量、来源、受访、访客变化状况等,方便用户及时了解当前网站流量情况。
访问明细:提供最近 7 日的访客访问记录,可按每一个 PV 或每次访问行为(访客的每次会话)显示,并可按照来源、搜索词等条件进行筛选。 经过访问明细,用户能够详细了解网站流量的累计过程,从而为用户快速找出流量变更缘由提供最原始、最准确的依据。
来源分类:提供不一样来源形式(直接输入、搜索引擎、其余外部连接、站内来源)、不一样来源项引入流量的比例状况。经过精确的量化数据,帮助用户分析什么类型的来路产生的流量多、效果好,进而合理优化推广方案。
搜索引擎:提供各搜索引擎以及搜索引擎子产品引入流量的比例状况。
搜索词:提供访客经过搜索引擎进入网站所使用的搜索词,以及各搜索词引入流量的特征和分布。帮助用户了解各搜索词引入流量的质量,进而了解访客的兴趣关注点、网站与访客兴趣点的匹配度,为优化 SEO(搜索引擎优化)方案及 SEM(搜索引擎营销)提词方案提供详细依据。
最近 7 日的访客搜索记录:可按每一个 PV 或每次访问行为(访客的每次会话)显示,并可按照访客类型、地区等条件进行筛选。为您搜索引擎优化提供最详细的原始数据。
来路域名:提供具体来路域名引入流量的分布状况,并可按“社会化媒体”、“搜索引擎”、“邮箱”等网站类型对来源域名进行分类。 帮助用户了解哪类推广渠道产生的流量多、效果好,进而合理优化网站推广方案。
来路页面:提供具体来路页面引入流量的分布状况。 尤为对于经过流量置换、包广告位等方式从其余网站引入流量的用户,该功能能够方便、清晰地展示广告引入的流量及效果,为优化推广方案提供依据。
来源升降榜:提供开通统计后任意两日的 TOP10000 搜索词、来路域名引入流量的对比状况,并按照变化的剧烈程度提供排行榜。 用户可经过此功能快速找到哪些来路对网站流量的影响比较大,从而及时排查相应来路问题。
受访域名:提供访客对网站中各个域名的访问状况。 通常状况下,网站不一样域名提供的产品、内容各有差别,经过此功能用户能够了解不一样内容的受欢迎程度以及网站运营成效。
受访页面:提供访客对网站中各个页面的访问状况。 站内入口页面为访客进入网站时浏览的第一个页面,若是入口页面的跳出率较高则须要关注并优化;站内出口页面为访客访问网站的最后一个页面,对于离开率较高的页面须要关注并优化。
受访升降榜:提供开通统计后任意两日的 TOP10000 受访页面的浏览状况对比,并按照变化的剧烈程度提供排行榜。 可经过此功能验证通过改版的页面是否有流量提高或哪些页面有巨大流量波动,从而及时排查相应问题。
热点图:记录访客在页面上的鼠标点击行为,经过颜色区分不一样区域的点击热度;支持将一组页面设置为"关注范围",并可按来路细分点击热度。 经过访客在页面上的点击量统计,能够了解页面设计是否合理、广告位的安排可否获取更多佣金等。
用户视点:提供受访页面对页面上连接的其余站内页面的输出流量,并经过输出流量的高低绘制热度图,与热点图不一样的是,全部记录都是实际打开了下一页面产生了浏览次数(PV)的数据,而不只仅是拥有鼠标点击行为。
访问轨迹:提供观察焦点页面的上下游页面,了解访客从哪些途径进入页面,又流向了哪里。 经过上游页面列表比较出不一样流量引入渠道的效果;经过下游页面列表了解用户的浏览习惯,哪些页面元素、内容更吸引访客点击。
地区运营商:提供各地区访客、各网络运营商访客的访问状况分布。 地方网站、下载站等与地域性、网络链路等结合较为紧密的网站,能够参考此功能数据,合理优化推广运营方案。
终端详情:提供网站访客所使用的浏览终端的配置状况。 参考此数据进行网页设计、开发,可更好地提升网站兼容性,以达到良好的用户交互体验。
新老访客:当日访客中,历史上第一次访问该网站的访客记为当日新访客;历史上已经访问过该网站的访客记为老访客。 新访客与老访客进入网站的途径和浏览行为每每存在差别。该功能能够辅助分析不一样访客的行为习惯,针对不一样访客优化网站,例如为制做新手导航提供数据支持等。
忠诚度:从访客一天内回访网站的次数(日访问频度)与访客上次访问网站的时间两个角度,分析访客对网站的访问粘性、忠诚度、吸引程度。 因为提高网站内容的更新频率、加强用户体验与用户价值能够有更高的忠诚度,所以该功能在网站内容更新及用户体验方面提供了重要参考。
活跃度:从访客单次访问浏览网站的时间与网页数两个角度,分析访客在网站上的活跃程度。 因为提高网站内容的质量与数量能够得到更高的活跃度,所以该功能是网站内容分析的关键指标之一。
转化定义:
访客在您的网站完成了某项您指望的活动,记为一次转化,如注册、下载、购买。
目标示例:
·得到用户目标:在线注册、建立帐号等。
·咨询目标:咨询、留言、电话等。
·互动目标:视频播放、加入购物车、分享等。
·收入目标:在线订单、付款等。
路径分析:
根据设置的特定路线,监测某一流程的完成转化状况,算出每步的转换率和流失率数据,
如注册流程,购买流程等。
转化类型:
l 页面
l 事件
网站流量日志数据分析是一个纯粹的数据分析项目,其总体流程基本上就是
依据数据的处理流程进行。有如下几个大的步骤:
数据采集
数据采集概念,目前行业会有两种解释:一是数据从无到有的过程(web服务器打印的日志、自定义采集的日志等)叫作数据采集;另外一方面也有把经过使用Flume等工具把数据采集到指定位置的这个过程叫作数据采集。
关于具体含义要结合语境具体分析,明白语境中具体含义便可。
数据预处理
经过mapreduce程序对采集到的原始日志数据进行预处理,好比清洗,格式
整理,滤除脏数据等,而且梳理成点击流模型数据。
数据入库
将预处理以后的数据导入到HIVE仓库中相应的库和表中。
数据分析
项目的核心内容,即根据需求开发ETL分析语句,得出各类统计结果。
数据展示
将分析所得数据进行数据可视化,通常经过图表进行展现。
相对于传统的BI数据处理,流程几乎差很少,可是由于是处理大数据,因此流程中各环节所使用的技术则跟传统BI彻底不一样:
数据采集:定制开发采集程序,或使用开源框架Flume
数据预处理:定制开发mapreduce程序运行于hadoop集群数据仓库技术:基于hadoop之上的Hive
数据导出:基于hadoop的sqoop数据导入导出工具数据可视化:定制开发web程序(echarts)
整个过程的流程调度:hadoop生态圈中的azkaban工具
其中,须要强调的是:系统的数据分析不是一次性的,而是按照必定的时间频率反复计算,于是整个处理链条中的各个环节须要按照必定的前后依赖关系紧密衔接,即涉及到大量任务单元的管理调度,因此,项目中须要添加一个任务调度模块。
数据展示的目的是将分析所得的数据进行可视化,以便运营决策人员能更方便地获取数据,更快更简单地理解数据。
市面上有许多开源的数据可视化软件、工具。好比Echarts.
在网站web流量日志分析这种场景中,对数据采集部分的可靠性、容错能力要求一般不会很是严苛,所以使用通用的 flume 日志采集框架彻底能够知足需
求。
Flume 采集系统的搭建相对简单:
一、在服务器上部署 agent 节点,修改配置文件
二、启动 agent 节点,将采集到的数据汇聚到指定的 HDFS 目录中
三、针对nginx日志生成场景,若是经过flume(1.6)收集,不管是Spooling Directory Source和Exec Source均不能知足动态实时收集的需求,在当前flume1.7稳定版本中,提供了一个很是好用的TaildirSource,使用这个source,能够监控一个目录,而且使用正则表达式匹配该目录中的文件名进行实时收集。
核心配置以下:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources = r1 a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.positionFile = /root/logs/taildir_position.json a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /root/logs/example.log a1.sources.r1.filegroups.f2 = /root/logs/toupload/.*log.* # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/ a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.rollInterval = 3 a1.sinks.k1.hdfs.rollSize = 20 a1.sinks.k1.hdfs.rollCount = 5 a1.sinks.k1.hdfs.batchSize = 1 a1.sinks.k1.hdfs.useLocalTimeStamp = true #生成的文件类型,默认是 Sequencefile,可用 DataStream,则为普通文本 a1.sinks.k1.hdfs.fileType = DataStream # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 制做log命令: ## while true; do echo example... >> /root/logs/example.log; echo access... >> /root/logs/toupload/access.log.1;sleep 0.3;done 启动命令: bin/flume-ng agent -c conf/ -f conf/kkkk.conf -n a1 -Dflume.root.logger=INFO,console
filegroups:指定filegroups,能够有多个,以空格分隔;(TailSource能够同时监控 tail多个目录中的文件)
positionFile:配置检查点文件的路径,检查点文件会以json格式保存已经tail文件的位置,解决了断点不能续传的缺陷。
filegroups.<filegroupName>:配置每一个filegroup的文件绝对路径,文件名能够用正则表达式匹配。
经过以上配置,就能够监控文件内容的增长和文件的增长。产生和所配置的文件名正则表达式不匹配的文件,则不会被tail。
58.215.204.118 - - [18/Sep/2013:06:51:35 +0000] "GET /wp-includes/js/jquery/jquery.js?ver=1.10.2 HTTP/1.1" 304 0 "http://blog.fens.me/nodejs-socketio-chat/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0"
字段解析:
访客ip地址: 58.215.204.118
访客用户信息: - -
请求时间:[18/Sep/2013:06:51:35 +0000]
请求方式:GET
请求的url:/wp-includes/js/jquery/jquery.js?ver=1.10.2
请求所用协议:HTTP/1.1
响应码:304
返回的数据流量:0
访客的来源url:http://blog.fens.me/nodejs-socketio-chat/
访客所用浏览器:Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101
Firefox/23.0
过滤“不合规”数据,清洗无心义的数据格式转换和规整根据后续的统计需求,过滤分离出各类不一样主题(不一样栏目 path)的基础数据。
import java.io.IOException; import java.util.HashSet; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 处理原始日志,过滤出真实pv请求 转换时间格式 对缺失字段填充默认值 对记录标记valid和invalid * */ public class WeblogPreProcess { static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> { // 用来存储网站url分类数据 Set<String> pages = new HashSet<String>(); Text k = new Text(); NullWritable v = NullWritable.get(); /** * 从外部配置文件中加载网站的有用url分类数据 存储到maptask的内存中,用来对日志数据进行过滤 */ @Override protected void setup(Context context) throws IOException, InterruptedException { pages.add("/about"); pages.add("/black-ip-list/"); pages.add("/cassandra-clustor/"); pages.add("/finance-rhive-repurchase/"); pages.add("/hadoop-family-roadmap/"); pages.add("/hadoop-hive-intro/"); pages.add("/hadoop-zookeeper-intro/"); pages.add("/hadoop-mahout-roadmap/"); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); WebLogBean webLogBean = WebLogParser.parser(line); if (webLogBean != null) { // 过滤 WebLogParser.filtStaticResource(webLogBean, pages); /* if (!webLogBean.isValid()) return; */ k.set(webLogBean.toString()); context.write(k, v); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(WeblogPreProcess.class); job.setMapperClass(WeblogPreProcessMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // FileInputFormat.setInputPaths(job, new Path(args[0])); // FileOutputFormat.setOutputPath(job, new Path(args[1])); FileInputFormat.setInputPaths(job, new Path("d:/weblog/input")); FileOutputFormat.setOutputPath(job, new Path("d:/weblog/output")); job.setNumReduceTasks(0); boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }
import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Locale; import java.util.Set; public class WebLogParser { //194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)" public static WebLogBean parser(String line) { WebLogBean webLogBean = new WebLogBean(); String[] arr = line.split(" "); if (arr.length > 11) { webLogBean.setRemote_addr(arr[0]); webLogBean.setRemote_user(arr[1]); String time_local = formatDate(arr[3].substring(1)); if(null==time_local || "".equals(time_local)) time_local="-invalid_time-"; webLogBean.setTime_local(time_local); webLogBean.setRequest(arr[6]); webLogBean.setStatus(arr[8]); webLogBean.setBody_bytes_sent(arr[9]); webLogBean.setHttp_referer(arr[10]); //若是useragent元素较多,拼接useragent if (arr.length > 12) { StringBuilder sb = new StringBuilder(); for(int i=11;i<arr.length;i++){ sb.append(arr[i]); } webLogBean.setHttp_user_agent(sb.toString()); } else { webLogBean.setHttp_user_agent(arr[11]); } if (Integer.parseInt(webLogBean.getStatus()) >= 400) {// 大于400,HTTP错误 webLogBean.setValid(false); } if("-invalid_time-".equals(webLogBean.getTime_local())){ webLogBean.setValid(false); } } else { webLogBean=null; } return webLogBean; } public static void filtStaticResource(WebLogBean bean, Set<String> pages) { if (!pages.contains(bean.getRequest())) { bean.setValid(false); } } //格式化时间方法 public static String formatDate(String time_local) { // 18/Sep/2013:06:49:18 SimpleDateFormat df1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US); SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US); try { return df2.format(df1.parse(time_local));//dfs2=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US); } catch (ParseException e) { return null; } } }
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class WebLogBean implements Writable { private boolean valid = true;// 判断数据是否合法 private String remote_addr;// 记录客户端的ip地址 private String remote_user;// 记录客户端用户名称,忽略属性"-" private String time_local;// 记录访问时间与时区 private String request;// 记录请求的url与http协议 private String status;// 记录请求状态;成功是200 private String body_bytes_sent;// 记录发送给客户端文件主体内容大小 private String http_referer;// 用来记录从那个页面连接访问过来的 private String http_user_agent;// 记录客户浏览器的相关信息 public void set(boolean valid,String remote_addr, String remote_user, String time_local, String request, String status, String body_bytes_sent, String http_referer, String http_user_agent) { this.valid = valid; this.remote_addr = remote_addr; this.remote_user = remote_user; this.time_local = time_local; this.request = request; this.status = status; this.body_bytes_sent = body_bytes_sent; this.http_referer = http_referer; this.http_user_agent = http_user_agent; } public String getRemote_addr() { return remote_addr; } public void setRemote_addr(String remote_addr) { this.remote_addr = remote_addr; } public String getRemote_user() { return remote_user; } public void setRemote_user(String remote_user) { this.remote_user = remote_user; } public String getTime_local() { return this.time_local; } public void setTime_local(String time_local) { this.time_local = time_local; } public String getRequest() { return request; } public void setRequest(String request) { this.request = request; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } public String getBody_bytes_sent() { return body_bytes_sent; } public void setBody_bytes_sent(String body_bytes_sent) { this.body_bytes_sent = body_bytes_sent; } public String getHttp_referer() { return http_referer; } public void setHttp_referer(String http_referer) { this.http_referer = http_referer; } public String getHttp_user_agent() { return http_user_agent; } public void setHttp_user_agent(String http_user_agent) { this.http_user_agent = http_user_agent; } public boolean isValid() { return valid; } public void setValid(boolean valid) { this.valid = valid; } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append(this.valid); sb.append("\001").append(this.getRemote_addr()); sb.append("\001").append(this.getRemote_user()); sb.append("\001").append(this.getTime_local()); sb.append("\001").append(this.getRequest()); sb.append("\001").append(this.getStatus()); sb.append("\001").append(this.getBody_bytes_sent()); sb.append("\001").append(this.getHttp_referer()); sb.append("\001").append(this.getHttp_user_agent()); return sb.toString(); } @Override public void readFields(DataInput in) throws IOException { this.valid = in.readBoolean(); this.remote_addr = in.readUTF(); this.remote_user = in.readUTF(); this.time_local = in.readUTF(); this.request = in.readUTF(); this.status = in.readUTF(); this.body_bytes_sent = in.readUTF(); this.http_referer = in.readUTF(); this.http_user_agent = in.readUTF(); } @Override public void write(DataOutput out) throws IOException { out.writeBoolean(this.valid); out.writeUTF(null==remote_addr?"":remote_addr); out.writeUTF(null==remote_user?"":remote_user); out.writeUTF(null==time_local?"":time_local); out.writeUTF(null==request?"":request); out.writeUTF(null==status?"":status); out.writeUTF(null==body_bytes_sent?"":body_bytes_sent); out.writeUTF(null==http_referer?"":http_referer); out.writeUTF(null==http_user_agent?"":http_user_agent); } }
因为大量的指标统计从点击流模型中更容易得出,因此在预处理阶段,可使用mr程序来生成点击流模型的数据。
Pageviews 表模型数据生成, 详细见:ClickStreamPageView.java
/** * * 将清洗以后的日志梳理出点击流pageviews模型数据 * * 输入数据是清洗事后的结果数据 * * 区分出每一次会话,给每一次visit(session)增长了session-id(随机uuid) * 梳理出每一次会话中所访问的每一个页面(请求时间,url,停留时长,以及该页面在此次session中的序号) * 保留referral_url,body_bytes_send,useragent * * * @author * */ public class ClickStreamPageView { static class ClickStreamMapper extends Mapper<LongWritable, Text, Text, WebLogBean> { Text k = new Text(); WebLogBean v = new WebLogBean(); protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\001"); if (fields.length < 9) return; //将切分出来的各字段set到weblogbean中 v.set("true".equals(fields[0]) ? true : false, fields[1], fields[2], fields[3], fields[4], fields[5], fields[6], fields[7], fields[8]); //只有有效记录才进入后续处理 if (v.isValid()) { //此处用ip地址来标识用户 k.set(v.getRemote_addr()); context.write(k, v); } } } static class ClickStreamReducer extends Reducer<Text, WebLogBean, NullWritable, Text> { Text v = new Text(); /* 输入:<ip,[weblogbean,weblogbean] 同一个ip的全部请求,按照时间前后顺序排序了 */ protected void reduce(Text key, Iterable<WebLogBean> values, Context context) throws IOException, InterruptedException { ArrayList<WebLogBean> requestList = new ArrayList<WebLogBean>(); // 先将一个用户的全部访问记录中的时间拿出来排序 try { for (WebLogBean bean : values) { WebLogBean webLogBean = new WebLogBean(); try { BeanUtils.copyProperties(webLogBean, bean); } catch(Exception e) { e.printStackTrace(); } requestList.add(webLogBean); } //将bean按时间前后顺序排序 Arrays.sort() Collections.sort(requestList, new Comparator<WebLogBean>() { //[b,a,] , c @Override public int compare(WebLogBean o1, WebLogBean o2) { try { Date d1 = toDate(o1.getTime_local()); Date d2 = toDate(o2.getTime_local()); if (d1 == null || d2 == null) return 0; return d1.compareTo(d2); } catch (Exception e) { e.printStackTrace(); return 0; } } }); /** * 如下逻辑为:从有序bean中分辨出各次visit,并对一次visit中所访问的page按顺序标号step * 核心思想: * 就是比较相邻两条记录中的时间差,若是时间差<30分钟,则该两条记录属于同一个session * 不然,就属于不一样的session * */ int step = 1; String session = UUID.randomUUID().toString(); // 若是仅有1条数据,则直接输出 if (1 == requestList.size()) { WebLogBean bean = requestList.get(0); // 设置默认停留时长为60s v.set(session+"\001"+key.toString()+"\001"+bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001" + bean.getStatus()); context.write(NullWritable.get(), v); return; } for (int i = 0; i < requestList.size(); i++) { // 若是不止1条数据,则将第一条跳过不输出,遍历第二条时再输出 if (i == 0) { continue; } /* beans集合 s1 false58.215.204.118-2013-09-18 06:51:35 0 ip1 s1 false58.215.204.118-2013-09-18 06:51:36 1 s2 false58.215.204.118-2013-09-18 07:51:36 2 */ WebLogBean bean1 = requestList.get(i - 1); WebLogBean bean2 = requestList.get(i); // 求近两次时间差 long timeDiff = timeDiff(toDate(bean2.getTime_local()), toDate(bean1.getTime_local())); // 若是本次-上次时间差<30分钟,则输出前一次的页面访问信息 if (timeDiff < 30 * 60 * 1000) { v.set(session+"\001"+key.toString()+"\001"+bean1.getRemote_user() + "\001" + bean1.getTime_local() + "\001" + bean1.getRequest() + "\001" + step + "\001" + (timeDiff / 1000) + "\001" + bean1.getHttp_referer() + "\001" + bean1.getHttp_user_agent() + "\001" + bean1.getBody_bytes_sent() + "\001" + bean1.getStatus()); context.write(NullWritable.get(), v); step++; } else { // 若是本次-上次时间差>30分钟,则输出前一次的页面访问信息且将step重置,以分隔为新的visit v.set(session+"\001"+key.toString()+"\001"+bean1.getRemote_user() + "\001" + bean1.getTime_local() + "\001" + bean1.getRequest() + "\001" + (step) + "\001" + (60) + "\001" + bean1.getHttp_referer() + "\001" + bean1.getHttp_user_agent() + "\001" + bean1.getBody_bytes_sent() + "\001" + bean1.getStatus()); context.write(NullWritable.get(), v); // 输出完上一条以后,重置step编号 step = 1; session = UUID.randomUUID().toString(); } // 若是这次遍历的是最后一条,则将本条直接输出 if (i == requestList.size() - 1) { // 设置默认停留市场为60s v.set(session+"\001"+key.toString()+"\001"+bean2.getRemote_user() + "\001" + bean2.getTime_local() + "\001" + bean2.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean2.getHttp_referer() + "\001" + bean2.getHttp_user_agent() + "\001" + bean2.getBody_bytes_sent() + "\001" + bean2.getStatus()); context.write(NullWritable.get(), v); } } } catch (ParseException e) { e.printStackTrace(); } } private String toStr(Date date) { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US); return df.format(date); } private Date toDate(String timeStr) throws ParseException { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US); return df.parse(timeStr); } private long timeDiff(String time1, String time2) throws ParseException { Date d1 = toDate(time1); Date d2 = toDate(time2); return d1.getTime() - d2.getTime(); } private long timeDiff(Date time1, Date time2) throws ParseException { return time1.getTime() - time2.getTime(); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(ClickStreamPageView.class); job.setMapperClass(ClickStreamMapper.class); job.setReducerClass(ClickStreamReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(WebLogBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // FileInputFormat.setInputPaths(job, new Path(args[0])); // FileOutputFormat.setOutputPath(job, new Path(args[1])); FileInputFormat.setInputPaths(job, new Path("d:/weblog/output")); FileOutputFormat.setOutputPath(job, new Path("d:/weblog/pageviews")); job.waitForCompletion(true); } }
public class PageViewsBean implements Writable { private String session; private String remote_addr; private String timestr; private String request; private int step; private String staylong; private String referal; private String useragent; private String bytes_send; private String status; public void set(String session, String remote_addr, String useragent, String timestr, String request, int step, String staylong, String referal, String bytes_send, String status) { this.session = session; this.remote_addr = remote_addr; this.useragent = useragent; this.timestr = timestr; this.request = request; this.step = step; this.staylong = staylong; this.referal = referal; this.bytes_send = bytes_send; this.status = status; } public String getSession() { return session; } public void setSession(String session) { this.session = session; } public String getRemote_addr() { return remote_addr; } public void setRemote_addr(String remote_addr) { this.remote_addr = remote_addr; } public String getTimestr() { return timestr; } public void setTimestr(String timestr) { this.timestr = timestr; } public String getRequest() { return request; } public void setRequest(String request) { this.request = request; } public int getStep() { return step; } public void setStep(int step) { this.step = step; } public String getStaylong() { return staylong; } public void setStaylong(String staylong) { this.staylong = staylong; } public String getReferal() { return referal; } public void setReferal(String referal) { this.referal = referal; } public String getUseragent() { return useragent; } public void setUseragent(String useragent) { this.useragent = useragent; } public String getBytes_send() { return bytes_send; } public void setBytes_send(String bytes_send) { this.bytes_send = bytes_send; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } @Override public void readFields(DataInput in) throws IOException { this.session = in.readUTF(); this.remote_addr = in.readUTF(); this.timestr = in.readUTF(); this.request = in.readUTF(); this.step = in.readInt(); this.staylong = in.readUTF(); this.referal = in.readUTF(); this.useragent = in.readUTF(); this.bytes_send = in.readUTF(); this.status = in.readUTF(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(session); out.writeUTF(remote_addr); out.writeUTF(timestr); out.writeUTF(request); out.writeInt(step); out.writeUTF(staylong); out.writeUTF(referal); out.writeUTF(useragent); out.writeUTF(bytes_send); out.writeUTF(status); } }
注:“一次访问”=“N 次连续请求”
直接从原始数据中用hql 语法得出每一个人的“次”访问信息比较困难,可先用mapreduce 程序分析原始数据得出“次”信息数据,而后再用hql 进行更多维度统计用 MR 程序从 pageviews 数据中,梳理出每一次 visit 的起止时间、页面信息详细代码见工程:ClickStreamVisit.java
/** * 输入数据:pageviews模型结果数据 * 从pageviews模型结果数据中进一步梳理出visit模型 * sessionid start-time out-time start-page out-page pagecounts ...... * * @author * */ public class ClickStreamVisit { // 以session做为key,发送数据到reducer static class ClickStreamVisitMapper extends Mapper<LongWritable, Text, Text, PageViewsBean> { PageViewsBean pvBean = new PageViewsBean(); Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //5abe467e-d500-4889-82c1-36695b7affbf101.226.167.201-2013-09-18 09:30:36/hadoop-mahout-roadmap/160"http://blog.fens.me/hadoop-mahout-roadmap/""Mozilla/4.0(compatible;MSIE8.0;WindowsNT6.1;Trident/4.0;SLCC2;.NETCLR2.0.50727;.NETCLR3.5.30729;.NETCLR3.0.30729;MediaCenterPC6.0;MDDR;.NET4.0C;.NET4.0E;.NETCLR1.1.4322;TabletPC2.0);360Spider"10335200 String line = value.toString(); String[] fields = line.split("\001"); int step = Integer.parseInt(fields[5]); //(String session, String remote_addr, String timestr, String request, int step, String staylong, String referal, String useragent, String bytes_send, String status) //299d6b78-9571-4fa9-bcc2-f2567c46df3472.46.128.140-2013-09-18 07:58:50/hadoop-zookeeper-intro/160"https://www.google.com/""Mozilla/5.0"14722200 pvBean.set(fields[0], fields[1], fields[2], fields[3],fields[4], step, fields[6], fields[7], fields[8], fields[9]); k.set(pvBean.getSession()); context.write(k, pvBean); } } static class ClickStreamVisitReducer extends Reducer<Text, PageViewsBean, NullWritable, VisitBean> { /** 2 1 3 session001,[PageViewsBean1,PageViewsBean2,PageViewsBean3] session001,[PageViewsBean2,PageViewsBean1,PageViewsBean3] */ protected void reduce(Text session, Iterable<PageViewsBean> pvBeans, Context context) throws IOException, InterruptedException { // 将pvBeans按照step排序 ArrayList<PageViewsBean> pvBeansList = new ArrayList<PageViewsBean>(); for (PageViewsBean pvBean : pvBeans) { PageViewsBean bean = new PageViewsBean(); try { BeanUtils.copyProperties(bean, pvBean); pvBeansList.add(bean); } catch (Exception e) { e.printStackTrace(); } } Collections.sort(pvBeansList, new Comparator<PageViewsBean>() { @Override public int compare(PageViewsBean o1, PageViewsBean o2) { return o1.getStep() > o2.getStep() ? 1 : -1; } }); // 取此次visit的首尾pageview记录,将数据放入VisitBean中 VisitBean visitBean = new VisitBean(); // 取visit的首记录 visitBean.setInPage(pvBeansList.get(0).getRequest()); visitBean.setInTime(pvBeansList.get(0).getTimestr()); // 取visit的尾记录 visitBean.setOutPage(pvBeansList.get(pvBeansList.size() - 1).getRequest()); visitBean.setOutTime(pvBeansList.get(pvBeansList.size() - 1).getTimestr()); // visit访问的页面数 visitBean.setPageVisits(pvBeansList.size()); // 来访者的ip visitBean.setRemote_addr(pvBeansList.get(0).getRemote_addr()); // 本次visit的referal visitBean.setReferal(pvBeansList.get(0).getReferal()); visitBean.setSession(session.toString()); context.write(NullWritable.get(), visitBean); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(ClickStreamVisit.class); job.setMapperClass(ClickStreamVisitMapper.class); job.setReducerClass(ClickStreamVisitReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(PageViewsBean.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(VisitBean.class); // FileInputFormat.setInputPaths(job, new Path(args[0])); // FileOutputFormat.setOutputPath(job, new Path(args[1])); FileInputFormat.setInputPaths(job, new Path("d:/weblog/pageviews")); FileOutputFormat.setOutputPath(job, new Path("d:/weblog/visitout")); boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }
public class VisitBean implements Writable { private String session; private String remote_addr; private String inTime; private String outTime; private String inPage; private String outPage; private String referal; private int pageVisits; public void set(String session, String remote_addr, String inTime, String outTime, String inPage, String outPage, String referal, int pageVisits) { this.session = session; this.remote_addr = remote_addr; this.inTime = inTime; this.outTime = outTime; this.inPage = inPage; this.outPage = outPage; this.referal = referal; this.pageVisits = pageVisits; } public String getSession() { return session; } public void setSession(String session) { this.session = session; } public String getRemote_addr() { return remote_addr; } public void setRemote_addr(String remote_addr) { this.remote_addr = remote_addr; } public String getInTime() { return inTime; } public void setInTime(String inTime) { this.inTime = inTime; } public String getOutTime() { return outTime; } public void setOutTime(String outTime) { this.outTime = outTime; } public String getInPage() { return inPage; } public void setInPage(String inPage) { this.inPage = inPage; } public String getOutPage() { return outPage; } public void setOutPage(String outPage) { this.outPage = outPage; } public String getReferal() { return referal; } public void setReferal(String referal) { this.referal = referal; } public int getPageVisits() { return pageVisits; } public void setPageVisits(int pageVisits) { this.pageVisits = pageVisits; } @Override public void readFields(DataInput in) throws IOException { this.session = in.readUTF(); this.remote_addr = in.readUTF(); this.inTime = in.readUTF(); this.outTime = in.readUTF(); this.inPage = in.readUTF(); this.outPage = in.readUTF(); this.referal = in.readUTF(); this.pageVisits = in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(session); out.writeUTF(remote_addr); out.writeUTF(inTime); out.writeUTF(outTime); out.writeUTF(inPage); out.writeUTF(outPage); out.writeUTF(referal); out.writeInt(pageVisits); } @Override public String toString() { return session + "\001" + remote_addr + "\001" + inTime + "\001" + outTime + "\001" + inPage + "\001" + outPage + "\001" + referal + "\001" + pageVisits; } }
pom.xml
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.4</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.4</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.4</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>2.4</version> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <classpathPrefix>lib/</classpathPrefix> <mainClass></mainClass> </manifest> </archive> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> <encoding>utf-8</encoding> </configuration> </plugin> </plugins> </build>