import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object HiveToClickHouseDF {final val USERNAME =""final val PASSWORD =""final val APPNAME =""final val URL =""final val CLICKHOUSE_TABLE ="HiveToClickHouseDF-JOB"
def main(args: Array[String]): Unit ={if(args.length <5){
System.err.println("Usage: OfficialJDBCDriver <tableName> <partition> <batchSize>\n"+" <hiveTableName> is the clickhouse database+table name,like default.test \n"+" <ckTableName> is the hive database+table name ,like default.test\n"+" <partition> is the partition which want insert into clickhouse, like 20200516\n"+" <batchSize> is JDBC batch size, may be 1000\n"+" <parallel> is spark job parallel, may be 5\n\n")
System.exit(1)}
val (hiveTableName,ckTableName, partition, batchSize,pare)=(args(0),args(1),args(2),args(3).toInt,args(4).toInt)
val sparkConf =newSparkConf().setAppName(APPNAME)
val spark = SparkSession
.builder().enableHiveSupport().appName("mazenn-clickhouse").config("spark.serializer","org.apache.spark.serializer.KryoSerializer").config("spark.kryoserializer.buffer","10").getOrCreate()// An highlighted block
val prop =newProperties()
val ckDriver ="ru.yandex.clickhouse.ClickHouseDriver"
prop.put("driver", ckDriver)
prop.put("user", USERNAME)
prop.put("password", PASSWORD)
val df = spark.sql(s"select * from $hiveTableName where dt = $partition")println(df.count())
df.coalesce(pare).write.mode(saveMode ="append").option("batchsize", batchSize).option("isolationLevel","NONE")// 设置事务.jdbc(URL, ckTableName, prop)}}