========================实时流量统计web
1. 实时热门商品HotItems
每隔 5 分钟输出最近一小时内点击量最多的前 N 个商品。
抽取出业务时间戳,告诉 Flink 框架基于业务时间作窗口
• 过滤出点击行为数据
• 按一小时的窗口大小,每 5 分钟统计一次,作滑动窗口聚合( Sliding Window)
• 按每一个窗口聚合,输出每一个窗口中点击量前 N 名的商品
2. 实时流量统计 NetworkFlow
"实时流量统计" 对于一个电商平台而言,用户登
录的入口流量、不一样页面的访问流量 都是值得分析的重要数据,而这些数据,能够
简单地从 web 服务器的日志中提取出来。
实现"热门页面浏览数"的统计,也就是读取服务器日志中的每
一行log统计在一段时间内用户访问每个url的次数,而后排序输出显示。
具体作法为:
每隔 5 秒,输出最近 10 分钟内访问 量最多的前 N 个 URL。能够看出,这个需求与以前“实时热门商品统计”很是相似,
因此咱们彻底能够借鉴此前的代码。
3. PV 网站页面流量 - PageView
衡量网站流量一个最简单的指标,就是网站的页面浏览量(Page View PV );
用户每次打开一个页面便记录 1 次 PV ,屡次打开同一页面则浏览量累计。通常来讲PV 与来访者的数量成正比,可是 PV 并不直接决定页面的真实来访者数量,
如同一个来访者经过不断的刷新页面,也能够制造出很是高的 PV 。
咱们知道,用户浏览页面时,会从浏览器向网络服务器 发出一个请求 Request网络服务器接到这个请求后,会将该请求对应的一个网页( Page )发送给浏览器
从而产生了一个 PV。因此咱们的统计方法,能够是从 web 服务器的日志中去提取对应的页面访问而后统计,就向上一节中的作法同样;也能够直接从埋点日志中提
取用户发来的页面请求,从而统计出总浏览量。
实现一个网站总浏览量的统计。能够设置滚动时间窗口,实时统计每小时内的网站 PV
4. UV 独立访客数
* 上例中,咱们统计的是全部用户对页面的全部浏览行为,也就是说,同一用户的浏览行为会被重复统计。而在实际应用中,咱们每每还会关注,在一段
* 时间内到底有多少不一样的用户访问了网站。另一个统计流量的重要指标是网站的独立访客数(Unique Visitor UV )。 UV指的是一段时间(好比一小时)内访问网站 的 总人数, 1 天内同一访客的屡次访问
* 只记录为一个访客。经过 IP 和 cookie 通常 是判断 UV 值的两种方式。 当客户端第一次访问某个网站服务器的时候,网站服务器会给这个客户端的电脑发出 一个 Cookie
* 一般放在这个客户端电脑的 C 盘当中。在这个 Cookie 中会分配一个独一无二的编号,这其中会记录一些访问服务器的信息,如访问时间,访问了哪些页面等等。当你下次再访问这个服务器的时候,服务器就能够直接从你的电脑中找到上一次放进去的
* Cookie 文件,而且对其进行一些更新,但那个独一无二的编号是不会变的。
* 此例中能够根据 userId 来区分不一样的用户。
5. 使用布隆过滤器查重-过滤的UV统计
/**
* 上例中,把全部数据的userId 都存在了窗口计算的状态里,在窗口收集数据的过程当中,状态会不断增大。通常状况下,只要不超出内存的承受范围,
* 这种作法也没什么问题;但若是咱们遇到的数据量很大呢?把全部数据暂存放到内存里,显然不是一个好注意。咱们会想到,能够利用 redis这种内存级 k v 数据库,为咱们作一个缓存。
* 但若是咱们遇到的状况很是极端,数据大到惊人呢?好比上亿级的用户,要去重计算 UV 。
* 若是放到redis 中,亿级的用户id (每一个 20 字节左右的话)可能须要几G甚至几十G的空间来存储。固然放到 redis 中,用集群进行扩展也不是不能够,但明显
* 代价太大了。一个更好的想法是,其实咱们不须要完整地存储用户ID 的信息,只要知道他在不在就好了。因此其实咱们能够进行压缩处理,用一位( bit )就能够表示一个用户
* 的状态。这个思想的具体实现就是布隆过滤器( Bloom Filter )。
* 本质上布隆过滤器是一种数据结构,比较巧妙的几率型数据结构(probabilisticdata structure ),特色是高效地插入和查询,能够用来告诉你 “某样东西必定不存在或者可能存在”。
* 它自己是一个很长的二进制向量,既然是二进制的向量,那么显而易见的,存放的不是 0 ,就是 1 。 相比于传统的 List 、 Set 、 Map 等数据结构,它更高效、占用空间更少,
* 可是缺点是其返回的结果是几率性的,而不是确切的。
* 咱们的目标就是,利用某种方法(通常是Hash 函数)把每一个数据,对应到一个位图的某一位上去;若是数据存在,那一位就是1,不存在则为 0 。
*/
判断当前最大的时间戳 <= 当前的watermark,就返回一个TriggerResult.FIRE(触发);不然就注册一个定时器(关窗的操做)redis
TriggerResult的类型:CONTINUE-什么都不作继续处理窗口;FIRE触发窗口的计算操做但并不会关闭窗口清除它的状态;PURGE清除窗口的状态;FIRE_AND_PURGE触发并清除掉;
redis:数据库
==========================市场营销商业指标统计分析===========
* 对于电商企业来讲,通常会经过各类不一样的渠道对本身的APP进行市场推广,而这些渠道的统计数据(好比,不一样网站上广告连接的点击量、APP下载量)就成了市场营销的重要商业指标。
* 首先考察分渠道的市场推广统计。
* 须要自定义一个测试源SimulatedEventSource来生成用户行为的事件流。
*1. 分渠道统计 AppMarketingByChannel.scala
/**
* 2. 不分渠道(总量)统计
* 一样咱们还能够考察不分渠道的市场推广统计,这样获得的就是全部渠道推广的总量 AppMarketing.scala 。
* /
/**
* 电商网站的市场营销商业指标中,除了自身的APP 推广,还会考虑到页面上的广告投放(包括本身经营的产品和其它网站的广告)。 因此广告相关的统计分析,也是市场营销的重要指标。
* 对于广告的统计,最简单也最重要的就是页面广告的点击量,网站每每须要根据广告点击量来制定订价策略和调整推广方式,并且也能够借此收集用户的偏好信息。
* 更加具体的应用是,咱们能够根据用户的地理位置进行划分,从而总结出不一样省份用户对不一样广告的偏好,这样更有助于广告的精准投放。
* 3. 页面广告点击量统计
* 接下来咱们就进行页面广告按照省份划分的点击量的统计。AdStatisticsByGeo .scala 文件 。
* 自定义一些测试数据AdClickLog,用来生成用户点击广告行为的事件流。
* 主函数以 province 进行 keyBy ,而后开一小时的时间窗口,滑动距离为5秒,统计窗口内的点击事件数量。
*
* 广告点击量统计,同一用户的重复点击是会叠加计算的。在实际场景中,同一用户确实可能反复点开同一个广告,这也说明了用户对广告更大的兴趣;
* 可是若是用户在一段时间很是频繁地点击广告,这显然不是一个正常行为,有刷点击量的嫌疑。因此咱们能够对一段时间内(好比一天内)的用户点击行为进行约束,
* 若是对同一个广告点击超过必定限额(好比 100 次),应该把该用户加入黑名单并报警,此后其点击行为不该该再统计。
* 4. 黑名单过滤
==========================恶意登陆监控==================
* 对于网站而言,用户登陆并非频繁的业务操做。若是一个用户短期内频繁登陆失败,就有多是出现了程序的恶意攻击,好比密码暴力破解。所以咱们考虑,
* 应该对用户的登陆失败动做进行统计,具体来讲,若是同一用户(能够是不一样 IP)在2秒以内连续两次登陆失败,就认为存在恶意登陆的风险,输出相关的信息进行
* 报警提示。这是电商网站、也是几乎全部 网站风控的基本一环。
* 1. 状态编程的方式实现:LoginFail .scala
* 因为一样引入了时间,咱们能够想到,最简单的方法其实与以前的热门统计相似,只须要按照用户 ID 分流,而后遇到登陆失败的事件时将其保存在 ListState 中,
* 而后设置一个定时器,2秒后触发。定时器触发时检查状态中的登陆失败事件个数,若是大于等于2,那么就输出报警信息。
*
* 新建一个单例对象。 定义样例类LoginEvent ,这是输入的登陆事件流。登陆数据本应该从UserBehavior日志里提取
* 因为UserBehavior.csv中没有作相关埋点,从另外一个文件 LoginLog.csv 中读取登陆数据 。
*
*
* 2. 优化操做:
* 第一次的代码实现中咱们能够看到,直接把每次登陆失败的数据存起来、设置定时器一段时间后再读取,这种作法尽管简单,但和咱们开始的需求仍是略有差别
* 的。这种作法只能隔 2 秒以后去判断一下这期间是否有屡次失败登陆,而不是在一次登陆失败以后、再一次登陆失败时就马上报警。这个需求若是严格实现起来,相
* 当于要判断任意紧邻的事件,是否符合某种模式。因而咱们能够想到,这个需求其实能够不用定时器触发,直接在状态中存取上一次登陆失败的事件,每次都作判断和比对,就能够实现最初的需求。
* 在代码MatchFunction中删掉onTimer processElement
*
* 咱们经过对状态编程的改进,去掉了定时器,在 process function 中作了
* 更多的逻辑处 理,实现了最初的需求。不过这种方法里有不少的条件判断,目前仅仅实现的是检测“连续2次登陆失败”,这是最简单的情形。
* 若是须要检测更屡次,内部逻辑显然会变得很是复杂。那有什么方式能够方便地实现呢?
* flink为咱们提供了CEP Complex Event Processing ,复琐事件处理库,用于在流中筛选符合某种复杂模式的事件。
* 3. 基于 CEP 来完成这个模块的实现。
========================订单支付实时监控=========================
在电商网站中,订单的支付做为直接与营销收入挂钩的一环,在业务流程中很是重要。对于订单而言,为了正确控制业务流程,也为了增长用户的支付意愿,网
站通常会设置一个支付失效时间,超过一段时间不支付的订单就会被取消。另外,对于订单的支付,咱们还应保证用户支付的正确性,这能够经过第三方支付平台的
对于订单的支付,咱们还应保证用户支付的正确性,这能够经过第三方支付平台的交易数据来作一个实时对帐。编程
将实现这两个需求。交易数据来作一个实时对帐。浏览器
* 在电商平台中最终创造收入和利润的是用户下单购买的环节;更具体一点,是用户真正完成支付动做的时候。用户下单的行为能够代表用户对商品的需求,但
* 在现实中,并非每次下单都会被用户马上支付。当拖延一段时间后,用户支付的意愿会下降。因此为了让用户更有紧迫感从而提升支付转化率,同时也为了防范订
* 单支付环节的安全风险,电商网站每每会对订单状态进行监控,设置一个失效时间(好比 15 分钟),若是下单后一段时间仍未支付,订单就会被 取消。
* 使用 CEP 实现
* 利用 CEP 库来实现这个功能。咱们先将事件流按照订单号orderId分流,
* 定义这样的一个事件模式:在15分钟内,事件“create”与pay非严格紧邻,这样调用.select 方法时,就能够同时获取到匹配出的事件和超时未匹配的事件。
* 1. CEP实现订单超时报警
* 2. 用状态编程来实现:
* 咱们一样能够利用Process Function ,自定义实现检测订单超时的功能。为了简化问题,咱们只考虑超时报警的情形,在 pay 事件超时未发生的状况下,输出超时报警信息。
* 一个简单的思路是,能够在订单的create 事件到来后注册定时器,15分钟后触发;而后再用一个布尔类型的 Value 状态来做为标识位,代表 pay 事件是否发生过。
* 若是 pay 事件已经发生,状态被置为 true ,那么就再也不须要 作什么操做;而若是 pay事件一直没来,状态一直为 false,到定时器触发时,就应该输出超时报警信息。
* 如今只考虑两种状况:①来一个create,来一个pay create后边有pay就正常匹配,若是没来就超时报警
* 乱序的数据,有可能create和pay的前后顺序
* 超时报警的状况: 遇到create设一个定时器,遇到pay改一个状态(或者不删定时器,直接设定一个状态看有没有pay来过,有则定时器触发时说是正常的,没有就超时报警
-----来自两条流的订单交易匹配----------
* 对于订单支付事件,用户支付完成其实并不算完,咱们还得确认平台帐户上是否到帐了。而每每这会来自不一样的日志信息,因此咱们要同时读入两条流的数据来
* 作合并处理。这里咱们利用 connect 将两条流进行链接,
* 1. 用自定义的CoProcessFunction 进行处理。
* 2. 双流join
* window join(Tumbling Window Join、 Sliding Window Join)适用于两条流join,后边还要开窗口的分析
*Interval join(区间join)适用于传感器报警(温度烟雾出现异常,它俩时间得匹配上在同一时间范围内同时出现,温度又升高的很快)
* Join中当作状态保存起来
*此需求是两条流匹配上就能够了
统计类:读取数据、作简单包装转换map、filter、按某个字段分组,开窗,作聚合
排序| TopN:再作一个ProcessFunction,把全部数据都收集到排序输出;
以上是基于DataStreamAPI,也能够用高级API、TableAPI和FlinkSQL
业务流程中的状态作检测输出和警告:自定义编程、状态
事件逻辑、风控:CEP