DataFrame原生支持直接输出到JDBC,但若是目标表有自增字段(好比id),那么DataFrame就不能直接进行写入了。由于DataFrame.write().jdbc()要求DataFrame的schema与目标表的表结构必须彻底一致(甚至字段顺序都要一致),不然会抛异常,固然,若是你SaveMode选择了Overwrite,那么Spark删除你原有的表,而后根据DataFrame的Schema生成一个。。。。字段类型会很是很是奇葩。。。。
因而咱们只能经过DataFrame.collect(),把整个DataFrame转成List<Row>到Driver上,而后经过原生的JDBC方法进行写入。可是若是DataFrame体积过于庞大,很容易致使Driver OOM(特别是咱们通常不会给Driver配置太高的内存)。这个问题真的很让人纠结。
翻看Spark的JDBC源码,发现其实是经过foreachPartition方法,在DataFrame每个分区中,对每一个Row的数据进行JDBC插入,那么为何咱们就不能直接用呢?java
Spark JdbcUtils.scala部分源码:web
def saveTable(df: DataFrame,url: String,table: String,properties: Properties = new Properties()) { val dialect = JdbcDialects.get(url) val nullTypes: Array[Int] = df.schema.fields.map { field => dialect.getJDBCType(field.dataType).map(_.jdbcNullType).getOrElse( field.dataType match { case IntegerType => java.sql.Types.INTEGER case LongType => java.sql.Types.BIGINT case DoubleType => java.sql.Types.DOUBLE case FloatType => java.sql.Types.REAL case ShortType => java.sql.Types.INTEGER case ByteType => java.sql.Types.INTEGER case BooleanType => java.sql.Types.BIT case StringType => java.sql.Types.CLOB case BinaryType => java.sql.Types.BLOB case TimestampType => java.sql.Types.TIMESTAMP case DateType => java.sql.Types.DATE case t: DecimalType => java.sql.Types.DECIMAL case _ => throw new IllegalArgumentException( s"Can't translate null value for field $field") }) } val rddSchema = df.schema val driver: String = DriverRegistry.getDriverClassName(url) val getConnection: () => Connection = JDBCRDD.getConnector(driver, url, properties) // ****************** here ****************** df.foreachPartition { iterator => savePartition(getConnection, table, iterator, rddSchema, nullTypes) } }
嗯。。。既然Scala能实现,那么做为他的爸爸,Java也应该能玩!
咱们看看foreachPartition的方法原型:sql
def foreachPartition(f: Iterator[Row] => Unit)
又是函数式语言最爱的匿名函数。。。很是讨厌写lambda,因此咱们仍是实现个匿名类吧。要实现的抽象类为:
scala.runtime.AbstractFunction1<Iterator<Row>,BoxedUnit> 两个模板参数,第一个很直观,就是Row的迭代器,做为函数的参数。第二个BoxedUnit,是函数的返回值。不熟悉Scala的可能会很困惑,其实这就是Scala的void。因为Scala函数式编程的特性,代码块的末尾必须返回点什么,因而他们就搞出了个unit来代替本应什么都没有的void(解释得可能不是很准确,我是这么理解的)。对于Java而言,咱们能够直接使用BoxedUnit.UNIT,来获得这个“什么都没有”的东西。
来玩耍一下吧!apache
df.foreachPartition(new AbstractFunction1<Iterator<Row>, BoxedUnit>() { @Override public BoxedUnit apply(Iterator<Row> it) { while (it.hasNext()){ System.out.println(it.next().toString()); } return BoxedUnit.UNIT; } });
嗯,maven complete一下,spark-submit看看~
好勒~抛异常了
org.apache.spark.SparkException: Task not serializable
Task不能被序列化
嗯哼,想一想以前实现UDF的时候,UDF1/2/3/4...各接口,都extends Serializable,也就是说,在Spark运行期间,Driver会把UDF接口实现类序列化,并在Executor中反序列化,执行call方法。。。这就不难理解了,咱们foreachPartition丢进去的类,也应该implements Serializable。这样,咱们就得本身搞一个继承AbstractFunction1<Iterator<Row>, BoxedUnit>,又实现Serializable的抽象类,给咱们这些匿名类去实现!编程
import org.apache.spark.sql.Row; import scala.runtime.AbstractFunction1; import scala.runtime.BoxedUnit; import java.io.Serializable; public abstract class JavaForeachPartitionFunc extends AbstractFunction1<Iterator<Row>, BoxedUnit> implements Serializable { }
但是每次都要return BoxedUnit.UNIT 搞得太别扭了,没一点Java的风格。app
import org.apache.spark.sql.Row; import scala.collection.Iterator; import scala.runtime.AbstractFunction1; import scala.runtime.BoxedUnit; import java.io.Serializable; public abstract class JavaForeachPartitionFunc extends AbstractFunction1<Iterator<Row>, BoxedUnit> implements Serializable { @Override public BoxedUnit apply(Iterator<Row> it) { call(it); return BoxedUnit.UNIT; } public abstract void call(Iterator<Row> it); }
因而咱们能够直接Override call方法,就能够用满满Java Style的代码去玩耍了!maven
df.foreachPartition(new JavaForeachPartitionFunc() { @Override public void call(Iterator<Row> it) { while (it.hasNext()){ System.out.println(it.next().toString()); } } });
注意!咱们实现的匿名类的方法,其实是在executor上执行的,因此println是输出到executor机器的stdout上。这个咱们能够经过Spark的web ui,点击具体Application的Executor页面去查看(调试用的虚拟机集群,手扶拖拉机同样的配置,别吐槽了~)ide
至于foreach方法同理。只不过把Iterator<Row> 换成 Row。具体怎么搞,慢慢玩吧~~~
have fun~函数式编程