Kafka+Spark Streaming+Redis实时计算整合实践

基于Spark通用计算平台,能够很好地扩展各类计算类型的应用,尤为是Spark提供了内建的计算库支持,像Spark Streaming、Spark SQL、MLlib、GraphX,这些内建库都提供了高级抽象,能够用很是简洁的代码实现复杂的计算逻辑、这也得益于Scala编程语言的简洁性。这 里,咱们基于1.3.0版本的Spark搭建了计算平台,实现基于Spark Streaming的实时计算。
咱们的应用场景是分析用户使用手机App的行为,描述以下所示:html

  • 手机客户端会收集用户的行为事件(咱们以点击事件为例),将数据发送到数据服务器,咱们假设这里直接进入到Kafka消息队列java

  • 后端的实时服务会从Kafka消费数据,将数据读出来并进行实时分析,这里选择Spark Streaming,由于Spark Streaming提供了与Kafka整合的内置支持redis

  • 通过Spark Streaming实时计算程序分析,将结果写入Redis,能够实时获取用户的行为数据,并能够导出进行离线综合统计分析数据库

Spark Streaming介绍apache

Spark Streaming提供了一个叫作DStream(Discretized Stream)的高级抽象,DStream表示一个持续不断输入的数据流,能够基于Kafka、TCP Socket、Flume等输入数据流建立。在内部,一个DStream其实是由一个RDD序列组成的。Sparking Streaming是基于Spark平台的,也就继承了Spark平台的各类特性,如容错(Fault-tolerant)、可扩展 (Scalable)、高吞吐(High-throughput)等。
在Spark Streaming中,每一个DStream包含了一个时间间隔以内的数据项的集合,咱们能够理解为指定时间间隔以内的一个batch,每个batch就 构成一个RDD数据集,因此DStream就是一个个batch的有序序列,时间是连续的,按照时间间隔将数据流分割成一个个离散的RDD数据集,如图所 示(来自官网):

咱们都知道,Spark支持两种类型操做:Transformations和Actions。Transformation从一个已知的RDD数据集通过 转换获得一个新的RDD数据集,这些Transformation操做包括map、filter、flatMap、union、join等,并且 Transformation具备lazy的特性,调用这些操做并无马上执行对已知RDD数据集的计算操做,而是在调用了另外一类型的Action操做才 会真正地执行。Action执行,会真正地对RDD数据集进行操做,返回一个计算结果给Driver程序,或者没有返回结果,如将计算结果数据进行持久 化,Action操做包括reduceByKey、count、foreach、collect等。关于Transformations和Actions 更详细内容,能够查看官网文档。
一样、Spark Streaming提供了相似Spark的两种操做类型,分别为Transformations和Output操做,它们的操做对象是DStream,做 用也和Spark相似:Transformation从一个已知的DStream通过转换获得一个新的DStream,并且Spark Streaming还额外增长了一类针对Window的操做,固然它也是Transformation,可是能够更灵活地控制DStream的大小(时间 间隔大小、数据元素个数),例如window(windowLength, slideInterval)、countByWindow(windowLength, slideInterval)、reduceByWindow(func, windowLength, slideInterval)等。Spark Streaming的Output操做容许咱们将DStream数据输出到一个外部的存储系统,如数据库或文件系统等,执行Output操做相似执行 Spark的Action操做,使得该操做以前lazy的Transformation操做序列真正地执行。编程

Kafka+Spark Streaming+Redis编程实践json

下面,咱们根据上面提到的应用场景,来编程实现这个实时计算应用。首先,写了一个Kafka Producer模拟程序,用来模拟向Kafka实时写入用户行为的事件数据,数据是JSON格式,示例以下:后端

1 {"uid":"068b746ed4620d25e26055a9f804385f","event_time":"1430204612405","os_type":"Android","click_count":6}

一个事件包含4个字段:api

  • uid:用户编号服务器

  • event_time:事件发生时间戳

  • os_type:手机App操做系统类型

  • click_count:点击次数

下面是咱们实现的代码,以下所示:

