大数据相关面试题

kafka
1 什么是kafka
Kafka是分布式发布-订阅消息系统,它最初是由LinkedIn公司开发的,以后成为Apache项目的一部分,Kafka是一个分布式,可划分的,冗余备份的持久性的日志服务,它主要用于处理流式数据。html

2 为何要使用 kafka,为何要使用消息队列
缓冲和削峰:上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余,kafka在中间能够起到一个缓冲的做用,把消息暂存在kafka中,下游服务就能够按照本身的节奏进行慢慢处理。
解耦和扩展性:项目开始的时候,并不能肯定具体需求。消息队列能够做为一个接口层,解耦重要的业务流程。只须要遵照约定,针对数据编程便可获取扩展能力。
冗余:能够采用一对多的方式,一个生产者发布消息,能够被多个订阅topic的服务消费到,供多个毫无关联的业务使用。
健壮性:消息队列能够堆积请求,因此消费端业务即便短期死掉,也不会影响主要业务的正常进行。
异步通讯:不少时候,用户不想也不须要当即处理消息。消息队列提供了异步处理机制,容许用户把一个消息放入队列,但并不当即处理它。想向队列中放入多少消息就放多少,而后在须要的时候再去处理它们。java

3.Kafka中的ISR、AR又表明什么?ISR的伸缩又指什么
ISR:In-Sync Replicas 副本同步队列
AR:Assigned Replicas 全部副本
ISR是由leader维护,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度, 当前最新的版本0.10.x中只支持replica.lag.time.max.ms这个维度),任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。git

4.kafka中的broker 是干什么的
broker 是消息的代理,Producers往Brokers里面的指定Topic中写消息,Consumers从Brokers里面拉取指定Topic的消息,而后进行业务处理,broker在中间起到一个代理保存消息的中转站。github

5.kafka中的 zookeeper 起到什么做用,能够不用zookeeper么
zookeeper 是一个分布式的协调组件,早期版本的kafka用zk作meta信息存储,consumer的消费状态,group的管理以及 offset的值。考虑到zk自己的一些因素以及整个架构较大几率存在单点问题,新版本中逐渐弱化了zookeeper的做用。新的consumer使用了kafka内部的group coordination协议,也减小了对zookeeper的依赖,
可是broker依然依赖于ZK,zookeeper 在kafka中还用来选举controller 和 检测broker是否存活等等。数据库

6.kafka follower如何与leader同步数据
Kafka的复制机制既不是彻底的同步复制,也不是单纯的异步复制。彻底同步复制要求All Alive Follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率。而异步复制方式下,Follower异步的从Leader复制数据,数据只要被Leader写入log就被认为已经commit,这种状况下,若是leader挂掉,会丢失数据,kafka使用ISR的方式很好的均衡了确保数据不丢失以及吞吐率。Follower能够批量的从Leader复制数据,并且Leader充分利用磁盘顺序读以及send file(zero copy)机制,这样极大的提升复制性能,内部批量写磁盘,大幅减小了Follower与Leader的消息量差。apache

7.什么状况下一个 broker 会从 isr中踢出去
leader会维护一个与其基本保持同步的Replica列表,该列表称为ISR(in-sync Replica),每一个Partition都会有一个ISR,并且是由leader动态维护 ,若是一个follower比一个leader落后太多,或者超过必定时间未发起数据复制请求,则leader将其重ISR中移除 。编程

8.kafka 为何那么快
Cache Filesystem Cache PageCache缓存
顺序写 因为现代的操做系统提供了预读和写技术,磁盘的顺序写大多数状况下比随机写内存还要快。
Zero-copy 零拷技术减小拷贝次数
Batching of Messages 批量量处理。合并小的请求,而后以流的方式进行交互,直顶网络上限。
Pull 拉模式 使用拉模式进行消息的获取消费,与消费端处理能力相符。小程序

9.kafka producer如何优化打入速度
增长线程
提升 batch.size
增长更多 producer 实例
增长 partition 数
设置 acks=-1 时,若是延迟增大:能够增大 num.replica.fetchers(follower 同步数据的线程数)来调解;
跨数据中心的传输:增长 socket 缓冲区设置以及 OS tcp 缓冲区设置。vim

