在本篇中,咱们就来根据一个案例,看看如何去设计一个拓扑, 如何分解问题以适应Storm架构,同时对Storm拓扑内部的并行机制会有一个基本的了解。前端
本章代码都在:java
git@github.com:zyzdisciple/storm_study.gitgit
项目下的 user_behavior包下。github
有这样一种场景,在前端存在会话,咱们会不断收到来自前端的消息,消息包含消息的发送时间,消息内容,结束标识, 消息的发送者, SessionId等其余信息, 咱们须要作的事情是当接收到消息以后,根据SessionId判断是否属于同一消息, 若是是的话将内容拼接, 若是结束标识为 true, 表示会话已结束,则存入数据库或其余地方, 若是不为true, 则等待, 在1分钟后 仍是没有收到消息, 则存入数据库。数据库
咱们对消息数据并无太严格的要求,由于数据自己是用户的行为, 如一个点击,浏览事件等等其余行为信息,在这里咱们用三位数的数值,来表示用户的行为类型。这就是案例的基本需求。apache
数据的输入格式为:json
{sessionId:1, content:1,isEnd:true, timeMillis:1, userId:1}安全
这些基本的信息中,就包含着某用户在某时间段的行为,而咱们事实上须要作的就是,了解用户在一段时间内的操做行为(在这里过时时间是1分钟,指的是系统自身的过时时间,并不是用户的操做间隔),将其存入数据库,供给其余的系统使用。网络
咱们须要关注的核心点在如下几个地方:session
数据具备时效性,所以咱们必然须要尽量快的推入数据库,不然的话用Hadoop或Spark作批处理比Storm更合适。 因此,速度对咱们而言,很重要。
可靠数据源,不可靠数据源, 这一样也是须要考虑的一点,对于可靠数据源而言,咱们必须保证数据不丢失,当拓扑挂掉以后,或是有些数据遗失以后,可以从数据源的节点再次获取。 但咱们这里的是不可靠数据源,由于用户的几条数据丢失,对咱们而言并无太大差异。 因此在趋势性预测方面, 数据的可靠性并无那么重要。
因此须要把握数据源的相关特性,与业务相对应, 作出速度, 安全等其余方面的取舍。
这也是咱们在一开始就须要关注的另外一个核心点,以下:
{sessionId:1,content:001,002,005, isEnd:true, endTime:2, userId:1}
sessionId,userId不用多说, content须要将对应的标识转换成三位数,而后拼接存储,以“,”间隔, isEnd表示收到最后一条数据时的 end标识, endTime,表示收到最后一条数据的时间。
在拓扑设计中,处理逻辑自己并非第一件该关注的地方, 不管未来是存入数据又或是其余方式,须要了解数据源的特性,对数据的时效性安全性,性能等方面的考虑,以及未来的扩展性,可能须要优化的地方都须要放在考虑范围内。
尽管有一种说法是,不要过早优化。但那种优化,细节并非性能的决定因素,而大数据,任何一点细节均可能会致使性能的巨大误差, 重构设计又是一件很难的事情。 所以提前考虑优化并非没有必要的事情。
在拓扑设计时,须要:
肯定输入数据,怎么把它表示成元组
肯定输出重点数据,怎么表示成元组
肯定中间部分,补充完善数据处理方式。
因此在开始代码以前,最好已经肯定了每一步究竟该干什么。
咱们接收到的是字符串,又须要保存各个数据的相应属性,所以在一开始的Spout中,须要将对应的数据转换成Java对象,下发下去,同时若是传入数据有误,则直接跳过,不进行处理。另一点须要作的是, 将content转成3位有效数字。
在Spout中主要工做就是对接数据源,同时它自身也是整个系统的数据源,须要作相应的数据处理,而不包含其余业务逻辑。
当接收到对象以后,须要按照时间间隔进行分组,提取出当前数据所在的时间间隔下发到下一层级便可。
按时间收取到数据以后,再进行处理,按照以前所提到的要求进一步处理数据。
写出。
在其中的 bolt及Spout中须要注意的一点问题是, 全部的可能抛出异常的地方都要被处理掉, 即便是 RuntimeException, 缘由稍后会提到。
spout的核心功能就是读取数据,对数据进行基本的数据处理,做为拓扑的数据源,下发数据。
那么做为数据源,须要作的工做有哪些?在平时作接口的过程当中,咱们向前端返回的数据必定是具备必定的数据格式, 且不容变动的格式,可以提供前端所需的全部信息才行, 最好数据自己已经通过必定的处理, 对待错误数据等其余问题,并不直接向前端返回数据。
因此数据的基本处理, 校验, 转换成对象才是基本需求。
一样,为了简化处理, 这里采起了从文件中读取数据的方式。
代码以下:
import com.google.gson.Gson; import com.storm.demo.user_behavior.BehaviorConstants; import com.storm.demo.user_behavior.entity.MessageInfo; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.util.Map; /** * @author zyzdisciple * @date 2019/4/7 */ public class FileReaderSpout extends BaseRichSpout { private static final long serialVersionUID = 2970891499685374549L; private static final Logger logger = LoggerFactory.getLogger(FileReaderSpout.class); private static Gson gson; private BufferedReader br; private SpoutOutputCollector collector; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; gson = new Gson(); try { br = new BufferedReader(new FileReader("E:\\IdeaProjects\\storm_demo\\src\\main\\resources\\user_behavior_data.txt")); } catch (FileNotFoundException e) { e.printStackTrace(); } } @Override public void nextTuple() { try { String line = br.readLine(); if (line != null && !line.isEmpty()) { line = line.trim(); MessageInfo info = gson.fromJson(line, MessageInfo.class); //只发送有效数据,无效数据跳过 /*在这里, 咱们是逐行发送, 若是有条件的话, 也就是说一次能取出 * 多条数据的状况下, 所有一次发送是比较好的选择. * */ if (isValid(info, line)) { completingMessage(info); collector.emit(new Values(info)); } } } catch (Exception e) { /*在catch语句中,咱们通常会选择打印log日志,表示为何出错,并不作其余处理, * 即便数据格式存在问题依然须要可以正常执行下去,保证拓扑不中断。 * 在这里能够进一步将Exception细分,对不一样的Exception打印日志不一样。 * */ e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(BehaviorConstants.FIELD_INFO)); } @Override public void close() { if (br != null) { try { br.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * 补全基本数据, 设置默认值 * @param info */ private void completingMessage(MessageInfo info) { try { info.setContent(formatMessage(info.getContent())); } catch (Exception e) { logger.warn("内容转换失败:" + info); } if (info.getEndTime() == null) { info.setEndTime(System.currentTimeMillis()); } } /** * 格式化内容,将content转为3位 * @param content * @return */ private String formatMessage(String content) { if (NumberUtils.isCreatable(content)) { StringBuilder sb = new StringBuilder(BehaviorConstants.CONTENT_CAPACITY); //从第一位不是0的数值开始 boolean skipZero = false; int currentValue; try { for (String c : content.trim().split("")) { //已经跳过数值为0的字符, 若是不为0将 skipZero设为true //若是是+ - x等字符, 会抛出异常 currentValue = Integer.valueOf(c); if (skipZero || (currentValue != 0 && (skipZero = true))) { sb.append(currentValue); } } } catch (Exception e) { throw new NumberFormatException(); } //补全字符串 StringBuilder result = new StringBuilder(BehaviorConstants.CONTENT_CAPACITY); for (int i = 0, L = sb.length(); i < BehaviorConstants.CONTENT_CAPACITY - L; i++) { result.append(0); } result.append(sb); return result.toString(); } else { return BehaviorConstants.UNKNOWN_CONTENT; } } /** * message校验 * @return */ private boolean isValid(MessageInfo info, String source) { if (info == null) { logger.warn("数据格式转换失败, 字符串为:" + source); } else if (StringUtils.isBlank(info.getSessionId())) { logger.warn("缺失sessionId, 字符串为:" + source); } else if (StringUtils.isBlank(info.getUserId())) { logger.warn("缺失userId, 字符串为:" + source); } else { return true; } return false; } }
代码并无什么太多值得注意的地方, 更多的则是数据前期处理方面。
主要功能是划分数据时间组, 为了测试起见, 能够将timeOut时间进一步调小.
import com.storm.demo.user_behavior.BehaviorConstants; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Map; /** * @author zyzdisciple * @date 2019/4/7 */ public class TimeIntervalBolt extends BaseRichBolt { private static final long serialVersionUID = 5632264737187544663L; private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } /** * 是按照接收到的时间来进行分组. * @param input */ @Override public void execute(Tuple input) { //判断当前的时间区间. long timeGroup = System.currentTimeMillis() / (BehaviorConstants.SESSION_TIME_OUT_SECS * 1000); collector.emit(new Values(timeGroup, input.getValueByField(BehaviorConstants.FIELD_INFO))); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(BehaviorConstants.FIELD_TIME_GROUP, BehaviorConstants.FIELD_INFO)); } }
初步代码以下:
import com.storm.demo.user_behavior.BehaviorConstants; import com.storm.demo.user_behavior.entity.GroupKey; import com.storm.demo.user_behavior.entity.MessageInfo; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.HashMap; import java.util.Map; /** * @author zyzdisciple * @date 2019/4/7 */ public class ContentStitchingBolt extends BaseRichBolt { private static final long serialVersionUID = -4684689172584740403L; private OutputCollector collector; /** * 存储每一个时间段的数据, 因为每一个时间段可能有多组session数据, 所以放入list中处理. */ private Map<GroupKey, MessageInfo> collectMap; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; collectMap = new HashMap<>(); } @Override public void execute(Tuple input) { //在这里, 首先获取数据, 若是自己为true, 且在分组中不存在对应的数据, 则能够直接发送. MessageInfo info = (MessageInfo) input.getValueByField(BehaviorConstants.FIELD_INFO); long timeGroup = input.getLongByField(BehaviorConstants.FIELD_TIME_GROUP); GroupKey key = new GroupKey(info.getSessionId(), timeGroup); //根据sessionId, group双重判断, 存储 MessageInfo messageInfo = collectMap.get(key); //更新info的数据 updateNewInfoWithPreMessage(info, messageInfo); //若是不存在且已结束, 直接发送数据便可 if (info.getEnd()) { collector.emit(new Values(info)); //发送后须要移除相关数据 collectMap.remove(key); } else { collectMap.put(key, info); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(BehaviorConstants.FIELD_INFO)); } /** * 用之前的messageContent 更改当前message * @param newInfo * @param preInfo */ private void updateNewInfoWithPreMessage(MessageInfo newInfo, MessageInfo preInfo) { if (preInfo != null) { StringBuilder sb = new StringBuilder(preInfo.getContent()); sb.append(",").append(newInfo.getContent()); newInfo.setContent(sb.toString()); } } }
其中GroupKey以下:
import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; /** * 以sessionId 和 group作为分组的主键, 重写equals方法 * * @author zyzdisciple * @date 2019/4/7 */ public class GroupKey { private String sessionId; private Long timeGroup; public GroupKey(String sessionId, Long timeGroup) { this.sessionId = sessionId; this.timeGroup = timeGroup; } public String getSessionId() { return sessionId; } public void setSessionId(String sessionId) { this.sessionId = sessionId; } public Long getTimeGroup() { return timeGroup; } public void setTimeGroup(Long timeGroup) { this.timeGroup = timeGroup; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; GroupKey groupKey = (GroupKey) o; return new EqualsBuilder() .append(sessionId, groupKey.sessionId) .append(timeGroup, groupKey.timeGroup) .isEquals(); } @Override public int hashCode() { return new HashCodeBuilder(17, 37) .append(sessionId) .append(timeGroup) .toHashCode(); } }
咱们发现对于GroupKey并无序列化操做, 由于这里是不须要的, 在emit中发送的数据须要被序列化(并不许确, 但目前能够这样理解).
然而,仍然存在一个问题, 咱们是已经将数据存储进去, 而且发送, 可是, end 标识为false的数据呢? 咱们在这里并无进行处理, 仅仅是存入Map中, 这固然是不合适的, 有一种通用的处理办法, 咱们并不采用 简单的Map进行存储, 而是 采用另外一种, TimeCacheMap, 有兴趣的能够本身了解一下, 设置定时时间, 过时后能够自行处理, 并不只限于Storm中使用, 在任何代码中均可以使用.
若是尝试以后会发现它是过时的, 须要用另外一个 RotatingMap进行实现, 须要自行在循环中调用方法. 循环须要sleep, 周期为 timeOut / (桶的个数 - 1); 原理与 TimeCacheMap一致.
但在这里并不打算介绍这两种方法, 而是Storm自身的定时机制, Tick, 其原理就在于, Storm会每隔固定时间发送一条系统消息给对应的Bolt, Spout ,甚至全部的均可以. 当咱们每次接收到系统消息时, 则表示当前已经通过一个周期, 依然没有收到的数据, 须要进行过时处理.
它的处理机制能够自行定义, 比较灵活, 实现代码以下:
@Override public Map<String, Object> getComponentConfiguration() { Config conf = new Config(); conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, BehaviorConstants.SESSION_TIME_OUT_SECS); return conf; }
仅仅须要在对应的Bolt, Spout中重写以上方法便可. 若是须要全局发送, 则须要在Topology的main方法中, 加入:
Config conf = new Config(); conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, BehaviorConstants.SESSION_TIME_OUT_SECS);
配置便可.
经过这种方式, 咱们能够作到每隔60秒让系统发送一条消息给bolt, 还需加入以下代码, 更改execute方法便可:
@Override public void execute(Tuple input) { if (input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && input.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) { //在这里, 首先获取数据, 若是自己为true, 且在分组中不存在对应的数据, 则能够直接发送. MessageInfo info = (MessageInfo) input.getValueByField(BehaviorConstants.FIELD_INFO); long timeGroup = input.getLongByField(BehaviorConstants.FIELD_TIME_GROUP); GroupKey key = new GroupKey(info.getSessionId(), timeGroup); //根据sessionId, group双重判断, 存储 MessageInfo messageInfo = collectMap.get(key); //更新info的数据 updateNewInfoWithPreMessage(info, messageInfo); //若是不存在且已结束, 直接发送数据便可 if (info.getEnd()) { collector.emit(new Values(info)); //发送后须要移除相关数据 collectMap.remove(key); } else { collectMap.put(key, info); } } else { final Long timeGroup = System.currentTimeMillis() / (BehaviorConstants.SESSION_TIME_OUT_SECS * 1000); Iterator<Map.Entry<GroupKey, MessageInfo>> iterator = collectMap.entrySet().iterator(); while(iterator.hasNext()) { Map.Entry<GroupKey, MessageInfo> entry = iterator.next(); if (entry.getKey().getTimeGroup() < timeGroup) { collector.emit(new Values(entry.getValue())); iterator.remove(); } } } }
经过这种方式就实现了周期性校验数据, 发送.
但依然要注意到的一点是, 这种发送并不许确, 并不是是严格的按照时间发送, 而是将发送至bolt的系统消息 与 其余消息 加入队列, 排队发送, 等待前一次的执行完成.
线程安全
而当咱们采起Map的时候, 经常须要考虑到的一个问题则是线程安全, 但在Storm中, 咱们通常不须要考虑这个问题, 由于每个bolt实例都是在线程中按序执行,所以不怎么须要考虑线程安全.
可是前提是, 不要再bolt中存在 static 变量用以保存数据, 又或是在bolt中开启新的线程, 致使线程安全问题. 当出现这两种状况时, 就须要考虑线程安全了.
固然Storm中已经提供了这种Tick机制的bolt:
package org.apache.storm.topology.base; import org.apache.storm.tuple.Tuple; import org.apache.storm.utils.TupleUtils; /** * This class is based on BaseRichBolt, but is aware of tick tuple. */ public abstract class BaseTickTupleAwareRichBolt extends BaseRichBolt { @Override public void execute(final Tuple tuple) { if (TupleUtils.isTick(tuple)) { onTickTuple(tuple); } else { process(tuple); } } /** * 当接收到系统消息时, 执行的方法 */ protected void onTickTuple(final Tuple tuple) { } /** * 其余消息执行的方法 */ protected abstract void process(final Tuple tuple); }
当咱们继承BaseTickTupleAwareRichBolt以作定时任务时, 须要重写 onTickTuple, process, getComponentConfiguration 便可.
而这种方式, 即tick 与 timeInterval结合的方式, 便是处理过时的一种比较好的方式.
特别是timeInterval, 经过时间分组, 是一种比较巧妙的方式.
固然,依然存在相应的问题, 因此最好的方式多是时间窗口, 在这里暂时就以这种方式来实现.
因此是在消息信息中自身已经携带了时间信息是最好的方式.
这个依然没有太多须要注意的地方, 在cleanUp中须要关闭流, 便可.
import com.google.gson.Gson; import com.storm.demo.user_behavior.BehaviorConstants; import com.storm.demo.user_behavior.entity.MessageInfo; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; import java.util.Map; /** * @author zyzdisciple * @date 2019/4/7 */ public class MessageWriterBolt extends BaseRichBolt { private static final long serialVersionUID = 5411259920685235771L; private static final Logger logger = LoggerFactory.getLogger(MessageWriterBolt.class); private PrintWriter pw; private static Gson gson; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { try { pw = new PrintWriter(new FileWriter("E:\\IdeaProjects\\storm_demo\\src\\main\\resources\\user_behavior_data_write.txt")); gson = new Gson(); } catch (IOException e) { logger.error("文件有误,保存失败"); } } @Override public void execute(Tuple input) { MessageInfo info = (MessageInfo) input.getValueByField(BehaviorConstants.FIELD_INFO); try { String jsonMessage = gson.toJson(info, MessageInfo.class); pw.println(info); pw.flush(); } catch (Exception e) { logger.warn("格式转换失败:" + info); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public void cleanup() { if (pw != null) { pw.close(); } } }
排在首位的是分流策略的选取, 在前一篇文章中提到了三种, shuffle, global, field 三种方式. 那么来整合一下吧:
import com.storm.demo.user_behavior.bolt.ContentStitchingBolt; import com.storm.demo.user_behavior.bolt.MessageWriterBolt; import com.storm.demo.user_behavior.bolt.TimeIntervalBolt; import com.storm.demo.user_behavior.spout.FileReaderSpout; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.generated.StormTopology; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; import org.apache.storm.utils.Utils; /** * @author zyzdisciple * @date 2019/4/7 */ public class UserBehaviorTopology { private static final String STREAM_SPOUT = "spout"; private static final String STREAM_TIME_INTERVAL_BOLT = "time-bolt"; private static final String STREAM_CONTENT_BOLT = "content-stitching-bolt"; private static final String STREAM_FILE_WRITER_BOLT = "file-writer-bolt"; private static final String TOPOLOGY_NAME = "user-behavior-topology"; private static final long TEN_SECONDS = 1000L * 10; public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(STREAM_SPOUT, new FileReaderSpout()); builder.setBolt(STREAM_TIME_INTERVAL_BOLT, new TimeIntervalBolt()).shuffleGrouping(STREAM_SPOUT); //这里必然要使用field 保证同一group的数据发送到同一个bolt中. builder.setBolt(STREAM_CONTENT_BOLT, new ContentStitchingBolt()) .fieldsGrouping(STREAM_TIME_INTERVAL_BOLT, new Fields(BehaviorConstants.FIELD_TIME_GROUP)); builder.setBolt(STREAM_FILE_WRITER_BOLT, new MessageWriterBolt()).shuffleGrouping(STREAM_CONTENT_BOLT); Config conf = new Config(); //当设置为true时, 且log level定义为info时, 会在控制台输出发射元组内容 conf.setDebug(true); StormTopology topology = builder.createTopology(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(TOPOLOGY_NAME, conf, topology); //停留几秒后关闭拓扑,不然会永久运行下去 Utils.sleep(TEN_SECONDS); cluster.killTopology(TOPOLOGY_NAME); cluster.shutdown(); } }
那么仅仅这样就能够了吗? 固然不行, 咱们会发现, 在整个拓扑中, 依然是采用串行化的方式去处理数据, 对资源得不到充分的利用, 也是不可以支撑起大数据的.
因此在这里就须要提到另外一个东西, 即拓扑的并行化
咱们能够先来看看代码, 在UserBehaviorTopology 中的main 方法作以下更改:
builder.setSpout(STREAM_SPOUT, new FileReaderSpout(), 4);
有什么区别呢? 在最后一位参数加了个4.
咱们须要知道的一点是: 不管是 Spout 仍是 bolt 最终都会回归到虚拟机, 或物理上, 在一个JVM进程中去运行相关的代码, 虽然咱们自身没法分配, 决定哪个bolt在哪个jvm中去执行, 可是咱们能够决定同时用多少个线程来运行相关的bolt, spout, 进行并行化处理, 大大提高速度, 而在这里, 咱们则是指定了4个线程去执行spout. 也就是 excutor
须要注意:不管有多少个jvm在运行spout, 其总的执行spout的线程数,肯定为4
那么在第一环进行了扩展, 能够支持更大批量的数据处理, 但这并不够, 系统的总体性能必然是取决于最低点的.
builder.setSpout(STREAM_SPOUT, new FileReaderSpout(), 4); builder.setBolt(STREAM_TIME_INTERVAL_BOLT, new TimeIntervalBolt(), 2).shuffleGrouping(STREAM_SPOUT); builder.setBolt(STREAM_CONTENT_BOLT, new ContentStitchingBolt(), 8) .fieldsGrouping(STREAM_TIME_INTERVAL_BOLT, new Fields(BehaviorConstants.FIELD_TIME_GROUP)); builder.setBolt(STREAM_FILE_WRITER_BOLT, new MessageWriterBolt(), 4).shuffleGrouping(STREAM_CONTENT_BOLT);
在这里考虑到Spout 与 ContentStitchingBolt的任务功能比较复杂, 所以给了较多的executor去执行.
这里的线程数分配 并无严格的要求, 节点功能简单, 能够适当减小执行器, 节点功能复杂能够适当增长.
而在前面提到过, 异常处理问题, 在每个bolt 或Spout中的异常都须要被处理掉, 咱们知道, 节点是运行在线程中的, 而且并不必定在某个jvm中运行多少个bolt, spout, 当抛出运行时异常或未被cathc掉的异常 会致使程序异常终止, 在这种状况下, 可能会牵连其余的节点也异常终止, 致使topology挂掉, 所以必须处理.
而在上面提到的jvm也被称做 workNode, worker.
执行线程则是 executor
另外每个线程中可能会运行一个或多个节点(spout bolt实例), 这样的实例就是task, 即任务.
这是并行性相关的三个概念.
一个worker对应一个jvm进程, 其中包含多个线程, 每一个线程中含有数目不定(默认是一个)的task, 也即spout bolt实例.
而设定bolt则是:
builder.setBolt(STREAM_CONTENT_BOLT, new ContentStitchingBolt(), 8) .setNumTasks(64) .fieldsGrouping(STREAM_TIME_INTERVAL_BOLT, new Fields (BehaviorConstants.FIELD_TIME_GROUP));
经过这种方式, setNumTasks 定义了64个实例. 每一个线程中存在8个实例, 这8个实例采起循环执行, 而非并行执行的方式.
那么为何要用这种方式呢? 每一个线程中多建立8个实例串行执行,效率会有所提升吗? 并不会.
那么意义在哪里呢?
Storm中有一个比较有趣的功能叫作, Rebalance, 支持在拓扑运行时动态调整 work 和 executor的数量. 但却并不支持调整 task的数量.
而且要求 executor <= task 数量, 若是超出, 则executor 会与 task保持一致. 那么task数量为何不能调整呢? 浅显的例子是, 对于 fieldGrouping而言, 要求 field一致的数据, 最终都流向同一个 bolt实例, 也就是同一个task中, 若是在rebalance过程当中, 建立了新的bolt, 就会出现问题.
能够经过这样两个方面来作进一步优化:
分解为功能组件的优化方式
将每个功能细分, 分解到不一样的节点中进行处理, 这样知足了单一职责原则, 在未来调整扩展时带来极大的方便.
基于重分配的方式进行设计
咱们须要在拓扑中尽量的减小重分配的次数, 由于每一次执行分配, 咱们须要进行相应的序列化, 反序列化, 以及网络传输种种开销.
分配的节点越多, 咱们须要越多的实例, 对物理虚拟资源的开销也都会相应的变大.
所以, 咱们须要在尽量知足组件优化的同时, 减小重分配次数.
想到这里不只产生了一点点问题:
那不是在一个单独的Spout中所有计算, 将功能定义为各类方法, 就是最大程度的知足了最少重分配吗?
这里就会存在一个问题, 计算资源的分配, 并行处理多个计算显然要比流式处理来的更快, 咱们将不一样的计算任务分配到各个节点, 最大化利用物理资源.
我想这大概是Storm的设计初衷.
因此其核心就在于将计算资源分配, 同时若是可以最少程度的减小网络传输, 最好不过了. 而这一点, 也正是Spark的实现方式.
那么在咱们的项目中又该如何调整呢?
答案在于调整 TimeIntervalBolt, 一方面它自己也是数据处理的一部分, 另外一方面, 与其为了这一操做加入没必要要的网络传输序列化等, 不如在前一步直接进行处理, 减小一个节点.
实现以下:
if (isValid(info, line)) { completingMessage(info); //判断当前的时间区间. long timeGroup = System.currentTimeMillis() / (BehaviorConstants.SESSION_TIME_OUT_SECS * 1000); collector.emit(new Values(timeGroup, info)); }
删除TimeInterval, 加入Spout中, 并作相应字段,分流调整.
说到底, 这自己仍是一件全凭借我的理解的问题, 合并时须要考虑这样几个点:
性能影响, 即合并与拆分 带来的开销, 利弊权衡.
语义化问题,不然会为未来扩展带来没必要要的问题.
在本篇中经过一个更加真实的案例, 从头设计了一个拓扑, 有如下知识点:
如何设计一个拓扑, 咱们须要考虑其数据源的特性, 在一开始就肯定最终的输出格式, 以及进一步一点点肯定中间的操做步骤.
Tick 机制, 在Storm中实现相关的定时处理.
并行度, worker, executor, task, 对这些有了基本的了解.
bolt的线程安全问题, 有了基本了解
拓扑的设计方法, 基于功能, 和基于重分配的两种方式.