利用Spark SQL实现轻量级用户数据查询

简介

当人们把愈来愈多的大数据存储在HDFS或者AWS的S3上,一般下一个问题是如何让全公司范围的员工可以方便的查询这些数据。一个选项是创建一个SQL-on-Hadoop系统,让用户使用SQL或者类SQL语言来查询数据,可是这些SQL-on-Hadoop系统每每比较复杂,须要必定的开发和维护工做量。html

另外一个选项是,若是你已经有了Spark或者Hadoop YARN集群,那么利用Spark SQL,经过编写少许的代码,你就能够创建一个轻量级的工具,让用户本身提交SQL语句,来获取他们须要的数据。java

主要思路

这里的思路是编写一个Spark程序,在其中设置DataFrame(Spark SQL中的数据表)的数据格式(schema),而后用户能够经过Spark程序的参数,指定一个SQL查询,进而执行这个查询。sql

示例代码

让咱们经过一个具体的例子,来展现如何经过代码实现这样的功能。具体代码参见这里,下面是一些简略解释。apache

数据文件

咱们在AWS S3中有两个文件: "s3n://bopublic/demo/selfservicequery/customers.json" 和 "s3n://bopublic/demo/selfservicequery/orders.json"。json

Spark程序

咱们编写完Spark程序后,用户能够经过如下命令行执行SQL语句:微信

java -cp ... YourJob -q "select * from customers join orders on customers.key = orders.customerKey"ide

建立DataFrame数据格式(schema)

在Spark中,StructType类用来定义DataFrame的数据格式(schema)。下面代码展现如何建立"customers"数据表的schema.工具

private static StructType createCustomerTableSchema() {
StructField[] fields = new StructField[] {
new StructField("key", DataTypes.IntegerType, true,
Metadata.empty()),
new StructField("name", DataTypes.StringType, true,
Metadata.empty()),
new StructField("address", DataTypes.StringType, true,
Metadata.empty())
};oop

StructType structType = new StructType(fields);
return structType;
}大数据

在Spark中加载数据

SparkConf conf = new SparkConf().setMaster(master).setAppName(
SparkSqlWithExplicitSchema.class.getSimpleName());

JavaSparkContext sc = new JavaSparkContext(conf);

String customerS3Path = "s3n://bopublic/demo/selfservicequery/customers.json";

JavaRDD customerRDD = sc.textFile(customerS3Path).mapPartitions(new ParseJson(customerTableSchema));

建立SQLContext和DataFrame

SQLContext sqlContext = new SQLContext(sc);

DataFrame customerDF = sqlContext.createDataFrame(customerRDD, createCustomerTableSchema());
customerDF.registerTempTable("customers");

运行SQL

String query = (get SQL query from program arguments);
DataFrame resultDF = sqlContext.sql(query);

输出结果

// SerializeToCsv is a class to convert DataFrame row data to CSV. See full source code for details.

JavaRDD csvRDD = resultDF.toJavaRDD().map(new SerializeToCsv());

一点讨论

显式建立Schema

因为Spark SQL能够自动根据JSON检测出数据格式,也许有人认为咱们不须要显式地建立DataFrame Schema。这里咱们仍然显式建立schema,有两个缘由:

  • JSON文件运行忽略某些属性,当这些属性的值是缺省值的时候。当这种状况发生的时候,Spark SQL检测不出这些被忽略的属性格式。

  • 对于其余数据格式,好比CSV,Spark SQL无法检测出schema,显式建立schema使得咱们仍然能够查询这些数据源。

Spark Thrift JDBC/ODBC Server

Spark自带一个Thrift JDBC/ODBC Server,人们可使用"beeline"工具链接上来执行SQL查询,参考Spark文档

这是方法也值得一试,可是目前Spark Thrift Server还未成熟到能够产品化应用的阶段。


扫描微信二维码联系做者
扫描微信二维码联系做者

相关文章
相关标签/搜索