Spark是一个通用的大规模数据快速处理引擎。能够简单理解为Spark就是一个大数据分布式处理框架。基于内存计算的Spark的计算速度要比Hadoop的MapReduce快上50倍以上,基于磁盘的计算速度也快于10倍以上。Spark运行在Hadoop第二代的yarn集群管理之上,能够轻松读取Hadoop的任何数据。可以读取HBase、HDFS等Hadoop的数据源。java
从Spark 1.0版本起,Spark开始支持Spark SQL,它最主要的用途之一就是可以直接从Spark平台上面获取数据。而且Spark SQL提供比较流行的Parquet列式存储格式以及从Hive表中直接读取数据的支持。以后,Spark SQL还增长了对JSON等其余格式的支持。到了Spark 1.3 版本Spark还可使用SQL的方式进行DataFrames的操做。咱们经过JDBC的方式经过前台业务逻辑执行相关sql的增删改查,经过远程链接linux对文件进行导入处理,使项目可以初步支持Spark平台,现现在已支持Spark1.6版本。那么从应用的前台与后台两个部分来简介基于Spark的项目开发实践。mysql
一、 JDBC链接方式。linux
前台咱们使用ThriftServer链接后台SparkSQL,它是一个JDBC/ODBC接口,经过配置Hive-site.xml,就可使前台用JDBC/ODBC链接ThriftServer来访问HDFS的数据。ThriftServer经过调用hive元数据信息找到表或文件信息在hdfs上的具体位置,并经过Spark的RDD实现了hive的接口。对于业务的增、删、改、查都是经过SparkSQL对HDFS上存储的相应表文件进行操做。项目前台中须要引入相应hive-jdbc等的jar包。sql
<dependency>shell
<groupId>org.apache.hadoop</groupId>数据库
<artifactId>hadoop-common</artifactId>apache
<version>${hadoop-common.version}</version>api
<exclusions>缓存
<exclusion>tomcat
<artifactId>jdk.tools</artifactId>
<groupId>jdk.tools</groupId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-common</artifactId>
<version>${hive-common.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive-exec.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive-jdbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>${hive-metastore.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-service</artifactId>
<version>${hive-service.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libfb303</artifactId>
<version>${libfb303.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j-api.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j-log4j12.version}</version>
</dependency>
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>${jsch.version}</version>
</dependency>
二、Parquet列式文件存储格式
咱们使用Parquet面向列存存储的文件存储结构现现在的Spark版本已经支持了列式存储格式parquet,由于Parquet具备高压缩比的特色且适合嵌套数据类型的存储,可以避免没必要要的IO性能。但在Spark1.3时并无默认支持,这里就再也不对该文件格式进行过多的说明,建立parquet格式表结构建表语句以下:
Create table yangsy as select * from table name
三、数据的导入。
使用的是Apache的一个项目,最先做为Hadoop的一个第三方模块存在,主要功能是在Hadoop(hive)与传统的数据库(mysql、oracle等)间进行数据的传递,能够将一个关系型数据库中的数据导入到Hadoop的HDFS中,也能够将HDFS的数据导进到关系数据库中。
这里注意,执行sqoop导入须要占用yarn的资源进行mapreduce,因为spark开启后即使在空闲状态下也不释放内存,故修改spark-env.sh配置下降memory或暂时停用thfitserver,分以便运行sqoop。
Create job -f 3 -t 4
Creating job for links with from id 3 and to id 4
Please fill following values to create new job object
Name: Sqoopy
From database configuration
Schema name: hive
Table name: TBLS
Table SQL statement:
Table column names:
Partition column name:
Null value allowed for the partition column:
Boundary query:
ToJob configuration
Output format:
0 : TEXT_FILE
1 : SEQUENCE_FILE
Choose: 0
Compression format:
successfully created with validation status OK and persistent id 2
0 : NONE
1 : DEFAULT
2 : DEFLATE
3 : GZIP
4 : BZIP2
5 : LZO
6 : LZ4
7 : SNAPPY
8 : CUSTOM
Choose: 0
Custom compression format:
Output directory: hdfs://hadoop000:8020/sqoop2
Throttling resources
Extractors:
Loaders:
New job was
四、先后台的交互实现工具类。
工具类提供静态的方法,能够进行相应业务逻辑的调用,因为Hadoop集群存在于服务器端,前台须要实现跨平台服务器的链接,才能执行相应的Hadoop命令,实现对HDFS上文件的操做。这次设计的ShellUtils类,经过jsch链接Linux服务器执行shell命令.须要引入jsch的jar包:
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>${jsch.version}</version>
</dependency>
private static JSch jsch;
private static Session session;
public static void connect(String user, String passwd, String host) throws JSchException {
jsch = new JSch();
session = jsch.getSession(user, host,22);
session.setPassword(passwd);
java.util.Properties config = new java.util.Properties();
config.put("StrictHostKeyChecking", "no");
session.setConfig(config);
五、数据的下载:
经过传入的Linux命令、用户名、密码等参数对远程linux服务器进行链接。调用hadoop的cat命令直接将文件从HDFS上合并下来经过ftp方式传入tomcat所在服务器,拿到相应的清单文件,大大减小了读取生成文件所须要的时间。命令以下:
String command = "cd " + ftpPath + " && " + hadoopPath + "hadoop fs -cat '" + hdfsPath+ listRandomName + "/*'>" + listName1+".csv;"+ "sed -i '1i"+ title +"' " + listName1+".csv;"
CodecUtil类,用来实现不一样类型压缩文件的解压工做,经过传入的压缩类型,利用反射机制锁定压缩的类型,因为存储在hdfs上的文件都是以文件块的形式存在的,因此首先须要获取hdfs中文件的二级子目录,遍历查询到每个文件块的文件路径,随后经过输入输出流进行文件的解压工做。而后将此类打包成jar包放入集群中,经过前台远程链接服务端,执行hadoop命令操做执行,实现类部分代码以下:
public class CodecUtil{
public static void main(String[] args) throws Exception {
//compress("org.apache.hadoop.io.compress.GzipCodec");
String listName = args[0];
String codecType = args[1];
String hdfsPath = args[2];
uncompress(listName,codecType,hdfsPath);
//解压缩
public static void uncompress(String listName,String CodecType,String hdfsPath) throws Exception{
Class<?> codecClass = Class.forName(CodecType);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path listf =new Path(hdfsPath+listName);
//获取根目录下的全部2级子文件目录
FileStatus stats[]=fs.listStatus(listf);
CompressionCodec codec = (CompressionCodec)
ReflectionUtils.newInstance(codecClass, conf);
int i;
for ( i = 0; i < stats.length; i++){
//得到子文件块的文件路径
String Random = findRandom();
Path list = new Path(stats[i].getPath().toString());
InputStream in = codec.createInputStream(inputStream);
FSDataOutputStream output = fs.create(new Path(hdfsPath + listName+"/"+unListName));
IOUtils.copyBytes(in, output, conf);
IOUtils.closeStream(in);
}
}
}
六、功能性导入文件
经过功能选择,将须要导入的CSV文件经过ftp方式上传到Spark所在服务器,再将文件经过load的方式导入表中,实现导入文件的业务导入。执行sql以下:
String sql = " LOAD DATA LOCAL INPATH '" + Path + fileName
+ "' OVERWRITE INTO TABLE " + tabName;
七、获取表头信息。
可使用describe table,从而获取HDFS上对应的表头信息,从而根据业务进行相应的业务逻辑处理。
八、JDBC链接问题
这里简要说一下执行的性能问题,咱们经过JDBC方式提交SQL给spark,假若SQL中含有大量的窗口函数像row_number over()一类的,在大数据量的状况下会形成任务执行完毕,但前台jdbc卡死,程序没法继续进行的状况。这是因为像窗口函数以及聚合函数都是至关于MapReduce的Shuffle操做。在提交至Spark运行过程当中, DAGScheduler会把Shuffle的过程切分红map和reduce两个Stage(以前一直被我叫作shuffle前和shuffle后),map的中间结果是写入到本地硬盘的,而不是内存,因此对磁盘的读写要求很是高,(最好是固态硬盘比较快,本人亲自尝试,一样的性能参数下,固态硬盘会比普通磁盘快10倍。还有,经过调大shuffle partition的数目从而减小每一个task所运算的时间,能够减小运行的时间,不至于前台卡死。但不建议前台使用窗口函数进行业务逻辑处理,前台卡死的概率仍是很大。
九、性能调优部分参数
Spark默认序列化方式为Java的ObjectOutputStream序列化一个对象,速度较慢,序列化产生的结果有时也比较大。因此项目中咱们使用kryo序列化方式,经过kryo序列化,使产生的结果更为紧凑,减小内存的占用空间,同时减小了对象自己的元数据信息与基本数据类型的开销,从而更好地提升了性能。
Spark默认用于缓存RDD的空间为一个executor的60%,项目中因为考虑到标签数量为成百个,使用一样规则与数量的标签进行客户群探索及客户群生成的几率很小。因此修改spark.storage.memoryFaction=0.4,这样使百分之60%的内存空间能够在task执行过程当中缓存建立新对象,从而加大task的任务执行效率,以及spark.shuffle.memoryFraction参数。不过从至今Spark1.6已经动态的调整计算内存与缓存内存的大小,这个参数也可不比手动配置,具体要根据项目是缓存的数据仍是计算数据的比例来决定。
十、decimal数据类型改成double数据类型
Decimal数据类型在spark1.3及spark1.4版本没法更好的支持parquet文件格式,生成文件时会报没法识别该类型,现现在的版本已经更加优化了decimal,但具体是否支持暂时还没有尝试。
至此,前台的相关方法就介绍完毕,开始后台
所谓的后台,就是进行真正的数据处理,用Scala编写处理逻辑生成jar包提交于spark-submit,生成从而服务于上层应用的数据表。
一、 环境变量的加载
val sparkConf = new SparkConf()
val sc: SparkContext = new SparkContext(sparkConf)
val sqlContext = new HiveContext(sc)
这里能够经过直接set参数从而告诉spark要申请多少内存,多少个核,启动多少个executer例如:
val sparkConf = new SparkConf().setMaster("yarn").setAppName("app")
.set("spark.executor.memory", "4g")
不过不建议在代码中写死,能够写个配置文件加载类往里面传入参数,也能够经过在提交spark-submit的时候指定参数:
./bin/spark-submit --conf spark.ui.port=4444 --name "app" --master yarn-client --num-executors(num) --executor-cores (num) --executor-memory (num) --class main.asiainfo.coc.CocDss $CocBackHome/jar_name.jar
加载mysql中的配置信息表,从而进行相应的业务逻辑处理:
val mySQLUrl = "jdbc:mysql://localhost:3306/yangsy?user=root&password=yangsiyi"
val table_name = sqlContext.jdbc(mySQLUrl,"table_name").cache()
二、多表关联:
val table_join = table1.join(table2, table1 ("id") === table2 ("id"),"inner") 或
Val table_join = table1.join(table2,”id”)
这里要注意一点,多表进行join的时候很容易形成ID冲突,因为应用于生产环境的依旧是Spark1.4版本(Spark1.5,1.6是否稳定有待测试,因此暂时没有用),因此仍是使用第一种方法稳妥,该方法为Spark1.3的使用方法,毕竟稳定第一。
二、 读取本地数据文件,根据某个字段排序并注册成表:
case class test(column1 : String,column2: String,column3: String)
(这里要注意,case class必定要写在业务处理方法以外)
val loadData = sc.textFile(path)
val data = loadData.map(_.split(",")).sortBy(line => line.indexOf(1))
val dataTable = sqlContext.repartition(400).createDataFrame(data).registerTempTable("test")
sqlContext.sql("drop table if exists asiainfo_yangsy")
sqlContext.sql("select * from test").toDF().saveAsTable("asiainfo_yangsy ")
这里要注意的是,要调用registerTempTable函数,必须调用createDataFrame,通过资料查阅,读取文件生成的RDD只是个普通的RDD,而registerTempTable并不属于RDD类,因此经过建立SchemaRDD实例进行调用。随后注册成表后,转化为DataFrame,保存表至HDFS。,
顺便提一下repartition函数,经过此函数来设置patition的数量。由于一个partition对应的就是stage的一个task,那么根据真实的数据量进行设置,从而减小OOM的可能性。不过现现在Spark1.6版本已经支持自行调整parition数量,代码中可不比添加。
四、读取HDFS中的表或数据文件:
val loadData2 = sqlContext.read.table("asiainfo_")
五、describe函数
val select_table_cache = sqlContext.table(save_table_name)
al describeTable1 = select_table_cache.describe().show()
这里要强调下,describe函数的调用并非前台那样,获取表头信息。而是获取相应列数据的count、mean、stddev、min以及max值。用于作一些简单的统计。
六、根据join后的DF生成须要业务数据的DF,并根据某个table某一字段进行排序
val select_table= table.select(table1("id"),
table("update_cycle"), table1("column_id"),
table2("table_id")).repartition(400).sort(table1("label_id"))
七、数据的MapReduce
val loadData = sc.textFile(path)
val map_reduce = loadData.
flatMap(line =>line.split(",")).map(w =>(w,1)).reduceByKey(_+_).foreach(println)
剩下还有不少使用的函数就不一一说明了,具体应用查官网API便可。