反爬虫项目开发html
项目介绍前端
项目背景java
为何要有反爬虫项目node
爬虫程序大量占用咱们的系统资源,好比带宽/计算能力等mysql
爬虫程序进行预订/抢票影响咱们的正常业务.nginx
导入反爬WEB工程web
反爬虫项目数据流程走向正则表达式
逻辑架构redis
功能描述算法
数据管理模块
流程管理
非功能性描述
数据的组成:
点击流(信息采集服务器)/业务日志(业务服务器)/第三方对接
数据量级计算方式:
峰值数数据量有多大?
将8亿条平均到每秒,峰值每秒20万
500万用户,日活20万,若是每一个用户点击30,600万次点击,业务日志6000万,总共数据量6600万
公司集群分类:
数据库ER图
PowerDesigner的使用
新建一个model
导出SQL文件
防爬规则
数据源
数据源做为反扒的各类指标计算来源,主要包含用户请求携带的各类参数.好比:用户来源的URL,用请求的URL,用户的SessionID.查询相关的出发地/目的地/出发时间
防爬规则
爬虫程序的特色
数据采集
安装Openresty
配置Openresty
./configure --prefix=/usr/local/openresty --with-http_stub_status_module
若是缺乏依赖,安装依赖
yum install readline-devel pcre-devel openssl-devel perl gcc
若是不想本身编译,资料中 反扒参考资料\OpenResty\编译后\openresty是编译好的.直接放入Linux中就可使用
Lua语法入门
使用方式
交互方式
文件方式
数据类型
java中的数据类型
数字相关的: 整型: byte/int/short/long 浮点型: float/double 布尔类型: boolean 字符类型: char
Lua 数据类型
nil:==java中的null boolean:布尔类型 number:数字类型,不区分整型和浮点型 string:字符串 userdata: C的数据结构 function:函数(方法) thread:线程 table:集合/数组/Map
Lua运算符
赋值运算符
--赋值运算符 a = 3 b = "hello" print(a) print(b) c,d = 1,2 print(c,d) e,f = 1,2,3 print(e,f) g,h = 1 print(g,h) str = "hello" .. "world" print(str)
字符串的拼接操做不能使用"+",应该使用".."进行拼接
算术运算符
-- 算术运算符 -- 加减乘除取余 a,b = 10,20 print("a+b=" .. a + b) print("a-b=" .. a - b) print("a*b=" .. a * b) print("a/b=" .. a / b) print("a%b=" .. a % b)
关系运算符
-- 关系运算符 print("========= 关系运算符 =========") a,b = 1,2 print("a等于b" .. a == b) print("a不等于b" .. a ~= b) print(a > b) print(a >= b) print(a < b) print(a <= b)
逻辑预算符
-- 逻辑运算符 print("========= 逻辑运算符 =========") a,b = true,false print(a and b) print(a or b) print (not b)
其它运算符
"#"用来获取字符串或者集合的长度
--其它运算符
print("========其它运算符===========")
str = "hello java"
print(#str)
Lua流程控制
if条件判断
-- 条件控制 -- if条件判断 a = 10 if a > 5 then print("a>5") end -- if - else 条件判断 if a > 11 then print("a>11") else print("a<=11") end -- if 嵌套 if a > 5 then print("a>5") if a < 12 then print("a<12") end end
循环
while循环
--While循环 print("=======while循环=======") while a > 0 do print(a) a = a - 1 end
repeat循环
-- repeat循环 print("=======repeat循环=======") repeat print(a) a = a + 1 until a > 10
repeat循环最少执行一次
假如一张纸的厚度为0.04,累计叠多少次才能超过珠穆朗玛峰的高度8847.
for循环
--for循环 print("=======for循环=======") for b = 10, 1,-1 do print(b) end
for循环后面3个参数
Lua的数组
--lua的数组 arr = {"zhangsan","lisi","wangwu"} print(#arr) for a = 1,#arr do print(arr[a]) end --使用泛型for循环 -- i是索引 -- name 该索引对应的值 for i,name in ipairs(arr) do print(i,name) end
注意:
使用ipairs的时候,只能针对于集合/数组 遍历Map数据结构的时候须要使用pairs关键字
Lua的类型转换
其它类型转换为字符串
tonumber()
-- 其它类型转换为String
-- tostring()
-- 布尔类型转string
boo = true
print(type(boo))
print(type(tostring(boo)))
print(tostring(boo))
-- 数值类型转string
num = 20
print(type(num))
print(type(tostring(num)))
print(tostring(num))
-- table类型转string
tbl = {"tom","cat"}
print(type(tbl))
print(type(tostring(tbl)))
print(tostring(tbl))
通常都是将数字转换为字符串
function/table默认不能转为字符串
其它类型转换为数字
-- 其它类型转数字: -- tonumber() num = "12" print(type(num)) print(type(tonumber(num))) print(tonumber(num)) num = "AF" print(type(num)) print(type(tonumber(num,16))) print(tonumber(num,16)) tbl = {"tom","cat"} print(tonumber(tbl)) boo = false print(tonumber(boo))
通常非数字格式的都转换不了,好比布尔类型/table/"hello"
Lua的函数
Lua函数定义方式:
函数做用范围 function 函数名字(参数1,参数2...)
函数体 return 结果1, 结果2 ...
end
--Lua的函数定义 function f1(a,b) print("hello function") return a+b end result = f1(3,4) print(result) --多个返回值 local function f2(a,b) return a,b end c,d = f2(1,2) print(c,d)
Lua变量的做用范围
Lua变量默认做用范围是全局的,
加了local关键字以后就变成了局部的,
若是使用全局变量,须要注意变量名不要定义重复了,原来的变量会被替换掉
-- 变量的做用范围 a = 10 if a>3 then b = 20 local c = 30 print(a) print(b) print(c) end a = "hello" print(a) print(b) print(c) -- nil
Lua的Table
Lua的table能够表明java中的数组/list/Map类型的数据结构
若是table中是数组格式的数据,遍历的时候应该使用ipairs关键字,若是是Map数据结构,使用paris关键字
--定义一个集合table local arr = {"zhangsan","lisi","wangwu"} print(arr[1]) --使用索引遍历table for i = 1, #arr do print(arr[i]) end print("========泛型方式遍历=========") for index, value in ipairs(arr) do print(index, value) end print("========Map类型数据结构=========") map = {name="zhangsan", sex="男", age = 13} print(map["name"]) print(map.name) print(map.age) -- 赋值操做,能够经过"."变量的形式进行赋值或者取值 map.address = "深圳" print(map.address) print("========使用循环遍历Map数据结构=========") for key, value in pairs(map) do print(key, value) end
Lua的模块
Lua的模块功能依赖于table,先定义一个空的table来存储成员变量或者函数
引用模块的时候使用require关键字,require空格"模块名字"注意不须要".lua"后缀名
模拟向Kafka发送消息
kafka.lua
-- 模拟向Kafka发送消息 _M = {} --默认分区数量 _M.default_partition_num = 5 function _M.new(props) -- 根据传入的props,建立客户端 return "Kafka client ..." end -- 向Kafka发送消息 function _M.send(topic, key, message) print("正在向Kafka发送消息,Topic为:"..topic..",消息体为:"..message) -- 根据发送结果,返回状态信息,方便作出判断 return nil,"error" end
testKafka.lua
-- 模拟测试引入自定义的Kafka模块 require "Kafka" dpn = _M.default_partition_num print("默认分区数为:"..dpn) --建立客户端对象 --须要传入props props = {{hosts="192.168.80.81", port="9092"},{hosts="192.168.80.81", port="9092"}} _M.new(props) --发送消息 ok, err = _M.send("sz07", "1", "向Kafka发送测试消息") if not ok then --若是结果不正常,打印一下错误信息 print(err) return end
Lua和Nginx的整合
Lua结合Nginx的2种方式
Lua代码块
location / { #root html; #index index.html index.htm; default_type text/html; content_by_lua_block{ #编写lua代码 print("hello") ngx.say("hello openresty") } }
Lua脚本文件
location / { #root html; #index index.html index.htm; default_type text/html; content_by_lua_file /export/servers/openresty/test.lua; }
content_by_lua_file /export/servers/openresty/test.lua;
最后又一个";"号别忘记写了
Lua获取Http请求参数
获取Get请求参数
-- 使用Lua获取Http请求参数 -- get请求参数的获取 getArgs = ngx.req.get_uri_args() --获取参数信息 for k,v in pairs(getArgs) do ngx.say("参数名:"..k.." 参数值:"..v) ngx.say("<br>") end
获取Post请求参数
ngx.say("=======获取Post请求参数========") -- post请求参数的获取 -- 想要读取请求体内容,须要先调用read_body()方法 ngx.req.read_body() postArgs = ngx.req.get_post_args() --获取参数信息 for k,v in pairs(postArgs) do ngx.say("参数名:"..k.." 参数值:"..v) ngx.say("<br>") end
凡是涉及到操做请求体的动做,都须要先调用ngx.req.read_body()方法
获取请求头参数
ngx.say("=======获取请求头参数========") headerArgs = ngx.req.get_headers() for k,v in pairs(headerArgs) do ngx.say("参数名:"..k.." 参数值:"..v) ngx.say("<br>") end
获取请求体内容(针对于JSON请求参数)
ngx.say("=======获取请求体内容========") -- 必须先调用read_body()方法 ngx.req.read_body() bodyData = ngx.req.get_body_data() -- 由于若是是JSON的请求体内容,没有办法直接遍历,因此直接输出 ngx.say(bodyData)
使用Lua链接MySQL
先引用MySQL模块.位置在:openresty/lualib/resty/mysql.lua
-- 链接MySQL操做 -- 引入MySQL的模块 local restyMysql = require "resty.mysql" -- Lua调用方法默认用"."就能够了,但若是第一个参数是self,那么能够经过":"来调用,就能够省略掉第一个self参数 local mysql = restyMysql:new() --设置链接超时时间 mysql:set_timeout(20000) --开始链接MySQL --定义链接MySQL的配置 local opts = {} opts.host = "192.168.80.81" opts.port = 3306 opts.database = "test" opts.user = "root" opts.password = "root" local ok, err = mysql:connect(opts) if not ok then ngx.say("链接MySQL失败" .. err) return end --定义SQL local sql = "select * from user" local result, err = mysql:query(sql) if not result then ngx.say("查询数据失败:" .. err) return end -- 从查询结果中获取数据 for i,row in ipairs(result) do for key,value in pairs(row) do ngx.say("列名:"..key.." 值为:" .. value) end ngx.say("<br>") end ngx.say("全部数据打印完毕")
对MySQL进行增删改操做
--新增数据 local sql = "insert into user values('lisi','123','深圳','0','2019-01-01')" local result, err = mysql:query(sql) if not result then ngx.say("插入数据失败:" .. err) return end ngx.say("数据插入成功") --删除数据 local sql = "delete from user where username='lisi'" local result, err = mysql:query(sql) if not result then ngx.say("数据删除失败:" .. err) return end ngx.say("数据删除成功") for i,row in ipairs(result) do for key,value in pairs(row) do ngx.say("列名:"..key.." 值为:" .. value) end ngx.say("<br>") end --修改数据 local sql = "update user set username = 'lisi' where username='zhangsan'" local result, err = mysql:query(sql) if not result then ngx.say("数据修改失败:" .. err) return end ngx.say("数据修改为功")
使用Lua链接Redis
redis单机安装
Redis是基于内存的NoSQL的数据库,里面存储的都是键值对.
若是不想编译安装,可使用反扒参考资料\Redis\redis-5.0.4直接拷贝到虚拟机中使用.
redis.conf配置文件
#绑定的主机地址 bind 0.0.0.0 #绑定的端口号 port 6379 #后台运行,默认状况下,redis服务器独占一个进程窗口 daemonize yes #redis进程文件所在目录 pidfile /var/run/redis_6379.pid #redis备份文件 dbfilename dump.rdb
启动Redis服务端
./redis-server redis.conf
查看redis状态
ps -ef | grep redis
链接Redis
./redis-cli
Lua链接Redis
--使用Lua链接Redis --引用Redis的模块 local restyRedis = require "resty.redis" --调用new方法建立redis客户端 local redis = restyRedis:new() --设置超时时间 redis:set_timeout(20000) --建立链接 ok,err = redis:connect("192.168.80.83", 6379) if not ok then ngx.say("链接失败"..err) return end -- 链接成功 ok, err = redis:set("username", "zhangsan") if not ok then ngx.say("设置失败"..err) return end ngx.say("设置成功") --获取Redis数据 ok, err = redis:get("username") if not ok then ngx.say("获取失败"..err) return end ngx.say(ok)
Redis集群
运行原理
集群搭建
参考反扒参考资料\Redis\Redis集群搭建步骤.md
每一个节点的文件夹下面都有一个700x.conf
每一个配置文件中都有一些路径相关的配置,因此尽可能安装课程去存放,不然须要手动修改路径
7001.conf:
port 7001 dir /export/servers/redis-5.0.4/cluster/7001/data cluster-enabled yescluster-config-file /export/servers/redis-5.0.4/cluster/7001/data/nodes.conf
启动集群:
bin/redis-server cluster/7001/7001.conf bin/redis-server cluster/7002/7002.conf bin/redis-server cluster/7003/7003.conf bin/redis-server cluster/7004/7004.conf bin/redis-server cluster/7005/7005.conf bin/redis-server cluster/7006/7006.conf 经过netstat -nltp查看集群状态
初始化:
若是服务端第一次启动后,直接使用客户端去链接,存入数据,这个时候会报错,报槽没有分配错误
下面的初始化操做,只须要第一次运行的时候执行,之后不须要再重复执行
-- 将下方的192.168.80.81替换为本身的IP地址 bin/redis-cli --cluster create --cluster-replicas 1 你的机器IP:7001 192.168.80.83:7002 192.168.80.83:7003 192.168.80.83:7004 192.168.80.83:7005 192.168.80.83:7006
--cluster-replicas 1指定副本数为1个
链接集群:
bin/redis-cli -c -p 7001 set hello world get hello
-c 指定我是要链接集群,若是不添加此参数,会形成重定向失败
-p 指定链接的端口号
使用Lua链接Kafka
编写Lua脚本
-- 链接Kafka发送消息 -- 引用Kafka模块 local kafka = require "resty.kafka.producer" --建立producer local broker_list = {{host="192.168.80.81",port=9092},{host="192.168.80.82",port=9092},{host="192.168.80.83",port=9092}} local producer = kafka:new(broker_list) --发送数据 local ok, err = producer:send("test", "1", "hello openresty") if not ok then ngx.say("Kafka发送失败"..err) return end ngx.say("消息发送成功")
启动Kafka集群
先启动zookeeper
zkServer.sh start
启动Kafka
nohup /export/servers/kafka_2.11-1.0.0/bin/kafka-server-start.sh /export/servers/kafka_2.11-1.0.0/config/server.properties > /dev/null 2>&1 &
/dev/null 指定消息输出的目录
2>&1 将错误消息转换为标准输出
& 后台运行
显示全部的Topic
/export/servers/kafka_2.11-1.0.0/bin/kafka-topics.sh --zookeeper node01:2181 --list
启动console-consumer
/export/servers/kafka_2.11-1.0.0/bin/kafka-console-consumer.sh --zookeeper node01:2181 --from-beginning --topic test
编写Lua脚本进行信息采集
修改nginx.conf
http { include mime.types; default_type application/octet-stream; #log_format main '$remote_addr - $remote_user [$time_local] "$request" ' # '$status $body_bytes_sent "$http_referer" ' # '"$http_user_agent" "$http_x_forwarded_for"'; #access_log logs/access.log main; sendfile on; #tcp_nopush on; #keepalive_timeout 0; keepalive_timeout 65; #gzip on; #开启共享词典功能, 开启的空间为10Mb大小,由于咱们只是存储一些数字,10Mb够用了 lua_shared_dict shared_data 10m; #配置本地域名解析 resolver 127.0.0.1; server { listen 80; server_name localhost; #charset koi8-r; #access_log logs/host.access.log main; location / { #root html; #index index.html index.htm; #开启 nginx 监控 stub_status on; default_type text/html; #content_by_lua_block{ # print("hello") # ngx.say("hello openresty") #} content_by_lua_file /export/servers/openresty/mylua/controller.lua; }
编写controller.lua
--过载保护功能,若是链接超出必定范围,再也不进行信息采集 --定义过载的最大值 local maxConnectNum = 10000 --获取当前链接数量 local currentConnect = tonumber(ngx.var.connections_active) --若是当前链接数大于过载范围,再也不进行信息采集 if currentConnect > maxConnectNum then return end -- 均衡分区操做 --定义Kafka分区数量 local partition_num = 6 --定义共享词典中的变量名 local sharedKey = "publicValue" --共享词典操做对象 local shared_data = ngx.shared.shared_data --从共享词典中取出数据 local num = shared_data:get(sharedKey) --若是第一运行,num没有值 if not num then --初始化一个值存入共享词典 num = 0 shared_data:set(sharedKey, 0) end --进行取余操做,肯定分区ID local patitionID = num % partition_num --调用共享词典自带的自增功能进行累加 shared_data:incr(sharedKey, 1) -- 数据采集 -- 获取当前系统时间 local time_local = ngx.var.time_local if time_local == nil then time_local = "" end -- 请求的URL local request = ngx.var.request if request == nil then request = "" end -- 获取请求方式 local request_method = ngx.var.request_method if request_method == nil then request_method = "" end -- 获取请求的内容类型,text/html,application/json local content_type = ngx.var.content_type if content_type == nil then content_type = "" end -- 读取请求体内容 ngx.req.read_body() --获取请求体数据 local request_body = ngx.var.request_body if request_body == nil then request_body = "" end -- 获取来源的URL local http_referer = ngx.var.http_referer if http_referer == nil then http_referer = "" end -- 客户端的IP地址 local remote_addr = ngx.var.remote_addr if remote_addr == nil then remote_addr = "" end -- 获取请求携带的UA信息 local http_user_agent = ngx.var.http_user_agent if http_user_agent == nil then http_user_agent = "" end -- 请求携带的时间 local time_iso8601 = ngx.var.time_iso8601 if time_iso8601 == nil then time_iso8601 = "" end -- 请求的IP地址(服务器地址) local server_addr = ngx.var.server_addr if server_addr == nil then server_addr = "" end --获取用户的Cookie信息 local http_cookie = ngx.var.http_cookie if http_cookie == nil then http_cookie = "" end --封装数据 local message = time_local .."#CS#".. request .."#CS#".. request_method .."#CS#".. content_type .."#CS#".. request_body .."#CS#".. http_referer .."#CS#".. remote_addr .."#CS#".. http_user_agent .."#CS#".. time_iso8601 .."#CS#".. server_addr .."#CS#".. http_cookie; -- 链接Kafka,将message发送出去 -- 引用Kafka模块 local kafka = require "resty.kafka.producer" --建立producer local broker_list = {{host="192.168.80.81",port=9092},{host="192.168.80.82",port=9092},{host="192.168.80.83",port=9092}} local producer = kafka:new(broker_list) --发送数据(主题,key(使用partitionid(0-5)做为key),消息) local ok, err = producer:send("sz07", tostring(patitionID), message) if not ok then ngx.say("Kafka发送失败"..err) return end
注意:
分区数量使用Lua没法指定,须要使用kafka脚本手动指定 查看topic操做的帮助 /export/servers/kafka_2.11-1.0.0/bin/kafka-topics.sh --zookeeper node01:2181 --help 修改分区数量为6: /export/servers/kafka_2.11-1.0.0/bin/kafka-topics.sh --zookeeper node01:2181 --alter --partitions 6 --topic sz07
数据预处理
获取Kafka中的消息
建立工程
导入pom.xml配置文件
导入配置文件
将反扒参考资料\配置文件目录下的文件拷贝到项目resources目录下
修改配置文件中的IP相关配置
导入项目须要的实体类以及工具类
将反扒参考资料\工具包中的类拷贝到项目中
消费Kafka数据的2种方式
消费的偏移量由Spark保存在CheckPoint中
优势:
1. 不会出现重复消费,可以保证刚好一次语义
链路统计
编写主程序APP
package com.air.antispider.stream.dataprocess import com.air.antispider.stream.common.util.jedis.PropertiesUtil import com.air.antispider.stream.dataprocess.businessprocess.BusinessProcess import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka.KafkaUtils /** * 数据预处理的主程序 */ object DataProcessApp { def main(args: Array[String]): Unit = { //建立Spark配置对象 val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DataProcessApp") //建立SparkStreamingContext对象 val ssc = new StreamingContext(sparkConf, Seconds(2)) //消费Kafka消息,有几种方式?2种 var kafkaParams = Map[String, String]() //从kafkaConfig.properties配置文件中获取broker列表信息 val brokerList: String = PropertiesUtil.getStringByKey("default.brokers", "kafkaConfig.properties") kafkaParams += ("metadata.broker.list" -> brokerList) val topics = Set[String]("sz07") //使用Direct方式从Kafka中消费数据 //StringDecoder:默认状况下,java的序列化性能不高,Kafka为了提升序列化性能,须要使用kafka本身的序列化机制 val inputDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) //获取的消息是(key,message)的形式, val messageDStream: DStream[String] = inputDStream.map(_._2) messageDStream.foreachRDD(messageRDD =>{ //开启链路统计功能 BusinessProcess.linkCount(messageRDD) messageRDD.foreach(println) }) //启动Spark程序 ssc.start() ssc.awaitTermination() } }
链路统计代码
package com.air.antispider.stream.dataprocess.businessprocess import java.util.Date import com.air.antispider.stream.common.util.jedis.{JedisConnectionUtil, PropertiesUtil} import org.apache.spark.rdd.RDD import org.json4s.DefaultFormats import org.json4s.jackson.Json import redis.clients.jedis.JedisCluster /** * 链路统计功能 */ object BusinessProcess { def linkCount(messageRDD: RDD[String]) = { //信息采集量 val serverCountRDD: RDD[(String, Int)] = messageRDD.map(message => { val arr: Array[String] = message.split("#CS#") if (arr.length > 9) { //有数据 val serverIP = arr(9) //(ip,1次) (serverIP, 1) } else { ("", 1) } }) //按照Key进行累加操做 .reduceByKey(_ + _) //当前活跃链接数 val activeNumRDD: RDD[(String, Int)] = messageRDD.map(message => { val arr: Array[String] = message.split("#CS#") if (arr.length > 11) { //取IP val serverIP = arr(9) //取本IP的活跃链接数量 val activeNum = arr(11) //(ip,1次) (serverIP, activeNum.toInt) } else { ("", 1) } }) //舍弃一个值,主须要一个活跃链接数就ok了 .reduceByKey((x, y) => y) //进行数据展现 //经过跟踪java代码,发现咱们须要封装一个json数据,存入Redis中,让前端进行数据展现 if (!serverCountRDD.isEmpty() && !activeNumRDD.isEmpty()) { //若是数据不为空,开始数据处理 //将RDD的结果转换为Map val serversCountMap: collection.Map[String, Int] = serverCountRDD.collectAsMap() val activeNumMap: collection.Map[String, Int] = activeNumRDD.collectAsMap() val map = Map[String, collection.Map[String, Int]]( "serversCountMap" -> serversCountMap, "activeNumMap" -> activeNumMap ) //将map转换为JSON val jsonData: String = Json(DefaultFormats).write(map) //将jsonData存入Redis中 //获取Redis链接 val jedis: JedisCluster = JedisConnectionUtil.getJedisCluster //存入数据 //使用CSANTI_MONITOR_LP + 时间戳 格式来做为Key val key: String = PropertiesUtil.getStringByKey("cluster.key.monitor.linkProcess", "jedisConfig.properties") + new Date().getTime val ex: Int = PropertiesUtil.getStringByKey("cluster.exptime.monitor", "jedisConfig.properties").toInt //当前数据是以天为单位进行存储的,因此有效时间,设置为1天就好了 // jedis.set(key, jsonData) //设置超时时间为2分钟 jedis.setex(key, ex, jsonData) } } }
URL过滤
流程:
1. 先获取MySQL数据库中的URL过滤规则
代码编写
代码:
import com.air.antispider.stream.common.util.database.QueryDB import scala.collection.mutable.ArrayBuffer /** * 加载MySQL中的规则,方便Spark进行计算 */ object AnalyzeRuleDB { /** * 获取MySQL中的URL过滤规则 */ def getFilterRule(): ArrayBuffer[String] = { val sql = "select value from nh_filter_rule" val field = "value" //查询数据库的value列 val filterRule: ArrayBuffer[String] = QueryDB.queryData(sql, field) filterRule } }
在建立SparkContext以后,获取Kafka数据以前,加载数据库的信息,放入广播变量
//加载数据库规则,放入广播变量 val filterRuleList: ArrayBuffer[String] = AnalyzeRuleDB.getFilterRule() //将过滤规则列表放入广播变量 //@volatile 让多个线程可以安全的修改广播变量 @volatile var filterRuleBroadcast: Broadcast[ArrayBuffer[String]] = sc.broadcast(filterRuleList)
@volatile注解:
更新广播变量
//先检查数据库,更新广播变量
var filterRuleChangeFlag = jedis.get("FilterRuleChangeFlag")
//检查标记是否存在
if (StringUtils.isBlank(filterRuleChangeFlag)) {
filterRuleChangeFlag = "true"
//从新设置到Redis中
jedis.set("FilterRuleChangeFlag", filterRuleChangeFlag)
}
//更新广播变量
if (filterRuleChangeFlag.toBoolean) {
//FilterRuleChangeFlag为true,表明须要从新更新广播变量
//加载数据库规则,放入广播变量
val filterRuleList: ArrayBuffer[String] = AnalyzeRuleDB.getFilterRule()
//将过滤规则列表放入广播变量
//@volatile 让多个线程可以安全的修改广播变量
filterRuleBroadcast = sc.broadcast(filterRuleList)
filterRuleChangeFlag = "false"
jedis.set("FilterRuleChangeFlag", filterRuleChangeFlag)
}
建立URLFilter过滤类
import scala.collection.mutable.ArrayBuffer
/**在主程序中引用URLFilter过滤类
//URL过滤功能
val filterRDD: RDD[String] = messageRDD.filter(message => URLFilter.filterURL(message, filterRuleBroadcast.value))
数据加密操做
代码编写:
package com.air.antispider.stream.dataprocess.businessprocess import java.util.regex.{Matcher, Pattern} import com.air.antispider.stream.common.util.decode.MD5 import org.apache.spark.rdd.RDD /** * 对用户的敏感信息进行加密操做 */ object EncryptedData { /** * 加密身份证号 * @param encryptedPhoneRDD * @return */ def encryptedID(encryptedPhoneRDD: RDD[String]): RDD[String] = { //如何找到手机号 encryptedPhoneRDD.map(message => { //建立加密对象 val md5 = new MD5 //找message中的手机号 //可使用正则表达式来找 val pattern: Pattern = Pattern.compile("(\\d{18})|(\\d{17}(\\d|X|x))|(\\d{15})") //使用正则对象,对message进行匹配,matcher是匹配结果 val matcher: Matcher = pattern.matcher(message) var tempMessage = message // while (iterator.hasNext()) { // iterator.next() // } //循环结果,看有没有匹配到的数据 while (matcher.find()) { //取出匹配结果 val id: String = matcher.group() //加密/替换 val encryptedID: String = md5.getMD5ofStr(id) tempMessage = tempMessage.replace(id, encryptedID) } //返回加密以后的数据 tempMessage }) } //手机号加密 def encryptedPhone(filterRDD: RDD[String]): RDD[String] = { //如何找到手机号 filterRDD.map(message => { //建立加密对象 val md5 = new MD5 //找message中的手机号 //可使用正则表达式来找 val pattern: Pattern = Pattern.compile("((13[0-9])|(14[5|7])|(15([0-3]|[5-9]))|(17[0-9])|(18[0,5-9]))\\d{8}") //使用正则对象,对message进行匹配,matcher是匹配结果 val matcher: Matcher = pattern.matcher(message) var tempMessage = message // while (iterator.hasNext()) { // iterator.next() // } //循环结果,看有没有匹配到的数据 while (matcher.find()) { //取出匹配结果 val phone: String = matcher.group() //加密/替换 val encryptedPhone: String = md5.getMD5ofStr(phone) tempMessage = tempMessage.replace(phone, encryptedPhone) } //返回加密以后的数据 tempMessage }) } }
主程序:
//进行数据脱敏操做 //加密手机号 val encryptedPhoneRDD: RDD[String] = EncryptedData.encryptedPhone(filterRDD) //加密身份证号 val encryptedRDD: RDD[String] = EncryptedData.encryptedID(encryptedPhoneRDD)
数据切割操做
代码:
package com.air.antispider.stream.dataprocess.businessprocess import java.util.regex.Pattern import com.air.antispider.stream.common.util.decode.{EscapeToolBox, RequestDecoder} import com.air.antispider.stream.common.util.jedis.PropertiesUtil import com.air.antispider.stream.common.util.string.CsairStringUtils /** * 数据切割主程序 */ object DataSplit { /** * 将源数据进行切割,获得具体的参数 * @param message * @return */ def split(message: String):(String,String,String,String,String,String,String,String,String,String,String,String ) = { val values: Array[String] = message.split("#CS#") //从arr中取出这12个参数,进行赋值操做 //记录数据长度 val valuesLength = values.length //request 原始数据 val regionalRequest = if (valuesLength > 1) values(1) else "" //分割出 request 中的 url val request = if (regionalRequest.split(" ").length > 1) { regionalRequest.split(" ")(1) } else { "" } //请求方式 GET/POST val requestMethod = if (valuesLength > 2) values(2) else "" //content_type val contentType = if (valuesLength > 3) values(3) else "" //Post 提交的数据体 val requestBody = if (valuesLength > 4) values(4) else "" //http_referrer val httpReferrer = if (valuesLength > 5) values(5) else "" //客户端 IP val remoteAddr = if (valuesLength > 6) values(6) else "" //客户端 UA val httpUserAgent = if (valuesLength > 7) values(7) else "" //服务器时间的 ISO8610 格式 val timeIso8601 = if (valuesLength > 8) values(8) else "" //服务器地址 val serverAddr = if (valuesLength > 9) values(9) else "" //Cookie 信息 //原始信息中获取 Cookie 字符串,去掉空格,制表符 val cookiesStr = CsairStringUtils.trimSpacesChars(if (valuesLength > 10) values(10) else "") //提取 Cookie 信息并保存为 K-V 形式 val cookieMap = { var tempMap = new scala.collection.mutable.HashMap[String, String] if (!cookiesStr.equals("")) { cookiesStr.split(";").foreach { s => val kv = s.split("=") //UTF8 解码 if (kv.length > 1) { try { val chPattern = Pattern.compile("u([0-9a-fA-F]{4})") val chMatcher = chPattern.matcher(kv(1)) var isUnicode = false while (chMatcher.find()) { isUnicode = true } if (isUnicode) { tempMap += (kv(0) -> EscapeToolBox.unescape(kv(1))) } else { tempMap += (kv(0) -> RequestDecoder.decodePostRequest(kv(1))) } } catch { case e: Exception => e.printStackTrace() } } } } tempMap } //Cookie 关键信息解析 //从配置文件读取 Cookie 配置信息 val cookieKey_JSESSIONID = PropertiesUtil.getStringByKey("cookie.JSESSIONID.key", "cookieConfig.properties") val cookieKey_userId4logCookie = PropertiesUtil.getStringByKey("cookie.userId.key", "cookieConfig.properties") //Cookie-JSESSIONID val cookieValue_JSESSIONID = cookieMap.getOrElse(cookieKey_JSESSIONID, "NULL") //Cookie-USERID-用户 ID val cookieValue_USERID = cookieMap.getOrElse(cookieKey_userId4logCookie, "NULL") (request,requestMethod,contentType,requestBody,httpReferrer,remoteAddr,httpUserAgent,timeIso8601,serverAddr,cookiesStr,cookieValue_JSESSIONID,cookieValue_USERID) } }
主程序:
encryptedRDD.map(message => { //获取到消息后开始进行数据切割/打标签等操做 //数据切割 val (request, requestMethod, contentType, requestBody, httpReferrer, remoteAddr, httpUserAgent, timeIso8601, serverAddr, cookiesStr, cookieValue_JSESSIONID, cookieValue_USERID) = DataSplit.split(message) })
数据打标签
为了方便后面的业务进行数据解析操做,必须知道当前的信息是一个什么样的请求,好比是国内/查询/单程,仍是国际/查询/往返,
分类打标签
去数据库中查询分类规则信息
/**
* 查询标签规则的数据
*/
def getClassifyRule(): Map[String, ArrayBuffer[String]] = {
//获取"国内查询"的全部URL
val nationalQuerySQL = "select expression from nh_classify_rule where flight_type = " + FlightTypeEnum.National.id + " and operation_type = " + BehaviorTypeEnum.Query.id
val nationalQueryList: ArrayBuffer[String] = QueryDB.queryData(nationalQuerySQL, "expression")
//获取"国内预约"的全部URL
val nationalBookSQL = "select expression from nh_classify_rule where flight_type = " + FlightTypeEnum.National.id + " and operation_type = " + BehaviorTypeEnum.Book.id
val nationalBookList: ArrayBuffer[String] = QueryDB.queryData(nationalBookSQL, "expression")
//获取"国际查询"的全部URL
val internationalQuerySQL = "select expression from nh_classify_rule where flight_type = " + FlightTypeEnum.International.id + " and operation_type = " + BehaviorTypeEnum.Query.id
val internationalQueryList: ArrayBuffer[String] = QueryDB.queryData(internationalQuerySQL, "expression")
//获取"国际预约"的全部URL
val internationalBookSQL = "select expression from nh_classify_rule where flight_type = " + FlightTypeEnum.International.id + " and operation_type = " + BehaviorTypeEnum.Book.id
val internationalBookList: ArrayBuffer[String] = QueryDB.queryData(internationalBookSQL, "expression")
//定义一个Map,用来封装上面的4个集合 val map = Map[String, ArrayBuffer[String]]( "nationalQueryList" -> nationalQueryList, "nationalBookList" -> nationalBookList, "internationalQueryList" -> internationalQueryList, "internationalBookList" -> internationalBookList ) map
}
加载分类规则到广播变量
//将分类规则加载到广播变量 val classifyRuleMap: Map[String, ArrayBuffer[String]] = AnalyzeRuleDB.getClassifyRule() @volatile var classifyRuleBroadcast: Broadcast[Map[String, ArrayBuffer[String]]] = sc.broadcast(classifyRuleMap)
更新广播变量
//更新分类规则信息
var classifyRuleChangeFlag: String = jedis.get("ClassifyRuleChangeFlag")
//先判断classifyRuleChangeFlag是否为空
if (StringUtils.isBlank(classifyRuleChangeFlag)){
classifyRuleChangeFlag = "true"
//从新设置到Redis中
jedis.set("ClassifyRuleChangeFlag", classifyRuleChangeFlag)
}
if (classifyRuleChangeFlag.toBoolean) {
classifyRuleBroadcast.unpersist()
//将分类规则加载到广播变量
val classifyRuleMap: Map[String, ArrayBuffer[String]] = AnalyzeRuleDB.getClassifyRule()
classifyRuleBroadcast = sc.broadcast(classifyRuleMap)
classifyRuleChangeFlag = "false"
//从新设置到Redis中
jedis.set("ClassifyRuleChangeFlag", classifyRuleChangeFlag)
}
根据广播变量中的规则对当前请求打标签
package com.air.antispider.stream.dataprocess.businessprocess
import com.air.antispider.stream.common.bean.RequestType
import com.air.antispider.stream.dataprocess.constants.{BehaviorTypeEnum, FlightTypeEnum}
import com.air.antispider.stream.dataprocess.constants.FlightTypeEnum.FlightTypeEnum
import scala.collection.mutable.ArrayBuffer
object RequestTypeClassifier {
/**
* 对请求的分类进行判断
* @param request
* @param classifyRuleMap
* @return 用户的请求分类信息(国内,查询)
*/
def requestTypeClassifier(request: String, classifyRuleMap: Map[String, ArrayBuffer[String]]): RequestType = {
//取出分类集合中的数据
val nationalQueryList: ArrayBuffer[String] = classifyRuleMap.getOrElse("nationalQueryList", null)
val nationalBookList: ArrayBuffer[String] = classifyRuleMap.getOrElse("nationalBookList", null)
val internationalQueryList: ArrayBuffer[String] = classifyRuleMap.getOrElse("internationalQueryList", null)
val internationalBookList: ArrayBuffer[String] = classifyRuleMap.getOrElse("internationalBookList", null)
//变量这4个集合,看当前的request在哪一个集合中匹配 //国内查询 if (nationalQueryList != null) { // fira code for (expression <- nationalQueryList) { //判断当前请求的URL是否和本正则匹配 if (request.matches(expression)) { return RequestType(FlightTypeEnum.National, BehaviorTypeEnum.Query) } } } //国内预约 if (nationalBookList != null) { // fira code for (expression <- nationalBookList) { //判断当前请求的URL是否和本正则匹配 if (request.matches(expression)) { return RequestType(FlightTypeEnum.National, BehaviorTypeEnum.Book) } } } //国际查询 if (internationalQueryList != null) { // fira code for (expression <- internationalQueryList) { //判断当前请求的URL是否和本正则匹配 if (request.matches(expression)) { return RequestType(FlightTypeEnum.International, BehaviorTypeEnum.Query) } } } //国际预约 if (internationalBookList != null) { // fira code for (expression <- internationalBookList) { //判断当前请求的URL是否和本正则匹配 if (request.matches(expression)) { return RequestType(FlightTypeEnum.International, BehaviorTypeEnum.Book) } } } //若是上面没有任何一个匹配上,那么返回一个默认值 return RequestType(FlightTypeEnum.Other, BehaviorTypeEnum.Other)
}
}
5.在主程序中引用打标签的方法
//对请求的分类进行打标签操做 val requestType: RequestType = RequestTypeClassifier.requestTypeClassifier(request, classifyRuleBroadcast.value)
往返类型打标签
用户请求信息中没有携带往返类型信息,咱们须要须要用HttpReferrer中获取日期数量来判断往返类型,若是日期个数为1,单程.若是个数为2,往返
编写代码:
package com.air.antispider.stream.dataprocess.businessprocess import java.util.regex.{Matcher, Pattern} import com.air.antispider.stream.dataprocess.constants.TravelTypeEnum import com.air.antispider.stream.dataprocess.constants.TravelTypeEnum.TravelTypeEnum /** * 往返信息打标签 */ object TravelTypeClassifier { def travelTypeClassifier(httpReferrer: String): TravelTypeEnum = { val pattern: Pattern = Pattern.compile("(\\d{4})-(0\\d{1}|1[0-2])-(0\\d{1}|[12]\\d{1}|3[01])") val matcher: Matcher = pattern.matcher(httpReferrer) //建立一个计数器 var num = 0 //调用find方法的时候,游标会自动向下 while (matcher.find()) { num = num + 1 } if (num == 1) { //是单程 return TravelTypeEnum.OneWay } else if (num == 2) { //是往返 return TravelTypeEnum.RoundTrip } else { //不知道啊 return TravelTypeEnum.Unknown } } }
主程序:
//对往返数据进行打标签操做 val travelTypeEnum: TravelTypeEnum = TravelTypeClassifier.travelTypeClassifier(httpReferrer)
数据解析操做
由于先有南航系统,由于系统开发久远,各个模块请求参数不统一或者请求格式不统一,咱们根据航线类型/操做类型/请求的URL/请求方式等信息,经过查询数据库中的analyzerule表信息,获取解析规则,经过数据库配置好的解析规则来进行数据解析,
此处主要肯定2个内容:1. 解析方式,好比使用json解析仍是使用XML方式解析. 2. 肯定须要解析哪些字段
加载数据库解析规则
从反扒参考资料\工具包\解析类\AnalyzeRuleDB.scala中找到获取解析规则的方法:queryRule
/** * 查询"查询规则"或者“预约规则”正则表达式,添加到广播变量 * * @return */ def queryRule(behaviorType: Int): List[AnalyzeRule] = { //mysql中解析规则(0-查询,1-预订)数据 var analyzeRuleList = new ArrayBuffer[AnalyzeRule]() val sql: String = "select * from analyzerule where behavior_type =" + behaviorType var conn: Connection = null var ps: PreparedStatement = null var rs: ResultSet = null try { conn = c3p0Util.getConnection ps = conn.prepareStatement(sql) rs = ps.executeQuery() while (rs.next()) { val analyzeRule = new AnalyzeRule() analyzeRule.id = rs.getString("id") analyzeRule.flightType = rs.getString("flight_type").toInt analyzeRule.BehaviorType = rs.getString("behavior_type").toInt analyzeRule.requestMatchExpression = rs.getString("requestMatchExpression") analyzeRule.requestMethod = rs.getString("requestMethod") analyzeRule.isNormalGet = rs.getString("isNormalGet").toBoolean analyzeRule.isNormalForm = rs.getString("isNormalForm").toBoolean analyzeRule.isApplicationJson = rs.getString("isApplicationJson").toBoolean analyzeRule.isTextXml = rs.getString("isTextXml").toBoolean analyzeRule.isJson = rs.getString("isJson").toBoolean analyzeRule.isXML = rs.getString("isXML").toBoolean analyzeRule.formDataField = rs.getString("formDataField") analyzeRule.book_bookUserId = rs.getString("book_bookUserId") analyzeRule.book_bookUnUserId = rs.getString("book_bookUnUserId") analyzeRule.book_psgName = rs.getString("book_psgName") analyzeRule.book_psgType = rs.getString("book_psgType") analyzeRule.book_idType = rs.getString("book_idType") analyzeRule.book_idCard = rs.getString("book_idCard") analyzeRule.book_contractName = rs.getString("book_contractName") analyzeRule.book_contractPhone = rs.getString("book_contractPhone") analyzeRule.book_depCity = rs.getString("book_depCity") analyzeRule.book_arrCity = rs.getString("book_arrCity") analyzeRule.book_flightDate = rs.getString("book_flightDate") analyzeRule.book_cabin = rs.getString("book_cabin") analyzeRule.book_flightNo = rs.getString("book_flightNo") analyzeRule.query_depCity = rs.getString("query_depCity") analyzeRule.query_arrCity = rs.getString("query_arrCity") analyzeRule.query_flightDate = rs.getString("query_flightDate") analyzeRule.query_adultNum = rs.getString("query_adultNum") analyzeRule.query_childNum = rs.getString("query_childNum") analyzeRule.query_infantNum = rs.getString("query_infantNum") analyzeRule.query_country = rs.getString("query_country") analyzeRule.query_travelType = rs.getString("query_travelType") analyzeRule.book_psgFirName = rs.getString("book_psgFirName") analyzeRuleList += analyzeRule } } catch { case e: Exception => e.printStackTrace() } finally { c3p0Util.close(conn, ps, rs) } analyzeRuleList.toList }
将规则放入广播变量
//加载解析规则信息到广播变量 val queryRuleList: List[AnalyzeRule] = AnalyzeRuleDB.queryRule(0) val bookRuleList: List[AnalyzeRule] = AnalyzeRuleDB.queryRule(1) @volatile var queryRuleBroadcast: Broadcast[List[AnalyzeRule]] = sc.broadcast(queryRuleList) @volatile var bookRuleBroadcast: Broadcast[List[AnalyzeRule]] = sc.broadcast(bookRuleList)
更新广播变量
//更新解析规则信息 var analyzeRuleChangeFlag: String = jedis.get("AnalyzeRuleChangeFlag") //先判断classifyRuleChangeFlag是否为空 if (StringUtils.isBlank(analyzeRuleChangeFlag)){ analyzeRuleChangeFlag = "true" //从新设置到Redis中 jedis.set("AnalyzeRuleChangeFlag", analyzeRuleChangeFlag) } if (analyzeRuleChangeFlag.toBoolean) { queryRuleBroadcast.unpersist() bookRuleBroadcast.unpersist() //将解析规则加载到广播变量 //加载解析规则信息到广播变量 val queryRuleList: List[AnalyzeRule] = AnalyzeRuleDB.queryRule(0) val bookRuleList: List[AnalyzeRule] = AnalyzeRuleDB.queryRule(1) queryRuleBroadcast = sc.broadcast(queryRuleList) bookRuleBroadcast = sc.broadcast(bookRuleList) analyzeRuleChangeFlag = "false" //从新设置到Redis中 jedis.set("AnalyzeRuleChangeFlag", analyzeRuleChangeFlag) }
编写解析规则代码
将反扒参考资料\工具包\解析类路径下的AnalyzeBookRequest和AnalyzeRequest2个类拷贝到com.air.antispider.stream.dataprocess.businessprocess包下
主程序调用
//开始解析数据 //解析查询数据 val queryParams: Option[QueryRequestData] = AnalyzeRequest.analyzeQueryRequest( requestType, requestMethod, contentType, request, requestBody, travelTypeEnum, queryRuleBroadcast.value) //解析预约数据 val bookParams: Option[BookRequestData] = AnalyzeBookRequest.analyzeBookRequest( requestType, requestMethod, contentType, request, requestBody, travelTypeEnum, bookRuleBroadcast.value )
数据加工
提早将本次访问的IP和MySQL中的黑名单数据进行比对,判断当前的IP是不是一个高频IP,若是是高频IP,那么就打个标记,让后续业务使用.
加载MySQL中的黑名单数据
/**
* 查询MySQL数据库中的黑名单数据
* @return
*/
def getIpBlackList (): ArrayBuffer[String] = {
val sql = "select ip_name from nh_ip_blacklist"
val blackIPList: ArrayBuffer[String] = QueryDB.queryData(sql, "ip_name")
blackIPList
}
将黑名单数据放入广播变量
//将黑名单数据加载到广播变量 val blackIPList: ArrayBuffer[String] = AnalyzeRuleDB.getIpBlackList() @volatile var blackIPBroadcast: Broadcast[ArrayBuffer[String]] = sc.broadcast(blackIPList)
更新广播变量的黑名单数据
//更新黑名单信息 var blackIPChangeFlag: String = jedis.get("BlackIPChangeFlag") //先判断classifyRuleChangeFlag是否为空 if (StringUtils.isBlank(blackIPChangeFlag)){ blackIPChangeFlag = "true" //从新设置到Redis中 jedis.set("BlackIPChangeFlag", blackIPChangeFlag) } if (blackIPChangeFlag.toBoolean) { blackIPBroadcast.unpersist() //将黑名单数据加载到广播变量 val blackIPList: ArrayBuffer[String] = AnalyzeRuleDB.getIpBlackList() blackIPBroadcast = sc.broadcast(blackIPList) blackIPChangeFlag = "false" //从新设置到Redis中 jedis.set("BlackIPChangeFlag", blackIPChangeFlag) }
编写判断高频IP代码
package com.air.antispider.stream.dataprocess.businessprocess
import scala.collection.mutable.ArrayBuffer
object IpOperation {
/**
* 判断当前客户端IP是不是高频IP
* @param remoteAddr
* @param blackIPList
* @return
*/
def operationIP(remoteAddr: String, blackIPList: ArrayBuffer[String]): Boolean = {
//遍历blackIPList,判断remoteAddr在集合中是否存在
for (blackIP <- blackIPList) {
if (blackIP.equals(remoteAddr)){
//若是相等,当前IP是高频IP
return true
}
}
return false
}
}
主程序代码
//数据加工操做
val highFrqIPGroup: Boolean = IpOperation.operationIP(remoteAddr, blackIPBroadcast.value)
数据结构化
前面获取/计算的数据都是散乱的,没办法交给后面的业务进行处理,因此须要进行封装为结构化数据.
代码编写:
package com.air.antispider.stream.dataprocess.businessprocess import com.air.antispider.stream.common.bean.{BookRequestData, CoreRequestParams, ProcessedData, QueryRequestData, RequestType} import com.air.antispider.stream.dataprocess.constants.TravelTypeEnum.TravelTypeEnum object DataPackage { /** * 对下方散乱的数据,进行封装,封装为ProcessedData * @param source * @param requestMethod * @param request * @param remoteAddr * @param httpUserAgent * @param timeIso8601 * @param serverAddr * @param highFrqIPGroup * @param requestType * @param travelType * @param cookieValue_JSESSIONID * @param cookieValue_USERID * @param queryParams * @param bookParams * @param httpReferrer * @return */ def dataPackage(sourceData: String, requestMethod: String, request: String, remoteAddr: String, httpUserAgent: String, timeIso8601: String, serverAddr: String, highFrqIPGroup: Boolean, requestType: RequestType, travelType: TravelTypeEnum, cookieValue_JSESSIONID: String, cookieValue_USERID: String, queryParams: Option[QueryRequestData], bookParams: Option[BookRequestData], httpReferrer: String): ProcessedData = { //由于建立ProcessedData的时候,还须要核心请求参数, //但这些参数在queryParams/bookParams中 //定义出发时间/始发地/目的地等参数 var flightDate: String = "" //出发地 var depcity: String = "" //目的地 var arrcity: String = "" //看查询请求参数中有没有值 queryParams match { //Option有值的状况,queryData:若是有值,就使用此变量操做 case Some(x) => flightDate = x.flightDate depcity = x.depCity arrcity = x.arrCity //None:没有值 case None => //若是查询请求参数没有值,就去预约请求参数中获取 bookParams match { //Option有值的状况,queryData:若是有值,就使用此变量操做 case Some(bookData) => //为了确保安全,须要加上长度判断,只有长度大于0才能这样取值 flightDate = bookData.flightDate.mkString depcity = bookData.depCity.mkString arrcity = bookData.arrCity.mkString //None:没有值 case None => } } //建立核心请求参数 val requestParams = CoreRequestParams(flightDate, depcity, arrcity) ProcessedData( sourceData, requestMethod, request, remoteAddr, httpUserAgent, timeIso8601, serverAddr, highFrqIPGroup, requestType, travelType, requestParams, cookieValue_JSESSIONID, cookieValue_USERID, queryParams, bookParams, httpReferrer) } }
主程序代码
//进行数据信息提取/转换等操做,获得ProcessedDataRDD val processedDataRDD: RDD[ProcessedData] = encryptedRDD.map(message => { //获取到消息后开始进行数据切割/打标签等操做 //数据切割 val (request, //请求URL requestMethod, contentType, requestBody, //请求体 httpReferrer, //来源URL remoteAddr, //客户端IP httpUserAgent, timeIso8601, serverAddr, cookiesStr, cookieValue_JSESSIONID, cookieValue_USERID) = DataSplit.split(message) //对请求的分类进行打标签操做 val requestType: RequestType = RequestTypeClassifier.requestTypeClassifier(request, classifyRuleBroadcast.value) //对往返数据进行打标签操做 val travelType: TravelTypeEnum = TravelTypeClassifier.travelTypeClassifier(httpReferrer) //开始解析数据 //解析查询数据 val queryParams: Option[QueryRequestData] = AnalyzeRequest.analyzeQueryRequest( requestType, requestMethod, contentType, request, requestBody, travelType, queryRuleBroadcast.value) //解析预约数据 val bookParams: Option[BookRequestData] = AnalyzeBookRequest.analyzeBookRequest( requestType, requestMethod, contentType, request, requestBody, travelType, bookRuleBroadcast.value ) //数据加工操做 val highFrqIPGroup: Boolean = IpOperation.operationIP(remoteAddr, blackIPBroadcast.value) //对上面的散乱数据进行封装 val processedData: ProcessedData = DataPackage.dataPackage( "", //原始数据,此处直接置为空 requestMethod, request, remoteAddr, httpUserAgent, timeIso8601, serverAddr, highFrqIPGroup, requestType, travelType, cookieValue_JSESSIONID, cookieValue_USERID, queryParams, bookParams, httpReferrer) processedData })
数据推送模块
为了实现更好的解耦,在数据推送的时候,会根据请求具体的类型,好比查询/预约,发送到不一样的Topic.后面的业务,就很近本身的须要去拉取本身的消息
代码编写:
package com.air.antispider.stream.dataprocess.businessprocess import com.air.antispider.stream.common.bean.ProcessedData import com.air.antispider.stream.common.util.jedis.PropertiesUtil import com.air.antispider.stream.dataprocess.constants.BehaviorTypeEnum import com.air.antispider.stream.dataprocess.constants.BehaviorTypeEnum.BehaviorTypeEnum import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.spark.rdd.RDD object SendData { /** * 发送预约数据到Kafka * * @param processedDataRDD */ def sendBookDataKafka(processedDataRDD: RDD[ProcessedData]) = { sendToKafka(processedDataRDD, 1) } /** * 发送查询数据到Kafka * * @param processedDataRDD */ def sendQueryDataKafka(processedDataRDD: RDD[ProcessedData]) = { sendToKafka(processedDataRDD, 0) } /** * 根据指定的类型,发送到Kafka * * @param processedDataRDD * @param topicType 0: 查询,1: 预约 */ def sendToKafka(processedDataRDD: RDD[ProcessedData], topicType: Int) = { //将processedData数据发送到Kafka中 val messageRDD: RDD[String] = processedDataRDD //根据类型进行过滤 .filter(processedData => processedData.requestType.behaviorType.id == topicType) //将数据转换为字符串 .map(processedData => processedData.toKafkaString()) //若是通过过滤操做后,还有数据,那么就发送 if (!messageRDD.isEmpty()) { //定义Kafka相关配置 //查询数据的 topic:target.query.topic = processedQuery var topicKey = "" if (topicType == 0) { topicKey = "target.query.topic" } else if (topicType == 1) { topicKey = "target.book.topic" } val queryTopic = PropertiesUtil.getStringByKey(topicKey, "kafkaConfig.properties") //建立 map 封装 kafka 参数 val props = new java.util.HashMap[String, Object]() //设置 brokers props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, PropertiesUtil.getStringByKey("default.brokers", "kafkaConfig.properties")) //key 序列化方法 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, PropertiesUtil.getStringByKey("default.key_serializer_class_config", "kafkaConfig.properties")) //value 序列化方法 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, PropertiesUtil.getStringByKey("default.value_serializer_class_config", "kafkaConfig.properties")) //批发送设置:32KB 做为一批次或 10ms 做为一批次 props.put(ProducerConfig.BATCH_SIZE_CONFIG, PropertiesUtil.getStringByKey("default.batch_size_config", "kafkaConfig.properties")) props.put(ProducerConfig.LINGER_MS_CONFIG, PropertiesUtil.getStringByKey("default.linger_ms_config", "kafkaConfig.properties")) messageRDD.foreachPartition(iter => { //先建立Kafka链接 val producer = new KafkaProducer[String, String](props) //发送数据 iter.foreach(message => { //发送数据 producer.send(new ProducerRecord[String, String](queryTopic, message)) }) //关闭Kafka链接 producer.close() }) } } }
主程序:
//将结构化的数据ProcessedData根据不一样的请求发送到不一样的Topic中 //发送查询数据到Kafka SendData.sendQueryDataKafka(processedDataRDD) //发送预约数据到Kafka SendData.sendBookDataKafka(processedDataRDD)
任务实时监控
Spark自带有性能监控功能,须要在建立SparkConf的时候开启:
//当应用被中止的时候,进行以下设置能够保证当前批次执行完以后再中止应用。 System.setProperty("spark.streaming.stopGracefullyOnShutdown", "true") //建立Spark配置对象 val sparkConf: SparkConf = new SparkConf() .setMaster("local[*]") .setAppName("DataProcessApp") //开启Spark性能监控功能 .set("spark.metrics.conf.executor.source.jvm.class", "org.apache.spark.metrics.source.JvmSource")
在浏览器上能够经过:http://localhost:4040/metrics/json/访问
代码编写
package com.air.antispider.stream.dataprocess.businessprocess import com.air.antispider.stream.common.bean.ProcessedData import com.air.antispider.stream.common.util.jedis.{JedisConnectionUtil, PropertiesUtil} import com.air.antispider.stream.common.util.spark.SparkMetricsUtils import com.alibaba.fastjson.JSONObject import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.json4s.DefaultFormats import org.json4s.jackson.Json import redis.clients.jedis.JedisCluster object SparkStreamingMonitor { /** * Spark性能监控, * * @param sc * @param processedDataRDD * @param serversCountMap */ def streamMonitor(sc: SparkContext, processedDataRDD: RDD[ProcessedData], serversCountMap: collection.Map[String, Int]) = { //1. 获取到Spark的状态信息 /* //在项目上线后,使用下方的方式获取URL //监控数据获取 val sparkDriverHost = sc.getConf.get("spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY _URI_BASES") //在 yarn 上运行的监控数据 json 路径 val url = s"${sparkDriverHost}/metrics/json" */ val url = "http://localhost:4040/metrics/json/" val sparkDataInfo: JSONObject = SparkMetricsUtils.getMetricsJson(url) val gaugesObj: JSONObject = sparkDataInfo.getJSONObject("gauges") //获取应用ID和应用名称,用来构建json中的key val id: String = sc.applicationId val appName: String = sc.appName //local-1561617727065.driver.DataProcessApp.StreamingMetrics.streaming.lastCompletedBatch_processingStartTime val startKey = id + ".driver." + appName + ".StreamingMetrics.streaming.lastCompletedBatch_processingStartTime" val endKey = id + ".driver." + appName + ".StreamingMetrics.streaming.lastCompletedBatch_processingEndTime" val startTime = gaugesObj.getJSONObject(startKey) //{"value": 1561617812011} .getLong("value") val endTime = gaugesObj.getJSONObject(endKey) //{"value": 1561617812011} .getLong("value") //将结束时间进行格式化yyyy-MM-dd HH:mm:ss,注意,web平台使用的是24小时制,因此此处须要使用HH val endTimeStr: String = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss").format(endTime) //2. 计算时间差 val costTime = endTime - startTime //3. 根据时间差计算数据处理速度,速度= 数量/时间 //获取处理的数量 val count: Long = processedDataRDD.count() //计算处理速度 var countPer = 0.0 if (costTime != 0) { countPer = count / costTime } //4. 交给JavaWeb进行结果展现 //对serversCountMap进行转换,转换为JSON val serversCountMapJson: String = Json(DefaultFormats).write(serversCountMap) //根据web平台的代码,发现须要存入Redis中 val message = Map[String, Any]( "costTime" -> costTime.toString, //时间差 "applicationId" -> id, //应用ID "applicationUniqueName" -> appName, //应用名称 "countPerMillis" -> countPer.toString,//计算速度 "endTime" -> endTimeStr, //结束时间:2019-06-27 15:44:32 "sourceCount" -> count.toString, //数据的数量 "serversCountMap" -> serversCountMap //数据采集信息 ) //将message转换为json val messageJson: String = Json(DefaultFormats).write(message) //将messageJson发送到Kafka val jedis: JedisCluster = JedisConnectionUtil.getJedisCluster //存入Redis的Key.CSANTI_MONITOR_DP + 时间戳 val key = PropertiesUtil.getStringByKey("cluster.key.monitor.dataProcess", "jedisConfig.properties") + System.currentTimeMillis() val ex = PropertiesUtil.getStringByKey("cluster.exptime.monitor", "jedisConfig.properties").toInt jedis.setex(key, ex, messageJson) //若是须要最后一批数据,那么可使用下面的方式, val lastKey = PropertiesUtil.getStringByKey("cluster.key.monitor.dataProcess", "jedisConfig.properties") + "_LAST" jedis.set(lastKey, messageJson) } }
主程序代码:
由于第三个参数serversCountMap涉及到了以前的链路统计,因此须要修改链路统计的返回值
//开启Spark性能监控 //SparkContext, 数据集RDD, 数据采集结果信息 SparkStreamingMonitor.streamMonitor(sc, processedDataRDD, serversCountMap)
实时计算
自定义维护Offset
读取偏移量代码:
package com.air.antispider.stream.rulecompute import com.air.antispider.stream.common.util.jedis.PropertiesUtil import com.air.antispider.stream.common.util.kafka.KafkaOffsetUtil import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import org.I0Itec.zkclient.ZkClient import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 黑名单实时计算主程序 */ object RuleComputeApp { def main(args: Array[String]): Unit = { //建立Spark执行环境 //当应用被中止的时候,进行以下设置能够保证当前批次执行完以后再中止应用。 System.setProperty("spark.streaming.stopGracefullyOnShutdown", "true") //建立Spark配置对象 val sparkConf: SparkConf = new SparkConf() .setMaster("local[*]") .setAppName("RuleComputeApp") //开启Spark性能监控功能 .set("spark.metrics.conf.executor.source.jvm.class", "org.apache.spark.metrics.source.JvmSource") //建立SparkContext val sc = new SparkContext(sparkConf) //建立SparkStreamingContext对象 val ssc = new StreamingContext(sc, Seconds(2)) val inputStream: InputDStream[(String, String)] = createKafkaStream(ssc) inputStream.print() //启动程序 ssc.start() ssc.awaitTermination() } /** * 消费Kafka数据,建立InputStream对象 * @param ssc * @return */ def createKafkaStream(ssc: StreamingContext): InputDStream[(String, String)] = { //链接Kafka //封装Kafka参数信息 var kafkaParams = Map[String, String]() //从kafkaConfig.properties配置文件中获取broker列表信息 val brokerList: String = PropertiesUtil.getStringByKey("default.brokers", "kafkaConfig.properties") kafkaParams += ("metadata.broker.list" -> brokerList) //zookeeper主机地址 val zkHosts: String = PropertiesUtil.getStringByKey("zkHosts", "zookeeperConfig.properties") //topic信息存储位置 val zkPath: String = PropertiesUtil.getStringByKey("rulecompute.antispider.zkPath", "zookeeperConfig.properties") //topic val topic: String = PropertiesUtil.getStringByKey("source.query.topic", "kafkaConfig.properties") //封装topic的集合 val topics = Set[String](topic) //建立zk客户端对象 val zkClient = new ZkClient(zkHosts) //使用KafkaOffsetUtil来获取TopicAndPartition数据 val topicAndPartitionOption: Option[Map[TopicAndPartition, Long]] = KafkaOffsetUtil.readOffsets(zkClient, zkHosts, zkPath, topic) val inputStream: InputDStream[(String, String)] = topicAndPartitionOption match { //若是有数据:从Zookeeper中读取偏移量 case Some(topicAndPartition) => val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message) KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, topicAndPartition, messageHandler) //若是没有数据,还按照之前的方式来读取数据 case None => KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) } inputStream } }
保存偏移量代码:
//将数据偏移量到zookeeper中 inputStream.foreachRDD(rdd => { //保存偏移量 saveOffsets(rdd) }) /** * 保存偏移量信息 * @param rdd */ def saveOffsets(rdd: RDD[(String, String)]): Unit = { //zookeeper主机地址 val zkHosts: String = PropertiesUtil.getStringByKey("zkHosts", "zookeeperConfig.properties") //建立zk客户端对象 val zkClient = new ZkClient(zkHosts) //topic信息存储位置 val zkPath: String = PropertiesUtil.getStringByKey("rulecompute.antispider.zkPath", "zookeeperConfig.properties") KafkaOffsetUtil.saveOffsets(zkClient, zkHosts, zkPath, rdd) }
数据封装
将获取到的字符串转换为ProcessedData对象,能够直接从讲义中拷贝过来
代码:
package com.air.antispider.stream.rulecompute.businessprocess import com.air.antispider.stream.common.bean._ import com.air.antispider.stream.dataprocess.constants.TravelTypeEnum.TravelTypeEnum import com.air.antispider.stream.dataprocess.constants.{BehaviorTypeEnum, FlightTypeEnum, TravelTypeEnum} import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.spark.streaming.dstream.DStream /** * 数据分割封装 */ object QueryDataPackage { /** * 数据分割封装 * * @param lines * @return */ def queryDataLoadAndPackage(lines: DStream[String]): DStream[ProcessedData] = { //使用 mapPartitions 减小包装类的建立开销 lines.mapPartitions { partitionsIterator => //建立 json 解析 val mapper = new ObjectMapper mapper.registerModule(DefaultScalaModule) //将数据进行 map,一条条处理 partitionsIterator.map { sourceLine => //分割数据 val dataArray = sourceLine.split("#CS#", -1) //原始数据,站位,并没有数据 val sourceData = dataArray(0) val requestMethod = dataArray(1) val request = dataArray(2) val remoteAddr = dataArray(3) val httpUserAgent = dataArray(4) val timeIso8601 = dataArray(5) val serverAddr = dataArray(6) val highFrqIPGroup: Boolean = dataArray(7).equalsIgnoreCase("true") val requestType: RequestType = RequestType(FlightTypeEnum.withName(dataArray(8)), BehaviorTypeEnum.withName(dataArray(9))) val travelType: TravelTypeEnum = TravelTypeEnum.withName(dataArray(10)) val requestParams: CoreRequestParams = CoreRequestParams(dataArray(11), dataArray(12), dataArray(13)) val cookieValue_JSESSIONID: String = dataArray(14) val cookieValue_USERID: String = dataArray(15) //分析查询请求的时候不须要 book 数据 val bookRequestData: Option[BookRequestData] = None //封装 query 数据 val queryRequestData = if (!dataArray(16).equalsIgnoreCase("NULL")) { mapper.readValue(dataArray(16), classOf[QueryRequestData]) match { case value if value != null => Some(value) case _ => None } } else { None } val httpReferrer = dataArray(18) //封装流程数据,返回 ProcessedData("", requestMethod, request, remoteAddr, httpUserAgent, timeIso8601, serverAddr, highFrqIPGroup, requestType, travelType, requestParams, cookieValue_JSESSIONID, cookieValue_USERID, queryRequestData, bookRequestData, httpReferrer) } } } }
主程序:
//从inputStream中取出消息 val dStream: DStream[String] = inputStream.map(_._2) //将消息转换为ProcessedData对象 val processedDataDStream: DStream[ProcessedData] = QueryDataPackage.queryDataLoadAndPackage(dStream)
加载规则
从MySQL中获取:1. 关键页面 2. 黑名单IP 3. 流程规则
查询每一个规则的真实名称
/**
* 获取流程列表
* 参数n为0为反爬虫流程
参数n为1为防占座流程
* @return ArrayBuffer[FlowCollocation]
*/
def createFlow(n:Int) :ArrayBuffer[FlowCollocation] = {
var array = new ArrayBuffer[FlowCollocation]
var sql:String = ""
if(n == 0){ sql = "select nh_process_info.id,nh_process_info.process_name,nh_strategy.crawler_blacklist_thresholds from nh_process_info,nh_strategy where nh_process_info.id=nh_strategy.id and status=0"}
else if(n == 1){sql = "select nh_process_info.id,nh_process_info.process_name,nh_strategy.occ_blacklist_thresholds from nh_process_info,nh_strategy where nh_process_info.id=nh_strategy.id and status=1"}
var conn: Connection = null var ps: PreparedStatement = null var rs:ResultSet = null try{ conn = c3p0Util.getConnection ps = conn.prepareStatement(sql) rs = ps.executeQuery() while (rs.next()) { val flowId = rs.getString("id") val flowName = rs.getString("process_name") if(n == 0){ val flowLimitScore = rs.getDouble("crawler_blacklist_thresholds") array += new FlowCollocation(flowId, flowName,createRuleList(flowId,n), flowLimitScore, flowId) }else if(n == 1){ val flowLimitScore = rs.getDouble("occ_blacklist_thresholds") array += new FlowCollocation(flowId, flowName,createRuleList(flowId,n), flowLimitScore, flowId) } } }catch{ case e : Exception => e.printStackTrace() }finally { c3p0Util.close(conn, ps, rs) } array
}
/**
* 获取规则列表
@param process_id 根据该ID查询规则
* @return list列表
/
def createRuleList(process_id:String,n:Int):List[RuleCollocation] = {
var list = new ListBuffer[RuleCollocation]
val sql = "select from(select nh_rule.id,nh_rule.process_id,nh_rules_maintenance_table.rule_real_name,nh_rule.rule_type,nh_rule.crawler_type,"+
"nh_rule.status,nh_rule.arg0,nh_rule.arg1,nh_rule.score from nh_rule,nh_rules_maintenance_table where nh_rules_maintenance_table."+
"rule_name=nh_rule.rule_name) as tab where process_id = '"+process_id + "'and crawler_type="+n
//and status="+n
var conn: Connection = null
var ps: PreparedStatement = null
var rs:ResultSet = null
try{
conn = c3p0Util.getConnection
ps = conn.prepareStatement(sql)
rs = ps.executeQuery()
while ( rs.next() ) {
val ruleId = rs.getString("id")
val flowId = rs.getString("process_id")
val ruleName = rs.getString("rule_real_name")
val ruleType = rs.getString("rule_type")
val ruleStatus = rs.getInt("status")
val ruleCrawlerType = rs.getInt("crawler_type")
val ruleValue0 = rs.getDouble("arg0")
val ruleValue1 = rs.getDouble("arg1")
val ruleScore = rs.getInt("score")
val ruleCollocation = new RuleCollocation(ruleId,flowId,ruleName,ruleType,ruleStatus,ruleCrawlerType,ruleValue0,ruleValue1,ruleScore)
list += ruleCollocation
}
}catch {
case e : Exception => e.printStackTrace()
}finally {
c3p0Util.close(conn, ps, rs)
}
list.toList
}
FlowCollocation``RuleCollocation须要从反扒参考资料\工具包\ruleComputeBean中拷贝到项目中
将流程信息放入广播变量
//将流程数据加载到广播变量 val flowCollocations: ArrayBuffer[FlowCollocation] = AnalyzeRuleDB.createFlow() @volatile var flowCollocationsBroadcast: Broadcast[ArrayBuffer[FlowCollocation]] = sc.broadcast(flowCollocations)
更新广播变量
//更新流程的广播变量flowCollocationsBroadcast var flowCollocationChangeFlag: String = jedis.get("flowCollocationChangeFlag") //先判断classifyRuleChangeFlag是否为空 if (StringUtils.isBlank(flowCollocationChangeFlag)){ flowCollocationChangeFlag = "true" //从新设置到Redis中 jedis.set("flowCollocationChangeFlag", flowCollocationChangeFlag) } if (flowCollocationChangeFlag.toBoolean) { flowCollocationsBroadcast.unpersist() //将黑名单数据加载到广播变量 val flowCollocations: ArrayBuffer[FlowCollocation] = AnalyzeRuleDB.createFlow() flowCollocationsBroadcast = sc.broadcast(flowCollocations) flowCollocationChangeFlag = "false" //从新设置到Redis中 jedis.set("flowCollocationChangeFlag", flowCollocationChangeFlag) }
规则计算
IP段指标计算
package com.air.antispider.stream.rulecompute.businessprocess import com.air.antispider.stream.common.bean.ProcessedData import org.apache.spark.rdd.RDD /** * 按照不一样的维度进行计算的工具类 */ object CoreRule { /** * IP段指标计算 * @param processedDataRDD */ def ipBlockCount(processedDataRDD: RDD[ProcessedData]): collection.Map[String, Int] = { val mapRDD: RDD[(String, Int)] = processedDataRDD.map(processedData => { //获取客户端IP 192.168.80.81 val ip: String = processedData.remoteAddr //获取IP的前2位, 192.168 val arr: Array[String] = ip.split("\\.") if (arr.length == 4) { //表明这是一个完整的IP val ipBlock = arr(0) + "." + arr(1) //(ip段, 1) (ipBlock, 1) } else { ("", 1) } }) //按照IP段进行分组,聚合计算 val resultRDD: RDD[(String, Int)] = mapRDD.reduceByKey((x, y) => x + y) //将结果采集为Map类型返回 resultRDD.collectAsMap() } }
主程序:
//开始根据各个指标维度进行计算 //计算IP段的访问量 val ipBlockCountMap: collection.Map[String, Int] = CoreRule.ipBlockCount(processedDataRDD)
IP访问量
代码:
/** * 计算IP5分钟访问量 * @param processedDataRDD * @return */ def ipCount(processedDataRDD: RDD[ProcessedData]): collection.Map[String, Int] = { processedDataRDD.map(processedData => { val ip: String = processedData.remoteAddr //(ip, 次数) (ip, 1) }) //累加 .reduceByKey(_ + _) //采集数据 .collectAsMap() }
IP对关键页面的访问量
/** * 计算IP访问关键页面的次数 * @param processedDataRDD * @param criticalPagesList * @return */ def ipCriticalPagesCount(processedDataRDD: RDD[ProcessedData], criticalPagesList: ArrayBuffer[String]): collection.Map[String, Int] = { processedDataRDD.map(processedData => { val ip: String = processedData.remoteAddr val url: String = processedData.request //定义访问次数,默认为0次 var count = 0 for (criticalPages <- criticalPagesList) { if (url.matches(criticalPages)){ //若是匹配上,表明访问了1次关键页面 count = 1 } } (ip, count) }) //累加 .reduceByKey(_ + _) //采集数据 .collectAsMap() }
IP携带不一样UA的个数
/** * 计算IP5分钟携带不一样UA的个数 * @param processedDataRDD * @return */ def ipUACount(processedDataRDD: RDD[ProcessedData]): collection.Map[String, Int] = { //将processedData转换为(ip, ua)的格式 val mapData: RDD[(String, String)] = processedDataRDD.map(processedData => { val ip: String = processedData.remoteAddr val ua: String = processedData.httpUserAgent (ip, ua) }) //(ip, ua) => (ip, (ua1, ua2, ua1))的格式 val groupRDD: RDD[(String, Iterable[String])] = mapData.groupByKey() //将(ip, (ua1, ua2, ua1))的格式 转换为 (ip, 次数)的格式 groupRDD.map(line => { val ip: String = line._1 val sourceData: Iterable[String] = line._2 //建立一个Set集合,将原始的数据放入集合中,去重 var set = Set[String]() for (ua <- sourceData) { //将ua放入set集合 set += ua } (ip, set.size) }) .collectAsMap() }
IP访问关键页面最小时间间隔
/** * 计算IP5分钟访问关键页面最小时间间隔 * * @param processedDataRDD * @param criticalPagesList * @return */ def ipCriticalPagesMinTimeCount(processedDataRDD: RDD[ProcessedData], criticalPagesList: ArrayBuffer[String]): collection.Map[String, Long] = { //先过滤出关键页面 processedDataRDD //过滤 .filter(processedData => { val url: String = processedData.request //定义访问次数,默认为0次 var count = 0 for (criticalPages <- criticalPagesList) { if (url.matches(criticalPages)) { //若是匹配上,表明访问了1次关键页面 count = 1 } } //若是count == 1,表明当前访问的是关键页面,返回true if (count == 0) { false } else { true } }) //转换,获取(ip,时间戳) .map(processedData => { val ip: String = processedData.remoteAddr val time: String = processedData.timeIso8601 //time的格式2019-06-29T08:46:56+08:00 val timeStamp: Long = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss").parse(time).getTime (ip, timeStamp) }) //分组(ip,(时间1,时间2,时间3...)) .groupByKey() //转换,为了获取(IP,最小时间差) .map(line => { val ip: String = line._1 //封装全部时间的迭代器对象 val sourceData: Iterable[Long] = line._2 //将迭代器对象转换为Array val sourceArray: Array[Long] = sourceData.toArray //将原始数据进行排序 util.Arrays.sort(sourceArray) //定义一个用于存储差值的集合 var resultArray = new ArrayBuffer[Long]() for (i <- 0 until sourceArray.size - 1) { //当前元素 val currentTime: Long = sourceArray(i) //下一个元素 val nexTime: Long = sourceArray(i + 1) val result = nexTime - currentTime //将差值存入集合 resultArray += result } //将差值结果进行排序 val array: Array[Long] = resultArray.toArray util.Arrays.sort(array) (ip, array(0)) }) //采集数据 .collectAsMap() }
IP访问关键页面时间间隔小于预设时间的次数
代码:
/** * 计算IP5分钟访问关键页面最小时间间隔小于预设值的次数 * @param processedDataRDD * @param criticalPagesList * @return */ def ipCriticalPagesMinNumCount(processedDataRDD: RDD[ProcessedData], criticalPagesList: ArrayBuffer[String]): collection.Map[(String, String), Int] = { //先过滤出关键页面 processedDataRDD //过滤 .filter(processedData => { val url: String = processedData.request //定义访问次数,默认为0次 var count = 0 for (criticalPages <- criticalPagesList) { if (url.matches(criticalPages)) { //若是匹配上,表明访问了1次关键页面 count = 1 } } //若是count == 1,表明当前访问的是关键页面,返回true if (count == 0) { false } else { true } }) //转换,获取((IP, URL),时间戳) .map(processedData => { val ip: String = processedData.remoteAddr val url: String = processedData.request val time: String = processedData.timeIso8601 //time的格式2019-06-29T08:46:56+08:00 val timeStamp: Long = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss").parse(time).getTime ((ip, url), timeStamp) }) //分组((IP, URL),(时间1,时间2,时间3...)) .groupByKey() //转换,为了获取(IP,最小时间差) .map(line => { val key: (String, String) = line._1 //封装全部时间的迭代器对象 val sourceData: Iterable[Long] = line._2 //将迭代器对象转换为Array val sourceArray: Array[Long] = sourceData.toArray //将原始数据进行排序 util.Arrays.sort(sourceArray) //定义一个用于存储差值的集合 var resultArray = new ArrayBuffer[Long]() for (i <- 0 until sourceArray.size - 1) { //当前元素 val currentTime: Long = sourceArray(i) //下一个元素 val nexTime: Long = sourceArray(i + 1) val result = nexTime - currentTime //将小于预设值的差值存入集合(此处直接写死5秒钟) if (result < 5000) { resultArray += result } } //返回((ip, url), 次数) (key, resultArray.size) }) .collectAsMap() }
计算IP5分钟查询不一样航班的次数
/** * 计算IP5分钟查询不一样航班的次数 * @param processedDataRDD * @return */ def ipCityCount(processedDataRDD: RDD[ProcessedData]): collection.Map[String, Int] = { //(ip , 出发地->目的地) processedDataRDD.map(line => { val ip: String = line.remoteAddr //出发地 val depcity: String = line.requestParams.depcity //目的地 val arrcity: String = line.requestParams.arrcity (ip, depcity + "->" + arrcity) }) .groupByKey() //(ip, 不一样城市的次数) .map(line => { val ip: String = line._1 val sourceCitys: Iterable[String] = line._2 //定义Set集合实现去重 var set = Set[String]() //循环,去重 for (city <- sourceCitys) { set += city } (ip, set.size) }) .collectAsMap() }
IP5分钟携带不一样Cookie的数量
/** * 计算IP5分钟携带不一样Cookie的数量 * @param processedDataRDD * @param criticalPagesList * @return */ def ipCookieCount(processedDataRDD: RDD[ProcessedData], criticalPagesList: ArrayBuffer[String]): collection.Map[String, Int] = { //先过滤出关键页面 processedDataRDD //过滤 .filter(processedData => { val url: String = processedData.request //定义访问次数,默认为0次 var count = 0 for (criticalPages <- criticalPagesList) { if (url.matches(criticalPages)) { //若是匹配上,表明访问了1次关键页面 count = 1 } } //若是count == 1,表明当前访问的是关键页面,返回true if (count == 0) { false } else { true } }) //(ip , jSessionID) .map(line => { val ip: String = line.remoteAddr //SessionID val sessionID: String = line.cookieValue_JSESSIONID (ip, sessionID) }) .groupByKey() //(ip, (sID1, sID2, sID1)) .map(line => { val ip: String = line._1 val sourceSessionIDs: Iterable[String] = line._2 //定义Set集合实现去重 var set = Set[String]() //循环,去重 for (sessionID <- sourceSessionIDs) { set += sessionID } (ip, set.size) }) .collectAsMap() }
黑名单打分计算
从数据库中加载到流程的相关信息,里面包含每一个流程本身的规则列表,咱们已经计算好了每一个规则的数量,只须要和数据库的规则进行比对就能够得出超出范围指标打分的列表,以及开启规则的打分列表
代码:
package com.air.antispider.stream.rulecompute.businessprocess import com.air.antispider.stream.common.bean.{FlowCollocation, ProcessedData, RuleCollocation} import com.air.antispider.stream.rulecompute.bean.{AntiCalculateResult, FlowScoreResult} import org.apache.spark.rdd.RDD import scala.collection.mutable.ArrayBuffer object RuleUtil { /** * 经过各个规则计算流程最终结果 * * @param processedDataRDD * @param ipBlockCountMap * @param ipCountMap * @param ipCriticalPagesMap * @param ipUAMap * @param ipCriticalPagesMinTimeMap * @param ipCriticalPagesMinNumMap * @param ipCityCountMap * @param ipCookieCountMap * @param flowCollocationList */ def calculateAntiResult( processedDataRDD: RDD[ProcessedData], ipBlockCountMap: collection.Map[String, Int], ipCountMap: collection.Map[String, Int], ipCriticalPagesMap: collection.Map[String, Int], ipUAMap: collection.Map[String, Int], ipCriticalPagesMinTimeMap: collection.Map[String, Long], ipCriticalPagesMinNumMap: collection.Map[(String, String), Int], ipCityCountMap: collection.Map[String, Int], ipCookieCountMap: collection.Map[String, Int], flowCollocationList: ArrayBuffer[FlowCollocation] ): RDD[AntiCalculateResult] = { //从map中获取各个指标的数据 processedDataRDD.map(processedData => { val ip: String = processedData.remoteAddr val url: String = processedData.request //获取IP的前2位, 192.168 val arr: Array[String] = ip.split("\\.") var ipBlock = "" if (arr.length == 4) { //表明这是一个完整的IP ipBlock = arr(0) + "." + arr(1) } //获取IP段的值 val ipBlockCounts: Int = ipBlockCountMap.getOrElse(ipBlock, 0) //获取IP的值 val ipCounts: Int = ipCountMap.getOrElse(ip, 0) //获取关键页面的值 val ipCriticalPagesCounts: Int = ipCriticalPagesMap.getOrElse(ip, 0) val ipUACounts: Int = ipUAMap.getOrElse(ip, 0) //最小访问时间间隔,若是获取不到IP,给个Int最大值,不能给0 val ipCriticalPagesMinTimeCounts: Int = ipCriticalPagesMinTimeMap.getOrElse(ip, Integer.MAX_VALUE).toInt val ipCriticalPagesMinNumCounts: Int = ipCriticalPagesMinNumMap.getOrElse((ip, url), 0) val ipCityCounts: Int = ipCityCountMap.getOrElse(ip, 0) val ipCookieCounts: Int = ipCookieCountMap.getOrElse(ip, 0) //定义map封装规则分值信息 val map = Map[String, Int]( "ipBlock" -> ipBlockCounts, "ip" -> ipCounts, "criticalPages" -> ipCriticalPagesCounts, "userAgent" -> ipUACounts, "criticalPagesAccTime" -> ipCriticalPagesMinTimeCounts, "criticalPagesLessThanDefault" -> ipCriticalPagesMinNumCounts, "flightQuery" -> ipCityCounts, "criticalCookies" -> ipCookieCounts ) val flowsScore: Array[FlowScoreResult] = computeFlowScore(map, flowCollocationList) AntiCalculateResult( processedData, ip, ipBlockCounts, ipCounts, ipCriticalPagesCounts, ipUACounts, ipCriticalPagesMinTimeCounts, ipCriticalPagesMinNumCounts, ipCityCounts, ipCookieCounts, null ) }) } /** * 开始计算,获取最终计算结果 * @param map * @param flowCollocationList * @return */ def computeFlowScore(map: Map[String, Int], flowCollocationList: ArrayBuffer[FlowCollocation]): Array[FlowScoreResult] = { //由于传过来的flowCollocationList表明多个流程,因此先循环流程 for (flow <- flowCollocationList) { //经过flow,获取该流程下的规则 val rules: List[RuleCollocation] = flow.rules //定义集合存储超出范围的规则得分信息 var array1 = new ArrayBuffer[Int]() //定义超出范围,而且处于开启状态的得分信息 var array2 = new ArrayBuffer[Int]() for (rule <- rules) { val ruleName: String = rule.ruleName val num: Int = map.getOrElse(ruleName, 0) //若是数据库名称和计算结果名称同样,开始比较大小 if (num > rule.ruleValue0) { //若是当前计算结果超出了数据库配置好的阈值范围,那么就命中该规则 //将得分放入集合 array1 += rule.ruleScore if (rule.ruleStatus == 0){ //若是当前规则状态为开启状态 array2 += rule.ruleScore } } } // val result = xxx(array1, array2) } null } }