Flume-NG中Transaction并发性探究

  咱们曾经在Flume-NG中的Channel与Transaction关系(原创)这篇文章中说了channel和Transaction的关系,可是在source和sink中都会使用Transaction,那么Transaction的并发性如何?html

  Transaction是介于channel和source、sink直接的一层缓存,为了安全性和可靠性source、sink不能直接访问channel,只能访问在他之上的Transaction,经过Transaction间接操做channel中的数据。缓存

  这节咱们以memory channel和file channel来研究一下Flume-NG的Transaction并发性。安全

  首先全部channel的父类都是BasicChannelSemantics,全部Transaction的父类都是BasicTransactionSemantics,mem channel中的Transaction是MemoryTransaction是内部私有类;file channel的Transaction是FileBackedTransaction。通常来讲本身定义channel须要实现本身的Transaction。多线程

  咱们在看源码时发现FileBackedTransaction不容许take和put同时操做,在其doCommit和doRollback方法中都有限制使得,究其缘由是:。而mem channel的Transaction则没有诸多限制。并发

  咱们在source和sink中见到的getTransaction()获取的Transaction是同一个吗?若是不是并发性是怎么保证的?第一个很明显不是同一个,试想若是都是同一个,那么不一样组件好比一个source和一个sink都会有Transaction.close操做,将会关闭事务,那关闭晚的还如何commit?咱们再来看下getTransaction()代码:  ide

 1   /**
 2    * <p>
 3    * Initializes the channel if it is not already, then checks to see
 4    * if there is an open transaction for this thread, creating a new
 5    * one via <code>createTransaction</code> if not.
 6    * @return the current <code>Transaction</code> object for the
 7    *     calling thread
 8    * </p>
 9    */
10   @Override
11   public Transaction getTransaction() {
12 
13     if (!initialized) {
14       synchronized (this) {
15         if (!initialized) {
16           initialize();
17           initialized = true;
18         }
19       }
20     }
21 
22     BasicTransactionSemantics transaction = currentTransaction.get();
23     if (transaction == null || transaction.getState().equals(
24             BasicTransactionSemantics.State.CLOSED)) {
25       transaction = createTransaction();
26       currentTransaction.set(transaction);
27     }
28     return transaction;
29   }

  上面咱们能够看出来,若是transaction还未初始化或者transaction的状态是CLOSED(就是执行了close()方法改了状态),说明须要经过createTransaction()新建一个Transaction,createTransaction()这个方法在子类中实现的。咱们来看看mem和file的createTransaction()方法的代码,先看mem的:高并发