10.kafka producer 打数据,ack 为 0, 1, -1 的时候表明啥, 设置 -1 的时候,什么状况下,leader 会认为一条消息 commit了
1(默认) 数据发送到Kafka后,通过leader成功接收消息的的确认,就算是发送成功了。在这种状况下,若是leader宕机了,则会丢失数据。
0 生产者将数据发送出去就无论了,不去等待任何返回。这种状况下数据传输效率最高,可是数据可靠性确是最低的。
-1 producer须要等待ISR中的全部follower都确认接收到数据后才算一次发送完成,可靠性最高。当ISR中全部Replica都向Leader发送ACK时,leader才commit,这时候producer才能认为一个请求中的消息都commit了。数组

11.kafka unclean 配置表明啥,会对 spark streaming 消费有什么影响
unclean.leader.election.enable 为true的话,意味着非ISR集合的broker 也能够参与选举,这样有可能就会丢数据,spark streaming在消费过程当中拿到的 end offset 会忽然变小,致使 spark streaming job挂掉。若是unclean.leader.election.enable参数设置为true,就有可能发生数据丢失和数据不一致的状况,Kafka的可靠性就会下降;而若是unclean.leader.election.enable参数设置为false,Kafka的可用性就会下降。

12.若是leader crash时,ISR为空怎么办
kafka在Broker端提供了一个配置参数:unclean.leader.election,这个参数有两个值:
true(默认):容许不一样步副本成为leader,因为不一样步副本的消息较为滞后,此时成为leader,可能会出现消息不一致的状况。
false:不容许不一样步副本成为leader,此时若是发生ISR列表为空,会一直等待旧leader恢复,下降了可用性。

13.kafka的message格式是什么样的
一个Kafka的Message由一个固定长度的header和一个变长的消息体body组成
header部分由一个字节的magic(文件格式)和四个字节的CRC32(用于判断body消息体是否正常)构成。
当magic的值为1的时候,会在magic和crc32之间多一个字节的数据:attributes(保存一些相关属性,
好比是否压缩、压缩格式等等);若是magic的值为0,那么不存在attributes属性
body是由N个字节构成的一个消息体,包含了具体的key/value消息

14.kafka中consumer group 是什么概念
一样是逻辑上的概念,是Kafka实现单播和广播两种消息模型的手段。同一个topic的数据,会广播给不一样的group;同一个group中的worker,只有一个worker能拿到这个数据。换句话说,对于同一个topic,每一个group均可以拿到一样的全部数据,可是数据进入group后只能被其中的一个worker消费。group内的worker可使用多线程或多进程来实现,也能够将进程分散在多台机器上,worker的数量一般不超过partition的数量,且两者最好保持整数倍关系,由于Kafka在设计时假定了一个partition只能被一个worker消费(同一group内)。

15.Kafka中的消息是否会丢失和重复消费?
要肯定Kafka的消息是否丢失或重复,从两个方面分析入手:消息发送和消息消费。
一、消息发送
Kafka消息发送有两种方式:同步(sync)和异步(async),默认是同步方式,可经过producer.type属性进行配置。Kafka经过配置request.required.acks属性来确认消息的生产:
0---表示不进行消息接收是否成功的确认;
1---表示当Leader接收成功时确认;
-1---表示Leader和Follower都接收成功时确认;
综上所述,有6种消息生产的状况,下面分状况来分析消息丢失的场景:
(1)acks=0,不和Kafka集群进行消息接收确认,则当网络异常、缓冲区满了等状况时,消息可能丢失;
(2)acks=一、同步模式下,只有Leader确认接收成功后但挂掉了,副本没有同步,数据可能丢失;

二、消息消费
Kafka消息消费有两个consumer接口,Low-level API和High-level API:
Low-level API:消费者本身维护offset等值,能够实现对Kafka的彻底控制;
High-level API:封装了对parition和offset的管理,使用简单;
若是使用高级接口High-level API,可能存在一个问题就是当消息消费者从集群中把消息取出来、并提交了新的消息offset值后,还没来得及消费就挂掉了,那么下次再消费时以前没消费成功的消息就“诡异”的消失了;
解决办法:
针对消息丢失:同步模式下,确认机制设置为-1,即让消息写入Leader和Follower以后再确认消息发送成功;异步模式下,为防止缓冲区满,能够在配置文件设置不限制阻塞超时时间,当缓冲区满时让生产者一直处于阻塞状态;
针对消息重复:将消息的惟一标识保存到外部介质中,每次消费时判断是否处理过便可。
消息重复消费及解决参考:https://www.javazhiyin.com/22910.html