01 package org.shirdrn.spark.streaming.utils
02
03 import java.util.Properties
04 import scala.util.Properties
05 import org.codehaus.jettison.json.JSONObject
06 import kafka.javaapi.producer.Producer
07 import kafka.producer.KeyedMessage
08 import kafka.producer.KeyedMessage
09 import kafka.producer.ProducerConfig
10 import scala.util.Random
11
12 object KafkaEventProducer {
13
14   private val users = Array(
15       "4A4D769EB9679C054DE81B973ED5D768", "8dfeb5aaafc027d89349ac9a20b3930f",
16       "011BBF43B89BFBF266C865DF0397AA71", "f2a8474bf7bd94f0aabbd4cdd2c06dcf",
17       "068b746ed4620d25e26055a9f804385f", "97edfc08311c70143401745a03a50706",
18       "d7f141563005d1b5d0d3dd30138f3f62", "c8ee90aade1671a21336c721512b817a",
19       "6b67c8c700427dee7552f81f3228c927", "a95f22eabc4fd4b580c011a3161a9d9d")
20
21   private val random = new Random()
22
23   private var pointer = -1
24
25   def getUserID() : String = {
26        pointer = pointer + 1
27     if(pointer >= users.length) {
28       pointer = 0
29       users(pointer)
30     } else {
31       users(pointer)
32     }
33   }
34
35   def click() : Double = {
36     random.nextInt(10)
37   }
38
39   // bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --create --topic user_events --replication-factor 2 --partitions 2
40   // bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --list
41   // bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --describe user_events
42   // bin/kafka-console-consumer.sh --zookeeper zk1:2181,zk2:2181,zk3:22181/kafka --topic test_json_basis_event --from-beginning
43   def main(args: Array[String]): Unit = {
44     val topic = "user_events"
45     val brokers = "10.10.4.126:9092,10.10.4.127:9092"
46     val props = new Properties()
47     props.put("metadata.broker.list", brokers)
48     props.put("serializer.class", "kafka.serializer.StringEncoder")
49
50     val kafkaConfig = new ProducerConfig(props)
51     val producer = new Producer[String, String](kafkaConfig)
52
53     while(true) {
54       // prepare event data
55       val event = new JSONObject()
56       event
57         .put("uid", getUserID)
58         .put("event_time", System.currentTimeMillis.toString)
59         .put("os_type", "Android")
60         .put("click_count", click)
61
62       // produce event message
63       producer.send(new KeyedMessage[String, String](topic, event.toString))
64       println("Message sent: " + event)
65
66       Thread.sleep(200)
67     }
68   
69 }

经过控制上面程序最后一行的时间间隔来控制模拟写入速度。下面咱们来讨论实现实时统计每一个用户的点击次数,它是按照用户分组进行累加次数,逻辑比较简单,关键是在实现过程当中要注意一些问题,如对象序列化等。先看实现代码,稍后咱们再详细讨论,代码实现以下所示:

01 object UserClickCountAnalytics {
02
03   def main(args: Array[String]): Unit = {
04     var masterUrl = "local[1]"
05     if (args.length > 0) {
06       masterUrl = args(0)
07     }
08
09     // Create a StreamingContext with the given master URL
10     val conf = new SparkConf().setMaster(masterUrl).setAppName("UserClickCountStat")
11     val ssc = new StreamingContext(conf, Seconds(5))
12
13     // Kafka configurations
14     val topics = Set("user_events")
15     val brokers = "10.10.4.126:9092,10.10.4.127:9092"
16     val kafkaParams = Map[String, String](
17       "metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")
18
19     val dbIndex = 1
20     val clickHashKey = "app::users::click"
21
22     // Create a direct stream
23     val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
24
25     val events = kafkaStream.flatMap(line => {
26       val data = JSONObject.fromObject(line._2)
27       Some(data)
28     })
29
30     // Compute user click times
31     val userClicks = events.map(x => (x.getString("uid"), x.getInt("click_count"))).reduceByKey(_ + _)
32     userClicks.foreachRDD(rdd => {
33       rdd.foreachPartition(partitionOfRecords => {
34         partitionOfRecords.foreach(pair => {
35           val uid = pair._1
36           val clickCount = pair._2
37           val jedis = RedisClient.pool.getResource
38           jedis.select(dbIndex)
39           jedis.hincrBy(clickHashKey, uid, clickCount)
40           RedisClient.pool.returnResource(jedis)
41         })
42       })
43     })
44
45     ssc.start()
46     ssc.awaitTermination()
47
48   }
49 }

上面代码使用了Jedis客户端来操做Redis,将分组计数结果数据累加写入Redis存储,若是其余系统须要实时获取该数据,直接从Redis实时读取便可。RedisClient实现代码以下所示:

01 object RedisClient extends Serializable {
02   val redisHost = "10.10.4.130"
03   val redisPort = 6379
04   val redisTimeout = 30000
05   lazy val pool = new JedisPool(new GenericObjectPoolConfig(), redisHost, redisPort, redisTimeout)
06
07   lazy val hook = new Thread {
08     override def run = {
09       println("Execute hook thread: " + this)
10       pool.destroy()
11     }
12   }
13   sys.addShutdownHook(hook.run)
14 }

上面代码咱们分别在local[K]和Spark Standalone集群模式下运行经过。
若是咱们是在开发环境进行调试的时候,也就是使用local[K]部署模式,在本地启动K个Worker线程来计算,这K个Worker在同一个JVM实 例里,上面的代码默认状况是,若是没有传参数则是local[K]模式,因此若是使用这种方式在建立Redis链接池或链接的时候,可能很是容易调试通 过,可是在使用Spark Standalone、YARN Client(YARN Cluster)或Mesos集群部署模式的时候,就会报错,主要是因为在处理Redis链接池或链接的时候出错了。咱们能够看一下Spark架构,如图 所示(来自官网):

不管是在本地模式、Standalone模式,仍是在Mesos或YARN模式下,整个Spark集群的结构均可以用上图抽象表示,只是各个组件的运行环 境不一样,致使组件多是分布式的,或本地的,或单个JVM实例的。如在本地模式,则上图表现为在同一节点上的单个进程以内的多个组件;而在YARN Client模式下,Driver程序是在YARN集群以外的一个节点上提交Spark Application,其余的组件都运行在YARN集群管理的节点上。
在Spark集群环境部署Application后,在进行计算的时候会将做用于RDD数据集上的函数(Functions)发送到集群中Worker上 的Executor上(在Spark Streaming中是做用于DStream的操做),那么这些函数操做所做用的对象(Elements)必须是可序列化的,经过Scala也可使用 lazy引用来解决,不然这些对象(Elements)在跨节点序列化传输后,没法正确地执行反序列化重构成实际可用的对象。上面代码咱们使用lazy引 用(Lazy Reference)来实现的,代码以下所示:

01 // lazy pool reference
02 lazy val pool = new JedisPool(new GenericObjectPoolConfig(), redisHost, redisPort, redisTimeout)
03 ...
04 partitionOfRecords.foreach(pair => {
05   val uid = pair._1
06   val clickCount = pair._2
07   val jedis = RedisClient.pool.getResource
08   jedis.select(dbIndex)
09   jedis.hincrBy(clickHashKey, uid, clickCount)
10   RedisClient.pool.returnResource(jedis)
11 })

另外一种方式,咱们将代码修改成,把对Redis链接的管理放在操做DStream的Output操做范围以内,由于咱们知道它是在特定的Executor中进行初始化的,使用一个单例的对象来管理,以下所示:

001 package org.shirdrn.spark.streaming
002
003 import org.apache.commons.pool2.impl.GenericObjectPoolConfig
004 import org.apache.spark.SparkConf
005 import org.apache.spark.streaming.Seconds
006 import org.apache.spark.streaming.StreamingContext
007 import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
008 import org.apache.spark.streaming.kafka.KafkaUtils
009
010 import kafka.serializer.StringDecoder
011 import net.sf.json.JSONObject
012 import redis.clients.jedis.JedisPool
013
014 object UserClickCountAnalytics {
015
016   def main(args: Array[String]): Unit = {
017     var masterUrl = "local[1]"
018     if (args.length > 0) {
019       masterUrl = args(0)
020     }
021
022     // Create a StreamingContext with the given master URL
023     val conf = new SparkConf().setMaster(masterUrl).setAppName("UserClickCountStat")
024     val ssc = new StreamingContext(conf, Seconds(5))
025
026     // Kafka configurations
027     val topics = Set("user_events")
028     val brokers = "10.10.4.126:9092,10.10.4.127:9092"
029     val kafkaParams = Map[String, String](
030       "metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")
031
032     val dbIndex = 1
033     val clickHashKey = "app::users::click"
034
035     // Create a direct stream
036     val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
037
038     val events = kafkaStream.flatMap(line => {
039       val data = JSONObject.fromObject(line._2)
040       Some(data)
041     })
042
043     // Compute user click times
044     val userClicks = events.map(x => (x.getString("uid"), x.getInt("click_count"))).reduceByKey(_ + _)
045     userClicks.foreachRDD(rdd => {
046       rdd.foreachPartition(partitionOfRecords => {
047         partitionOfRecords.foreach(pair => {
048
049           /**
050            * Internal Redis client for managing Redis connection {@link  Jedis} based on {@link  RedisPool}
051            */
052           object InternalRedisClient extends Serializable {
053
054             @transient private var pool: JedisPool = null
055
056             def makePool(redisHost: String, redisPort: Int, redisTimeout: Int,
057                 maxTotal: Int, maxIdle: Int, minIdle: Int): Unit = {
058               makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle, true, false, 10000)  
059             }
060
061             def makePool(redisHost: String, redisPort: Int, redisTimeout: Int,
062                 maxTotal: Int, maxIdle: Int, minIdle: Int, testOnBorrow: Boolean,
063                 testOnReturn: Boolean, maxWaitMillis: Long): Unit = {
064               if(pool == null) {
065                    val poolConfig = new GenericObjectPoolConfig()
066                    poolConfig.setMaxTotal(maxTotal)
067                    poolConfig.setMaxIdle(maxIdle)
068                    poolConfig.setMinIdle(minIdle)
069                    poolConfig.setTestOnBorrow(testOnBorrow)
070                    poolConfig.setTestOnReturn(testOnReturn)
071                    poolConfig.setMaxWaitMillis(maxWaitMillis)
072                    pool = new JedisPool(poolConfig, redisHost, redisPort, redisTimeout)
073
074                    val hook = new Thread{
075                         override def run = pool.destroy()
076                    }
077                    sys.addShutdownHook(hook.run)
078               }
079             }
080
081             def getPool: JedisPool = {
082               assert(pool != null)
083               pool
084             }
085           }
086
087           // Redis configurations
088           val maxTotal = 10
089           val maxIdle = 10
090           val minIdle = 1
091           val redisHost = "10.10.4.130"
092           val redisPort = 6379
093           val redisTimeout = 30000
094           val dbIndex = 1
095           InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle)
096
097           val uid = pair._1
098           val clickCount = pair._2
099           val jedis =InternalRedisClient.getPool.getResource
100           jedis.select(dbIndex)
101           jedis.hincrBy(clickHashKey, uid, clickCount)
102           InternalRedisClient.getPool.returnResource(jedis)
103         })
104       })
105     })
106
107     ssc.start()
108     ssc.awaitTermination()
109
110   }
111 }

