首先咱们要建立SparkSessionjava
val spark = SparkSession.builder() .appName("test") .master("local") .getOrCreate() import spark.implicits._ //将RDD转化成为DataFrame并支持SQL操做
而后咱们经过SparkSession来建立DataFramemysql
1.使用toDF
函数建立DataFramesql
经过导入(importing)spark.implicits, 就能够将本地序列(seq), 数组或者RDD转为DataFrame。shell
只要这些数据的内容能指定数据类型便可。数据库
import spark.implicits._ val df = Seq( (1, "zhangyuhang", java.sql.Date.valueOf("2018-05-15")), (2, "zhangqiuyue", java.sql.Date.valueOf("2018-05-15")) ).toDF("id", "name", "created_time")
注意:若是直接用toDF()而不指定列名字,那么默认列名为"_1", "_2"apache
能够经过df.withColumnRenamed("_1", "newName1").withColumnRenamed("_2", "newName2")进行修改列名json
2.使用createDataFrame
函数建立DataFrame数组
经过schema + row 来建立app
咱们能够通俗的理解为schema为表的表头,row为表的数据记录函数
import org.apache.spark.sql.types._ //定义dataframe的结构的schema val schema = StructType(List( StructField("id", IntegerType, nullable = false), StructField("name", StringType, nullable = true), StructField("create_time", DateType, nullable = true) )) //定义dataframe内容的rdd val rdd = sc.parallelize(Seq( Row(1, "zhangyuhang", java.sql.Date.valueOf("2018-05-15")), Row(2, "zhangqiuyue", java.sql.Date.valueOf("2018-05-15")) )) //建立dataframe val df = spark.createDataFrame(rdd, schema)
不过,咱们能够把文件结构当作参数来使用,经过rdd自动产生schema和row,不用本身手动生成。
import org.apache.spark.sql.types._ //传入属性参数 val schemaString = " id name create_time" //解析参数变成StructField val fields = schemaString.split(" ") .map(fieldName => StructField(fieldname, StringType, nullable = true)) //定义dataframe的结构的schema val schema = StructType(fields) //定义dataframe内容的rdd val lines = sc.textFile("file:///people.txt") val rdd = lines.spilt(_.split(",")) .map(attributes=>ROW(attributes(0),attributes(1).trim) ) //建立dataframe val df = spark.createDataFrame(rdd, schema)
3.经过反射机制建立DataFrame
首先要定义一个case class,由于只有case class才能被Spark隐式转化为DataFrame
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.Encoder import spark.implicits._ //建立匹配类 case class Person(id:Int,name:String,age:Long) //读取文件生成rdd val rdd = sc.textFile("file:///") //经过匹配类把rdd转化成dataframe val df = rdd.map(_.split(",")) .map(attributes => Person(attributes(0),attributes(1),attributes(2).trim.toInt)) .toDF()
4.经过文件直接建立DataFrame
(1)使用parquet文件read建立
val df = spark.read.parquet("hdfs:/path/to/file")
(2)使用json文件read建立
val df = spark.read.json("examples/src/main/resources/people.json")
(3)使用csv文件load建立
val df = spark.read .format("com.databricks.spark.csv") .option("header", "true") //reading the headers .option("mode", "DROPMALFORMED") .load("csv/file/path")
(4)使用Hive表建立
spark.table("test.person") // 库名.表名 的格式 .registerTempTable("person") // 注册成临时表 spark.sql( """ | select * | from person | limit 10 """.stripMargin).show()
记得,最后咱们要调用spark.stop()来关闭SparkSession。
5.保存
(1)经过df.write.format().save("file:///")保存
write.format()支持输出的格式有 JSON、parquet、JDBC、orc、csv、text等文件格式
,save()定义保存的位置
当咱们保存成功后能够在保存位置的目录下看到文件,可是这个文件并非一个文件而是一个目录。
里面的内容通常为
不用担忧,这是没错的。
咱们读取的时候,并不须要使用文件夹里面的part-xxxx文件,直接读取目录便可。
(2)经过df.rdd.saveAsTextFile("file:///")转化成rdd再保存
咱们对于不一样格式的文件读写来讲,咱们通常使用两套对应方式
val df = spark.read.格式("file:///")//读取文件 df.write.格式("file:///")//保存文件
val df = spark.read.format("").load("file:///")//读取文件 df.write.save("file:///")//保存文件
具体read和load方法有什么不一样,我还不是很清楚,弄明白了回来补充。
6.经过JDBC建立DataFrame
咱们在启动Spark-shell或者提交任务的时候须要添加相应的jar包
spark-shell(spark-submit)
--jars /usr/local/spark/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar \
--driver-class-path /usr/local/spark/mysql-connector-java-5.1.40-bin.jar
val jdbcDf = spark.read.format("jdbc") .option("driver", "com.mysql.jdbc.Driver") //驱动 .option("url", "jdbc:mysql://ip:3306") //数据库地址 .option("dbtable", "db.user_test") //表名:数据库名.表名 .option("user", "test") //用户名 .option("password", "123456") //密码 .load() jdbcDf.show()