16.为何Kafka不支持读写分离?
在 Kafka 中,生产者写入消息、消费者读取消息的操做都是与 leader 副本进行交互的,从 而实现的是一种主写主读的生产消费模型。
Kafka 并不支持主写从读,由于主写从读有 2 个很明 显的缺点:
(1)数据一致性问题。数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间 窗口会致使主从节点之间的数据不一致。某一时刻,在主节点和从节点中 A 数据的值都为 X, 以后将主节点中 A 的值修改成 Y,那么在这个变动通知到从节点以前,应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。
(2)延时问题。相似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程须要经 历网络→主节点内存→网络→从节点内存这几个阶段,整个过程会耗费必定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它须要经历网络→主节点内存→主节点磁盘→网络→从节 点内存→从节点磁盘这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。

17.Kafka中是怎么体现消息顺序性的?
kafka每一个partition中的消息在写入时都是有序的,消费时,每一个partition只能被每个group中的一个消费者消费,保证了消费时也是有序的。
整个topic不保证有序。若是为了保证topic整个有序,那么将partition调整为1.

18.消费者提交消费位移时提交的是当前消费到的最新消息的offset仍是offset+1?
offset+1

19.kafka如何实现延迟队列?
Kafka并无使用JDK自带的Timer或者DelayQueue来实现延迟的功能,而是基于时间轮自定义了一个用于实现延迟功能的定时器(SystemTimer)。JDK的Timer和DelayQueue插入和删除操做的平均时间复杂度为O(nlog(n)),并不能知足Kafka的高性能要求,而基于时间轮能够将插入和删除操做的时间复杂度都降为O(1)。时间轮的应用并不是Kafka独有,其应用场景还有不少,在Netty、Akka、Quartz、Zookeeper等组件中都存在时间轮的踪迹。
底层使用数组实现,数组中的每一个元素能够存放一个TimerTaskList对象。TimerTaskList是一个环形双向链表,在其中的链表项TimerTaskEntry中封装了真正的定时任务TimerTask.
Kafka中究竟是怎么推动时间的呢?Kafka中的定时器借助了JDK中的DelayQueue来协助推动时间轮。具体作法是对于每一个使用到的TimerTaskList都会加入到DelayQueue中。Kafka中的TimingWheel专门用来执行插入和删除TimerTaskEntry的操做,而DelayQueue专门负责时间推动的任务。再试想一下,DelayQueue中的第一个超时任务列表的expiration为200ms,第二个超时任务为840ms,这里获取DelayQueue的队头只须要O(1)的时间复杂度。若是采用每秒定时推动,那么获取到第一个超时的任务列表时执行的200次推动中有199次属于“空推动”,而获取到第二个超时任务时有须要执行639次“空推动”,这样会无端空耗机器的性能资源,这里采用DelayQueue来辅助以少许空间换时间,从而作到了“精准推动”。Kafka中的定时器真可谓是“知人善用”,用TimingWheel作最擅长的任务添加和删除操做,而用DelayQueue作最擅长的时间推动工做,相辅相成。
参考:https://blog.csdn.net/u013256816/article/details/80697456

20.Kafka中的事务是怎么实现的?
参考:https://blog.csdn.net/u013256816/article/details/89135417

21.Kafka中有那些地方须要选举?这些地方的选举策略又有哪些?
https://blog.csdn.net/yanshu2012/article/details/54894629

  1. 消息列队的特色:
    • 生产者消费者模式
    • 先进先出(FIFO)顺序保证
    • 可靠性保证
    – 本身不丢数据
    – 消费者不丢数据:“至少一次,严格一次”
    • 至少一次就是可能会有两次,会重
    • 严格一次机制就会负责一点

4.消息列队常见场景
• 系统之间解耦合
– queue模型
– publish-subscribe模型
• 峰值压力缓冲
• 异步通讯