上面代码实现,得益于Scala语言的特性,能够在代码中任何位置进行class或object的定义,咱们将用来管理Redis链接的代码放在了 特定操做的内部,就避免了瞬态(Transient)对象跨节点序列化的问题。这样作还要求咱们可以了解Spark内部是如何操做RDD数据集的,更多可 以参考RDD或Spark相关文档。
在集群上,以Standalone模式运行,执行以下命令:

1 cd /usr/local/spark
2 ./bin/spark-submit --class org.shirdrn.spark.streaming.UserClickCountAnalytics --master spark://hadoop1:7077 --executor-memory 1G --total-executor-cores 2 ~/spark-0.0.SNAPSHOT.jar spark://hadoop1:7077

能够查看集群中各个Worker节点执行计算任务的状态,也能够很是方便地经过Web页面查看。
下面,看一下咱们存储到Redis中的计算结果,以下所示:

01 127.0.0.1:6379[1]> HGETALL app::users::click
02 1) "4A4D769EB9679C054DE81B973ED5D768"
03 2) "7037"
04 3) "8dfeb5aaafc027d89349ac9a20b3930f"
05 4) "6992"
06 5) "011BBF43B89BFBF266C865DF0397AA71"
07 6) "7021"
08 7) "97edfc08311c70143401745a03a50706"
09 8) "6874"
10 9) "d7f141563005d1b5d0d3dd30138f3f62"
11 10) "7057"
12 11) "a95f22eabc4fd4b580c011a3161a9d9d"
13 12) "7092"
14 13) "6b67c8c700427dee7552f81f3228c927"
15 14) "7266"
16 15) "f2a8474bf7bd94f0aabbd4cdd2c06dcf"
17 16) "7188"
18 17) "c8ee90aade1671a21336c721512b817a"
19 18) "6950"
20 19) "068b746ed4620d25e26055a9f804385f"

