spark杂记

1.须要加上转义字符
java.util.regex.PatternSyntaxException: Unclosed character class near index 0
java.util.regex.PatternSyntaxException: Unexpected internal error near index 1

2.kafka中数据还没来得及消费,数据就已经丢失或者过时了;就是kafka的topic的offset超过range了,多是maxratePerPartition的值设定小了 [https://blog.csdn.net/yxgxy270187133/article/details/53666760]
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {newsfeed-100-content-docidlog-1=103944288}


3.内存参数过小 --executor-memory 8G \  --driver-memory 8G \
 Application application_1547156777102_0243 failed 2 times due to AM Container for appattempt_1547156777102_0243_000002 exited with exitCode: -104
For more detailed output, check the application tracking page:https://host-10-31-4-246:26001/cluster/app/application_1547156777102_0243 Then click on links to logs of each attempt.
Diagnostics: Container [pid=5064,containerID=container_e62_1547156777102_0243_02_000001] is running beyond physical memory limits. Current usage: 4.6 GB of 4.5 GB physical memory used; 6.3 GB of 22.5 GB virtual memory used. Killing container.


4.方法调用在方法定义以后
forward reference extends over definition of value xxx

*******************************************************************
https://blog.csdn.net/appleyuchi/article/details/81633335
pom中的provided指的是编译须要,发布不须要,当咱们经过spark-submit提交时,spark会提供须要的streaming包,而Intellij是经过java提交的,在运行时依然须要streaming的包,因此须要去掉.
1.解决方案:本地运行时注销掉<scope>provided</scope>,reimport maven projects
java.lang.ClassNotFoundException: org.apache.spark.SparkConf

2.

[ERROR] E:\git3_commit2\Newsfeed\Newsfeed\src\main\scala\com\huawei\rcm\newsfeed
\textcontent\wordToVecDocUser.scala:206: error: No org.json4s.Formats found. Try
 to bring an instance of org.json4s.Formats in scope or use the org.json4s.Defau
ltFormats.
[INFO]     val str = write(map)

添加
implicit val formats: DefaultFormats = DefaultFormats

3.



https://stackoverflow.com/questions/30033043/hadoop-job-fails-resource-manager-doesnt-recognize-attemptid/30391973#30391973

*******************************************************************
如何在IDEA 中使用Git
https://www.cnblogs.com/zbw911/p/6206689.html

*******************************************************************
(1)链接zk
cd /opt/FIC70_client/ZooKeeper/zookeeper/bin
./zkCli.sh -timeout 5000 -r -server 172.16.16.159:24002 (中间以,分割)
./zkCli.sh -timeout 5000 -r -server 10.31.7.209:24002


(2)断点续传的kafka能力
读取kafka的topic offset失败,致使contenanalysis启动失败。 或者是读取的偏移量与实际的不符,故去kafka时获取offset失败,或者offset的值错误。 须要重建offset。
经过zookeeper查询对应组id下的topic是存在
zkCli.sh --server  10.73.80.4:24002   zookeeper的地址端口经过配置文件能够查询, cd $KAFKA_HOME/conf cat server.propertes | grep "^zookeeper.connect"
经过ls命令查询对应的grounpid下的topic是不是否存在,或者offset是否存在  ls /consumers/[grounpid]/offsets/   
若是不存在,能够考虑重建topic 或者手动添加topic偏移量
create /consumers/[grounpid]/offsets/[topic]/[partition]/[offsetvalue]

 (3)ls命令
在ZK上运行ls /consumers/对应的分组/offset/对应的topic,就能够看到此topic下的全部分区了
ls  /consumers/Newsfeed.Entertainment.ContentAnalysis/offsets/newsfeed-100-contentdistribute-entertainment

(4)删除zk目录
rmr  /consumers/Newsfeed.Entertainment.ContentAnalysis/offsets/newsfeed-100-contentdistribute-entertainment
deleteall  /consumers/Newsfeed.Entertainment.ContentAnalysis/offsets/newsfeed-100-contentdistribute-entertainment

(5)get命令
get /consumers/对应的分组/offset/对应的topic/对应的分区号,能够查询到该分区上记录的offset

(6)set命令
set /consumers/对应的分组/offset/对应的topic/对应的分区号 修改后的值(通常为0),便可完成对offset的修

*******************************************************************
Applicatiion: 应用程序
Driver: 表示main()函数,建立SparkContext
Executor:
Worker: 集群中能够运行Application代码的节点.在Standalone模式中指的是经过slave文件配置的worker节点,在Spark on Yarn模式中指的就是NodeManager节点
Task:在Executor进程中执行任务的工做单元,多个Task组成一个Stage
Job:包含多个Task组成的并行计算,是由Action行为触发的

spark运行流程:
    (1)构建Spark Application的运行环境(启动SparkContext),SparkContext向资源管理器(能够是Standalone、Mesos或YARN)注册并申请运行Executor资源;
    (2)资源管理器分配Executor资源并启动StandaloneExecutorBackend,Executor运行状况将随着心跳发送到资源管理器上;
    (3)SparkContext构建成DAG图,将DAG图分解成Stage,并把Taskset发送给Task Scheduler。Executor向SparkContext申请Task
    (4)Task Scheduler将Task发放给Executor运行同时SparkContext将应用程序代码发放给Executor
    (5)Task在Executor上运行,运行完毕释放全部资源

*******************************************************************
1.Spark和Hadoop
    Hadoop的两个核心模块:分布式存储模块HDFS,分布式计算模块Mapreduce
    Spark主要是在计算模块取代了Mapreduce,存储模块仍是基于hadoop的HDFS

2. RDD(Resilient Distributed Dataset)弹性分布式数据集
    Spark中的RDD是一个不可变的分布式对象集合,有五大特性:
①有一个分片列表。就是能被切分,和hadoop同样的,可以切分的数据才能并行计算。
②有一个函数计算每个分片,这里指的是下面会提到的compute函数。
③对其余的RDD的依赖列表,依赖还具体分为宽依赖和窄依赖,但并非全部的RDD都有依赖。
④可选:key-value型的RDD是根据哈希来分区的,相似于mapreduce当中的Paritioner接口,控制key分到哪一个reduce。
⑤可选:每个分片的优先计算位置(preferred locations),好比HDFS的block的所在位置应该是优先计算的位置。(存储的是一个表,能够将处理的分区“本地化”)

3.RDD两种建立方式:
    ①(经常使用)读取外部数据集:
        val rdd1=sc.textFile("/path/to/readme.md")
    ②在驱动程序中对另外一个集合并行化:
        val rdd2=sc.parallelize(List("apple","banana","orange"))
        注:通常在开发原型或测试时才使用

4.Spark程序或shell会话工做流程
    ①从外部数据建立出输入RDD;
    ②使用诸如filter()等这样的转化操做对RDD进行转化,以定义新的RDD;
    ③告诉Spark对须要被重用的中间结果RDD执行persist()操做;
    ④ 使用诸如first()等这样的行动操做来触发一次并行计算,Spark会对计算进行优化后再执行。

5.RDD操做:
    3.1获取RDD
    ①从共享的文件系统获取(如:HDFS)
    ②经过已存在的RDD转换 ③将已存在scala集合(只要是Seq对象)并行化,经过调用SparkContext的parallelize方法实现    ④改变现有RDD的之久性;RDD是懒散,短暂的。(RDD的固化:cache缓存至内错;save保存到分布式文件系统)
    

    3.2.转化操做(返回一个新的RDD)
    ① map(func)
       返回一个新的分布式数据集,由每一个原元素通过func函数转换后组成
    ②filter(func)
    返回一个新的数据集,由通过func函数后返回值为true的原元素组成
    ③flatMap(func) 相似于map,可是每个输入元素,会被映射为0到多个输出元素(所以,func函数的返回值是一个Seq,而不是单一元素)
    ④sample(withReplacement,  frac, seed)
    根据给定的随机种子seed,随机抽样出数量为frac的数据
    ⑤union(other)
    返回一个新的数据集,由原数据集和参数联合而成
    ⑥groupByKey([numTasks]) 在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。注意:默认状况下,使用8个并行任务进行分组,你能够传入numTask可选参数,根据数据量设置不一样数目的Task
    ⑦reduceByKey(func,  [numTasks])    在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,key相同的值,都被使用指定的reduce函数聚合到一块儿。和groupbykey相似,任务的个数是能够经过第二个可选参数来配置的。
    ⑧join(otherDataset,  [numTasks]) 在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每一个key中的全部元素都在一块儿的数据集
    ⑨groupWith(otherDataset,  [numTasks])
    在类型为(K,V)和(K,W)类型的数据集上调用,返回一个数据集,组成元素为(K, Seq[V], Seq[W]) Tuples。这个操做在其它框架,称为CoGroup
    ⑩cartesian(otherDataset)
    笛卡尔积。但在数据集T和U上调用时,返回一个(T,U)对的数据集,全部元素交互进行笛卡尔积。

    3.3.行动操做(向驱动器程序返回结果或把结果写入外部系统的操做)
    ①reduce(func)     经过函数func汇集数据集中的全部元素。Func函数接受2个参数,返回一个值。这个函数必须是关联性的,确保能够被正确的并发执行
    ②collect()     在Driver的程序中,以数组的形式,返回数据集的全部元素。这一般会在使用filter或者其它操做后,返回一个足够小的数据子集再使用,直接将整个RDD集Collect返回,极可能会让Driver程序OOM
    ③count()     返回数据集的元素个数
    ④take(n)     返回一个数组,由数据集的前n个元素组成。注意,这个操做目前并不是在多个节点上,并行执行,而是Driver程序所在机器,单机计算全部的元素(Gateway的内存压力会增大,须要谨慎使用)
    ⑤first()     返回数据集的第一个元素(相似于take(1)
    ⑥saveAsTextFile(path)     将数据集的元素,以textfile的形式,保存到本地文件系统,hdfs或者任何其它hadoop支持的文件系统。Spark将会调用每一个元素的toString方法,并将它转换为文件中的一行文本
    ⑦saveAsSequenceFile(path)     将数据集的元素,以sequencefile的格式,保存到指定的目录下,本地系统,hdfs或者任何其它hadoop支持的文件系统。RDD的元素必须由key-value对组成,并都实现了Hadoop的Writable接口,或隐式能够转换为Writable(Spark包括了基本类型的转换,例如Int,Double,String等等)
    ⑧foreach(func)     在数据集的每个元素上,运行函数func。这一般用于更新一个累加器变量,或者和外部存储系统作交互

[注:惰性求值:RDD的转化操做是惰性求值的,即在被调用行动操做以前Spark不会开始计算]













经常使用术语:
①Application:用户编写的Spark应用程序,其中包括一个Driver功能的代码和分布在集群中多个节点上运行的Executor代码
②Driver: 运行Application的main函数并建立SparkContext,建立SparkContext的目的是为了准备Spark应用程序的运行环境,在Spark中有SparkContext负责与ClusterManager通讯,进行资源申请、任务的分配和监控等,当Executor部分运行完毕后,Driver同时负责将SparkContext关闭,一般用SparkContext表明Driver
③Executor:
④Cluter Manager:
⑤Worker:
⑥Task:
⑦Job:
⑧Stage:
⑨DAGScheduler





html

相关文章
相关标签/搜索