5.kafka 的架构
• producer:消息生产者
• consumer:消息消费者
• broker:kafka集群的server,负责处理消息读、写请求,存储消息
• topic:消息队列/分类
• Queue里面有生产者消费者模型
• broker就是代理,在kafka cluster这一层这里,其实里面是有不少个broker
• topic就至关于queue
• 图里没有画其实还有zookeeper,这个架构里面有些元信息是存在zookeeper上面的,整个集群的管理也和zookeeper有很大的关系
• 一个topic分红多个partition
• 每一个partition内部消息强有序,其中的每一个消息都有一个序号叫offset
• 一个partition只对应一个broker,一个broker能够管多个partition
• 消息不通过内存缓冲,直接写入文件
• 根据时间策略删除,而不是消费完就删除
• producer本身决定往哪一个partition写消息,能够是轮询的负载均衡,或者是基于hash的partition策略

• 接下来咱们看kafka是怎么生产消息,消费消息,和怎么存储消息的,来看它精髓的地方
• kafka里面的消息是有topic来组织的,简单的咱们能够想象为一个队列,一个队列就是一个topic,而后它把每一个topic又分为不少个partition,这个是为了作并行的,在每一个partition里面是有序的,至关于有序的队列,其中每一个消息都有个序号,好比0到12,从前面读日后面写,
• 一个partition对应一个broker,一个broker能够管多个partition,好比说,topic有6个partition,有两个broker,那每一个broker就管3个partition
• 这个partition能够很简单想象为一个文件,当数据发过来的时候它就往这个partition上面append,追加就行,kafka和不少消息系统不同,不少消息系统是消费完了我就把它删掉,而kafka是根据时间策略删除,而不是消费完就删除,在kafka里面没有一个消费完这么个概念,只有过时这样一个概念,这个模型带来了不少个好处,这个咱们后面再讨论一下
•这里producer本身决定往哪一个partition里面去写,这里有一些的策略,譬如若是hash就不用多个partition之间去join数据了
kafka 的消息存储和生产消费模型
• consumer本身维护消费到哪一个offset
• 每一个consumer都有对应的group
• group内是queue消费模型
– 各个consumer消费不一样的partition
– 所以一个消息在group内只消费一次
• group间是publish-subscribe消费模型
– 各个group各自独立消费,互不影响
– 所以一个消息只被每一个group消费一次

kafka 有哪些特 点
• 消息系统的特色:生产者消费者模型,FIFO
• 高性能:单节点支持上千个客户端,百MB/s吞吐
• 持久性:消息直接持久化在普通磁盘上且性能好
• 分布式:数据副本冗余、流量负载均衡、可扩展
• 很灵活:消息长时间持久化+Client维护消费状态
• 消息系统基本的特色是保证了,有基本的生产者消费者模型,partition内部是FIFO的,partition之间呢不是FIFO的,固然咱们能够把topic设为一个partition,这样就是严格的FIFO
• 接近网卡的极限
• 直接写到磁盘里面去,就是直接append到磁盘里面去,这样的好处是直接持久化,数据不会丢,第二个好处是顺序写,而后消费数据也是顺序的读,因此持久化的同时还能保证顺序,比较好,由于磁盘顺序读比较好
• 分布式,数据副本,也就是同一份数据能够到不一样的broker上面去,也就是当一份数据,磁盘坏掉的时候,数据不会丢失,好比3个副本,就是在3个机器磁盘都坏掉的状况下数据才会丢,在大量使用状况下看这样是很是好的,负载均衡,可扩展,在线扩展,不须要停服务的
• 消费方式很是灵活,第一缘由是消息持久化时间跨度比较长,一天或者一星期等,第二消费状态本身维护消费到哪一个地方了,Queue的模型,发布订阅(广播)的模型,还有回滚的模型

• ZeroMQ是一个socket的通讯库,它是以库的形式提供的,因此说你须要写程序来实现消息系统,它只管内存和通讯那一块,持久化也得本身写,仍是那句话它是用来实现消息队列的一个库,其实在storm里面呢,storm0.9以前,那些spout和bolt,bolt和bolt之间那些底层的通讯就是由ZeroMQ来通讯的,它并非一个消息队列,就是一个通讯库,在0.9以后呢,由于license的缘由,ZeroMQ就由Netty取代了,Netty自己就是一个网络通讯库嘛,因此说更合适是在通讯库这一层,不该该是MessageQueue这一层