有关更多关于Spark Streaming的详细内容,能够参考官方文档。

附录

这里,附上前面开发的应用所对应的依赖,以及打包Spark Streaming应用程序的Maven配置,以供参考。若是使用maven-shade-plugin插件,配置有问题的话,打包后在Spark集群上 提交Application时候可能会报错Invalid signature file digest for Manifest main attributes。参考的Maven配置,以下所示:

001 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
002      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
003      <modelVersion>4.0.0</modelVersion>
004      <groupId>org.shirdrn.spark</groupId>
005      <artifactId>spark</artifactId>
006      <version>0.0.1-SNAPSHOT</version>
007
008      <dependencies>
009           <dependency>
010                <groupId>org.apache.spark</groupId>
011                <artifactId>spark-core_2.10</artifactId>
012                <version>1.3.0</version>
013           </dependency>
014           <dependency>
015                <groupId>org.apache.spark</groupId>
016                <artifactId>spark-streaming_2.10</artifactId>
017                <version>1.3.0</version>
018           </dependency>
019           <dependency>
020                <groupId>net.sf.json-lib</groupId>
021                <artifactId>json-lib</artifactId>
022                <version>2.3</version>
023           </dependency>
024           <dependency>
025                <groupId>org.apache.spark</groupId>
026                <artifactId>spark-streaming-kafka_2.10</artifactId>
027                <version>1.3.0</version>
028           </dependency>
029           <dependency>
030                <groupId>redis.clients</groupId>
031                <artifactId>jedis</artifactId>
032                <version>2.5.2</version>
033           </dependency>
034           <dependency>
035                <groupId>org.apache.commons</groupId>
036                <artifactId>commons-pool2</artifactId>
037                <version>2.2</version>
038           </dependency>
039      </dependencies>
040
041      <build>
042           <sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
043           <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
044           <resources>
045                <resource>
046                     <directory>${basedir}/src/main/resources</directory>
047                </resource>
048           </resources>
049           <testResources>
050                <testResource>
051                     <directory>${basedir}/src/test/resources</directory>
052                </testResource>
053           </testResources>
054           <plugins>
055                <plugin>
056                     <artifactId>maven-compiler-plugin</artifactId>
057                     <version>3.1</version>
058                     <configuration>
059                          <source>1.6</source>
060                          <target>1.6</target>
061                     </configuration>
062                </plugin>
063                <plugin>
064                     <groupId>org.apache.maven.plugins</groupId>
065                     <artifactId>maven-shade-plugin</artifactId>
066                     <version>2.2</version>
067                     <configuration>
068                          <createDependencyReducedPom>true</createDependencyReducedPom>
069                     </configuration>
070                     <executions>
071                          <execution>
072                               <phase>package</phase>
073                               <goals>
074                                    <goal>shade</goal>
075                               </goals>
076                               <configuration>
077                                    <artifactSet>
078                                         <includes>
079                                              <include>*:*</include>
080                                         </includes>
081                                    </artifactSet>
082                                    <filters>
083                                         <filter>
084                                              <artifact>*:*</artifact>
085                                              <excludes>
086                                                   <exclude>META-INF/*.SF</exclude>
087                                                   <exclude>META-INF/*.DSA</exclude>
088                                                   <exclude>META-INF/*.RSA</exclude>
089                                              </excludes>
090                                         </filter>
091                                    </filters>
092                                    <transformers>
093                                         <transformer
094                                              implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
095                                         <transformer
096                                              implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
097                                              <resource>reference.conf</resource>
098                                         </transformer>
099                                         <transformer
100                                              implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
101                                              <resource>log4j.properties</resource>
102                                         </transformer>
103                                    </transformers>
104                               </configuration>
105                          </execution>
106                     </executions>
107                </plugin>
108           </plugins>
109      </build>
110 </project>

参考连接

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、从新发布,但务必保留文章署名时延军(包含连接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的做品务必以相同的许可发布。若有任何疑问,请与我联系

相关文章
相关标签/搜索