Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.html
Spark是一个快速且多功能的集群计算系统。它为多种不一样语言提供高级API,和支持通常执行图的优化引擎。它也有丰富的高级工具集,Spark SQL进行结构化数据的处理,MLib处理机器学习,GraphX进行图处理,以及Spark Streaming流计算。java
它的主要组件有:算法
SparkCoresql
SparkSQLshell
SparkStreamingexpress
MLlibapache
GraphX编程
BlinkDBjson
Tachyon数组
返回一个包含数据集前n个元素的数组
编写代码
object WordCountDemo { def main(args: Array[String]): Unit = { //建立Spark配置对象 val conf = new SparkConf().setMaster("local").setAppName("MyApp") //经过conf建立sc val sc = new SparkContext(conf) //读取文件 val rdd1 = sc.textFile("/Users/README.md") //计算 val rdd2 = rdd1.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_) //打印 rdd2.take(10).foreach(println) } }
public class WordCountJavaDemo { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName("myapp").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> rdd1 = sc.textFile("/Users/README.md"); JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { List<String> list = new ArrayList<>(); String[] arr = s.split(" "); for (String ss : arr) { list.add(ss); } return list.iterator(); } }); JavaPairRDD<String, Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }); JavaPairRDD<String, Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); List<Tuple2<String, Integer>> list = rdd4.collect(); for (Tuple2<String, Integer> t : list) { System.out.println(t._1() + " " + t._2()); } } }
RDD会在多个节点上存储,就和hdfs的分布式道理是同样的。hdfs文件被切分为多个block存储在各个节点上,而RDD是被切分为多个partition。不一样的partition可能在不一样的节点上。
一、Driver 分发task,在分发以前,会调用RDD的方法,获取partition的位置。 将task的计算结果,拉回到Driver端 Driver是一个JVM进程
二、Worker
图中stage2的并行度是4,也就是有4个task。
宽依赖
父RDD与子RDD,partition的关系是一对多,就是宽依赖。宽依赖于shuffle对应。
窄依赖
父RDD与子RDD,partition的关系是一对一或多对一,就是窄依赖。
特色:懒执行
(1)map
map的输入变换函数应用于RDD中全部元素
(2)flatMap
flatMap与map区别在于map为“映射”,而flatMap“先映射,后扁平化”,map对每一次(func)都产生一个元素,返回一个对象,而flatMap多一步就是将全部对象合并为一个对象。
(3)flatMapValues
每一个元素的Value被输入函数映射为一系列的值,而后这些值再与原RDD中的Key组成一系列新的KV对。
代码
x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])]) def f(x): return x x.flatMapValues(f).collect()
打印结果
[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
filter
过滤操做,知足filter内function函数为true的RDD内全部元素组成一个新的数据集。
(4)groupByKey
主要做用是将相同的全部的键值对分组到一个集合序列当中,其顺序是不肯定的。
(5)reduceByKey
与groupByKey相似,却有不一样。如(a,1), (a,2), (b,1), (b,2)。groupByKey产生中间结果为( (a,1), (a,2) ), ( (b,1), (b,2) )。而reduceByKey为(a,3), (b,3)。
reduceByKey主要做用是聚合,groupByKey主要做用是分组。
(6)take
特色:当即触发执行
Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations. There are several ways to interact with Spark SQL including SQL and the Dataset API. When computing a result the same execution engine is used, independent of which API/language you are using to express the computation. This unification means that developers can easily switch back and forth between different APIs based on which provides the most natural way to express a given transformation.
SparkSQL是Spark的一个用来处理结构化数据的模块。使用相似SQL的方式访问Hadoop,实现MR计算。
A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.). The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar.
Dataset是分布式数据集合。
A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of Rows. In the Scala API, DataFrame is simply a type alias of Dataset[Row]. While, in Java API, users need to use Dataset<Row> to represent a DataFrame.
(1)建立DataFrames
数据
{"id":"1","name":"zhangsan","age":"12"} {"id":"2","name":"lisi","age":"12"} {"id":"3","name":"wangwu","age":"12"}
代码
object SparkSqlDemo { def main(args: Array[String]): Unit = { //建立Spark配置对象 val conf = new SparkConf().setMaster("local[4]").setAppName("MyApp"); val spark = SparkSession .builder() .appName("Spark SQL basic example") .config(conf) .getOrCreate() val df = spark.read.json("/Users/opensource/dev-problem/source/people_sample_json.json"); df.show() } }
(2)查询
val df = spark.read.json("/Users/fangzhijie/opensource/dev-problem/source/people_sample_json.json"); df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people WHERE name = 'zhangsan'") sqlDF.show()
Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.
(1)简单使用
object SparkStreamingDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") //建立Spark流上下文 val ssc = new StreamingContext(conf, Seconds(1)) //建立Socket文本流 val lines = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print() //启动 ssc.start() //等待结束 ssc.awaitTermination() // Wait for the computation to terminate } }
使用shell命令监听端口,输入待计算内容
$ nc -lk 9999
SparkStreaming的编程抽象是离散化流(DStream),它是一个RDD序列,每一个RDD表明数据流中一个时间片内的数据。
Spark Quick Start Spark32个经常使用算子总结 SparkSQL Guide SparkSQL官方文档 《Spark快速大数据分析》 SparkStream官方文档
原文出处:https://www.cnblogs.com/fonxian/p/11887518.html