一,简介mysql
2.1 函数定义apache
2.2 函数注册json
2.3 示例app
3.1 JSONui
3.2 JDBCurl
3.3 ParQuetspa
3.4 CSVcode
不少时候sql中的内置函数没法知足咱们的平常开发需求,这就须要咱们进行函数的自定义。同时Spark的数据源来源普遍,如JSON,MYSQL等均可以做为咱们的数据源。
val fun1 = (arg: String) => { arg + "aaa" }
spark.udf.register("addString", fun1)
package cn.edu360.spark07 import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.types._ object AutoFun { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .appName("sparkDateSet1") .master("local[2]") .getOrCreate() val lines: Dataset[String] = spark.read.textFile("hdfs://hd1:9000/wordcount/input/") import spark.implicits._ val words: Dataset[String] = lines.flatMap(_.split(" ")) // 注册视图操做SQL形式 words.createTempView("v_wc") // 定义函数 val fun1 = (arg: String) => { arg + "aaa" } // 对函数进行注册 spark.udf.register("addString", fun1) val result: DataFrame = spark.sql("select addString(value), count(*) from v_wc group by value") result.show() spark.stop() } }
object JsonDataSource { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("JdbcDataSource") .master("local[*]") .getOrCreate() import spark.implicits._ //指定之后读取json类型的数据(有表头) val jsons: DataFrame = spark.read.json("/Users/zx/Desktop/json") val filtered: DataFrame = jsons.where($"age" <=500) filtered.printSchema() filtered.show() spark.stop() } }
package cn.edu360.spark07 import org.apache.spark.sql.{DataFrame, SparkSession} object JDBCSource { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("JdbcDataSource") .master("local[*]") .getOrCreate() import spark.implicits._ val log: DataFrame = spark.read.format("jdbc").options( Map("url" -> "jdbc:mysql://localhost:3306/test?useSSL=true", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "log", "user" -> "root", "password" -> "qwe123" ) ).load() val result: DataFrame = log.select($"id", $"name", $"age") result.show() spark.stop() } }
package cn.edu360.day7 import org.apache.spark.sql.{DataFrame, SparkSession} /** * Created by zx on 2017/9/18. */ object ParquetDataSource { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("ParquetDataSource") .master("local[*]") .getOrCreate() //指定之后读取json类型的数据 val parquetLine: DataFrame = spark.read.parquet("/Users/zx/Desktop/parquet") //val parquetLine: DataFrame = spark.read.format("parquet").load("/Users/zx/Desktop/pq") parquetLine.printSchema() //show是Action parquetLine.show() spark.stop() } }
package cn.edu360.day7 import org.apache.spark.sql.{DataFrame, SparkSession} /** * Created by zx on 2017/9/18. */ object CsvDataSource { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("CsvDataSource") .master("local[*]") .getOrCreate() //指定之后读取json类型的数据 val csv: DataFrame = spark.read.csv("/Users/zx/Desktop/csv") csv.printSchema() val pdf: DataFrame = csv.toDF("id", "name", "age") pdf.show() spark.stop() } }