• Kafka,的亮点,天生是分布式的,不须要你在上层作分布式的工做,另外有较长时间持久化,前面基本消费就干掉了,另外在长时间持久化下性能还比较高,顺序读和顺序写,另外还经过sendFile这样0拷贝的技术直接从文件拷贝到网络,减小内存的拷贝,还有批量读批量写来提升网络读取文件的性能,最后一点是比较轻量和灵活

• 消费状态谁来维护Client vs.Server

• 有人可能会说kafka写磁盘,会不会是瓶颈,其实不会并且是很是好的,为何是很是好的,由于kafka写磁盘是顺序的,因此不断的往前产生,不断的日后写,kafka还用了sendFile的0拷贝技术,提升速度,并且还用到了批量读写,一批批往里写,64K为单位,100K为单位,每一次网络传输量不会特别小,RTT(RTT:Round-TripTime往返时间)的开销就会微不足道,对文件的操做不会是很小的IO,也会是比较大块的IO
storm+kafka 有什么好 处
• 知足获取输入。产生输出数据的基本需求
• kafka的分布式、产生输出数据的基本需求
• kafka的分布式、高性能和storm吻合
• pub-sub模型可让多个storm业务共享输入数据
• kafka灵活消费的模式能配合storm实现不丢不重(exactly-once)的处理模型
• exactly-once,精准一次,这种模型在不少时候也是颇有用的
理解零拷 贝
• 从WIKI的定义中,咱们看到“零拷贝”是指计算机操做的过程当中,CPU不须要为数据在内存之间的拷贝消耗资源。而它一般是指计算机在网络上发送文件时,不须要将文件内容拷贝到用户空间(User Space)而直接在内核空间(Kernel Space)中传输到网络的方式。
• Non-Zero Copy方式:

Zero Copy方式:

从上图中能够清楚的看到,Zero Copy的模式中,避免了数据在用户空间和内存空间之间的拷贝,从而提升了系统的总体性能。Linux中的sendfile()以及Java NIO中的FileChannel.transferTo()方法都实现了零拷贝的功能,而在Netty中也经过在FileRegion中包装了NIO的FileChannel.transferTo()方法实现了零拷贝。

Storm简介(一)
Storm是 Twitter开源的一个分布式的实时计算系统,用于数据的实时分析,持续计算,分布式RPC等等。
官网地址http://storm-project.net/
源码地址:https:/github.com/nathanmarz/storm
实时计算须要解决一些什么问题
最显而易见的就是实时推荐系统,好比咱们在淘宝等电商购物网站去买东西,咱们会在网页旁边或者底端看到与本身所须要商品相关的系列产品。这就是使用相似 storn实时计算去作的,咱们很是熟悉的 Hadoop只是作离线的数据分析,没法作到实时分析计算。
好比车流量实时的计算,天天咱们北京市的交通状况很是的拥挤,咱们能够利用stom为咱们实时计算每个路段的拥挤度等相关路况信息。
再好比咱们很是熟悉的股票,那么股票系统也是一种实时计算的机制,利用stom彻底能够实现。
Storm简介(二)
实现一个实时计算系统
低延迟:都说了是实时计算系统了,延迟是必定要低的。高性能:可使用几台普通的服务器创建环境,结余成本
分布式:Stom很是适合于分布式场景,大数据的实时计算;你的数据和计算单机就能搞定,那么不用考虑这些复杂的问题了。咱们所说的是单机搞不定的状况。
可扩展:伴随着业务的发展,咱们的数据量、计算量可能会愈来愈大,因此但愿这个系统是可扩展的。
容错:这是分布式系统中通用问题,一个节点挂了不能影响个人应用,Storm能够轻松作到在节点挂了的时候实现任务转移,而且在节点重启的时候(也就是从新投入生产环境时,自动平衡任务)
可靠性:可靠的消息处理。Storm保证每一个消息至少能获得一次完整处理。任务失败时,它会负责从消息源重试消息。
快速:系统的设计保证了消息能获得快速的处理,使用 ZeroMQ做为其底层消息队列。
本地模式:Storm有一个“本地模式”,能够在处理过程当中彻底模拟stom集群。这让你能够快速进行开发和单元测试
Storm体系结构(一)
首先咱们拿 Hadoop和stom进行一个简单的对比:

storm是一个开源的分布式实时计算系统,能够简单、可靠的处理大量的数据流。Storm有不少使用场景:如实时分析,在线机器学习,持续计算,分布式RPC,ETL等等。Stom支持水平扩展,具备高容错性,保证每一个消息都会获得处理,并且处理速度很快(在一个小集群中,每一个结点每秒能够处理数以百万计的消息)。Storm的部署和运维都很便捷,并且更为重要的是可使用任意编程询言来开发应用。
Storm体系结构(二)
Storm架构结构图

Storm体系结构(三)
Nimbus主节点:
主节点一般运行一个后台程序—Nimbus,用于响应分布在集群中的节点,分配任务和监测故障。这个很相似亍 Hadoop中的 Job Tracker.
Supervisor工做节点:
工做节点一样会运行一个后台程序—supervisor,用于收听工做指派并基于要求运行工做进程。每一个工做节点都是 topology中一个子集的实现。而Nimbus和 Supervisor之间的协调则经过 Zookeeper系统戒者集群。
Zookeeper
Zookeeper是完成 Supervisor和 Nimbus之间协调的服务。而应用程序实现实时的逻辑则被封装到stom中的“topology”.topology则是一组由 Spouts(数据源)和Bots(数据操做)经过 Stream Groupings运行链接的图。下面对出现的术语进行更深入的解析。
Topology(拓扑)
storm中运行的一个实时应用程序,由于各个组件间的消息流动造成逻辑上的一个拓扑结构。一个 topology是 spouts和bos组成的图,经过 stream groupings将图中的 spouts和bots链接起来,以下图:

Storm Hello World (一)

Storm Hello world(二)
咱们首先回顾下storm的组件,安装这个流程去写咱们的java代码:

Storm Hello world(三)
首先编写咱们的数据源类:Spout。可使用俩种方式:
继承 BaseRichSpout类
实现 IRichSpout接口
重点须要几个方法进行重写或实现:open、nextTuple、declareOutputFields
继续编写咱们的数据处理类:Bolt。可使用俩种方式:
继承 BaseBasicBolt类
实现 IRichBolt接口
重点须要几个方法进行重写或实现:execute、declareOutputFields
最后咱们编写主函数(Topology)去进行提交一个任务。
在使用 Topology的时候,Storm框架为咱们提供了俩种模式:本地模式和集群模式
本地模式:(无需stom集群,直接在jaa中便可运行,通常用于测试和开发阶段)执行运行main函数便可。
集群模式:(须要Stom集群,把实现的java程序打包,而后 Topology进行提交)须要把应用打成jar,使用stom命令把 Topology提交到集群中去
Storm Hello World(四)
提交topology命令:storm jar storm01.jar bhz.topology.PWTopology1
查看任务命令:storm list
另外俩个 supervisor节点jps显示:
最后咱们能够看下俩个工做节点的 usr/local/temp下的文件信息是否有内容
Storm APl
Topology(拓扑)
Stream grouping(流分组、数据的分发方式)
Spout(喷口、消息源)
Bolt(螺栓、处理器)
Worker(工做进程)
Executor(执行器、Task的线程)
Task(具体的执行任务)
Configuration(配置)
Storm拓扑配置(一)
工做进程、并行度、任务数设置:
咱们首先设置了2个工做进程(也就是2个jvm)
而后咱们设置了 spout的并行度为2(产生2个执行器和2个任务)
第一个bolt的并行度为2而且指定任务数为4(产生2个执行器和4个任务)
第二个bolt的并行度为6(产生6个执行器和6个任务)
所以:该拓扑程序共有俩个工做进程(worker),2+2+6=10个执行器
(executor),2+4+6=12个任务(task)。每一个工做进程能够领取到12/2=6个任务。默认状况下一个执行器执行一个任务,但若是指定了任务的数目。则任务会平均分配到执行器中。
Storm什么是拓扑?(二)
咱们在使用storm进行流式计算的时候,都必需要在Main函数里面创建所谓的“拓扑”,拓扑是什么?
拓扑是一个有向图的计算。(也就是说在计算的过程当中是有流向的去处理业务逻辑,节点之间的链接显示数据该如何进入下一个节点,他们是进行链接传递的)
拓扑运行很简单,只须要使用 storm命令,把一个jar提交给 nimbus节点,numbus就会把任务分配给具体的子节点(supervisor)去工做。
咱们建立拓扑很是简单:
第一,构建 TopologyBuilder对象
第二,设置 Spout(喷口)数据源对象(能够设置多个)
第三,设置Bolt(螺栓)数据处理对象(能够设置多个)
第四,构建 Config对象
第五,提交拓扑
Storm流分组(一)
Stream Grouping:为每一个bolt指定应该接受哪一个流做为输入,流分组定义了如何在bolt的任务直接进行分发。

