foreachRDD一般用来把SparkStream运行获得的结果保存到外部系统好比HDFS、Mysql、Redis等等。了解下面的知识能够帮助咱们避免不少误区mysql
误区1:实例化外部链接对象的位置不正确,好比下面代码sql
- dstream.foreachRDD { rdd =>
- val connection = createNewConnection() // executed at the driver
- rdd.foreach { record =>
- connection.send(record) // executed at the worker
- }
- }
其实例化的链接对象在driver中,而后经过序列化的方式发送到各个Worker,但实际上Connection的序列化一般是没法正确序列化的
误区2:为每条记录都建立一个链接对象apache
- dstream.foreachRDD { rdd =>
- rdd.foreach { record =>
- val connection = createNewConnection()
- connection.send(record)
- connection.close()
- }
- }
虽然误区1的问题获得了解决,但一般状况下,外部系统如mysql,其链接对象是很是难得的,若是一条记录就申请一个链接资源,系统性能会很是糟糕
而后,给出了一个比较好的方法,为每个分区建立一个链接对象,其具体代码以下markdown
- dstream.foreachRDD { rdd =>
- rdd.foreachPartition { partitionOfRecords =>
- val connection = createNewConnection()
- partitionOfRecords.foreach(record => connection.send(record))
- connection.close()
- }
- }
最后给出一个较优的方案,使用一个链接池来维护链接对象
- dstream.foreachRDD { rdd =>
- rdd.foreachPartition { partitionOfRecords =>
- // ConnectionPool is a static, lazily initialized pool of connections
- val connection = ConnectionPool.getConnection()
- partitionOfRecords.foreach(record => connection.send(record))
- ConnectionPool.returnConnection(connection) // return to the pool for future reuse
- }
- }
正如上面代码阐述的,链接对象推荐是使用lazy关键字来修饰,用到的时候才去实例化
下面给出网上一段把SparkStream的结果保存到Mysql中的代码示例socket
- package spark.examples.streaming
-
- import java.sql.{PreparedStatement, Connection, DriverManager}
- import java.util.concurrent.atomic.AtomicInteger
-
- import org.apache.spark.SparkConf
- import org.apache.spark.streaming.{Seconds, StreamingContext}
- import org.apache.spark.streaming._
- import org.apache.spark.streaming.StreamingContext._
-
- object SparkStreamingForPartition {
- def main(args: Array[String]) {
- val conf = new SparkConf().setAppName("NetCatWordCount")
- conf.setMaster("local[3]")
- val ssc = new StreamingContext(conf, Seconds(5))
- //The DStream is a collection of RDD, which makes the method foreachRDD reasonable
- val dstream = ssc.socketTextStream("192.168.26.140", 9999)
- dstream.foreachRDD(rdd => {
- //embedded function
- def func(records: Iterator[String]) {
- var conn: Connection = null
- var stmt: PreparedStatement = null
- try {
- val url = "jdbc:mysql://192.168.26.140:3306/person";
- val user = "root";
- val password = ""
- conn = DriverManager.getConnection(url, user, password)
- records.flatMap(_.split(" ")).foreach(word => {
- val sql = "insert into TBL_WORDS(word) values (?)";
- stmt = conn.prepareStatement(sql);
- stmt.setString(1, word)
- stmt.executeUpdate();
- })
- } catch {
- case e: Exception => e.printStackTrace()
- } finally {
- if (stmt != null) {
- stmt.close()
- }
- if (conn != null) {
- conn.close()
- }
- }
- }
- val repartitionedRDD = rdd.repartition(3)
- repartitionedRDD.foreachPartition(func)
- })
- ssc.start()
- ssc.awaitTermination()
- }
- }
注意的细节:性能
Dstream和RDD同样是延迟执行,只有遇到action操做才会真正去计算。所以在Dstream的内部RDD必须包含Action操做才能是接受到的数据获得处理。即便代码中包含foreachRDD,但在内部却没有action的RDD,SparkStream只会简单地接受数据数据而不进行处理atom