[TOC]html
前面在个人另外一篇文章中《大数据采集、清洗、处理:使用MapReduce进行离线数据分析完整案例》中已经有说起到,这里依然给出下面的图示:前端
前面给出的那篇文章是基于MapReduce
的离线数据分析案例,其经过对网站产生的用户访问日志进行处理并分析出该网站在某天的PV、UV等数据,对应上面的图示,其走的就是离线处理的数据处理方式,而这里即将要介绍的是另一条路线的数据处理方式,即基于Storm的在线处理,在下面给出的完整案例中,咱们将会完成下面的几项工做:java
若是你对上面说起的大数据组件已经有所认识,或者对如何构建大数据实时处理系统感兴趣,那么就能够尽情阅读下面的内容了。git
须要注意的是,核心在于如何构建实时处理系统,而这里给出的案例是实时统计某个网站的PV、UV,在实际中,基于每一个人的工做环境不一样,业务不一样,所以业务系统的复杂度也不尽相同,相对来讲,这里统计PV、UV的业务是比较简单的,但也足够让咱们对大数据实时处理系统有一个基本的、清晰的了解与认识,是的,它再也不那么神秘了。github
咱们的实时处理系统总体架构以下:redis
即从上面的架构中咱们能够看出,其由下面的几部分构成:shell
从构建实时处理系统的角度出发,咱们须要作的是,如何让数据在各个不一样的集群系统之间打通(从上面的图示中也能很好地说明这一点),即须要作各个系统以前的整合,包括Flume与Kafka的整合,Kafka与Storm的整合。固然,各个环境是否使用集群,依我的的实际须要而定,在咱们的环境中,Flume、Kafka、Storm都使用集群。数据库
对于Flume而言,关键在于如何采集数据,而且将其发送到Kafka上,而且因为咱们这里了使用Flume集群的方式,Flume集群的配置也是十分关键的。而对于Kafka,关键就是如何接收来自Flume的数据。从总体上讲,逻辑应该是比较简单的,便可以在Kafka中建立一个用于咱们实时处理系统的topic,而后Flume将其采集到的数据发送到该topic上便可。apache
在咱们的场景中,两个Flume Agent分别部署在两台Web服务器上,用来采集Web服务器上的日志数据,而后其数据的下沉方式都为发送到另一个Flume Agent上,因此这里咱们须要配置三个Flume Agent.后端
该Flume Agent部署在一台Web服务器上,用来采集产生的Web日志,而后发送到Flume Consolidation Agent上,建立一个新的配置文件flume-sink-avro.conf
,其配置内容以下:
######################################################### ## ##主要做用是监听文件中的新增数据,采集到数据以后,输出到avro ## 注意:Flume agent的运行,主要就是配置source channel sink ## 下面的a1就是agent的代号,source叫r1 channel叫c1 sink叫k1 ######################################################### a1.sources = r1 a1.sinks = k1 a1.channels = c1 #对于source的配置描述 监听文件中的新增数据 exec a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/uplooking/data/data-clean/data-access.log #对于sink的配置描述 使用avro日志作数据的消费 a1.sinks.k1.type = avro a1.sinks.k1.hostname = uplooking03 a1.sinks.k1.port = 44444 #对于channel的配置描述 使用文件作数据的临时缓存 这种的安全性要高 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint a1.channels.c1.dataDirs = /home/uplooking/data/flume/data #经过channel c1将source r1和sink k1关联起来 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
配置完成后, 启动Flume Agent,便可对日志文件进行监听:
$ flume-ng agent --conf conf -n a1 -f app/flume/conf/flume-sink-avro.conf >/dev/null 2>&1 &
该Flume Agent部署在一台Web服务器上,用来采集产生的Web日志,而后发送到Flume Consolidation Agent上,建立一个新的配置文件flume-sink-avro.conf
,其配置内容以下:
######################################################### ## ##主要做用是监听文件中的新增数据,采集到数据以后,输出到avro ## 注意:Flume agent的运行,主要就是配置source channel sink ## 下面的a1就是agent的代号,source叫r1 channel叫c1 sink叫k1 ######################################################### a1.sources = r1 a1.sinks = k1 a1.channels = c1 #对于source的配置描述 监听文件中的新增数据 exec a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/uplooking/data/data-clean/data-access.log #对于sink的配置描述 使用avro日志作数据的消费 a1.sinks.k1.type = avro a1.sinks.k1.hostname = uplooking03 a1.sinks.k1.port = 44444 #对于channel的配置描述 使用文件作数据的临时缓存 这种的安全性要高 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint a1.channels.c1.dataDirs = /home/uplooking/data/flume/data #经过channel c1将source r1和sink k1关联起来 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
配置完成后, 启动Flume Agent,便可对日志文件进行监听:
$ flume-ng agent --conf conf -n a1 -f app/flume/conf/flume-sink-avro.conf >/dev/null 2>&1 &
该Flume Agent用于接收其它两个Agent发送过来的数据,而后将其发送到Kafka上,建立一个新的配置文件flume-source_avro-sink_kafka.conf
,配置内容以下:
######################################################### ## ##主要做用是监听目录中的新增文件,采集到数据以后,输出到kafka ## 注意:Flume agent的运行,主要就是配置source channel sink ## 下面的a1就是agent的代号,source叫r1 channel叫c1 sink叫k1 ######################################################### a1.sources = r1 a1.sinks = k1 a1.channels = c1 #对于source的配置描述 监听avro a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 44444 #对于sink的配置描述 使用kafka作数据的消费 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic = f-k-s a1.sinks.k1.brokerList = uplooking01:9092,uplooking02:9092,uplooking03:9092 a1.sinks.k1.requiredAcks = 1 a1.sinks.k1.batchSize = 20 #对于channel的配置描述 使用内存缓冲区域作数据的临时缓存 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #经过channel c1将source r1和sink k1关联起来 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
配置完成后, 启动Flume Agent,便可对avro的数据进行监听:
$ flume-ng agent --conf conf -n a1 -f app/flume/conf/flume-source_avro-sink_kafka.conf >/dev/null 2>&1 &
在咱们的Kafka
中,先建立一个topic
,用于后面接收Flume
采集过来的数据:
kafka-topics.sh --create --topic f-k-s --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 --partitions 3 --replication-factor 3
启动Kafka
的消费脚本:
$ kafka-console-consumer.sh --topic f-k-s --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
若是在Web服务器上有新增的日志数据,就会被咱们的Flume程序监听到,而且最终会传输到到Kafka的f-k-s
topic中,这里做为验证,咱们上面启动的是一个kafka终端消费的脚本,这时会在终端中看到数据的输出:
$ kafka-console-consumer.sh --topic f-k-s --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 1003 221.8.9.6 80 0f57c8f5-13e2-428d-ab39-9e87f6e85417 10709 0 GET /index HTTP/1.1 null null Mozilla/5.0 (Windows; U; Windows NT 5.2)Gecko/2008070208 Firefox/3.0.1 1523107496164 1002 220.194.55.244 fb953d87-d166-4cb4-8a64-de7ddde9054c 10201 0 GET /check/detail HTTP/1.1 null null Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko 1523107497165 1003 211.167.248.22 9d7bb7c2-00bf-4102-9c8c-3d49b18d1b48 10022 1 GET /user/add HTTP/1.1 null null Mozilla/4.0 (compatible; MSIE 8.0; Windows NT6.0) 1523107496664 1002 61.172.249.96 null 10608 0 POST /updateById?id=21 HTTP/1.1 null null Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko 1523107498166 1000 202.98.11.101 aa7f62b3-a6a1-44ef-81f5-5e71b5c61368 20202 0 GET /getDataById HTTP/1.0 404 /check/init Mozilla/5.0 (Windows; U; Windows NT 5.1)Gecko/20070803 Firefox/1.5.0.12 1523107497666
这样的话,咱们的整合就没有问题,固然kafka中的数据应该是由咱们的storm来进行消费的,这里只是做为整合的一个测试,下面就会来作kafka+storm的整合。
Kafka和Storm的整合其实在Storm的官网上也有很是详细清晰的文档:http://storm.apache.org/releases/1.0.6/storm-kafka.html,想对其有更多了解的同窗能够参考一下。
在此次的大数据实时处理系统的构建中,Kafka至关因而做为消息队列(或者说是消息中间件)的角色,其产生的消息须要有消费者去消费,因此Kafka与Storm的整合,关键在于咱们的Storm如何去消费Kafka消息topic中的消息(kafka消息topic中的消息正是由Flume采集而来,如今咱们须要在Storm中对其进行消费)。
在Storm中,topology是很是关键的概念。
对比MapReduce,在MapReduce中,咱们提交的做业称为一个job,在一个Job中,又包含若干个Mapper和Reducer,正是在Mapper和Reducer中有咱们对数据的处理逻辑:
在Storm中,咱们提交的一个做业称为topology,其又包含了spout和bolt,在Storm中,对数据的处理逻辑正是在spout和bolt中体现:
即在spout中,正是咱们数据的来源,又由于其实时的特性,因此能够把它比做一个“水龙头”,表示其源源不断地产生数据:
因此,问题的关键是spout如何去获取来自kafka的数据?
好在,storm-kafka
的整合库中提供了这样的API来供咱们进行操做。
在代码的逻辑中只须要建立一个由storm-kafka
API提供的KafkaSpout
对象便可:
SpoutConfig spoutConf = new SpoutConfig(hosts, topic, zkRoot, id); return new KafkaSpout(spoutConf);
下面给出完整的整合代码:
package cn.xpleaf.bigdata.storm.statics; import kafka.api.OffsetRequest; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; import org.apache.storm.kafka.BrokerHosts; import org.apache.storm.kafka.KafkaSpout; import org.apache.storm.kafka.SpoutConfig; import org.apache.storm.kafka.ZkHosts; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple; /** * Kafka和storm的整合,用于统计实时流量对应的pv和uv */ public class KafkaStormTopology { // static class MyKafkaBolt extends BaseRichBolt { static class MyKafkaBolt extends BaseBasicBolt { /** * kafkaSpout发送的字段名为bytes */ @Override public void execute(Tuple input, BasicOutputCollector collector) { byte[] binary = input.getBinary(0); // 跨jvm传输数据,接收到的是字节数据 // byte[] bytes = input.getBinaryByField("bytes"); // 这种方式也行 String line = new String(binary); System.out.println(line); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); /** * 设置spout和bolt的dag(有向无环图) */ KafkaSpout kafkaSpout = createKafkaSpout(); builder.setSpout("id_kafka_spout", kafkaSpout); builder.setBolt("id_kafka_bolt", new MyKafkaBolt()) .shuffleGrouping("id_kafka_spout"); // 经过不一样的数据流转方式,来指定数据的上游组件 // 使用builder构建topology StormTopology topology = builder.createTopology(); String topologyName = KafkaStormTopology.class.getSimpleName(); // 拓扑的名称 Config config = new Config(); // Config()对象继承自HashMap,但自己封装了一些基本的配置 // 启动topology,本地启动使用LocalCluster,集群启动使用StormSubmitter if (args == null || args.length < 1) { // 没有参数时使用本地模式,有参数时使用集群模式 LocalCluster localCluster = new LocalCluster(); // 本地开发模式,建立的对象为LocalCluster localCluster.submitTopology(topologyName, config, topology); } else { StormSubmitter.submitTopology(topologyName, config, topology); } } /** * BrokerHosts hosts kafka集群列表 * String topic 要消费的topic主题 * String zkRoot kafka在zk中的目录(会在该节点目录下记录读取kafka消息的偏移量) * String id 当前操做的标识id */ private static KafkaSpout createKafkaSpout() { String brokerZkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181"; BrokerHosts hosts = new ZkHosts(brokerZkStr); // 经过zookeeper中的/brokers便可找到kafka的地址 String topic = "f-k-s"; String zkRoot = "/" + topic; String id = "consumer-id"; SpoutConfig spoutConf = new SpoutConfig(hosts, topic, zkRoot, id); // 本地环境设置以后,也能够在zk中创建/f-k-s节点,在集群环境中,不用配置也能够在zk中创建/f-k-s节点 //spoutConf.zkServers = Arrays.asList(new String[]{"uplooking01", "uplooking02", "uplooking03"}); //spoutConf.zkPort = 2181; spoutConf.startOffsetTime = OffsetRequest.LatestTime(); // 设置以后,刚启动时就不会把以前的消费也进行读取,会从最新的偏移量开始读取 return new KafkaSpout(spoutConf); } }
其实代码的逻辑很是简单,咱们只建立了 一个由storm-kafka
提供的KafkaSpout
对象和一个包含咱们处理逻辑的MyKafkaBolt
对象,MyKafkaBolt
的逻辑也很简单,就是把kafka的消息打印到控制台上。
须要注意的是,后面咱们分析网站PV、UV的工做,正是在上面这部分简单的代码中完成的,因此其是很是重要的基础。
上面的整合代码,能够在本地环境中运行,也能够将其打包成jar包上传到咱们的Storm集群中并提交业务来运行。若是Web服务器可以产生日志,而且前面Flume+Kafka的整合也没有问题的话,将会有下面的效果。
若是是在本地环境中运行上面的代码,那么能够在控制台中看到日志数据的输出:
...... 45016548 [Thread-16-id_kafka_spout-executor[3 3]] INFO o.a.s.k.ZkCoordinator - Task [1/1] Refreshing partition manager connections 45016552 [Thread-16-id_kafka_spout-executor[3 3]] INFO o.a.s.k.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{topic=f-k-s, partitionMap={0=uplooking02:9092, 1=uplooking03:9092, 2=uplooking01:9092}} 45016552 [Thread-16-id_kafka_spout-executor[3 3]] INFO o.a.s.k.KafkaUtils - Task [1/1] assigned [Partition{host=uplooking02:9092, topic=f-k-s, partition=0}, Partition{host=uplooking03:9092, topic=f-k-s, partition=1}, Partition{host=uplooking01:9092, topic=f-k-s, partition=2}] 45016552 [Thread-16-id_kafka_spout-executor[3 3]] INFO o.a.s.k.ZkCoordinator - Task [1/1] Deleted partition managers: [] 45016552 [Thread-16-id_kafka_spout-executor[3 3]] INFO o.a.s.k.ZkCoordinator - Task [1/1] New partition managers: [] 45016552 [Thread-16-id_kafka_spout-executor[3 3]] INFO o.a.s.k.ZkCoordinator - Task [1/1] Finished refreshing 1003 221.8.9.6 80 0f57c8f5-13e2-428d-ab39-9e87f6e85417 10709 0 GET /index HTTP/1.1 null null Mozilla/5.0 (Windows; U; Windows NT 5.2)Gecko/2008070208 Firefox/3.0.1 1523107496164 1000 202.98.11.101 aa7f62b3-a6a1-44ef-81f5-5e71b5c61368 20202 0 GET /getDataById HTTP/1.0 404 /check/init Mozilla/5.0 (Windows; U; Windows NT 5.1)Gecko/20070803 Firefox/1.5.0.12 1523107497666 1002 220.194.55.244 fb953d87-d166-4cb4-8a64-de7ddde9054c 10201 0 GET /check/detail HTTP/1.1 null null Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko 1523107497165 1003 211.167.248.22 9d7bb7c2-00bf-4102-9c8c-3d49b18d1b48 10022 1 GET /user/add HTTP/1.1 null null Mozilla/4.0 (compatible; MSIE 8.0; Windows NT6.0) 1523107496664 1002 61.172.249.96 null 10608 0 POST /updateById?id=21 HTTP/1.1 null null Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko 1523107498166 ......
若是是在Storm集群中提交的做业运行,那么也能够在Storm的日志中看到Web服务器产生的日志数据:
这样的话就完成了Kafka+Storm的整合。
其实所谓Storm和Redis的整合,指的是在咱们的实时处理系统中的数据的落地方式,即在Storm中包含了咱们处理数据的逻辑,而数据处理完毕后,产生的数据处理结果该保存到什么地方呢?显然就有不少种方式了,关系型数据库、NoSQL、HDFS、HBase等,这应该取决于具体的业务和数据量,在这里,咱们使用Redis来进行最后分析数据的存储。
因此实际上作这一步的整合,其实就是开始写咱们的业务处理代码了,由于经过前面Flume-Kafka-Storm的整合,已经打通了整个数据的流通路径,接下来关键要作的是,在Storm中,如何处理咱们的数据并保存到Redis中。
而在Storm中,spout已经不须要咱们来写了(由storm-kafka
的API提供了KafkaSpout
对象),因此问题就变成,如何根据业务编写分析处理数据的bolt。
咱们实时获取的日志格式以下:
1002 202.103.24.68 1976dc2e-f03a-44f0-892f-086d85105f7e 14549 1 GET /top HTTP/1.1 200 /tologin Mozilla/5.0 (Windows; U; Windows NT 5.2)AppleWebKit/525.13 (KHTML, like Gecko) Version/3.1Safari/525.13 1523806916373 1000 221.8.9.6 80 542ccf0a-9b14-49a0-93cd-891d87ddabf3 12472 1 GET /index HTTP/1.1 500 /top Mozilla/4.0 (compatible; MSIE 5.0; WindowsNT) 1523806916874 1003 211.167.248.22 0e4c1875-116c-400e-a4f8-47a46ad04a42 12536 0 GET /tologin HTTP/1.1 200 /stat Mozilla/5.0 (Windows; U; Windows NT 5.2) AppleWebKit/525.13 (KHTML,like Gecko) Chrome/0.2.149.27 Safari/525.13 1523806917375 1000 219.147.198.230 07eebc1a-740b-4dac-b53f-bb242a45c901 11847 1 GET /userList HTTP/1.1 200 /top Mozilla/4.0 (compatible; MSIE 6.0; Windows NT5.1) 1523806917876 1001 222.172.200.68 4fb35ced-5b30-483b-9874-1d5917286675 13550 1 GET /getDataById HTTP/1.0 504 /tologin Mozilla/5.0 (Windows; U; Windows NT 5.2)AppleWebKit/525.13 (KHTML, like Gecko) Version/3.1Safari/525.13 1523806918377
其中须要说明的是第二个字段和第三个字段,由于它对咱们统计pv和uv很是有帮助,它们分别是ip字段和mid字段,说明以下:
ip:用户的IP地址 mid:惟一的id,此id第一次会种在浏览器的cookie里。若是存在则再也不种。做为浏览器惟一标示。移动端或者pad直接取机器码。
所以,根据IP地址,咱们能够经过查询获得其所在的省份,而且建立一个属于该省份的变量,用于记录pv数,每来一条属于该省份的日志记录,则该省份的pv就加1,以此来完成pv的统计。
而对于mid,咱们则能够建立属于该省的一个set集合,每来一条属于该省份的日志记录,则能够将该mid添加到set集合中,由于set集合存放的是不重复的数据,这样就能够帮咱们自动过滤掉重复的mid,根据set集合的大小,就能够统计出uv。
在咱们storm的业务处理代码中,咱们须要编写两个bolt:
固然上面只是说明了总体的思路,实际上还有不少须要注意的细节问题和技巧问题,这都在咱们的代码中进行体现,我在后面写的代码中都加了很是详细的注释进行说明。
根据上面的分析,编写用于数据预处理的bolt,代码以下:
package cn.xpleaf.bigdata.storm.statistic; import cn.xpleaf.bigdata.storm.utils.JedisUtil; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import redis.clients.jedis.Jedis; /** * 日志数据预处理Bolt,实现功能: * 1.提取实现业务需求所须要的信息:ip地址、客户端惟一标识mid * 2.查询IP地址所属地,并发送到下一个Bolt */ public class ConvertIPBolt extends BaseBasicBolt { @Override public void execute(Tuple input, BasicOutputCollector collector) { byte[] binary = input.getBinary(0); String line = new String(binary); String[] fields = line.split("\t"); if(fields == null || fields.length < 10) { return; } // 获取ip和mid String ip = fields[1]; String mid = fields[2]; // 根据ip获取其所属地(省份) String province = null; if (ip != null) { Jedis jedis = JedisUtil.getJedis(); province = jedis.hget("ip_info_en", ip); // 须要释放jedis的资源,不然会报can not get resource from the pool JedisUtil.returnJedis(jedis); } // 发送数据到下一个bolt,只发送实现业务功能须要的province和mid collector.emit(new Values(province, mid)); } /** * 定义了发送到下一个bolt的数据包含两个域:province和mid */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("province", "mid")); } }
这个bolt包含咱们统计网站pv、uv的代码逻辑,所以很是重要,其代码以下:
package cn.xpleaf.bigdata.storm.statistic; import cn.xpleaf.bigdata.storm.utils.JedisUtil; import org.apache.storm.Config; import org.apache.storm.Constants; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple; import redis.clients.jedis.Jedis; import java.text.SimpleDateFormat; import java.util.*; /** * 日志数据统计Bolt,实现功能: * 1.统计各省份的PV、UV * 2.以天为单位,将省份对应的PV、UV信息写入Redis */ public class StatisticBolt extends BaseBasicBolt { Map<String, Integer> pvMap = new HashMap<>(); Map<String, HashSet<String>> midsMap = null; SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd"); @Override public void execute(Tuple input, BasicOutputCollector collector) { if (!input.getSourceComponent().equalsIgnoreCase(Constants.SYSTEM_COMPONENT_ID)) { // 若是收到非系统级别的tuple,统计信息到局部变量mids String province = input.getStringByField("province"); String mid = input.getStringByField("mid"); pvMap.put(province, pvMap.get(province) + 1); // pv+1 if(mid != null) { midsMap.get(province).add(mid); // 将mid添加到该省份所对应的set中 } } else { // 若是收到系统级别的tuple,则将数据更新到Redis中,释放JVM堆内存空间 /* * 以 广东 为例,其在Redis中保存的数据格式以下: * guangdong_pv(Redis数据结构为hash) * --20180415 * --pv数 * --20180416 * --pv数 * guangdong_mids_20180415(Redis数据结构为set) * --mid * --mid * --mid * ...... * guangdong_mids_20180415(Redis数据结构为set) * --mid * --mid * --mid * ...... */ Jedis jedis = JedisUtil.getJedis(); String dateStr = sdf.format(new Date()); // 更新pvMap数据到Redis中 String pvKey = null; for(String province : pvMap.keySet()) { int currentPv = pvMap.get(province); if(currentPv > 0) { // 当前map中的pv大于0才更新,不然没有意义 pvKey = province + "_pv"; String oldPvStr = jedis.hget(pvKey, dateStr); if(oldPvStr == null) { oldPvStr = "0"; } Long oldPv = Long.valueOf(oldPvStr); jedis.hset(pvKey, dateStr, oldPv + currentPv + ""); pvMap.replace(province, 0); // 将该省的pv从新设置为0 } } // 更新midsMap到Redis中 String midsKey = null; HashSet<String> midsSet = null; for(String province: midsMap.keySet()) { midsSet = midsMap.get(province); if(midsSet.size() > 0) { // 当前省份的set的大小大于0才更新到,不然没有意义 midsKey = province + "_mids_" + dateStr; jedis.sadd(midsKey, midsSet.toArray(new String[midsSet.size()])); midsSet.clear(); } } // 释放jedis资源 JedisUtil.returnJedis(jedis); System.out.println(System.currentTimeMillis() + "------->写入数据到Redis"); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } /** * 设置定时任务,只对当前bolt有效,系统会定时向StatisticBolt发送一个系统级别的tuple */ @Override public Map<String, Object> getComponentConfiguration() { Map<String, Object> config = new HashMap<>(); config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10); return config; } /** * 初始化各个省份的pv和mids信息(用来临时存储统计pv和uv须要的数据) */ public StatisticBolt() { pvMap = new HashMap<>(); midsMap = new HashMap<String, HashSet<String>>(); String[] provinceArray = {"shanxi", "jilin", "hunan", "hainan", "xinjiang", "hubei", "zhejiang", "tianjin", "shanghai", "anhui", "guizhou", "fujian", "jiangsu", "heilongjiang", "aomen", "beijing", "shaanxi", "chongqing", "jiangxi", "guangxi", "gansu", "guangdong", "yunnan", "sicuan", "qinghai", "xianggang", "taiwan", "neimenggu", "henan", "shandong", "shanghai", "hebei", "liaoning", "xizang"}; for(String province : provinceArray) { pvMap.put(province, 0); midsMap.put(province, new HashSet()); } } }
咱们须要编写一个topology用来组织前面编写的Bolt,代码以下:
package cn.xpleaf.bigdata.storm.statistic; import kafka.api.OffsetRequest; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; import org.apache.storm.kafka.BrokerHosts; import org.apache.storm.kafka.KafkaSpout; import org.apache.storm.kafka.SpoutConfig; import org.apache.storm.kafka.ZkHosts; import org.apache.storm.topology.TopologyBuilder; /** * 构建topology */ public class StatisticTopology { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); /** * 设置spout和bolt的dag(有向无环图) */ KafkaSpout kafkaSpout = createKafkaSpout(); builder.setSpout("id_kafka_spout", kafkaSpout); builder.setBolt("id_convertIp_bolt", new ConvertIPBolt()).shuffleGrouping("id_kafka_spout"); // 经过不一样的数据流转方式,来指定数据的上游组件 builder.setBolt("id_statistic_bolt", new StatisticBolt()).shuffleGrouping("id_convertIp_bolt"); // 经过不一样的数据流转方式,来指定数据的上游组件 // 使用builder构建topology StormTopology topology = builder.createTopology(); String topologyName = KafkaStormTopology.class.getSimpleName(); // 拓扑的名称 Config config = new Config(); // Config()对象继承自HashMap,但自己封装了一些基本的配置 // 启动topology,本地启动使用LocalCluster,集群启动使用StormSubmitter if (args == null || args.length < 1) { // 没有参数时使用本地模式,有参数时使用集群模式 LocalCluster localCluster = new LocalCluster(); // 本地开发模式,建立的对象为LocalCluster localCluster.submitTopology(topologyName, config, topology); } else { StormSubmitter.submitTopology(topologyName, config, topology); } } /** * BrokerHosts hosts kafka集群列表 * String topic 要消费的topic主题 * String zkRoot kafka在zk中的目录(会在该节点目录下记录读取kafka消息的偏移量) * String id 当前操做的标识id */ private static KafkaSpout createKafkaSpout() { String brokerZkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181"; BrokerHosts hosts = new ZkHosts(brokerZkStr); // 经过zookeeper中的/brokers便可找到kafka的地址 String topic = "f-k-s"; String zkRoot = "/" + topic; String id = "consumer-id"; SpoutConfig spoutConf = new SpoutConfig(hosts, topic, zkRoot, id); // 本地环境设置以后,也能够在zk中创建/f-k-s节点,在集群环境中,不用配置也能够在zk中创建/f-k-s节点 //spoutConf.zkServers = Arrays.asList(new String[]{"uplooking01", "uplooking02", "uplooking03"}); //spoutConf.zkPort = 2181; spoutConf.startOffsetTime = OffsetRequest.LatestTime(); // 设置以后,刚启动时就不会把以前的消费也进行读取,会从最新的偏移量开始读取 return new KafkaSpout(spoutConf); } }
将上面的程序打包成jar包,并上传到咱们的集群提交业务后,若是前面的整合没有问题,而且Web服务也有Web日志产生,那么一段时间后,咱们就能够在Redis数据库中看到数据的最终处理结果,即各个省份的uv和pv信息:
须要说明的是mid信息是一个set集合,只要求出该set集合的大小,也就能够求出uv值。
至此,准确来讲,咱们的统计pv、uv的大数据实时处理系统是构建完成了,处理的数据结果的用途根据不一样的业务需求而不一样,可是对于网站的pv、uv数据来讲,是很是适合用做可视化处理的,即用网页动态将数据展现出来,咱们下一步正是要构建一个简单的Web应用将pv、uv数据动态展现出来。
数据可视化处理目前咱们须要完成两部分的工做:
对于Web项目的开发,因我的的技术栈能力而异,选择的语言和技术也有所不一样,只要可以达到咱们最终数据可视化的目的,其实都行的。这个项目中咱们要展现的是pv和uv数据,难度不大,所以能够选择Java Web,如Servlet、SpringMVC等,或者Python Web,如Flask、Django等,Flask我我的很是喜欢,由于开发很是快,但由于前面一直用的是Java,所以这里我仍是选择使用SpringMVC来完成。
至于UI这一块,我前端能力通常,普通的开发没有问题,可是要作出像上面这种地图类型的UI界面来展现数据的话,确实有点无能为力。好在如今第三方的UI框架比较多,对于图表类展现的,好比就有highcharts和echarts,其中echarts是百度开源的,有丰富的中文文档,很是容易上手,因此这里我选择使用echarts来做为UI,而且其恰好就有可以知足咱们需求的地图类的UI组件。
由于难度不大,具体的开发流程的这里就不说起了,有兴趣的同窗能够直接参考后面我提供的源代码,这里咱们就直接来看一下效果好了。
由于实际上在本次项目案例中,这一块的代码也是很是少的,使用SpringMVC开发的话,只要把JavaEE三层构架搭起来了,把依赖引入好了,后面的开发确实不难的;而若是有同窗会Flask或者Django的话,其项目自己的构建和代码上也都会更容易。
启动咱们的Web项目后,输入地址就能够访问到数据的展现界面了:
能够看到,echarts的这个UI仍是比较好看的,并且也真的可以知足咱们的需求。每一个省份上的两个不一样颜色的点表示目前咱们须要展现的数据有两种,分别为pv和uv,在左上角也有体现,而颜色的深浅就能够体现pv或者uv的数量大小关系了。
在这个界面上,点击左上角的uv,表示不查看uv的数据,这样咱们就会只看到pv的状况:
固然,也能够只查看uv的状况:
当鼠标停留在某个省份上时,就能够查看这个省份具体的pv或uv值,好比下面咱们把鼠标停留在“广东”上时,就能够看到其此时的pv值为170,查看其它省份的也是如此:
那么数据是能够查看了,又怎么体现动态呢?
对于页面数据的动态刷新有两种方案,一种是定时刷新页面,另一种则是定时向后端异步请求数据。
目前我采用的是第一种,页面定时刷新,有兴趣的同窗也能够尝试使用第二种方法,只须要在后端开发相关的返回JSON数据的API便可。
那么至此,从整个大数据实时处理系统的构建到最后的数据可视化处理工做,咱们都已经完成了,能够看到整个过程下来涉及到的知识层面仍是比较多的,不过我我的以为,只要把核心的原理紧紧掌握了,对于大部分状况而言,环境的搭建以及基于业务的开发都可以很好地解决。
写此文,一来是对本身实践中的一些总结,二来也是但愿把一些比较不错的项目案例分享给你们,总之但愿可以对你们有所帮助。
项目案例涉及到的代码我已经上传到GitHub上面,分为两个,一个是storm的项目代码,另一个是数据可视化处理的代码,以下:
storm-statistic:https://github.com/xpleaf/storm-statistic
dynamic-show:https://github.com/xpleaf/dynamic-show