Storm流分组(二)☆☆☆
Shuffle Grouping随机分组:保证每一个bot接收到的 tuple数目相同
Fields Grouping按字段分组:好比按 userid来分组,具备一样 userid的 tuple会被分到相同的Bots,而不一样的 userid则会被分配到不一样的Bolts。
All Grouping广播发送:对于每个 tuple,全部的Bots都会收到。
Global Grouping:全局分组:这个 tuple被分配到stom中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
Non Grouping无分组:假设你不关心流式如何分组的煤科院使用这种方式,目前这种分组和随机分组是同样的效果,不一样的是Stom会把这个Bolt放到Bolt的订阅者的同一个线程中执行。
Direct Grouping直接分组:这种分组意味着消息的发送者指定由消息接收者的哪一个task处理这个消息。只有被声明为 Direct stream的消息流能够声明这种分组方法并且这种消息tupe必须使用 emitDirect方法来发射。消息处理者能够经过TopologyContext来获取处理它的消息的 taskid(Outputcollector.emit,方法也会返回 taskid)
本地分组:若是目标bo在同一工做进程存在一个或多个任务,元祖会随机分配给执行任务,不然该分组方式与随机分组方式是同样的。
storm流分组(三)

Storm WorldCount
咱们以一个统计单词的小程序来讲明问题。(storm02)

上面的示意图中有4个组件,分别为一个 spout和3个bolt,当数据源 spout.取得数据(能够是一个句子,里面包含多个单词)之后,发送给 SolitBolt进行切分,而后由 CountBolt进行统计结果,最终由ReportBolt记录结果
Storm Spout的可靠性(一)
Spout是 Storm数据流的入口,在设计拓扑时,一件很重要的事情就是须要考虑消息的可靠性,若是消息不能被处理而丢失是很严重的问题。
咱们继续做实验,以一个传递消息而且实时处理的例子,来讲明这个问题。
新建 maven项目(storm03)
经过示例咱们知道,若是在第一个bolt处理的时候出现异常,咱们可让整个数据进行重发,可是若是在第二个bolt处理的时候出现了异常,那么咱们也会让对应的整个spout里的数据重发,这样就会出现事务的问题,咱们就须要进行判断或者是进行记录
若是是数据入库的话,能够与原ID进行比对。
将一批数据定义惟一的ID入库(幂等性判断事物)
若是是事务的话在编写代码时,尽可能就不要进行拆分 tuple
或者使用 storm的 Trident框架
Storm Spout的可靠性(三)
下图是 spout处理可靠性的示意图:当 spout发送一个消息时,分配给俩个bolt分别处理,那么在最后一个bolt接受的时候会作异或运算

RPC介绍