1 @Override
2   protected BasicTransactionSemantics createTransaction() {
3     return new MemoryTransaction(transCapacity, channelCounter);
4   }

  直接就返回了本身的Transaction对象,在看file的createTransaction()方法的代码:性能

 1 @Override
 2   protected BasicTransactionSemantics createTransaction() {
 3     if(!open) {
 4       String msg = "Channel closed " + channelNameDescriptor;
 5       if(startupError != null) {
 6         msg += ". Due to " + startupError.getClass().getName() + ": " +
 7             startupError.getMessage();
 8         throw new IllegalStateException(msg, startupError);
 9       }
10       throw new IllegalStateException(msg);
11     }
12     FileBackedTransaction trans = transactions.get();
13     if(trans != null && !trans.isClosed()) {        //在这保证put和take只能一个时刻有一个
14       Preconditions.checkState(false,
15           "Thread has transaction which is still open: " +
16               trans.getStateAsString()  + channelNameDescriptor);
17     }
18     trans = new FileBackedTransaction(log, TransactionIDOracle.next(),
19         transactionCapacity, keepAlive, queueRemaining, getName(),
20         channelCounter);
21     transactions.set(trans);
22     return trans;
23   }

  这个就比mem的复杂了点,毕竟代码多了很多。优化

  在看上面的getTransaction()方法,若是已经建立了一个Transaction则会放入currentTransaction中,而后之后再调用getTransaction()就会经过currentTransaction返回currentTransaction.get(),这莫不是同一个Transaction吗?那就好像有点不对了,对吧,那究竟是怎么回事呢?this

  关键在于currentTransaction这个东西,咱们看声明:private ThreadLocal<BasicTransactionSemantics> currentTransaction = new ThreadLocal<BasicTransactionSemantics>()是ThreadLocal的实例,可能有不少人不了解这个东西,其实我也不了解!!简单来讲:ThreadLocal使得各线程可以保持各自独立的一个对象,ThreadLocal并非一个Thread,而是Thread的局部变量,为解决多线程程序的并发问题提供了一种新的思路,详细请谷歌、百度之。ThreadLocal有一个ThreadLocalMap静态内部类,你能够简单理解为一个MAP,这个‘Map’为每一个线程复制一个变量的‘拷贝’存储其中,这个“Map”的key是当前线程的ID,value就是set的变量,而get方法会依据当前线程ID从ThreadLocalMap中获取对应的变量,我们这里就是Transaction。这下明白了吧,每一个source和sink都会有单独的线程来驱动的,因此都有各自的Transaction,是不一样的,所以也就能够并发了(针对memory channel)。

  可是上面file的createTransaction()方法为何是那样的?由于咱们说了file的Transaction不能同时put和take(同一个Transaction通常只会作一个事就是put或者take),也就是不存在并发性的,因此在file channel中的transactions也设置为了private final ThreadLocal<FileBackedTransaction> transactions =new ThreadLocal<FileBackedTransaction>(),因为channel也是单独的线程驱使,因此这个transactions中始终只存在一对值就是file channel的线程ID和建立的Transaction,若是不一样sink或者source调用getTransaction()时会试图经过createTransaction()方法来建立新的Transaction可是file的createTransaction()方法因为已经有了一个Transaction,在其关闭以前是不会赞成 再次建立的,因此就只能等待这个Transaction关闭了,所以也就保证了put和take不会同时存在了。也就没有并发性了,性能天然大受影响。

  那file channel为何不让put和take同时操做呢?这个问题很值得研究,一:put、take、commit、rollback都会获取log的共享锁,一旦获取其余就只能读,获取锁的目的就是这四个操做都要写入log文件;二,put操做并不会将写入log的event的指针放到queue中,而是在commit中才会放到queue中;3、take的操做会直接从queue中取数据,这时若是put已经commit就能够获取数据,若是没有则会返回null;4、因为四个操做都会获取log锁,致使实际上写达不到并发,并且这个log锁使得即便是写不一样的数据文件也不可能,由于只有这一个锁,不是每一个数据文件一个锁(数据文件的个数是动态的这个很差作);5、若take和put同时操做会使得可能交替执行获取锁,此时可能put没commit而queue中无数据,take获取锁以后也没什么意义并且也是轮流不是并行,只会下降put和take的性能,好比put和take各自单独只需1s便可,可是这样可能须要2s甚至更长时间(take一直在等待put的commit)才能完成。综上不让put和take同时操做比较合理。

  可是有没有更好的方案能够提升file的性能呢?由于file是基于文件的性能不可能很高,更为合理的办法是合理提升并发性,能够优化的一个方案是put、take、commit、rollback单独以文件存放,并设置相应的多个锁,可是文件的动态变化以及put和put、take和take、commit和commit、rollback和rollback之间的并发性又难以实现了,彷佛只适合take和put的并发,这样貌似会使得file channel更复杂了,可是性能应该会提升一些,会不会得不偿失啊?

 

  还有一个问题就是:file channel中的createTransaction()方法若是再次建立Transaction,而先前建立的并未关闭,会执行Preconditions.checkState(false,"Thread has transaction which is still open: " +trans.getStateAsString()+ channelNameDescriptor)会直接抛出异常,可是彷佛日志中没有相似的异常啊,并且进程也并未中断,可是显然使用了file channel的flume,sink和source能够正常运行,这是怎么搞得?

相关文章
相关标签/搜索