大数据学习——spark笔记

变量的定义

val a: Int = 1
var b = 2

方法和函数

区别:函数能够做为参数传递给方法
方法:
        def test(arg: Int): Int=>Int ={
            方法体
        }
        val fun = (test _: Int =>(Int=>Int))=>函数体

逻辑执行语句

val a = if(条件){

执行逻辑
返回值
}else{
执行逻辑
}

while(条件){
执行逻辑
}

val arr = Array(1,2,3,4,5)  
for(i <- 0 to arr.length ){
    arr(i)
}

for(i <- arr){
    i
}

集合操做

Array ArrayBuffer List ListBuffer set Map tuple

val arr = Array(1,2,3,4,5)
    arr(0)
    arr += 9
val arrb = ArrayBuffer(1,2,3,4,5)
    arrb(0)
val list = List(1,2,3,4)

val tuple = (1,"string")
tuple._1

val map = Map("a"->1)
val map = Map(("a",1))

类(重要)

类的主构造器:主构造器里面的变量会被执行,方法会被加载,调用的方法会被执行
calss Test(){
    val int = 1
    def test(){
    }

    …………
    …………
    …………
    test

}

辅助构造器:重载

extends with

集合的高级操做(重要)

map:将集合中的变量循环出来作操做
flatMap:将集合中的参数压循环出来作操做
val arr = Array("hello tom","hello lilei","hello hanmeimei")
map:(hello tom),(hello lilei),(hello hanmeimei) 
flatMap:(hello tom hello lilei hello hanmeimei)
filter:过滤想要的元素
groupBy:按照key进行分组,分组以后value合并到Array
mapValues:针对kv类型的数据,只对value进行操做
sortBy:针对某个元素进行排序
val arr Array("hello tom","hello lilei")
val result =  arr.flatMap(x => x.split(" ")).map((_,1)).groupBy(_._1).mapValues(_.size).toList.sortBy(_._2).recerse
val result = arr.flatMap(_.split(" ")).map((_,1)),reduceByKey(_+_).sortBy(_._2,true)

高级特性

高阶函数:把函数做为参数传递给方法或者函数,函数在函数式编程中是第一位的。
    map(函数)

隐式转换(PreDef):对类的加强,Int类没有to这个方法,而后再RichInt类中包含这个方法,咱们只须要在某个地方将Int转换成RichInt,而后在用的地方import就ok了
class RichFile(file: File){
def read(file:File):String={
    Source.fromFile(file.getPath).mkString
}

}
object RichFile{

implict def file2RichFile(file:File)=RichFile(file)
}

object Test{
def main(args:Array[String]){
    import RichFile.file2RichFile
    val file = new File("c://words.txt").read
}

}


柯里化:将原来接收多个参数的方法或者函数,编程接收一个一个的方法或者函数,返回的是函数

    def test(a:Int)(b:Int)(c:Int){
        a+b+c
    }
    val fun = def(1) _

actor 并发编程的接口(很是重要)

actor:用消息传递的方式实现了并发编程,写起来像线程,玩起来像socket
AKKA:actorSystem actOf

spark(what、how、why、use、运维<源码的理解>)

课程目标

一、知道spark是干啥的
二、会安装spark
三、会写spark程序(scala、python、R、java)

什么是spark?

内存迭代式计算,利用DAG有向无环图
特别很是快:在硬盘快mr10x,在内存,落你一条街100x
易用性:代码写的少,能够用n中语言写,你mr就一种
通用性:我集成了core、sql、streaming、MLlib、graphx,能交互
无处不在:数据源多种(hdfs、hbase、mysql、文件),计算平台多种(standalone、YARN、mesos)

how1(部署)

一、下载安装包
二、上传包
三、解压
四、重命名
五、修改环境变量
六、修改配置文件(重要,去官方文档看(别人的帖子,例如:www.wangsenfeng.com)、全部集群跑不起来都在这,经过log文件查看)
七、下发(scp)
八、修改其余机器的配置(可选)
九、格式化(可选)
十、启动集群(注意依赖关系)

启动

方式1:
    standalone-单master:
                        java_home、masterip、masterport、hadoopconf
