pyspark中的DataFrame等价于Spark SQL中的一个关系表。在pyspark中,DataFrame由Column和Row构成。html
在操做DataFrame以前,首先须要建立SparkSession,经过SparkSession来操做DataFrame。python
1,建立SparkSessionsql
经过Builder类来建立SparkSession,在Databricks Notebook中,spark是默认建立,表示一个SparkSession对象:apache
spark = SparkSession.builder \ .master("local") \ .appName("Word Count") \ .config("spark.some.config.option", "some-value") \ .getOrCreate()
函数注释:json
master(master):用于设置要链接的Spark的master URL,例如local表示在本地运行,local[4] 在本地使用4核运行,api
appName(name):为application设置一个名字app
config(key=None, value=None, conf=None):设置SparkSession的配置选项,函数
getOrCreate():得到一个已存在的或者建立一个新的SparkSessionfetch
2,从常量数据中建立DataFrameui
从RDD、list或pandas.DataFrame 建立DataFrame:
createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)
3,从SQL查询中建立DataFrame
从一个给定的SQL查询或Table中获取DataFrame,举个例子:
df.createOrReplaceTempView("table1") #use SQL query to fetch data df2 = spark.sql("SELECT field1 AS f1, field2 as f2 from table1") #use table to fetch data df2 = spark.table("table1")
4,属性
read:该属性是DataFrameReader 对象,用于读取数据,返回DataFrame对象
readStream:该属性是DataStreamReader对象,用于读取Data Stream,返回 流式的DataFrame对象( streaming DataFrame)
从外部存储系统中读取数据,返回DataFrame对象,一般使用SparkSession.read来访问,通用语法是先调用format()函数来指定输入数据的格式,后调用load()函数从数据源加载数据,并返回DataFrame对象:
df = spark.read.format('json').load('python/test_support/sql/people.json')
对于不一样的格式,DataFrameReader类有细分的函数来加载数据:
df_csv = spark.read.csv('python/test_support/sql/ages.csv') df_json = spark.read.json('python/test_support/sql/people.json') df_txt = spark.read.text('python/test_support/sql/text-test.txt') df_parquet = spark.read.parquet('python/test_support/sql/parquet_partitioned') # read a table as a DataFrame df = spark.read.parquet('python/test_support/sql/parquet_partitioned') df.createOrReplaceTempView('tmpTable') spark.read.table('tmpTable')
还能够经过jdbc,从JDBC URL中构建DataFrame
jdbc(url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, predicates=None, properties=None)
用于把DataFrame写入到外部存储系统中,经过DataFrame.write来访问。
(df.write.format('parquet') .mode("overwrite") .saveAsTable('bucketed_table'))
函数注释:
saveAsTable
(name, format=None, mode=None, partitionBy=None, **options):把DataFrame 存储为表save
(path=None, format=None, mode=None, partitionBy=None, **options):把DataFrame存储到数据源中对于不一样的格式,DataFrameWriter类有细分的函数来加载数据:
df.write.csv(os.path.join(tempfile.mkdtemp(), 'data')) df.write.json(os.path.join(tempfile.mkdtemp(), 'data')) df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data')) df.write.txt(os.path.join(tempfile.mkdtemp(), 'data')) #wirte data to external database via jdbc jdbc(url, table, mode=None, properties=None)
DataFrame等价于Spark SQL中的关系表,
1,常规操做
从parquet 文件中读取数据,返回一个DataFrame对象:
people = spark.read.parquet("...")
从DataFrame对象返回一列:
ageCol = people.age
从DataFrame对象中row的集合:
people.collect()
从DataFrame对象中删除列:
people.drop(*cols)
2,建立临时视图
能够建立全局临时视图,也能够建立本地临时视图,对于local view,临时视图的生命周期和SparkSession相同;对于global view,临时视图的生命周期由Spark application决定。
createOrReplaceGlobalTempView(name)
createGlobalTempView(name)
createOrReplaceTempView(name)
createTempView(name)
3,DataFrame数据的查询
df.filter(df.age > 3) df.select('name', 'age') # join cond = [df.name == df3.name, df.age == df3.age] df.join(df3, cond, 'outer').select(df.name, df3.age) #group by df.groupBy('name').agg({'age': 'mean'})
DataFrame.groupBy() 返回的是GroupedData类,能够对分组数据应用聚合函数、apply函数。
df3.groupBy().max('age', 'height').collect()
请参考官方手册,再也不赘述。
参考文档: