Spark基本命令

1、spark所在目录

cd usr/local/spark

2、启动spark

/usr/local/spark/sbin/start-all.sh

启动Hadoop以及Spark:html

bash ./starths.sh

浏览器查看:python

172.16.31.17:8080

中止Hadoop以及Sparksql

bash ./stophs.sh

3、基础使用

参考连接:https://www.cnblogs.com/dasn/articles/5644919.htmlshell

1.运行Spark示例(SparkPi)

在 ./examples/src/main 目录下有一些 Spark 的示例程序,有 Scala、Java、Python、R 等语言的版本。apache

咱们能够先运行一个示例程序 SparkPi(即计算 π 的近似值),执行以下命令:编程

./bin/run-example SparkPi 2>&1 | grep "Pi is roughly"

执行时会输出很是多的运行信息,输出结果不容易找到,能够经过 grep 命令进行过滤(命令中的 2>&1 能够将全部的信息都输出到 stdout 中,不然因为输出日志的性质,仍是会输出到屏幕中)json

Python 版本的 SparkPi 则须要经过 spark-submit 运行:浏览器

./bin/spark-submit examples/src/main/python/pi.py

2.经过Spark-shell进行交互分析

Spark Shell 支持 Scala 和 Python缓存

Scala 运行于 Java 平台(JVM,Java 虚拟机),并兼容现有的 Java 程序。bash

Scala 是 Spark 的主要编程语言,若是仅仅是写 Spark 应用,并不是必定要用 Scala,用 Java、Python 都是能够的。 使用 Scala 的优点是开发效率更高,代码更精简,而且能够经过 Spark Shell 进行交互式实时查询,方便排查问题。

2.1 启动Spark Shell

./bin/spark-shell

2.2 基础操做

Spark 的主要抽象是分布式的元素集合(distributed collection of items),称为RDD(Resilient Distributed Dataset,弹性分布式数据集),它可被分发到集群各个节点上,进行并行操做。RDDs 能够经过 Hadoop InputFormats 建立(如 HDFS),或者从其余 RDDs 转化而来。

2.2.1 RDD建立方式

参考连接:

https://blog.csdn.net/sundujing/article/details/51397209 http://www.javashuo.com/article/p-nimgstgx-go.html https://www.iteblog.com/archives/1512.html https://www.jianshu.com/p/a7503b26cb73

(1)使用本地文件建立RDD:

val textFile = sc.textFile("file:///usr/local/spark/README.md")

代码中经过 “file://” 前缀指定读取本地文件。

(2)使用hdfs文件建立RDD:

val textfile = sc.textFile("/winnie/htest/test01.txt")

Spark shell 默认是读取 HDFS 中的文件,须要先上传文件到 HDFS 中,不然会有“org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/local/spark/README.md”的错误。

(3)使用集合建立RDD:

从集合中建立RDD主要有下面两个方法:makeRDD 和 parallelize

makeRDD示例:

sc.makeRDD(1 to 50)

sc.makeRDD(Array("1", "2", "3"))

sc.makeRDD(List(1,3,5))

parallelize示例:

同上

2.2.2 RDD两种操做

参考连接: https://blog.csdn.net/HANLIPENGHANLIPENG/article/details/53508746

(1)RDDs支持两种类型的操做:

actions: 执行操做,在数据集上运行计算后返回值

transformations: 转化操做,从现有数据集建立一个新的数据集

转化操做并不会当即执行,而是到了执行操做才会被执行

转化操做:

map()          参数是函数,函数应用于RDD每个元素,返回值是新的RDD 
flatMap()      参数是函数,函数应用于RDD每个元素,将元素数据进行拆分,变成迭代器,返回值是新的RDD 
filter()       参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD 
distinct()     没有参数,将RDD里的元素进行去重操做 
union()        参数是RDD,生成包含两个RDD全部元素的新RDD 
intersection() 参数是RDD,求出两个RDD的共同元素 
subtract()     参数是RDD,将原RDD里和参数RDD里相同的元素去掉 
cartesian()    参数是RDD,求两个RDD的笛卡儿积

行动操做:

collect()       返回RDD全部元素 
count()         RDD里元素个数,对于文本文件,为总行数
first()			RRD中的第一个item,对于文本文件,为首行内容 
countByValue()  各元素在RDD中出现次数 
reduce()        并行整合全部RDD数据,例如求和操做 
fold(0)(func)   和reduce功能同样,不过fold带有初始值 
aggregate(0)(seqOp,combop) 和reduce功能同样,可是返回的RDD数据类型和原RDD不同 
foreach(func)   对RDD每一个元素都是使用特定函数

行动操做每次的调用是不会存储前面的计算结果的,若想要存储前面的操做结果,要把须要缓存中间结果的RDD调用cache(),cache()方法是把中间结果缓存到内存中,也能够指定缓存到磁盘中(也能够只用persisit())

(2)实例

textFile.count()
textFile.first()

val linesWithSpark = textFile.filter(line => line.contains("Spark")) // 筛选出包含 Spark 的行
linesWithSpark.count() // 统计行数

action 和 transformation 能够用链式操做的方式结合使用,使代码更为简洁:

textFile.filter(line => line.contains("Spark")).count()

找到包含单词最多的那一行内容共有几个单词:

textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)

代码首先将每一行内容 map 为一个整数,这将建立一个新的 RDD,并在这个 RDD 中执行 reduce 操做,找到最大的数。map()、reduce() 中的参数是 Scala 的函数字面量(function literals,也称为闭包 closures),而且能够使用语言特征或 Scala/Java 的库。

Hadoop MapReduce 是常见的数据流模式,在 Spark 中一样能够实现(下面这个例子也就是 WordCount):

val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)  // 实现单词统计
wordCounts.collect() // 输出单词统计结果

2.2.3 Spark shell退出

参考连接:http://www.javashuo.com/article/p-hwerenoj-db.html

:quit

3.Spark SQL 和 DataFrames

Spark SQL 是 Spark 内嵌的模块,用于结构化数据。

在 Spark 程序中能够使用 SQL 查询语句或 DataFrame API。

DataFrames 和 SQL 提供了通用的方式来链接多种数据源,支持 Hive、Avro、Parquet、ORC、JSON、和 JDBC,而且能够在多种数据源之间执行 join 操做。

3.1 Spark SQL 基本操做

Spark SQL 的功能是经过 SQLContext 类来使用的,而建立 SQLContext 是经过 SparkContext 建立的。

在 Spark shell 启动时,输出日志的最后有这么几条信息:

这些信息代表 SparkContent 和 SQLContext 都已经初始化好了,可经过对应的 sc、sqlContext 变量直接进行访问。

使用 SQLContext 能够从现有的 RDD 或数据源建立 DataFrames。

经过 Spark 提供的 JSON 格式的数据源文件 ./examples/src/main/resources/people.json 来进行演示,该数据源内容以下:


【补充:xshell上传文件】

rz

【补充:xshell下载文件】

sz
相关文章
相关标签/搜索