方式2:
    standalone-多master:
                        java_home、masterport、hadoopconf、zookeeper

运行shell

运行spark-shell的两种方式:
一、直接运行spark-shell
    单机经过多线程跑任务,只在运行spark-shell的地方运行一个进程叫sparksubmit
二、运行spark-shell --master spark://master1:7077
    将任务运行在集群中,在运行spark-shell的机器上运行sparksubmit进程,运行executor在worker上

用api开发spark代码

一、建立项目
二、到pom.xml(在day01中)
三、建立scala类
    import org.apache.spark.SparkContext //一切任务的起源,全部的计算的开头。(上下文)
    import org.apache.spark.SparkConf   //spark的配置信息,至关于mr当中的那个conf,他会覆盖掉默认的配置文件(若是你进行了配置),他的主要做用,这只app的名字,设置时运行本地模式仍是集群模式
四、写代码(参考官方文档)
    若是是在windows上运行,设置setMaster("local[n]")
    若是是线上运行,把setMaster("local[n]")去掉,或者setMaster("spark://master1:7077")(不建议)
    注意两个关键词:transformation,action

提交任务到集群

一、打jar包,去掉setMaster
二、将jar上传到linux
三、执行命令 
    spark-submit \
    --master spark://master1:7077 \
    --executor-memory 512M \
    --total-executor-cores 2 \
    --class org.apache.spark.WordCount \
    xxx.jar \
    in  \   
    out \

用 python开发spark程序

一、开发python的程序
二、运行在集群,用spark-submit

用R开发spark

一、先安装R
    yum –y install gcc gcc-c++,
    yum –y install gcc-gfortran
    yum –y install readline-devel
    yum –y install libXt-devel
    yum –y install fonts-chinese tcl tcl-devel tclx tk tk-devel
    yum -y install epel-release
    vim /etc/yum.repos.d/epel.repo
    将
    #baseurl
    mirrorlist
    改为
    baseurl
    #mirrorlist
    yum -y install R 安装R语言

    二、而后按照官网的玩

    单机启动
     sparkR

    启动standalone
    sparkR --master spark://master1:7077

    启动yarn
    sparkR --master yarn-client 

    从hive读数据等
     sparkR --driver-class-path /home/hadoop/spark/lib/mysql-connector-java-5.1.35-bin.jar

    集群提交
    spark-submit examples/src/main/r/dataframe.R

    三、监控
    http://master1ha:4040/

思考问题

一、什么是RDD
二、什么是stage
三、什么是DAG

随堂问题

一、老师好,刚刚那个mr的container,是由resourceManager建立好,而后序列化后,再给NodeManager那些来反序列化的吗?
答:是由resourcemanager建立好序列化发给applicationMaster,而后applicationMaster找nodemanager去启动资源
二、老师,刚才那个执行结果分红两个文件,它的分区机制是将不一样的单词进行hash 吗?
答:是的,hash分区
三、在集群上,R运行须要安装R,Python文件运行,须要安装Python么?
答:须要安装,linux默认帮咱们安装了python

复习

什么spark?

内存迭代式计算,每一个算子将计算结果保存在内存中,其余算子,读取这个结果,继续计算
4个特性:快(10x、100x),易用性(代码优美、能够用4种语言开发\依赖外部数据源(hdfs、本地文件、kafka、flume、mysql))、
通用性(cores、sql、streaming、MLlib、graphx,交互使用)、随便那个平台均可以跑(standalone、yarn、mesos)

搭建spark

一主多从:
        一、下载安装包(依赖的hadoop的版本,source是下载源码的)
        二、上传到集群
        三、解压
        四、重命名(版本更新不须要修改环境变量)
        五、修改环境变量(root)
        六、修改配置文件(spark-env.sh:JAVA_HOME,master_ip,master_port,hadoop_conf_dir、java_opts(-D);slaves(从的域名))
        七、下发(scp)
        八、启动集群(start-all.sh;start-master.sh;start-slave.sh master的地址)
        九、spark的协议:spark://master:7077
        十、浏览器端口:master:8080
        十一、R语言的浏览器任务查看:masterR:4040

