HIVE数据导入CLICKHOUSE

import java.util.Properties

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object HiveToClickHouseDF {
  final val USERNAME = ""
  final val PASSWORD = ""
  final val APPNAME = ""
  final val URL = ""
  final val CLICKHOUSE_TABLE = "HiveToClickHouseDF-JOB"

  def main(args: Array[String]): Unit = {
    if (args.length < 5) {
      System.err.println("Usage: OfficialJDBCDriver <tableName> <partition> <batchSize>\n" +
        " <hiveTableName> is the clickhouse database+table name,like default.test \n" +
        " <ckTableName> is the hive database+table name ,like default.test\n" +
        " <partition> is the partition which want insert into clickhouse, like 20200516\n" +
        " <batchSize> is JDBC batch size, may be 1000\n" +
        " <parallel> is spark job parallel, may be 5\n\n")
      System.exit(1)
    }

    val (hiveTableName,ckTableName, partition, batchSize,pare) = (args(0), args(1),args(2), args(3).toInt,args(4).toInt)
    val sparkConf = new SparkConf()
      .setAppName(APPNAME)
    val spark = SparkSession
      .builder()
      .enableHiveSupport()
      .appName("mazenn-clickhouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.kryoserializer.buffer", "10")
      .getOrCreate()

    // An highlighted block
    val prop = new Properties()
    val ckDriver = "ru.yandex.clickhouse.ClickHouseDriver"
    prop.put("driver", ckDriver)
    prop.put("user", USERNAME)
    prop.put("password", PASSWORD)
    val df = spark.sql(s"select * from $hiveTableName where dt = $partition")
    println(df.count())
    df.coalesce(pare).write.mode(saveMode = "append")
      .option("batchsize", batchSize)
      .option("isolationLevel", "NONE") // 设置事务
      .jdbc(URL, ckTableName, prop)

  }
}