X-Pack Spark服务经过外部计算资源的方式,为Redis、Cassandra、MongoDB、HBase、RDS存储服务提供复杂分析、流式处理及入库、机器学习的能力,从而更好的解决用户数据处理相关场景问题。java
一键关联POLARDB到Spark集群python
一键关联主要是作好spark访问RDS & POLARDB的准备工做。
mysql
POLARDB表存储sql
在database ‘test1’中每5分钟生成一张表,这里假设为表 'test1'、'test2'、'test2'、...
数据库
具体的建表语句以下:服务器
*请左右滑动阅览session
CREATE TABLE `test1` ( `a` int(11) NOT NULL, `b` time DEFAULT NULL, `c` double DEFAULT NULL, PRIMARY KEY (`a`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
归档到Spark的调试app
x-pack spark提供交互式查询模式支持直接在控制台提交sql、python脚本、scala code来调试。机器学习
一、首先建立一个交互式查询的session,在其中添加mysql-connector的jar包。学习
*请左右滑动阅览
wget https://spark-home.oss-cn-shanghai.aliyuncs.com/spark_connectors/mysql-connector-java-5.1.34.jar
二、建立交互式查询
以pyspark为例,下面是具体归档demo的代码:
*请左右滑动阅览
spark.sql("drop table sparktest").show() # 建立一张spark表,三级分区,分别是天、小时、分钟,最后一级分钟用来存储具体的5分钟的一张polardb表达的数据。字段和polardb里面的类型一致 spark.sql("CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string) " "USING parquet PARTITIONED BY (dt ,hh ,mm )").show() #本例子在polardb里面建立了databse test1,具备三张表test1 ,test2,test3,这里遍历这三张表,每一个表存储spark的一个5min的分区 # CREATE TABLE `test1` ( # `a` int(11) NOT NULL, # `b` time DEFAULT NULL, # `c` double DEFAULT NULL, # PRIMARY KEY (`a`) # ) ENGINE=InnoDB DEFAULT CHARSET=utf8 for num in range(1, 4): #构造polardb的表名 dbtable = "test1." + "test" + str(num) #spark外表关联polardb对应的表 externalPolarDBTableNow = spark.read \ .format("jdbc") \ .option("driver", "com.mysql.jdbc.Driver") \ .option("url", "jdbc:mysql://pc-xxx.mysql.polardb.rds.aliyuncs.com:3306") \ .option("dbtable", dbtable) \ .option("user", "name") \ .option("password", "xxx*") \ .load().registerTempTable("polardbTableTemp") #生成本次polardb表数据要写入的spark表的分区信息 (dtValue, hhValue, mmValue) = ("20191015", "13", str(05 * num)) #执行导数据sql spark.sql("insert into sparktest partition(dt= %s ,hh= %s , mm=%s ) " "select * from polardbTableTemp " % (dtValue, hhValue, mmValue)).show() #删除临时的spark映射polardb表的catalog spark.catalog.dropTempView("polardbTableTemp") #查看下分区以及统计下数据,主要用来作测试验证,实际运行过程能够删除 spark.sql("show partitions sparktest").show(1000, False) spark.sql("select count(*) from sparktest").show()
归档做业上生产
交互式查询定位为临时查询及调试,生产的做业仍是建议使用spark做业的方式运行,使用文档参考。这里以pyspark做业为例:
/polardb/polardbArchiving.py 内容以下:
*请左右滑动阅览
# -*- coding: UTF-8 -*- from __future__ import print_function import sys from operator import add from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession \ .builder \ .appName("PolardbArchiving") \ .enableHiveSupport() \ .getOrCreate() spark.sql("drop table sparktest").show() # 建立一张spark表,三级分区,分别是天、小时、分钟,最后一级分钟用来存储具体的5分钟的一张polardb表达的数据。字段和polardb里面的类型一致 spark.sql("CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string) " "USING parquet PARTITIONED BY (dt ,hh ,mm )").show() #本例子在polardb里面建立了databse test1,具备三张表test1 ,test2,test3,这里遍历这三张表,每一个表存储spark的一个5min的分区 # CREATE TABLE `test1` ( # `a` int(11) NOT NULL, # `b` time DEFAULT NULL, # `c` double DEFAULT NULL, # PRIMARY KEY (`a`) # ) ENGINE=InnoDB DEFAULT CHARSET=utf8 for num in range(1, 4): #构造polardb的表名 dbtable = "test1." + "test" + str(num) #spark外表关联polardb对应的表 externalPolarDBTableNow = spark.read \ .format("jdbc") \ .option("driver", "com.mysql.jdbc.Driver") \ .option("url", "jdbc:mysql://pc-.mysql.polardb.rds.aliyuncs.com:3306") \ .option("dbtable", dbtable) \ .option("user", "ma,e") \ .option("password", "xxx*") \ .load().registerTempTable("polardbTableTemp") #生成本次polardb表数据要写入的spark表的分区信息 (dtValue, hhValue, mmValue) = ("20191015", "13", str(05 * num)) #执行导数据sql spark.sql("insert into sparktest partition(dt= %s ,hh= %s , mm=%s ) " "select * from polardbTableTemp " % (dtValue, hhValue, mmValue)).show() #删除临时的spark映射polardb表的catalog spark.catalog.dropTempView("polardbTableTemp") #查看下分区以及统计下数据,主要用来作测试验证,实际运行过程能够删除 spark.sql("show partitions sparktest").show(1000, False) spark.sql("select count(*) from sparktest").show() spark.stop()
扫描下方 ⬇️二维码
了解关于X-Pack Spark计算服务的更多信息
双十一还不知道买什么?
阿里云数据库双11爆款直降
这份购物清单 ⬇️给你拿去!
双11福利来了!先来康康#怎么买云服务器最便宜# [并不简单]参团购买指定配置云服务器仅86元/年,开团拉新享三重礼:1111红包+瓜分百万现金+31%返现,爆款必买清单,还有iPhone 11 Pro、卫衣、T恤等你来抽,立刻来试试手气 https://www.aliyun.com/1111/2019/home?utm_content=g_1000083110
本文为云栖社区原创内容,未经容许不得转载。