pyspark streaming6.1官方文档:html
http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext.checkpointpython
streaming 滑动窗口介绍:nginx
http://ju.outofmemory.cn/entry/96018redis
目的:flume收集到nginx日志-->kafka-> spark streaming 统计访问次数过多的ip -> ip写入redis->ip处理防采集apache
#! /usr/bin/env python # encoding: utf-8 '''目的:读取flume收集到的nginx的kafka数据 -> 解析 -> 统计访问次数过多的ip -> 返回driver写入redis->ip处理防采集 ''' from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils import re,json import redis,sys,socket,traceback,os class RedisClient: pool = None def __init__(self,redisIp,redisPort,redisDB): self.redisIp=redisIp self.redisPort=redisPort self.redisDB=redisDB self.getRedisPool(redisIp,redisPort,redisDB) def getRedisPool(self,redisIp,redisPort,redisDB): self.pool = redis.ConnectionPool(host=redisIp, port=redisPort, db=redisDB) return self.pool def insertRedis(self, key,field, value): if self.pool is None: self.pool = self.getRedisPool(self.redisIp,self.redisPort,self.redisDB) r = redis.Redis(connection_pool=self.pool) r.hset(key, field, value) def expire(self,key,times): if self.pool is None: self.pool = self.getRedisPool(self.redisIp,self.redisPort,self.redisDB) r = redis.Redis(connection_pool=self.pool) r.expire(key,times) def check_spider(ip): '''判断是否为蜘蛛''' try: result = socket.gethostbyaddr(ip) print result if result: return True else: return False except: return False if __name__ == '__main__': sc = SparkContext(appName="pyspark kafka-streaming-redis") # print(sc.version) lengthtime=int(sys.argv[1]) #窗口的大小 ,最近多少秒的数据 slicetime=int(sys.argv[-1]) #窗口滑动的频率,时间切片 # print "----------------",lengthtime,slicetime # 建立Spark Streaming Context,每隔多少秒处理一批数据 ssc = StreamingContext(sc,slicetime) paths='/tmp/checkpoint'#程序自动建目录 ssc.checkpoint(paths)#缓存机制 kstream = KafkaUtils.createDirectStream(ssc=ssc,topics=['statis-detailinfo-collect'],kafkaParams={"group.id":"gyf_test","auto.offset.reset":"largest","metadata.broker.list":"172.17.13.100:9092,172.17.13.101:9092"}) #info[1]为本身真实数据 info[0] sparkstream自带 ipcount = kstream.map(lambda info:json.loads(info[1])["ip"]).filter(lambda ip: ip!='').map(lambda ip:(ip,1)).reduceByKeyAndWindow(lambda x,y: x+y,lengthtime,slicetime).filter(lambda x:x[1]>100) ipcount.pprint() def rdd_handle(rdd): r = RedisClient("192.168.8.177",6379,0) ip_list=rdd.collect() ip_list=[i for i in ip_list if not check_spider(i[0])]#不是蜘蛛的为采集ip for ip,count in ip_list: if count>=100 and count<500 : r.insertRedis("ip_100_count",ip,count) elif count>=500 and count <1000: r.insertRedis("ip_500_count",ip,count) elif count>1000: r.insertRedis("ip_1000_count",ip,count) ipcount.foreachRDD(rdd_handle) ssc.start()#开始计算 ssc.awaitTermination()#等待计算结果 ######### 运行 #####spark-streaming-kafka-assembly_2.10-1.6.1.jar的版本要和spark的版本一致 #每隔两秒钟 统计前180秒ip的访问次数 # spark-submit --jars spark-streaming-kafka-assembly_2.10-1.6.1.jar test.py 180 2