调用客户端句柄;执行传送参数
调用本地系统内核发送网络
消息消息传送到远程主机
服务器句柄获得消息并取得参数
执行远程过程
执行的过程将结果返回服务器句柄
服务器句柄返回结果,调用远程系统内核
消息传回本地主机
客户句柄由内核接收消息
客户接收句柄返回的数据
Storm DRPC介绍
分布式RPc(distributed RPc,DRPc)
Storm里面引入DRPC主要是利用 storm的实时计算能力来并行化cPU密集型(CPU intensive)的计算任务。DRPc的 storm topology以函数的参数流做为输入,而把这些函数调用的返回值做为 topology的输出流。
DRPc其实不能算是 storm自己的一个特性,它是经过组合stom的原语stream、spout、bolt、topology而成的一种模式(pattern)。原本应该把DRPc单独打成一个包的,可是DRPC实在是太有用了,因此咱们把它和storm捆绑在一块儿。
Distributed RPC是经过一个”DRPC Server”来实现
DRPC Server的总体工做过程以下:
1接收一个RPC请求
2)发送请求到 storm topology
3)从 storm topology接收结果。
4)把结果发回给等待的客户端。
Storm DRPC配置和示例
Storm提供了一个称做 LinearDRPCTopologyBuilder的 Topology builder,它把实现DRPc的几乎全部步骤都自简化了。
相关代码地址https://github.com/nathanmarz/storm-starter/blob/master/src/jvm/storm/starter/BasicDRPCTopology.java
实现DRPC步骤:(新建 maven项目,storm04)
1须要修改配置文件内容为(分别修改每台机器配置):
vim /usr/local/apache-storm-0.9.2/conf/storm.yaml
drpc.servers:
-"192.168.1.114"
2须要启动stom的drpc:服务,命令:storm drpc&
3把相应的 topology代码上传到stom服务器上
storm jar storm04.jar bhz.drpc1.BasicDRPCTopology exc
4在本地调用远程 topology便可。
Storm DRPC实例场景
咱们继续看下一个示例:
主要使用 storm的并行计算能力来进行,咱们在微博、论坛进行转发帖子的时候,是对u进行转发,分析给粉丝(关注个人人),那么每个人的粉丝(关注者可能会有重复的状况),这个例子就是统计一下帖子(ur)的转发人数。
相关代码地址:https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java实现步骤以下:第一,获取当前转发帖子的人。第二,获取当前人的粉丝(关注者)。第三,进行粉丝去重。第四,统计人数第五,最后使用drpc远程调用 topology返回执行结果。Storm Trident介绍Trident是在stom基础上,一个以实时计算为目标的高度抽象。它在提供处理大吞吐量数据能力(每秒百万次消息)的同时,也提供了低延时分布式查询和有状态流式处理的能力。若是你对Pig和 Cascading这种高级批处理工具很了解的话,那么应该很容易理解 Trident,由于他们之间不少的概念和思想都是相似的。Trident提供了 joins,aggregations,grouping,functions,以及 filters等能力。除此以外,Trident还提供了一些与门的原语,从而在基于数据库戒者其余存储的前提下来应付有状态的递增式处理。Trident也提供致性(consistent)、有且仅有一次(exactly-once)等语义,这使得咱们在使用 trident toplogy时变得容易。咱们首先熟悉下 Trident的概念:"Stream"是 Trident中的核心数据模型,它被当作一系列的 batch来处理。在Storm集群的节点之间,一个 stream被划分红不少 partition(分区),对流的操做(operation)是在每一个 partition上并行执行的。state Query、partition Persist.、poe(filter、partitionAggregate、对每一个 partition的局部操做包括:function新建 maven工程(storm05)Storm Trident FunctionStorm Trident FilterStorm Trident projectionStorm Trident operationStorm Trident aggregateBatch和 Scout与 Transactiona(一)Trident提供了下面的语义来实现有且有一次被处理的目标一、Tuples是被分红小的集合(一组 tuple被称为一个 batch)被批量处理的。二、每一批 tuples被给定一个惟1D做为事务ID(txid),当这一个 batch被重发时,tid不变。三、batch和 batch之间的状态更新时严格顺序的。好比说 batch3的状态的更新必需要等到 batch2的状态更新成功以后才能够进行。有了这些定义,你的状态实现能够检测到当前 batch是否之前处理过,并根据不一样的状况进行不一样的处理,这个处理取决于你的输入 spout。有三种不一样类型的能够容错的 sqout:一、non-transactional(无事务支持的 spout)二、transactional(事务支持的 spout)三、opaque transactional(不透明事务支持的 spout)Batch和 Scout与 Transactiona(二)transactional sqout实现一、重发操做:二、重发结果:opaque transactional sqout实现实现ITridentspout接口最通用的AP能够支持 transactional or opaque transactional语义实现IBatchSpou接口:一个 non-transactional spout实现IPArtitioned Tridentspout接口:一个 transactional spout实现IOpaquePartitioned Tridentspout接口:一个opaque transactional spoutStorm与 KafKaKafka是一种高吞吐量的分布式发布订阅消息系统,它能够处理消费者规模的网站中的全部动做流数据。这种动做(网页浏览,搜索和其余用户的行动是在现代网络上的许多社会功能的一个关键因素。这些数据一般是因为吞吐量的要求而经过处理日志和日志聚合来解决。对于像 Hadoop的同样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是经过 Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了经过集群机来提供实时的消费。

相关文章
相关标签/搜索