写在前面的话,笔者第一次阅读框架源码,因此可能有些地方理解错误或者没有详细解释,若是在阅读过程发现错误很欢迎在文章下面评论指出。文章后续会陆续更新,能够关注或者收藏,转发请先私信我,谢谢。对了,笔者看的是2.2.1这个版本缓存
JStorm是一个分布式的实时计算引擎,是阿里巴巴根据storm的流处理模型进行重写的一个框架,支持相同的逻辑模型(也就是拓扑结构),而后底层的实现却大有不一样。不过本文并非打算对两个框架进行比较,接下来我会从源码的角度上来解析JStorm是如何工做的。
做为第一个篇章,笔者先来介绍下nimbus以及它启动的时候作了什么。JStorm的主节点上运行着nimbus的守护进程,这个进程主要负责与ZK通讯,分发代码,给集群中的从节点分配任务,监视集群状态等等。此外nimbus须要维护的全部状态都会存储在ZK中,JStorm为了减小对ZK的访问次数作了一些缓存,这个后续代码分析会说到。以上是nimbus功能的简介,接下来咱们从源码的角度看看Nimbus到底作了什么。首先在Nimbus启动的时候:框架
//设置主线程因为未捕获异常而忽然停止时调用的默认程序 Thread.setDefaultUncaughtExceptionHandler(new DefaultUncaughtExceptionHandler()); //加载集群的配置信息 Map config = Utils.readStormConfig(); //这下面这个方法内部注释掉了,笔者暂时没有太在乎,后续再补充 JStormServerUtils.startTaobaoJvmMonitor(); //建立一个NimbusServer实例 NimbusServer instance = new NimbusServer(); //建立一个默认的nimbus启动类 INimbus iNimbus = new DefaultInimbus(); //开始进行实际的初始化 instance.launchServer(config, iNimbus);
其实在DefaultUncaughtExceptionHandler
中也并无太多的处理操做,简单判断是不是内存溢出,而后正常关闭,不然就是异常直接抛出而后中断。读取配置的过程就不详细讲解了。NimbusServer
这个类主要封装了一些用于操做Nimbus的成员变量和方法,Nimbus的启动操做基本都是定义在这个类内的(上述代码就是这个类中的main方法所定义的)。
最重要的方法是launchServer
,接下来就详细的解说这个方法的做用,首先来看下launchServer
这个方法内部的代码:tcp
private void launchServer(final Map conf, INimbus inimbus) { LOG.info("Begin to start nimbus with conf " + conf); try { //判断配置模式是否正确 StormConfig.validate_distributed_mode(conf); createPid(conf); //设置退出时的操做 initShutdownHook(); //这个方法在默认实现中没有任何操做 inimbus.prepare(conf, StormConfig.masterInimbus(conf)); //建立NimbusData对象 data = createNimbusData(conf, inimbus); //这个方法主要负责处理当nimbus线程称为leader线程以后的操做 initFollowerThread(conf); int port = ConfigExtension.getNimbusDeamonHttpserverPort(conf); hs = new Httpserver(port, conf); hs.start(); //若是集群是运行在yarn上,也须要作一些初始化操做。 initContainerHBThread(conf); serviceHandler = new ServiceHandler(data); //thrift是一个分布式的RPC框架 initThrift(conf); } catch (Throwable e) { if (e instanceof OutOfMemoryError) { LOG.error("Halting due to Out Of Memory Error..."); } LOG.error("Fail to run nimbus ", e); } finally { cleanup(); } LOG.info("Quit nimbus"); }
只是判断配置信息中的一个字段名为“storm.cluster.mode”是不是“distributed”,本地模式下是“local”。分布式
initShutdownHook
添加退出的时候一些操做,包括设置参数提醒集群要退出,清除nimbus存储下的一些工做线程(负责处理通讯,分发代码,心跳的一系列守护线程),关闭打开的各类资源等。ide
createNimbusData
这个方法用于建立一个NimbusData
的对象,这个对象封装了Nimbus与ZK通讯的一些成员变量。下面会在每一个方法内部逐渐讲到NimbusData
的一些成员变量以及他们的做用。首先来看看NimbusData
的构造方法。函数
public NimbusData(final Map conf, INimbus inimbus) throws Exception { this.conf = conf; //两个方法分别处理打开的文件流和blob传输流 createFileHandler(); mkBlobCacheMap(); this.nimbusHostPortInfo = NimbusInfo.fromConf(conf); this.blobStore = BlobStoreUtils.getNimbusBlobStore(conf, nimbusHostPortInfo); this.isLaunchedCleaner = false; this.isLaunchedMonitor = false; this.submittedCount = new AtomicInteger(0); this.stormClusterState = Cluster.mk_storm_cluster_state(conf); createCache(); this.taskHeartbeatsCache = new ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>>(); //建立一个调度线程池,默认大小为12 this.scheduExec = Executors.newScheduledThreadPool(SCHEDULE_THREAD_NUM); this.statusTransition = new StatusTransition(this); this.startTime = TimeUtils.current_time_secs(); this.inimubs = inimbus; localMode = StormConfig.local_mode(conf); this.metricCache = new JStormMetricCache(conf, this.stormClusterState); this.clusterName = ConfigExtension.getClusterName(conf); pendingSubmitTopologies = new TimeCacheMap<String, Object>(JStormUtils.MIN_10); topologyTaskTimeout = new ConcurrentHashMap<String, Integer>(); tasksHeartbeat = new ConcurrentHashMap<String, TopologyTaskHbInfo>(); this.metricsReporter = new JStormMetricsReporter(this); this.metricRunnable = ClusterMetricsRunnable.mkInstance(this); String configUpdateHandlerClass = ConfigExtension.getNimbusConfigUpdateHandlerClass(conf); this.configUpdateHandler = (ConfigUpdateHandler) Utils.newInstance(configUpdateHandlerClass); if (conf.containsKey(Config.NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN)) { String string = (String) conf.get(Config.NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN); nimbusNotify = (ITopologyActionNotifierPlugin) Utils.newInstance(string); } else { nimbusNotify = null; } }
3.1 createFileHandler:在这方法内部,实现了一个匿名的内部类ExpiredCallback
,在其内部实现了一个方法叫expire
,利用回调的方式来关闭Channel
或者BufferFileInputStream
实例对象。ui
public void createFileHandler() { ExpiredCallback<Object, Object> expiredCallback = new ExpiredCallback<Object, Object>() { @Override public void expire(Object key, Object val) { try { LOG.info("Close file " + String.valueOf(key)); if (val != null) { if (val instanceof Channel) { Channel channel = (Channel) val; channel.close(); } else if (val instanceof BufferFileInputStream) { BufferFileInputStream is = (BufferFileInputStream) val; is.close(); } } } catch (IOException e) { LOG.error(e.getMessage(), e); } } }; //获取超时时间 int file_copy_expiration_secs = JStormUtils.parseInt(conf.get(Config.NIMBUS_FILE_COPY_EXPIRATION_SECS), 30); uploaders = new TimeCacheMap<Object, Object>(file_copy_expiration_secs, expiredCallback); downloaders = new TimeCacheMap<Object, Object>(file_copy_expiration_secs, expiredCallback); }
而后初始化NimbusData
的两个成员变量uploaders
和downloaders
,这两个分别维护须要上传的通道和须要下载的通道。TimeCacheMap
这个类的主要实现逻辑是在其构造函数内部启动一个守护线程。首先建立一个缓冲区,只要系统不关闭,则在守护线程内部不断的缓冲区获取对象,在对象不为空的状况下调用回调函数的expire方法,并执行相应的操做,这里具体传进来的expire
方法是关闭Channel
或者BufferFileInputStream
。
3.2. mkBlobCacheMap
:和上述的方法很是相似,也是申明一个匿名内部类,而后初始化几个成员变量。代码几乎和上个方法同样就不浪费拌面去贴了。这里expire方法中要关闭的是两个流AtomicOutputStream
和BufferInputStream
,blobUploaders
和blobDownloaders
分别存放着上传和下载所打开的流。blobListers
存放上传和下载的数据。
3.3. 初始化几个成员变量,包括NimbusInfo(包含了主机名,端口和标志是不是leader),BlobStore(用来存储blob数据的,使用键值存储,阿里提供了两个不一样的blob存储方式,一种是本地文件系统存储,一种的hdfs存储,两种方式的区别在于,因为本地文件存储并不能保证一致性,因此须要ZK介入来保证,这是JStorm的默认配置。若是使用hdfs来存储,则不须要ZK介入,由于hdfs能保证一致性和正确性),StormClusterState(存储整个集群的状态,这个是从ZK上获取的),为了不屡次向ZK通讯,还须要设置缓存信息,任务的心跳信息等等。
3.4. 初始化好metrics相关的报告线程和监听线程。this
initFollowerThread
4.1. 方法首先初始化一个回调函数,这是当一个nimbus成为leader以后就会调用的一个用于初始化一系列变量的方法,包括拓扑如何在集群上分配,拓扑状态更新,清除函数,还有监控线程等。后续会有新的篇章来介绍这个init方法,这里先放这个方法的源码。线程
private void init(Map conf) throws Exception { data.init(); NimbusUtils.cleanupCorruptTopologies(data); //拓扑分配 initTopologyAssign(); //状态更新 initTopologyStatus(); //清除函数 initCleaner(conf); initMetricRunnable(); if (!data.isLocalMode()) { initMonitor(conf); //mkRefreshConfThread(data); } }
4.2. 初始化一个Runnable的子类,在构造方法中,首先判断集群并非使用本地模式,而后更新ZK上的节点信息(将nimbus注册到ZK上)。而后经过ZK获取集群的状态信息,毕竟nimbus是须要维护整个集群的。紧接着判断是否存在leader,两次都没法选举出leader以后,则将ZK上的nimbus信息删除并退出。若是blobstore使用的是本地文件模式(有本文模式还有hdfs模式两种)还须要添加一个回调函数,这个回调函数执行的操做是,当这个nimbus不是leader的时候,对blob进行同步。此外还须要将那些active的blob存到ZK中,而将死掉的进行清除(缘由前文3.3也说到过,本地模式存储没法保证一致性,因此须要ZK进行维护,而hdfs自带容错机制,能保证数据的一致性)。
4.3. 设置该线程为守护线程,并启动这个线程。run方法首先判断当前保存在ZK上的集群中是否有leader,若是没有则选举当前nimbus为leader线程。若是有了leader线程,则须要判断是否跟当前的nimbus相同,若是不相同则中止当前的nimbus,毕竟已经有leader存在了。若是是相同的,则须要判断本地的状态中,若是尚未设置为leader,代表当前nimbus尚未进行初始化,则先设置nimbus为leader而后回调函数进行初始化,也就是调用init(conf)
方法。
获取一个端口(默认的端口是7621)用于构建HttpServer
实例对象。能够用于处理和接受tcp链接,启动一个新的线程进行httpserver的监听。(主要做用或者说在哪里用到尚且不明确)。code
initContainerHBThread
这个方法的主要做用是得知是否能在资源管理器(yarn)上运行jstorm集群,若是能够的话,则须要建立一个新的线程用于处理。(其实这里使用容器的目的是能够在一个物理集群上运行多个不同的逻辑集群甚至多个JStorm集群,能动态调整逻辑集群分到的资源,此外,资源管理器能提供很是强的可扩展性)。容器线程会被添加到NimbusServer
中,后续使用到的时候再详细讲解。这个容器线程也是守护线程,且立刻就会启动,这个线程的run方法里面包含两个处理:
6.1. handleWriteDir
:这个方法的主要做用是清除掉容器上的过时心跳信息,准确的说,若是JStorm集群容器目录下的心跳信息大于10,则须要清除(从最老的开始)。
6.2. handlReadDir
:这里主要是用于维护本地是否能接受到集群上的hb信息,若是屡次超时则要抛出异常。
initThrift
thrift是JStorm使用的一个分布式RPC框架。笔者后续再添加相应的源码解析。