经过Spark Streaming的foreachRDD把处理后的数据写入外部存储系统中

转载自:http://blog.csdn.net/erfucun/article/details/52312682java

本博文主要内容包括:mysql

  • 技术实现foreachRDD与foreachPartition解析
  • foreachRDD与foreachPartition实现实战

一:技术实现foreach解析:sql

一、首先咱们看一下Output Operations on DStreams提供的API: 
这里写图片描述 
这里写图片描述数据库

SparkStreaming的DStream提供了一个dstream.foreachRDD方法,该方法是一个功能强大的原始的API,它容许将数据发送到外部系统。然而,重要的是要了解如何正确有效地使用这种原始方法。一些常见的错误,以免以下: 
写数据到外部系统,须要创建一个数据链接对象(例如TCP链接到远程的服务器),使用它将数据发送到外部存储系统。为此开发者可能会在Driver中尝试建立一个链接,而后在worker中使用它来保存记录到外部数据。代码以下:apache

 
 
  1. dstream.foreachRDD { rdd =>
  2.   val connection = createNewConnection()  // executed at the driver
  3.   rdd.foreach { record =>
  4.     connection.send(record) // executed at the worker
  5.   }}

上面的代码是一个错误的演示,由于链接是在Driver中建立的,而写数据是在worker中完成的。此时链接就须要被序列化而后发送到worker中。可是咱们知道,链接的信息是不能被序列化和反序列化的(不一样的机器链接服务器须要使用不一样的服务器端口,即使链接被序列化了也不能使用)编程

进而咱们能够将链接移动到worker中实现,代码以下:数组

 
 
  1. dstream.foreachRDD { rdd =>
  2.   rdd.foreach { record =>
  3.     val connection = createNewConnection()
  4.     connection.send(record)
  5.     connection.close()
  6.   }}

 

可是此时,每处理一条数据记录,就须要链接一次外部系统,对于性能来讲是个严重的问题。这也不是一个完美的实现。服务器

Spark基于RDD进行编程,RDD的数据不能改变,若是擅长foreachPartition底层的数据可能改变,作到的方式foreachPartition操做一个数据结构,RDD里面一条条数据,可是一条条的记录是能够改变的spark也能够运行在动态数据源上。(就像数组的数据不变,可是指向的索引能够改变) 
咱们能够将代码作以下的改进:微信

 
 
  1. dstream.foreachRDD { rdd =>
  2.   rdd.foreachPartition { partitionOfRecords =>
  3.     val connection = createNewConnection()
  4.     partitionOfRecords.foreach(record => connection.send(record))
  5.     connection.close()
  6.   }}

 

这样一个partition,只需链接一次外部存储。性能上有大幅度的提升。可是不一样的partition之间不能复用链接。咱们可使用链接池的方式,使得partition之间能够共享链接。代码以下:markdown

 
 
  1. stream.foreachRDD { rdd =>
  2.   rdd.foreachPartition { partitionOfRecords =>
  3.     // ConnectionPool is a static, lazily initialized pool of connections
  4.     val connection = ConnectionPool.getConnection()
  5.     partitionOfRecords.foreach(record => connection.send(record))
  6.     ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  7.   }}

 

二:foreachRDD与foreachPartition实现实战

一、须要注意的是: 
(1)、你最好使用forEachPartition函数来遍历RDD,而且在每台Work上面建立数据库的connection。 
(2)、若是你的数据库并发受限,能够经过控制数据的分区来减小并发。 
(3)、在插入MySQL的时候最好使用批量插入。 
(4),确保你写入的数据库过程可以处理失败,由于你插入数据库的过程可能会通过网络,这可能致使数据插入数据库失败。 
(5)、不建议将你的RDD数据写入到MySQL等关系型数据库中。

二、下面咱们使用SparkStreaming实现将数据写到MySQL中:

(1)在pom.xml中加入以下依赖包

 
 
  1. <dependency>
  2.     <groupId>mysql</groupId>
  3.     <artifactId>mysql-connector-java</artifactId>
  4.     <version>5.1.38</version>
  5. </dependency>
  6. <dependency>
  7.     <groupId>commons-dbcp</groupId>
  8.     <artifactId>commons-dbcp</artifactId>
  9.     <version>1.4</version>
  10. </dependency>

(2)在MySql中建立数据库和表,命令操做以下:

 
 
  1. mysql -uroot -p
  2. create database spark;
  3. use spark;
  4. show tables;
  5. create table streaming_itemcount(keyword varchar(30));

 

使用Java编写一个数据库链接池类

 
 
  1. import java.sql.Connection;
  2. import java.sql.DriverManager;
  3. import java.util.LinkedList;
  4.  
  5. /**
  6.  * Created by zpf on 2016/8/26.
  7.  */
  8. public class ConnectionPool {
  9.     private static LinkedList<Connection> connectionQueue;
  10.  
  11.     static {
  12.         try {
  13.             Class.forName("com.mysql.jdbc.Driver");
  14.         } catch (ClassNotFoundException e) {
  15.             e.printStackTrace();
  16.         }
  17.     }
  18.  
  19.     public synchronized static Connection getConnection() {
  20.         try {
  21.             if (connectionQueue == null) {
  22.                 connectionQueue = new LinkedList<Connection>();
  23.                 for (int i = 0; i < 5; i++) {
  24.                     Connection conn = DriverManager.getConnection(
  25.                             "jdbc:mysql://Master:3306/sparkstreaming",
  26.                             "root",
  27.                             "12345");
  28.                     connectionQueue.push(conn);
  29.                 }
  30.             }
  31.         } catch (Exception e) {
  32.             e.printStackTrace();
  33.         }
  34.         return connectionQueue.poll();
  35.  
  36.     }
  37.     public  static void returnConnection(Connection conn){
  38.      connectionQueue.push(conn);
  39.     }
  40. }

 

编写Spark代码:

 
 
  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.streaming.{Seconds, StreamingContext}
  3.  
  4. /**
  5.   * Created by zpf on 2016/8/26.
  6.   */
  7. object OnlineForeachRDD2DB {
  8.   def main(args: Array[String]) {
  9.     val conf = new SparkConf().setAppName("OnlineForeachRDD2DB").setMaster("local[2]")
  10.     val ssc = new StreamingContext(conf, Seconds(5))
  11.  
  12.     val lines = ssc.socketTextStream("Master", 9999)
  13.     val words = lines.flatMap(_.split(" "))
  14.     val wordCounts = words.map(=> (x, 1)).reduceByKey(+ _)
  15.     wordCounts.foreachRDD { rdd =>
  16.       rdd.foreachPartition { partitionOfRecords => {
  17.         val connection = ConnectionPool.getConnection()
  18.         partitionOfRecords.foreach(record => {
  19.           val sql = "insert into streaming_itemcount(item,count) values('" + record._1 + "'," + record._2 + ")"
  20.           val stmt = connection.createStatement
  21.           stmt.executeUpdate(sql)
  22.         })
  23.         ConnectionPool.returnConnection(connection)
  24.  
  25.       }
  26.  
  27.       }
  28.     }
  29.   }
  30. }

 

打开netcat发送数据

 
 
  1. root@spark-master:~# nc -lk 9999
  2. spark hadoop kafka spark hadoop kafka spark hadoop kafka spark hadoop

 

打包运行spark代码

 
 
  1. /usr/local/spark/bin/spark-submit --driver-class-path /usr/local/spark/lib/mysql-connector-java-5.1.35-bin.jar /root/Documents/SparkApps/SparkStreamApps.jar

 

查看数据库中的结果:

博文内容源自DT大数据梦工厂Spark课程总结的笔记相关课程内容视频能够参考: 百度网盘连接:http://pan.baidu.com/s/1slvODe1(若是连接失效或须要后续的更多资源,请联系QQ460507491或者微信号:DT1219477246 获取上述资料)。

相关文章
相关标签/搜索