多主多从:多加了zookeeper调度(选举机制)

命令行

一、spark-shell:在当台机器上启动一个进程sparksubmit,经过多线程的方式模拟集群
二、spark-shell --master spark://master1:7077:启动的事集群版shell,任务会提交到集群运行,
    在当前的机器启动的集成sparksubmit,在丛集器启动的集成叫xxxxexecutorbackend
    默认没有加从机器的cores和memory参数,会在每台丛集器启动一个executor进程,
    若是加了--total-executor-cores n会启动n个executor进程

命令行版的wordcount

注意:在sparkshell中帮咱们默认加载了SparkContext,并命名为sc;也帮咱们建立了SparkConf,而且设置了appname(“sparkshell”),而且设置了setmaster(“local/spark://...”)
sc.textFile("file:///... ; hdfs://...").flatMap(_.split(" ")).map((_,1)).reduceByKey((x,y) => x+y).sortBy(_._2,false).saveAsTextFile("hdfs://...")

spark的api操做

一、scala
二、python
    #!/usr/bin/python
    from pyspark import SparkConf , SparkContext
    sc.textFile("hdfs://...").flatMap(lambda x: x.split(" ")).map(lambda y:(y,1)).reduceByKey(lambda x,y:x+y).saveAsTextFile("hdfs://...")
三、R
    ......
四、java
    ......

RDD

目标

一、什么是RDD?
二、RDD的建立方式和依赖关系
三、DAG有向无环图的意义
四、掌握划分stage的过程
五、掌握RDD的全部操做!!!!

什么是RDD?

RDD(Resilient Distributed Datasets )定义为弹性的分布式数据集,包含了只读的、分区的、分布式计算的概念;RDD是个类
一、一个数据分区的列表(hdfs的全部数据块的位置信息,保存在我RDD类成员变量Array中)
二、保存了在数据块上的计算方法,这个计算方法会应用到每个数据块上
三、一个对于其余RDD的依赖,是一个集合,spark就是经过这种依赖关系,像流水线同样处理咱们的数据,
    当某个分区的数据计算失败,只须要根据流水线的信息,从新计算这一个分区的数据便可,不须要计算所有数据
四、分区方式(partitioner),决定RDD数据来源的分区和数据计算后的分区:hashpartitioner;rangepartitioner
五、位置相关性(hdfs)

如何建立RDD

一、经过序列化集合的方式建立RDD(parallelize,makeRDD)
二、经过读取外部的数据源(testFile)
三、经过其余的rdd作transformation操做转换成新的RDD

RDD的两钟算子:
一、transformation:
    经过算法对RDD进程转换,延迟加载的一个处理数据及的方法:
    map flatMap reduceByKey 
二、Action:
    触发整个job进行计算的算子
    collect top first saveAsTextFile

广播(broadcast)变量

:其普遍用于广播Map Side Join中的小表,以及广播大变量等场景。这些数据集合在单节点内存可以容纳,不须要像RDD那样在节点之间打散存储。
Spark运行时把广播变量数据发到各个节点,并保存下来,后续计算能够复用。相比Hadoop的distributed cache,广播的内容能够跨做业共享。
Broadcast的底层实现采用了BT机制

ipLocation

一、广播变量
二、ip转long(分金定穴循八卦,toolong插棍左八圈)
三、二分法查找:(上下循环寻上下,左移右移寻中间)
四、分区存数据库(foreachPartition)

做业:

一、把全部的算子运行一遍
二、把iplocation的思想理解,代码运行
三、有富余时间的状况下,敲一个iplocation就好了

问题

一、每一个stege是做为一个任务总体,序列化后发送给一台机器反序列话执行吗?里面包含的多个RDD是串联起来工做的吗?
答:是的
二、MapReduce中MRappmaster,启动mapTask的时候,那个map类实例是否是已经序列化而且被包含在ResourceManager的任务队列中的任务对象中?
答:是的
三、老师,只有对于于key-value的RDD,才会有Partitioner,怎么理解??
答:kv型数据的RDD按照Key进行分组操做,非kv的数据不须要分组操做,由于没有响应的算子提供
四、讲解RDD的时候,能够把跟综进源码的路径加在笔记里吗?但愿能够在阅读源码的基础上理解RDD
答:经过crtl+shift+R打开源码RDD.scala就能查看了
五、还有那个分片的工做,是任务提交以前就作好了吧,MapReduce的job.split文件好像就是在任务提交以前就在客户端经过fileinputformat已经分好了,而后再发送到HDFS上
答:对的,咱们的分片也是作好了以后发送任务
六、getPartitions方法在整个运行过程当中总共会调用几回? 数据都是分开运行的吗? 若是是分开运行的,那只须要在第一个MapRDD调用一次。请问这样理解对吗?
答:getPartitions是在任务开始以前调用一次,拿出分区的地址进行分发任务

复习

一、什么是RDD
    一个分区的列表(FileSplit),决定读取的文件在哪
    一个应用在每一个分区上的算子
    一个对其余RDD的依赖集合
    可选:一个决定数据存储时的分区方式
    可选:若是在yarn上运行,决定数据本地运行的方式,移动数据不如移动计算
二、如何建立RDD
    一、经过序列化集合的方式(makeRDD、parallelize)
    二、经过读取文件的方式
    三、经过其余的RDD进行transformation转换而来
三、RDD的算子
    transformation:(懒加载)
    map、flatMap、filter、mapPartition、groupByKey、reduceByKey、union、intersaction、distinct、aggregateByKey
    Action:(触发任务的进行)
    top、take、first、count、collect、foreach、savaAsTextFile、reduce

四、iplocaltion:(ip的热力图)
    一、广播变量:共享的内存,只读的,只能追加的
    二、ip转long:分金定穴循八卦、toolong插棍左八圈
    三、二分法查找:上下循环寻上下,左移右移寻中间
    四、foreachPartition:对每一个分区的数据进行操做,能够在分区操做的时候建立外部连接(jedis、mysql、hbase)

目标:

一、掌握RDD的stage划分
二、掌握DAG概念
三、学会使用如何建立RDD的缓存
四、学会使用如何建立RDD的checkpoint

RDD的依赖关系

宽依赖:依赖的RDD产生的数据不仅是给我用的。父RDD不仅包含一个子RDD的数据(多对对),非独生子女
窄依赖:依赖的RDD产生的数据只给我本身。父RDD只包含一个子RDD的数据(一对1、一对多)。独生子女
Lineage(血统):RDD只支持粗粒度转换,即在大量记录上执行的单个操做。将建立RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。
        RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它能够根据这些信息来从新运算和恢复丢失的数据分区。

找依赖关系划分stage的目的

一、如何经过stage的划分设置缓存
    一、在窄依赖想设置缓存的时候,用cache
    二、在宽依赖想设置缓存的话,用checkpoint

如何设置cache和checkpoint

cache:
        someRDD.cache():将缓存放到内存中
        someRDD.persist(StorageLevel.MEMORY_AND_DISK):根据本身的须要设置缓存的位置(内存和硬盘)

checkPoint:能够吧RDD计算后的数据存储在本地的磁盘,也能够是hdfs
        sc.setCheckpointDir("hdfs://master1:9000/ceshi/checkpoint")//设置checkpoint的路径
        someRDD.checkpoin()

何时设置缓存,何时设置checkpoint

遇到宽依赖设置checkpoint,窄依赖想缓存的话设置cache

cache 和 checkpoint的区别?

cache只是缓存数据,不改变RDD的依赖关系
checkpoint是生成了一个新的RDD,后面的RDD依赖的关系已经改变。

checkpoint--》cache--》重算

四个案例

一、pv:点击率
二、uv:在线用户数
三、topk:微博热门词汇
四、moblelocation:统计家庭位置和工做位置

什么是spark-sql

至关于hive

书写代码的两种模式

datafream:spark-sql本身的语法

sql:spark-sql集成sql的语法
一、经过sc加载任意类型数据
二、建立case class Person(id:Int , name:String , age:Int)(表结构)
三、将数据添加到表结构中map
四、注册表
五、经过sqlContext.sql()

spark-sql的api

两种模式(表的schema加载的两种模式)
一、经过case class的方式加载表结构
二、经过StructType去本身定义表结构

做业

一、moblelocation回去运行一遍,若是有富余时间敲几遍
二、把sparl-sql的命令行和代码敲一遍

复习

RDD的依赖关系

一、宽依赖(多对多)
二、窄依赖(一对一 和 多对一)

经过宽依赖和窄依赖划分stage

一、遇到宽依赖,宽依赖到上一个宽依赖之间的全部窄依赖是一个stage
二、stage之间有包含关系

划分stage的目的

一、用来划分task
二、用来指导什么地方须要设置什么样的缓存(cache、checkpoint)

如何设置缓存

一、someRDD.cache()
二、someRDD.persist(StorageLeavel.MEMORY_AND_DISK_2)
三、sc.setCheckPointDir("hdfs://...")
    someRDD.checkpoint()

DAG

一个任务组成的流水线就是DAG(DAGscheduler)
DAG能够划分红n个stage
stage对应n个RDD
把stage封装成Task(stage),把task分发下去(TaskScheduler)

PV UV topK

pv:点击率
sc.textFile("hdfs://..").map(("pv",1)).reduceByKey(_+_).saveAsTextFile("hdfs://...")

uv:在线用户量:经过ip去重,按照(“uv”,1)

topK:微博热门词汇
    top谁--》wordcount--》排序--》take(正序)top(倒叙)

环比的pv uv

网站分析的文档

mobileLocation(家庭位置、工做位置)

一、先将数据进行清洗(家庭、工做)
二、针对家庭和工做进行重复数据收集
三、分别对家庭和工做作计算(尾-时间,时间-头)
四、数据去重
五、转转转(手机号,(基站id,时间total))-》join(基站id)找坐标

spark-sql

==hive

操做的两种方式

一、datafream
    一、建立SqlContext(sc)
    二、经过sc读取数据
    三、经过case class或者是structType建立表结构
    四、将数据加载到表结构中(Person或者Row)
    五、隐式转换sqlContext.implict._
    六、将RDD转换为DF//show
    七、注册成表
    八、sqlContext.sql("").show // write.

目标

一、利用spark-sql从mysql中读写数据
二、spark-sql能不能集成hive使用
三、练习
一、spar-streaming(对比storm)
二、flume+spark-streaming
三、kafka+spark-streaming

如何从mysql中读数据

一、必须有mysql的driver(上传mysql的jar包)
二、加载mysql包(spark-shell --master spark://master1:7077 --jars mysql.jar --driver-class-path mysql.jar)
三、读取数据的时候,设置(sqlContext.read.format("jdbc").options(Map("url"->"jdbc:mysql://192.168.56.204/bigdata","driver"->"com.mysql.jdbc.Driver","dbtable"->"dept","user"->"root","password"->"root")).load())
四、mysql中的表结构会读吗?(有帮咱们加载表结构)

往mysql中写数据

一、须要mysql的jar包
二、sc读数据
三、datefream.write.mode("append"/"overwrite").jdbc("url","table",properties(user,password))

hive on spark-SQL

一、安装hive,修改元数据库,加上hive-site.xml(mysql链接)
二、将hive-site.xml文件拷贝到集群的conf下
三、强mysql.jar拷贝到spark的lib下
四、执行:sqlContext.sql("select * from table1")
                                            .show()  
                                            .write.mode("append").jdbc()    
                                            .foreachPartition(it => {
                                                一、初始化链接
                                                it.map(x =>{
                                                二、写数据到存储层
                                                })
                                                三、关链接
                                            })

什么是spark-streaming?

spark流失处理的框架,可以很容易的构建容错、高可用的计算模型
特色:一、易用;二、容错;三、集成;

spark-streaming和spark的批处理有什么关系?

spark-streaming是小批量的RDD处理方式

spark-streaming的应用

从tcp的client中读取数据,进行汇总操做
还以从flume中读取数据
    poll:ip地址以flume为主
    push:IP地址以streaming为主

还能够从kafka中读取数据