咱们曾经在Flume-NG中的Channel与Transaction关系(原创)这篇文章中说了channel和Transaction的关系,可是在source和sink中都会使用Transaction,那么Transaction的并发性如何?html
Transaction是介于channel和source、sink直接的一层缓存,为了安全性和可靠性source、sink不能直接访问channel,只能访问在他之上的Transaction,经过Transaction间接操做channel中的数据。缓存
这节咱们以memory channel和file channel来研究一下Flume-NG的Transaction并发性。安全
首先全部channel的父类都是BasicChannelSemantics,全部Transaction的父类都是BasicTransactionSemantics,mem channel中的Transaction是MemoryTransaction是内部私有类;file channel的Transaction是FileBackedTransaction。通常来讲本身定义channel须要实现本身的Transaction。多线程
咱们在看源码时发现FileBackedTransaction不容许take和put同时操做,在其doCommit和doRollback方法中都有限制使得,究其缘由是:。而mem channel的Transaction则没有诸多限制。并发
咱们在source和sink中见到的getTransaction()获取的Transaction是同一个吗?若是不是并发性是怎么保证的?第一个很明显不是同一个,试想若是都是同一个,那么不一样组件好比一个source和一个sink都会有Transaction.close操做,将会关闭事务,那关闭晚的还如何commit?咱们再来看下getTransaction()代码: ide
1 /** 2 * <p> 3 * Initializes the channel if it is not already, then checks to see 4 * if there is an open transaction for this thread, creating a new 5 * one via <code>createTransaction</code> if not. 6 * @return the current <code>Transaction</code> object for the 7 * calling thread 8 * </p> 9 */ 10 @Override 11 public Transaction getTransaction() { 12 13 if (!initialized) { 14 synchronized (this) { 15 if (!initialized) { 16 initialize(); 17 initialized = true; 18 } 19 } 20 } 21 22 BasicTransactionSemantics transaction = currentTransaction.get(); 23 if (transaction == null || transaction.getState().equals( 24 BasicTransactionSemantics.State.CLOSED)) { 25 transaction = createTransaction(); 26 currentTransaction.set(transaction); 27 } 28 return transaction; 29 }
上面咱们能够看出来,若是transaction还未初始化或者transaction的状态是CLOSED(就是执行了close()方法改了状态),说明须要经过createTransaction()新建一个Transaction,createTransaction()这个方法在子类中实现的。咱们来看看mem和file的createTransaction()方法的代码,先看mem的:高并发
1 @Override 2 protected BasicTransactionSemantics createTransaction() { 3 return new MemoryTransaction(transCapacity, channelCounter); 4 }
直接就返回了本身的Transaction对象,在看file的createTransaction()方法的代码:性能
1 @Override 2 protected BasicTransactionSemantics createTransaction() { 3 if(!open) { 4 String msg = "Channel closed " + channelNameDescriptor; 5 if(startupError != null) { 6 msg += ". Due to " + startupError.getClass().getName() + ": " + 7 startupError.getMessage(); 8 throw new IllegalStateException(msg, startupError); 9 } 10 throw new IllegalStateException(msg); 11 } 12 FileBackedTransaction trans = transactions.get(); 13 if(trans != null && !trans.isClosed()) { //在这保证put和take只能一个时刻有一个 14 Preconditions.checkState(false, 15 "Thread has transaction which is still open: " + 16 trans.getStateAsString() + channelNameDescriptor); 17 } 18 trans = new FileBackedTransaction(log, TransactionIDOracle.next(), 19 transactionCapacity, keepAlive, queueRemaining, getName(), 20 channelCounter); 21 transactions.set(trans); 22 return trans; 23 }
这个就比mem的复杂了点,毕竟代码多了很多。优化
在看上面的getTransaction()方法,若是已经建立了一个Transaction则会放入currentTransaction中,而后之后再调用getTransaction()就会经过currentTransaction返回currentTransaction.get(),这莫不是同一个Transaction吗?那就好像有点不对了,对吧,那究竟是怎么回事呢?this
关键在于currentTransaction这个东西,咱们看声明:private ThreadLocal<BasicTransactionSemantics> currentTransaction = new ThreadLocal<BasicTransactionSemantics>()是ThreadLocal的实例,可能有不少人不了解这个东西,其实我也不了解!!简单来讲:ThreadLocal使得各线程可以保持各自独立的一个对象,ThreadLocal并非一个Thread,而是Thread的局部变量,为解决多线程程序的并发问题提供了一种新的思路,详细请谷歌、百度之。ThreadLocal有一个ThreadLocalMap静态内部类,你能够简单理解为一个MAP,这个‘Map’为每一个线程复制一个变量的‘拷贝’存储其中,这个“Map”的key是当前线程的ID,value就是set的变量,而get方法会依据当前线程ID从ThreadLocalMap中获取对应的变量,我们这里就是Transaction。这下明白了吧,每一个source和sink都会有单独的线程来驱动的,因此都有各自的Transaction,是不一样的,所以也就能够并发了(针对memory channel)。
可是上面file的createTransaction()方法为何是那样的?由于咱们说了file的Transaction不能同时put和take(同一个Transaction通常只会作一个事就是put或者take),也就是不存在并发性的,因此在file channel中的transactions也设置为了private final ThreadLocal<FileBackedTransaction> transactions =new ThreadLocal<FileBackedTransaction>(),因为channel也是单独的线程驱使,因此这个transactions中始终只存在一对值就是file channel的线程ID和建立的Transaction,若是不一样sink或者source调用getTransaction()时会试图经过createTransaction()方法来建立新的Transaction可是file的createTransaction()方法因为已经有了一个Transaction,在其关闭以前是不会赞成 再次建立的,因此就只能等待这个Transaction关闭了,所以也就保证了put和take不会同时存在了。也就没有并发性了,性能天然大受影响。
那file channel为何不让put和take同时操做呢?这个问题很值得研究,一:put、take、commit、rollback都会获取log的共享锁,一旦获取其余就只能读,获取锁的目的就是这四个操做都要写入log文件;二,put操做并不会将写入log的event的指针放到queue中,而是在commit中才会放到queue中;3、take的操做会直接从queue中取数据,这时若是put已经commit就能够获取数据,若是没有则会返回null;4、因为四个操做都会获取log锁,致使实际上写达不到并发,并且这个log锁使得即便是写不一样的数据文件也不可能,由于只有这一个锁,不是每一个数据文件一个锁(数据文件的个数是动态的这个很差作);5、若take和put同时操做会使得可能交替执行获取锁,此时可能put没commit而queue中无数据,take获取锁以后也没什么意义并且也是轮流不是并行,只会下降put和take的性能,好比put和take各自单独只需1s便可,可是这样可能须要2s甚至更长时间(take一直在等待put的commit)才能完成。综上不让put和take同时操做比较合理。
可是有没有更好的方案能够提升file的性能呢?由于file是基于文件的性能不可能很高,更为合理的办法是合理提升并发性,能够优化的一个方案是put、take、commit、rollback单独以文件存放,并设置相应的多个锁,可是文件的动态变化以及put和put、take和take、commit和commit、rollback和rollback之间的并发性又难以实现了,彷佛只适合take和put的并发,这样貌似会使得file channel更复杂了,可是性能应该会提升一些,会不会得不偿失啊?
还有一个问题就是:file channel中的createTransaction()方法若是再次建立Transaction,而先前建立的并未关闭,会执行Preconditions.checkState(false,"Thread has transaction which is still open: " +trans.getStateAsString()+ channelNameDescriptor)会直接抛出异常,可是彷佛日志中没有相似的异常啊,并且进程也并未中断,可是显然使用了file channel的flume,sink和source能够正常运行,这是怎么搞得?