1、
如下环境在pycharm中运行java
确保jdk,hadoop,spark,scala 软件在电脑端安装完毕,环境搭建python
pycharm 环境配置es6
环境变量配置:apache
PYTHONUNBUFFERED=1; -u 不缓冲stdin、stdout和stderr 默认是缓冲的。同PYTHONUNBUFFERED=1 不用buffer的意思数组
PYTHONPATH=C:\spark-2.2.0-bin-hadoop2.7\python;jvm
SPARK_HOME=C:\spark-2.2.0-bin-hadoop2.7\分布式
from pyspark import SparkContext,SparkConf conf = SparkConf().setMaster("local").setAppName("liujinjie") sc = SparkContext(conf=conf) textFile = sc.textFile("D:\hello.txt") wordCount = textFile.flatMap(lambda line: line.split(",")).map(lambda word:(word,1)).reduceByKey(lambda a,b :a + b) wordCount.foreach(print)
其中flatMap,Map,reduceByKey 含义讲解 link函数
2、oop
scala 代码spa
collect: 收集一个弹性分布式数据集的全部元素到一个数组中,这样便于咱们观察,毕竟分布式数据集比较抽象。Spark的collect方法,是Action类型的一个算子,会从远程集群拉取数据到driver端。最后,将大量数据
聚集到一个driver节点上,将数据用数组存放,占用了jvm堆内存,很是用意形成内存溢出,只用做小型数据的观察。
val arr = res.collect();
import org.apache.spark.{SparkContext, SparkConf} object WordCount { def main(args: Array[String]) { /* 建立Spark配置对象SparkConf,设置Spark程序运行时的配置信息, SetMaster 来设置程序要连接的Spark集群Master的URL,若是设定为local 则表明在本地运行, setAppName 设定应用程序名称,在程序运行监控界面可看到名称 */ val conf = new SparkConf().setMaster("local").setAppName("testRdd") val sc = new SparkContext(conf) // 等价与 val sc = new SparkContext("local","testRdd") /** * 建立SparkContext对象,SparkContext是全部程序功能的惟一入口,不管用Scala、Java、Python、R等都必需要有个SparkContext * SparkContext核心做用是初始化spark应用程序所需核心组件,包含DAGScheduler,TaskScheduler,SchedulerBackend * 还负责 spark 程序往master 注册程序等, 是整个应用程序最重要的一个对象, 经过传入conf 定制spark运行的具体参数跟配置信息 */ /** * 3.根据具体数据的来源(HDFS,HBase,Local,FS,DB,S3等)经过SparkContext来建立RDD; * RDD的建立基本有三种方式:根据外部的数据来源(例如HDFS)、根据Scala集合、由其余的RDD操做; * 数据会被RDD划分红为一系列的Partitions,分配到每一个Partition的数据属于一个Task的处理范畴; */ /** * 4.对初始的RDD进行Transformation级别的处理,例如map,filter等高阶函数的变成,来进行具体的数据计算 * 4.1.将每一行的字符串拆分红单个单词 */ //对每一行的字符串进行拆分并把全部行的拆分结果经过flat合并成一个大的集合 val lines = sc.textFile("d://hello.txt") val words = lines.flatMap(_.split(",")) val pairs = words.map{word =>(word,1)} val wordCounts = pairs.reduceByKey(_ + _) wordCounts.foreach(pair => println(pair._1 + ":" + pair._2)) sc.stop() // data.flatMap(_.split(","))//下划线是占位符,flatMap是对行操做的方法,对读入的数据进行分割 // .collect()//将分布式的RDD返回一个单机的scala array,在这个数组上运用scala的函数操做,并返回结果到驱动程序 // .foreach(println)//循环打印 } }
scala> List(1, 2, 3, 4, 5, 6) collect { case i if i % 2 == 0 => i * i } res0: List[Int] = List(4, 16, 36) scala> List(1, 2, 3, 4, 5, 6) collect { case i if i % 2 == 0 => i+1 } res1: List[Int] = List(3, 5, 7) scala> List(1, 2, 3, 4, 5, 6) collect { case i => i+1 } res2: List[Int] = List(2, 3, 4, 5, 6, 7) scala> List(1, 2, 3, 4, 5, 6) collect { case i =>(i, i+1 )} res3: List[(Int, Int)] = List((1,2), (2,3), (3,4), (4,5), (5,6), (6,7)) scala> List(1, 2, 7, 4, 9, 6) collect { case i =>(i, i+1 )} res4: List[(Int, Int)] = List((1,2), (2,3), (7,8), (4,5), (9,10), (6,7)) scala> List(1, 2, 3, 4, 5, 6) collect { case i if (i=i+1;i)=> i * i } <console>:1: error: ')' expected but ';' found. List(1, 2, 3, 4, 5, 6) collect { case i if (i=i+1;i)=> i * i } ^ <console>:1: error: ';' expected but ')' found. List(1, 2, 3, 4, 5, 6) collect { case i if (i=i+1;i)=> i * i } ^ scala> List(1, 2, 3, 4, 5, 6) collect { case i if (i=i+1;true)=> i * i } <console>:1: error: ')' expected but ';' found. List(1, 2, 3, 4, 5, 6) collect { case i if (i=i+1;true)=> i * i } ^ <console>:1: error: ';' expected but ')' found. List(1, 2, 3, 4, 5, 6) collect { case i if (i=i+1;true)=> i * i } ^ scala> List(1, 2, 3, 4, 5, 6) collect { case i if {i=i+1;true}=> i * i } <console>:8: error: reassignment to val List(1, 2, 3, 4, 5, 6) collect { case i if {i=i+1;true}=> i * i } ^ scala> List(1, 2, 3, 4, 5, 6) collect { case i if {i+1;true}=> i * i } res6: List[Int] = List(1, 4, 9, 16, 25, 36)