Spark Core、Spark-SQL与Spark-Streaming都是相同的,编写好以后打成jar包使用spark-submit命令提交到集群运行应用
$SPARK_HOME/bin#./spark-submit --master spark://Master01:7077 --class MainClassFullName [--files $HIVE_HOME/conf/hive-site.xml] JarNameFullPath [slices]html
说明:
--master参数用于指定提交到的Spark集群入口,这个入口一般是Spark的Master节点(即Master进程或ResourceManager进程所在的节点),若是须要为该参数指定一个高可用集群则集群节点之间使用英文逗号分割
--class参数用于指定Spark之Driver的入口Main类(必须指定该Main类的全名)
若是使用Spark操做Hive仓库则须要使用--files参数指定Hive的配置文件
若是使用Spark操做关系数据库则须要将关系数据库的驱动包放置于Spark安装目录下的library目录下(在Spark2.x中应该放置于jars目录下),如:
[hadoop@CloudDeskTop jars]$ pwd
/software/spark-2.1.1/jars
JarNameFullPath表示的是提交的Spark应用所在的JAR包全名(最好指定为绝对的全路径)
slices:表示的是读取数据的并行度(值为一个数值,根据实际的物理内存配置来指定,内存较小时指定为1或者不用指定),通常在Streaming应用中是不须要指定的前端
典型业务场景描述:将CloudDeskTop客户端本地的数据,经过Spark处理,而后将结果写入远端关系数据库中,供前端在线事务系统使用java
[hadoop@CloudDeskTop software]$ cd /project/RDDToJDBC/
[hadoop@CloudDeskTop RDDToJDBC]$ mkdir -p lib
[hadoop@CloudDeskTop RDDToJDBC]$ ls
bin lib srcnode
2.一、将MySql的jar包拷贝到工程目录RDDToJDBC下的lib目录下
[hadoop@CloudDeskTop software]$ cp -a /software/hive-1.2.2/lib/mysql-connector-java-3.0.17-ga-bin.jar /project/RDDToJDBC/lib/
2.一、将Spark的开发库Spark2.1.1-All追加到RDDToJDBC工程的classpath路径中去(能够经过添加用户库的方式来解决);Spark2.1.1-All中包含哪些包,请点击此处mysql
1 package com.mmzs.bigdata.spark.core.local; 2 3 import java.io.File; 4 import java.sql.Connection; 5 import java.sql.DriverManager; 6 import java.sql.PreparedStatement; 7 import java.sql.SQLException; 8 import java.util.Arrays; 9 import java.util.Iterator; 10 import java.util.List; 11 12 import org.apache.spark.SparkConf; 13 import org.apache.spark.api.java.JavaPairRDD; 14 import org.apache.spark.api.java.JavaRDD; 15 import org.apache.spark.api.java.JavaSparkContext; 16 import org.apache.spark.api.java.function.FlatMapFunction; 17 import org.apache.spark.api.java.function.Function2; 18 import org.apache.spark.api.java.function.PairFunction; 19 import org.apache.spark.api.java.function.VoidFunction; 20 21 import scala.Tuple2; 22 import scala.Tuple4; 23 24 public class RDDToDB { 25 /** 26 * 全局计数器 27 */ 28 private static int count; 29 30 /** 31 * 数据库链接 32 */ 33 private static Connection conn; 34 35 /** 36 * 预编译语句 37 */ 38 private static PreparedStatement pstat; 39 40 private static final File OUT_PATH=new File("/home/hadoop/test/output"); 41 42 static{ 43 delDir(OUT_PATH); 44 try { 45 String sql="insert into wordcount(word,count) values(?,?)"; 46 String url="jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8"; 47 Class.forName("com.mysql.jdbc.Driver"); 48 conn=DriverManager.getConnection(url, "root", "123456"); 49 pstat=conn.prepareStatement(sql); 50 } catch (ClassNotFoundException e) { 51 e.printStackTrace(); 52 } catch (SQLException e) { 53 e.printStackTrace(); 54 } 55 } 56 /** 57 * 删除任何目录或文件 58 * @param f 59 */ 60 private static void delDir(File f){ 61 if(!f.exists())return; 62 if(f.isFile()||(f.isDirectory()&&f.listFiles().length==0)){ 63 f.delete(); 64 return; 65 } 66 File[] files=f.listFiles(); 67 for(File fp:files)delDir(fp); 68 f.delete(); 69 } 70 71 //分批存储 72 private static void batchSave(Tuple2<String, Integer> line,boolean isOver){ 73 try{ 74 pstat.setString(1, line._1()); 75 pstat.setInt(2, line._2()); 76 77 if(isOver){//若是结束了循环则直接写磁盘 78 pstat.addBatch(); 79 pstat.executeBatch(); 80 pstat.clearBatch(); 81 pstat.clearParameters(); 82 }else{ //若是没有结束则将sql语句添加到批处理中去 83 pstat.addBatch(); 84 count++; 85 if(count%100==0){ //若是满一个批次就提交一次批处理操做 86 pstat.executeBatch(); 87 pstat.clearBatch(); 88 pstat.clearParameters(); 89 } 90 } 91 }catch(SQLException e){ 92 e.printStackTrace(); 93 } 94 } 95 96 /** 97 * 将RDD集合中的数据存储到关系数据库MYSql中去 98 * @param statResRDD 99 */ 100 private static void saveToDB(JavaPairRDD<String, Integer> statResRDD){ 101 final long rddNum=statResRDD.count(); 102 statResRDD.foreach(new VoidFunction<Tuple2<String,Integer>>(){ 103 private long count=0; 104 @Override 105 public void call(Tuple2<String, Integer> line) throws Exception { 106 if(++count<rddNum){ 107 batchSave(line,false); 108 }else{ 109 batchSave(line,true); 110 } 111 } 112 }); 113 114 try{ 115 if(null!=pstat)pstat.close(); 116 if(null!=conn)conn.close(); 117 }catch(SQLException e){ 118 e.printStackTrace(); 119 } 120 } 121 122 public static void main(String[] args) { 123 SparkConf conf=new SparkConf(); 124 conf.setAppName("Java Spark local"); 125 conf.setMaster("local"); 126 127 //根据Spark配置生成Spark上下文 128 JavaSparkContext jsc=new JavaSparkContext(conf); 129 130 //读取本地的文本文件成内存中的RDD集合对象 131 JavaRDD<String> lineRdd=jsc.textFile("/home/hadoop/test/jdbc"); 132 133 //切分每一行的字串为单词数组,并将字串数组中的单词字串释放到外层的JavaRDD集合中 134 JavaRDD<String> flatMapRdd=lineRdd.flatMap(new FlatMapFunction<String,String>(){ 135 @Override 136 public Iterator<String> call(String line) throws Exception { 137 String[] words=line.split(" "); 138 List<String> list=Arrays.asList(words); 139 Iterator<String> its=list.iterator(); 140 return its; 141 } 142 }); 143 144 //为JavaRDD集合中的每个单词进行计数,将其转换为元组 145 JavaPairRDD<String, Integer> mapRdd=flatMapRdd.mapToPair(new PairFunction<String, String,Integer>(){ 146 @Override 147 public Tuple2<String,Integer> call(String word) throws Exception { 148 return new Tuple2<String,Integer>(word,1); 149 } 150 }); 151 152 //根据元组中的第一个元素(Key)进行分组并统计单词出现的次数 153 JavaPairRDD<String, Integer> reduceRdd=mapRdd.reduceByKey(new Function2<Integer,Integer,Integer>(){ 154 @Override 155 public Integer call(Integer pre, Integer next) throws Exception { 156 return pre+next; 157 } 158 }); 159 160 //将单词元组中的元素反序以方便后续排序 161 JavaPairRDD<Integer, String> mapRdd02=reduceRdd.mapToPair(new PairFunction<Tuple2<String, Integer>,Integer,String>(){ 162 @Override 163 public Tuple2<Integer, String> call(Tuple2<String, Integer> wordTuple) throws Exception { 164 return new Tuple2<Integer,String>(wordTuple._2,wordTuple._1); 165 } 166 }); 167 168 //将JavaRDD集合中的单词按出现次数进行将序排列 169 JavaPairRDD<Integer, String> sortRdd=mapRdd02.sortByKey(false, 1); 170 171 //排序以后将元组中的顺序换回来 172 JavaPairRDD<String, Integer> mapRdd03=sortRdd.mapToPair(new PairFunction<Tuple2<Integer, String>,String,Integer>(){ 173 @Override 174 public Tuple2<String, Integer> call(Tuple2<Integer, String> wordTuple) throws Exception { 175 return new Tuple2<String, Integer>(wordTuple._2,wordTuple._1); 176 } 177 }); 178 179 //存储统计以后的结果到磁盘文件中去 180 //mapRdd03.saveAsTextFile("/home/hadoop/test/jdbc/output"); 181 182 saveToDB(mapRdd03); 183 184 //关闭Spark上下文 185 jsc.close(); 186 } 187 }
A、启动MySql数据库服务sql
[root@DB03 ~]# cd /software/mysql-5.5.32/multi-data/3306/
[root@DB03 3306]# ls
data my.cnf my.cnf.bak mysqld
[root@DB03 3306]# ./mysqld start
Starting MySQL...
B、创建test库数据库
[root@CloudDeskTop 3306]# cd /software/mysql-5.5.32/bin/
[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "show databases;"
+--------------------+
| Database |
+--------------------+
| information_schema |
| mysql |
| performance_schema |
+--------------------+
[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "create database test character set utf8;"
[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "show databases;"
+--------------------+
| Database |
+--------------------+
| information_schema |
| mysql |
| performance_schema |
| test |
+--------------------+
C、创建wordcount表apache
[root@DB03 bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "create table if not exists test.wordcount(wid int(11) auto_increment primary key,word varchar(30),count int(3))engine=myisam charset=utf8;"
[root@DB03 bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "desc test.wordcount;"
+-------+-------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------+-------------+------+-----+---------+----------------+
| wid | int(11) | NO | PRI | NULL | auto_increment |
| word | varchar(30) | YES | | NULL | |
| count | int(3) | YES | | NULL | |
+-------+-------------+------+-----+---------+----------------+
#目前数据库表中尚未数据
[root@DB03 bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "select * from test.wordcount;"
[hadoop@CloudDeskTop jdbc]$ pwd
/home/hadoop/test/jdbc
[hadoop@CloudDeskTop jdbc]$ ls
myuser testJDBC.txt
[hadoop@CloudDeskTop jdbc]$ cat testJDBC.txt myuser
zhnag san shi yi ge hao ren
jin tian shi yi ge hao tian qi
wo zai zhe li zuo le yi ge ce shi
yi ge guan yu scala de ce shi
welcome to mmzs
欢迎 欢迎
lisi 123456 165 1998-9-9
lisan 123ss 187 2009-10-19
wangwu 123qqwe 177 1990-8-3
[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "select * from test.wordcount;"api
典型业务场景描述:将HDFS集群中的数据经过Spark处理以后,将结果写入远端关系数据库中,供前端在线事务系统使用数组
[hadoop@CloudDeskTop software]$ cd /project/RDDToJDBC/
[hadoop@CloudDeskTop RDDToJDBC]$ mkdir -p package
[hadoop@CloudDeskTop RDDToJDBC]$ ls
bin package src
在客户端上传所需的mysql-connector-java-3.0.17-ga-bin.jar包:
[hadoop@CloudDeskTop jars]# pwd
/software/spark-2.1.1/jars
而后分发到集群:
[hadoop@CloudDeskTop software]$ scp -r /software/spark-2.1.1/jars/mysql-connector-java-3.0.17-ga-bin.jar master01:/software/spark-2.1.1/jars/
[hadoop@master01 software]$ scp -r /software/spark-2.1.1/jars/mysql-connector-java-3.0.17-ga-bin.jar master02:/software/spark-2.1.1/jars/
[hadoop@master01 software]$ scp -r /software/spark-2.1.1/jars/mysql-connector-java-3.0.17-ga-bin.jar slave01:/software/spark-2.1.1/jars/
[hadoop@master01 software]$ scp -r /software/spark-2.1.1/jars/mysql-connector-java-3.0.17-ga-bin.jar slave02:/software/spark-2.1.1/jars/
[hadoop@master01 software]$ scp -r /software/spark-2.1.1/jars/mysql-connector-java-3.0.17-ga-bin.jar slave03:/software/spark-2.1.1/jars/
1 package com.mmzs.bigdata.spark.core.cluster; 2 3 import java.sql.Connection; 4 import java.sql.DriverManager; 5 import java.sql.PreparedStatement; 6 import java.sql.SQLException; 7 import java.util.Arrays; 8 import java.util.Iterator; 9 import java.util.List; 10 11 import org.apache.spark.SparkConf; 12 import org.apache.spark.api.java.JavaPairRDD; 13 import org.apache.spark.api.java.JavaRDD; 14 import org.apache.spark.api.java.JavaSparkContext; 15 import org.apache.spark.api.java.function.FlatMapFunction; 16 import org.apache.spark.api.java.function.Function2; 17 import org.apache.spark.api.java.function.PairFunction; 18 import org.apache.spark.api.java.function.VoidFunction; 19 20 import scala.Tuple2; 21 22 public class RDDToDB { 23 /** 24 * 全局计数器 25 */ 26 private static int count; 27 28 /** 29 * 数据库链接 30 */ 31 private static Connection conn; 32 33 /** 34 * 预编译语句 35 */ 36 private static PreparedStatement pstat; 37 38 static{ 39 try { 40 String sql="insert into wordcount(word,count) values(?,?)"; 41 String url="jdbc:mysql://192.168.154.134:3306/test?useUnicode=true&characterEncoding=utf8"; 42 Class.forName("com.mysql.jdbc.Driver"); 43 conn=DriverManager.getConnection(url, "root", "123456"); 44 pstat=conn.prepareStatement(sql); 45 } catch (ClassNotFoundException e) { 46 e.printStackTrace(); 47 } catch (SQLException e) { 48 e.printStackTrace(); 49 } 50 } 51 52 /** 53 * 批量存储数据 54 * @param line 55 * @throws SQLException 56 */ 57 private static void batchSave(Tuple2<String, Integer> line,boolean isOver){ 58 try{ 59 pstat.setString(1, line._1()); 60 pstat.setInt(2, line._2()); 61 62 if(isOver){//若是结束了循环则直接写磁盘。 63 //若是RDD数据已经迭代结束,则执行剩下的批量语句。 64 pstat.addBatch(); 65 pstat.executeBatch(); 66 pstat.clearBatch(); 67 pstat.clearParameters(); 68 }else{ //若是没有结束则将sql语句添加到批处理中去。 69 //若是RDD数据的迭代还不曾结束,则直接将当前语句添加到批处理计划中去; 70 //可是若是批处理语句数量超过了100则冲刷一次缓冲区中批处理并重置计数器。 71 pstat.addBatch(); 72 count++; 73 if(count%100==0){ //若是满一个批次就提交一次批处理操做 74 pstat.executeBatch(); 75 pstat.clearBatch(); 76 pstat.clearParameters(); 77 } 78 } 79 }catch(SQLException e){ 80 e.printStackTrace(); 81 } 82 } 83 84 /** 85 * 将RDD集合中的数据存储到关系数据库MYSql中去。 86 * 存储结果到关系数据库中 87 * 必须将内部类对象方法(如:call)中的操做分离到一个独立的方法(如:batchSave)中去, 88 * 由于Spark给定的内部类API都是可序列化的,而执行JDBC操做的Statement和Connection都是不能被序列化的 89 * @param wordGroupList 90 * @throws ClassNotFoundException 91 */ 92 private static void saveToDB(JavaPairRDD<String, Integer> statResRDD){ 93 final long rddNum=statResRDD.count(); 94 statResRDD.foreach(new VoidFunction<Tuple2<String,Integer>>(){ 95 private long count=0; 96 @Override 97 public void call(Tuple2<String, Integer> line) throws Exception { 98 if(++count<rddNum){ 99 batchSave(line,false); 100 }else{ 101 batchSave(line,true); 102 } 103 } 104 }); 105 106 try{ 107 if(null!=pstat)pstat.close(); 108 if(null!=conn)conn.close(); 109 }catch(SQLException e){ 110 e.printStackTrace(); 111 } 112 } 113 114 public static void main(String[] args) { 115 SparkConf conf=new SparkConf(); 116 conf.setAppName("Java Spark Cluster"); 117 118 //根据Spark配置生成Spark上下文 119 JavaSparkContext jsc=new JavaSparkContext(conf); 120 121 //读取本地的文本文件成内存中的RDD集合对象 122 JavaRDD<String> lineRdd=jsc.textFile("/spark/input", 1); 123 124 //切分每一行的字串为单词数组,并将字串数组中的单词字串释放到外层的JavaRDD集合中 125 JavaRDD<String> flatMapRdd=lineRdd.flatMap(new FlatMapFunction<String,String>(){ 126 @Override 127 public Iterator<String> call(String line) throws Exception { 128 String[] words=line.split(" "); 129 List<String> list=Arrays.asList(words); 130 Iterator<String> its=list.iterator(); 131 return its; 132 } 133 }); 134 135 //为JavaRDD集合中的每个单词进行计数,将其转换为元组 136 JavaPairRDD<String, Integer> mapRdd=flatMapRdd.mapToPair(new PairFunction<String, String,Integer>(){ 137 @Override 138 public Tuple2<String,Integer> call(String word) throws Exception { 139 return new Tuple2<String,Integer>(word,1); 140 } 141 }); 142 143 //根据元组中的第一个元素(Key)进行分组并统计单词出现的次数 144 JavaPairRDD<String, Integer> reduceRdd=mapRdd.reduceByKey(new Function2<Integer,Integer,Integer>(){ 145 @Override 146 public Integer call(Integer pre, Integer next) throws Exception { 147 return pre+next; 148 } 149 }); 150 151 //将单词元组中的元素反序以方便后续排序 152 JavaPairRDD<Integer, String> mapRdd02=reduceRdd.mapToPair(new PairFunction<Tuple2<String, Integer>,Integer,String>(){ 153 @Override 154 public Tuple2<Integer, String> call(Tuple2<String, Integer> wordTuple) throws Exception { 155 return new Tuple2<Integer,String>(wordTuple._2,wordTuple._1); 156 } 157 }); 158 159 //将JavaRDD集合中的单词按出现次数进行将序排列 160 JavaPairRDD<Integer, String> sortRdd=mapRdd02.sortByKey(false, 1); 161 162 //排序以后将元组中的顺序换回来 163 JavaPairRDD<String, Integer> mapRdd03=sortRdd.mapToPair(new PairFunction<Tuple2<Integer, String>,String,Integer>(){ 164 @Override 165 public Tuple2<String, Integer> call(Tuple2<Integer, String> wordTuple) throws Exception { 166 return new Tuple2<String, Integer>(wordTuple._2,wordTuple._1); 167 } 168 }); 169 170 //存储统计以后的结果到磁盘文件中去 171 //mapRdd03.saveAsTextFile("/spark/output"); 172 173 saveToDB(mapRdd03); 174 175 //关闭Spark上下文 176 jsc.close(); 177 } 178 }
说明:
在集群模式下,Spark操做关系数据库是经过启动一个Job来完成的,而启动Job则是经过RDD的操做来触发的,所以在Spark集群模式下其关系数据库的全部操做必须位于RDD操做级别才是有效的,不然数据的操做将没法影响到关系数据库中去,而RDD级别以外的操做都属于Spark Core的客户端Driver级别(好比:SparkSQL和SparkStreaming),在上面的代码中,只有RDD对象在被foreachXXX时才会进入到SparkCore级别的Job操做,在RDD以外的操做是属于Driver级别的操做,没法启动Job。
在基于RDD级别的SparkCore操做过程当中,其数据都是被封装成Job提交到集群,并在集群的各个节点上执行分配的Task,数据在各个Task节点之间传递须要数据自己支持可序列化,所以在Spark应用中出现的高频率内部类对象(好比上面的VoidFunction)都必须支持可序列化,这意味着在这些内部类对象中出现的成员也必须是可序列化的,所以咱们在这些内部类对象所在的上下文中编写代码时必须注意不能出现不可序列化的对象或引用(如不能出现基于瞬态的流化对象Connection、Statement、Thread等),即在这些内部类对象上下文中出现的对象引用必须是实现了java.io.Seralizable接口的。
[hadoop@CloudDeskTop ~]$ cd /project/RDDToJDBC/bin/
[hadoop@CloudDeskTop bin]$ ls
com mysql-connector-java-3.0.17-ga-bin.jar
[hadoop@CloudDeskTop bin]$ jar -cvfe /project/RDDToJDBC/package/RDDToJDBC.jar com.mmzs.bigdata.spark.core.cluster.RDDToDB com/
[hadoop@CloudDeskTop bin]$ cd ../package
[hadoop@CloudDeskTop package]$ ls
RDDToJDBC.jar
A、启动spark集群运行环境:[hadoop@master01 install]$ sh start-total.sh
#!/bin/bash echo "请首先确认你已经切换到hadoop用户" #启动zookeeper集群 for node in hadoop@slave01 hadoop@slave02 hadoop@slave03;do ssh $node "source /etc/profile; cd /software/zookeeper-3.4.10/bin/; ./zkServer.sh start; jps";done #开启dfs集群 cd /software/ && start-dfs.sh && jps #开启spark集群 #启动master01的Master进程,slave节点的Worker进程 cd /software/spark-2.1.1/sbin/ && ./start-master.sh && ./start-slaves.sh && jps #启动master02的Master进程 ssh hadoop@master02 "cd /software/spark-2.1.1/sbin/; ./start-master.sh; jps" #spark集群的日志服务,通常不开,由于比较占资源 #cd /software/spark-2.1.1/sbin/ && ./start-history-server.sh && cd - && jps start-spark.sh
B、在CloudDeskTop客户端节点上提交Spark应用
#将数据库中的旧数据删除掉
[root@CloudDeskTop bin]# pwd
/software/mysql-5.5.32/bin
[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "truncate table test.wordcount;"
[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "select * from test.wordcount;"
#准备源数据
[hadoop@CloudDeskTop jdbc]$ hdfs dfs -put testJDBC.txt /spark/input/
[hadoop@master02 ~]$ hdfs dfs -ls /spark/
Found 1 items
drwxr-xr-x - hadoop supergroup 0 2018-02-26 21:56 /spark/input
[hadoop@master02 ~]$ hdfs dfs -ls /spark/input
Found 1 items
-rw-r--r-- 3 hadoop supergroup 156 2018-02-26 21:56 /spark/input/testJDBC.txt
[hadoop@master02 ~]$ hdfs dfs -cat /spark/input/testJDBC.txt
zhnag san shi yi ge hao ren
jin tian shi yi ge hao tian qi
wo zai zhe li zuo le yi ge ce shi
yi ge guan yu scala de ce shi
welcome to mmzs
欢迎 欢迎
#提交Spark应用
首先: [hadoop@CloudDeskTop lib]$ cd /software/spark-2.1.1/bin/ 而后: 第一种提交方式:(可能会出现空指针异常的状况) [hadoop@CloudDeskTop bin]$ ./spark-submit --master spark://master01:7077 --class com.mmzs.bigdata.spark.core.cluster.RDDToDB /project/RDDToJDBC/package/RDDToJDBC.jar 第二种提交方式: [hadoop@CloudDeskTop bin]$ ./spark-submit --master spark://master01:7077 --class com.mmzs.bigdata.spark.core.cluster.RDDToDB --jars /software/spark-2.1.1/jars/mysql-connector-java-3.0.17-ga-bin.jar /project/RDDToJDBC/package/RDDToJDBC.jar
C、测试关系数据库中是否已经有数据
[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "select * from test.wordcount;"