SparkSQL从2.0开始已经再也不支持ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment], ...)
这种语法了(下文简称add columns语法)。若是你的Spark项目中用到了SparkSQL+Hive这种模式,从Spark1.x升级到2.x颇有可能遇到这个问题。html
为了解决这个问题,咱们通常有3种方案能够选择:git
OK,接下来,咱们进入主题。github
本文基于最新版的Spark 2.1.0,源码地址:https://github.com/apache/spark/tree/branch-2.1sql
Spark2.1开始使用ANTLR来解析SQL语法,它的语法定义文件借鉴的Presto项目,咱们在Spark源码中找到这个文件sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
,作以下改动:apache
@@ -127,6 +127,8 @@ statement ('(' key=tablePropertyKey ')')? #showTblProperties | SHOW COLUMNS (FROM | IN) tableIdentifier ((FROM | IN) db=identifier)? #showColumns + | ALTER TABLE tableIdentifier ADD COLUMNS + ('(' columns=colTypeList ')')? #addColumns | SHOW PARTITIONS tableIdentifier partitionSpec? #showPartitions | SHOW identifier? FUNCTIONS (LIKE? (qualifiedName | pattern=STRING))? #showFunctions @@ -191,7 +193,6 @@ unsupportedHiveNativeCommands | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=COMPACT | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=CONCATENATE | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=SET kw4=FILEFORMAT - | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=ADD kw4=COLUMNS | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=CHANGE kw4=COLUMN? | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=REPLACE kw4=COLUMNS | kw1=START kw2=TRANSACTION
194行的kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=ADD kw4=COLUMNS
是在unsupportedHiveNativeCommands
列表中,咱们首先把它去掉。session
为了让Spark能解析ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment], ...)
,咱们还须要在129行处新增| ALTER TABLE tableIdentifier ADD COLUMNS ('(' columns=colTypeList ')')? #addColumns
最后的#addColumns
是为了让ANTLR插件(这个插件定义在sql/catalyst/pom.xml中)为咱们自动生成addColumns相关方法,便于咱们作语法解析处理。这个语法中有2个参数须要咱们处理table_name和columns。ide
SparkSqlAstBuilder
的做用是将ANTLR的语法树翻译为LogicalPlan/Expression/TableIdentifier
oop
要修改的文件为:sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
,咱们在178行处,新增以下方法:ui
override def visitAddColumns(ctx: AddColumnsContext): LogicalPlan = withOrigin(ctx) { val tableName = visitTableIdentifier(ctx.tableIdentifier()) val dataCols = Option(ctx.columns).map(visitColTypeList).getOrElse(Nil) AlterTableAddColumnsCommand(tableName, dataCols) }
visitAddColumns方法是ANTLR插件自动为咱们生成的方法,定义在SparkSqlAstBuilder的父类AstBuilder中(AST,Abstract Syntax Tree ,抽象语法树),这个方法用来处理咱们在SqlBase.g4中定义的| ALTER TABLE tableIdentifier ADD COLUMNS ('(' columns=colTypeList ')')? #addColumns
,咱们这里重载了visitAddColumns方法用来提取表名及新增的字段列表,并返回一个LogicalPlan:AlterTableAddColumnsCommand,这个类咱们接下来会说明。spa
修改sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
,在120行处,新增AlterTableAddColumnsCommand类:
case class AlterTableAddColumnsCommand( tableName: TableIdentifier, newColumns: Seq[StructField]) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) DDLUtils.verifyAlterTableType(catalog, table, isView = false) val newSchema = StructType(table.schema.fields ++ newColumns) val newTable = table.copy(schema = newSchema) catalog.alterTable(newTable) Seq.empty[Row] } }
RunnableCommand类继承自LogicalPlan,run方法用于执行addColumns语法对应的执行逻辑。这个类的处理逻辑比较简单,就不详细介绍了。
咱们在第3步的AlterTableAddColumnsCommand中,虽然调用了catalog.alterTable(newTable)
来修改表信息,但实际上并不能将新的字段添加到表中,由于Spark代码写死了,不能改Hive表的schema,咱们还须要修改HiveExternalCatalog类(sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala),改动以下:
@@ -588,7 +588,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val newTableProps = oldDataSourceProps ++ withStatsProps.properties + partitionProviderProp val newDef = withStatsProps.copy( storage = newStorage, - schema = oldTableDef.schema, + // allow `alter table xxx add columns(xx)` + schema = tableDefinition.schema, partitionColumnNames = oldTableDef.partitionColumnNames, bucketSpec = oldTableDef.bucketSpec, properties = newTableProps)
咱们将591行的schema = oldTableDef.schema
替换为schema = tableDefinition.schema
便可。
至此,咱们完成了整个代码的调整。
最后参考Spark的编译文档:http://spark.apache.org/docs/latest/building-spark.html#building-a-runnable-distribution,将Spark编译打包便可。
Spark 2.x会将编译后的assembly放到jars目录下,咱们此次的改动会影响到如下几个jar包:
若是Spark已经部署过了,能够直接将以上3个jar替换掉。
更新Spark后,咱们就可使用alter table xxx add columns(xx)
了。