接下来作的操做是:(这个操做,将程序打成jar包到集群中运行)
(1)编写spark程序在线上的hive中建立表并导入数据
(2)查询hive中的数据
(3)将查询结果保存到MySQL中
代码:python
object SparkSqlTest {
def main(args: Array[String]): Unit = { //屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") val spark: SparkSession = SparkSession.builder().config(conf) .enableHiveSupport() //这句话表示支持hive .getOrCreate() //建立sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //建立sparkContext val sc: SparkContext = spark.sparkContext //建立数据库 var sql= """ |create database if not exists `test` """.stripMargin spark.sql(sql) //使用当前建立的数据库 sql= """ |use `test` """.stripMargin spark.sql(sql) //建立hive表 sql= """ |create table if not exists `test`.`teacher_basic`( |name string, |age int, |married boolean, |children int |) row format delimited |fields terminated by ',' """.stripMargin spark.sql(sql) //加载数据 sql= """ |load data local inpath 'file:///home/hadoop/teacher_info.txt' |into table `test`.`teacher_basic` """.stripMargin spark.sql(sql) //执行查询操做 sql= """ |select * from `test`.`teacher_basic` """.stripMargin val hiveDF=spark.sql(sql) val url="jdbc:mysql://localhost:3306/test" val table_name="teacher_basic" val pro=new Properties() pro.put("password","123456") pro.put("user","root") hiveDF.write.mode(SaveMode.Append).jdbc(url,table_name,pro) } }
打jar包到集群中运行:https://blog.51cto.com/14048416/2337760mysql
做业提交shell:sql
spark-submit \
--class com.zy.sql.SparkSqlTest \ --master yarn \ --deploy-mode cluster \ --driver-memory 512M \ --executor-memory 512M \ --total-executor-cores 1 \ file:////home/hadoop/SparkSqlTest-1.0-SNAPSHOT.jar \
而后满怀期待的等待着success,不幸的是,当程序运行到一半的时候异常终止了:
我查看了一下打印的日志:
我上网查了好多资料,都说是hive的版本太高,what? I‘not why!!
而后想了想,我在集群中,使用spark的程序,去在hive表中进行操做,那么是否是spark须要和hive整合一下啊,而后我又上网查了spark如何整合hive,总的来讲就是将hive的元数据库共享出去,让spark能够访问。
具体操做:
①在hive的hive-site.xml加入:shell
<property> <name>hive.metastore.uris</name> <value>thrift://hadoop01:9083</value> #在哪里启动这个进程 </property>
②在相应的节点上启动在hive-site.xml中配置的进程数据库
nohup hive --service metastore 1>/home/hadoop/logs/hive_thriftserver.log 2>&1 &
ps:这里须要注意一下,nohup是后台启动的,并且全部的信息都是定向输出的,这条命令使用以后,必定要检查一下这个命令是否真的执行成功了:
使用:jsp查一下是否有相应的进程启动,若是没有表示启动失败,确定是 /home/hadoop/logs这个父目录没有建立,而后建立这个目录以后,在启动,在检查是否启动成功!!!!!!!
③将hive-site.xml复制到$SPARK_HOME/conf下(注意是每个节点都要复制)
④测试是否成功:spark-sql,若是正确进入而且能够访问hive的表,表示spark整合hive成功!!!apache
以后我有将原来的程序,从新跑了一次,结果 没有报错,程序运行成功!!!
我不敢相信,我又查看了一下MySQL的表:
确认 程序成功!!!!!!编程