ZStack源码剖析之核心库鉴赏——FlowChain

本文首发于泊浮目的专栏: https://segmentfault.com/blog...

前言

在ZStack(或者说产品化的IaaS软件)中的任务一般有很长的执行路径,错误可能发生在路径的任意一处。为了保证系统的正确性,需提供一种较为完善的回滚机制——在ZStack中,经过一个工做流引擎,ZStack的每个步骤都被包裹在独立的工做流中,能够在出错的时候回滚。此外,经过在配置文件中组装工做流的方式,关键的执行路径能够被配置,这使得架构的耦合度进一步下降。java

系统解耦合的手段除了以前文章所提到的分层、分割、分布等,还有一个重要手段是异步,业务之间的消息传递不是同步调用,而是将一个业务操做分红多个阶段,每一个阶段之间经过共享数据的方式异步执行进行协做。

这便是一种在业务设计原则中——流程可定义原则的具象化。接触过金融行业的同窗确定知道,不一样的保险理赔流程是不同的。而承保流程和理赔流程是分离的,在须要时进行关联,从而能够复用一些理赔流程,并提供一些个性化理赔流程。git

演示代码

就以建立VM为例,在ZStack中大体能够分如下几个步骤:github

<bean id="VmInstanceManager" class="org.zstack.compute.vm.VmInstanceManagerImpl">
        <property name="createVmWorkFlowElements">
            <list>
                <value>org.zstack.compute.vm.VmImageSelectBackupStorageFlow</value>
                <value>org.zstack.compute.vm.VmAllocateHostFlow</value>
                <value>org.zstack.compute.vm.VmAllocatePrimaryStorageFlow</value>
                <value>org.zstack.compute.vm.VmAllocateVolumeFlow</value>
                <value>org.zstack.compute.vm.VmAllocateNicFlow</value>
                <value>org.zstack.compute.vm.VmInstantiateResourcePreFlow</value>
                <value>org.zstack.compute.vm.VmCreateOnHypervisorFlow</value>
                <value>org.zstack.compute.vm.VmInstantiateResourcePostFlow</value>
            </list>
        </property>
<!--  还有不少,介于篇幅再也不列出 -->

能够说是代码即文档了。在这里,ZStack显式声明这些Flow在Spring XML中,这些属性将会被注入到createVmWorkFlowElements中。每个Flow都被拆成了一个个较小的单元,好处不只是将业务操做分红了多个阶段易于回滚,仍是能够有效复用这些Flow。这也是编程思想中“组合”的体现。spring

如何使用

除了这种配置型声明,还能够在代码中灵活的使用这些FlowChain。在这里,咱们将以Case来讲明这些FlowChain的用法,避免对ZStack业务逻辑不熟悉的读者看的一头雾水。编程

一共有两种可用的FlowChain:segmentfault

  • SimpleFlowChain
  • ShareFlowChain

SimpleFlowChain

咱们先来看一个Casepromise

@Test
    public void test() {
        FlowChain chain = FlowChainBuilder.newShareFlowChain();

        chain.then(new ShareFlow() {
            int a;

            @Override
            public void setup() {
                flow(new NoRollbackFlow() {
                    @Override
                    public void run(FlowTrigger trigger, Map data) {
                        a = 1;
                        increase();
                        trigger.next();
                    }
                });

                flow(new NoRollbackFlow() {
                    @Override
                    public void run(FlowTrigger trigger, Map data) {
                        a = 2;
                        increase();
                        trigger.next();
                    }
                });
            }
        }).done(new FlowDoneHandler(null) {
            @Override
            public void handle(Map data) {
                success = true;
            }
        }).start();

        Assert.assertTrue(success);
        expect(2);
    }

咱们能够看到,这就是一个工做流。完成一个工做流的时候(回调触发时)执行下一个工做流——由trigger.next触发。不只如此,还能够添加Rollback属性架构

@Test
    public void test() throws WorkFlowException {
        final int[] count = {0};

        new SimpleFlowChain()
                .then(new Flow() {
                    @Override
                    public void run(FlowTrigger chain, Map data) {
                        count[0]++;
                        chain.next();
                    }

                    @Override
                    public void rollback(FlowRollback chain, Map data) {
                        count[0]--;
                        chain.rollback();
                    }
                })
                .then(new Flow() {
                    @Override
                    public void run(FlowTrigger chain, Map data) {
                        count[0]++;
                        chain.next();
                    }

                    @Override
                    public void rollback(FlowRollback chain, Map data) {
                        count[0]--;
                        chain.rollback();
                    }
                })
                .then(new Flow() {
                    @Override
                    public void run(FlowTrigger chain, Map data) {
                        chain.fail(null);
                    }

                    @Override
                    public void rollback(FlowRollback chain, Map data) {
                        count[0]--;
                        chain.rollback();
                    }
                })
                .start();

        Assert.assertEquals(-1, count[0]);
    }

rollback由FlowTrigger的fail触发。这样咱们能够保证在发生一些错误的时候及时回滚,防止咱们的系统处于一个有脏数据的中间状态。同时,Map也能够用来在Flow之间传递上下文。异步

ShareFlowChain

public class TestShareFlow {
    int[] count = {0};
    boolean success;

    private void increase() {
        count[0]++;
    }

    private void decrease() {
        count[0]--;
    }

    private void expect(int ret) {
        Assert.assertEquals(count[0], ret);
    }

    @Test
    public void test() {
        FlowChain chain = FlowChainBuilder.newShareFlowChain();

        chain.then(new ShareFlow() {
            int a;

            @Override
            public void setup() {
                flow(new NoRollbackFlow() {
                    @Override
                    public void run(FlowTrigger trigger, Map data) {
                        a = 1;
                        increase();
                        trigger.next();
                    }
                });

                flow(new NoRollbackFlow() {
                    @Override
                    public void run(FlowTrigger trigger, Map data) {
                        a = 2;
                        increase();
                        trigger.next();
                    }
                });
            }
        }).done(new FlowDoneHandler(null) {
            @Override
            public void handle(Map data) {
                success = true;
            }
        }).start();

        Assert.assertTrue(success);
        expect(2);
    }


    @Before
    public void setUp() throws Exception {
        new BeanConstructor().build();
    }
}

比起SimpleFlowChain,ShareFlowChain则是一个Inner class,在相同的做用域里,传递数据变得更加的方便了。ide

它的实现

在ZStack中,FlowChain做为核心库,其实现也是很是的简单(能够直接参考SimpleFlowChainShareFlowChain),本质就是将任务放入List中,由内部方法进行迭代,在此基础上作了一系列操做。下面将开始分析它的源码。

从接口提及

public interface FlowChain {
    List<Flow> getFlows();

    FlowChain insert(Flow flow);

    FlowChain insert(int pos, Flow flow);

    FlowChain setFlowMarshaller(FlowMarshaller marshaller);

    FlowChain then(Flow flow);

    FlowChain done(FlowDoneHandler handler);

    FlowChain error(FlowErrorHandler handler);

    FlowChain Finally(FlowFinallyHandler handler);

    FlowChain setData(Map data);

    FlowChain putData(Map.Entry... es);

    FlowChain setName(String name);

    void setProcessors(List<FlowChainProcessor> processors);

    Map getData();

    void start();

    FlowChain noRollback(boolean no);

    FlowChain allowEmptyFlow();
}

接口的名字很是的易懂,那么在这里就很少做解释了。FlowChain仅仅定义了一个Flow最小应有的行为。

//定义了Flow的回滚操做接口
public interface FlowRollback extends AsyncBackup {
    //回滚操做
    void rollback();
    //设置跳过回滚操做
    void skipRestRollbacks();
}
//定义了触发器的行为接口
public interface FlowTrigger extends AsyncBackup {
    //触发失败,调用errorHandle
    void fail(ErrorCode errorCode);
    //触发下一个flow
    void next();
    //setError后,在下次调用next的时才会调用errorHandle
    void setError(ErrorCode error);
}

源码解析

Flow

public interface Flow {
    void run(FlowTrigger trigger, Map data);

    void rollback(FlowRollback trigger, Map data);
}

Flow的定义其实很是的简单——一组方法。执行和对应的回滚,通常在ZStack中都以匿名内部类的方式传入。

Chain的用法

在以前的SimpleFlowChain的case中。咱们能够看到一系列的链式调用,大体以下:

new SimpleFlowChain().then(new flow()).then(new flow()).then(new flow()).start();

then本质是往List<flow> flows里添加一个flow。

public SimpleFlowChain then(Flow flow) {
        flows.add(flow);
        return this;
    }

再来看看start

@Override
    public void start() {
        // 检测flow中是否设置了processors。通常用来打trace
        if (processors != null) {
            for (FlowChainProcessor p : processors) {
                p.processFlowChain(this);
            }
        }
        //若是flows为空可是以前在设置中容许为空,那么就直接直接done部分的逻辑。否则就报错
        if (flows.isEmpty() && allowEmptyFlow) {
            callDoneHandler();
            return;
        }

        if (flows.isEmpty()) {
            throw new CloudRuntimeException("you must call then() to add flow before calling start() or allowEmptyFlow() to run empty flow chain on purpose");
        }
        //每一个flow必须有一个map,用来传递上下文
        if (data == null) {
            data = new HashMap<String, Object>();
        }
        //标记为已经开始
        isStart = true;
        //若是没有名字的话给flow 取一个名字,由于颇有多是匿名使用的flow
        if (name == null) {
            name = "anonymous-chain";
        }

        logger.debug(String.format("[FlowChain(%s): %s] starts", id, name));
        //打印trace,方便调试
        if (logger.isTraceEnabled()) {
            List<String> names = CollectionUtils.transformToList(flows, new Function<String, Flow>() {
                @Override
                public String call(Flow arg) {
                    return String.format("%s[%s]", arg.getClass(), getFlowName(arg));
                }
            });
            logger.trace(String.format("execution path:\n%s", StringUtils.join(names, " -->\n")));
        }
        //生成一个迭代器
        it = flows.iterator();
        //从it中获取一个不须要跳过的flow开始执行。若是没有获取到,就执行done逻辑
        Flow flow = getFirstNotSkippedFlow();
        if (flow == null) {
            // all flows are skipped
            callDoneHandler();
        } else {
            runFlow(flow);
        }
    }

再来看一下runFlow中的代码

private void runFlow(Flow flow) {
        try {
            //看报错信息就能够猜到在作什么防护措施了:若是一个transaction在一个flow中没有被关闭而跳到下一个flow时,会抛出异常。这个防护机制来自于一个实习生写的bug,当时被排查出来的时候花了很是大的力气——现象很是的诡异。因此如今被写在了这里。
            if (TransactionSynchronizationManager.isActualTransactionActive()) {
                String flowName = null;
                String flowClassName = null;
                if (currentFlow != null) {
                    flowName = getFlowName(currentFlow);
                    flowClassName = currentFlow.getClass().getName();
                }

                throw new CloudRuntimeException(String.format("flow[%s:%s] opened a transaction but forgot closing it", flowClassName, flowName));
            }
            //toRun就是一个当前要run的flow
            Flow toRun = null;
            if (flowMarshaller != null) {
            //flowMarshaller 其实是一个很是恶心的玩意儿。尤为在一些配置好掉的xml flow忽然由于一些条件而改变接下来执行的flow使人很无语...可是也提供了一些灵活性。
                toRun = flowMarshaller.marshalTheNextFlow(currentFlow == null ? null : currentFlow.getClass().getName(),
                        flow.getClass().getName(), this, data);
                if (toRun != null) {
                    logger.debug(String.format("[FlowChain(%s): %s] FlowMarshaller[%s] replaces the next flow[%s] to the flow[%s]",
                            id, name, flowMarshaller.getClass(), flow.getClass(), toRun.getClass()));
                }
            }
       
            if (toRun == null) {
                toRun = flow;
            }

            if (CoreGlobalProperty.PROFILER_WORKFLOW) {
                //对flow的监视。好比flow的执行时间等
                stopWatch.start(toRun);
            }

            currentFlow = toRun;

            String flowName = getFlowName(currentFlow);
            String info = String.format("[FlowChain(%s): %s] start executing flow[%s]", id, name, flowName);
            logger.debug(info);
            //在flow中还容许定义afterDone afterError afterFinal的行为。稍后将会介绍
            collectAfterRunnable(toRun);
            //终于到了run,这里就是调用者传入的行为来决定run中的逻辑
            toRun.run(this, data);
             //fail的逻辑稍后解析
        } catch (OperationFailureException oe) {
            String errInfo = oe.getErrorCode() != null ? oe.getErrorCode().toString() : "";
            logger.warn(errInfo, oe);
            fail(oe.getErrorCode());
        } catch (FlowException fe) {
            String errInfo = fe.getErrorCode() != null ? fe.getErrorCode().toString() : "";
            logger.warn(errInfo, fe);
            fail(fe.getErrorCode());
        } catch (Throwable t) {
            logger.warn(String.format("[FlowChain(%s): %s] unhandled exception when executing flow[%s], start to rollback",
                    id, name, flow.getClass().getName()), t);
            fail(errf.throwableToInternalError(t));
        }
    }

fail

@Override
    public void fail(ErrorCode errorCode) {
        isFailCalled = true;
        setErrorCode(errorCode);
        //放入Stack中,以后Rollback会根据Stack中的flow顺序来
        rollBackFlows.push(currentFlow);
        //rollback会对this.rollBackFlows中flow按照顺序调用rollback
        rollback();
    }

FlowTrigger

//定义了触发器的行为接口
public interface FlowTrigger extends AsyncBackup {
    //触发失败,调用errorHandle
    void fail(ErrorCode errorCode);
    //触发下一个flow
    void next();
    //setError后,在下次调用next的时才会调用errorHandle
    void setError(ErrorCode error);
}

以前已经看过fail的代码。接下来来看看nextsetError

@Override
    public void next() {
        //若是flow没有run起来的状况下,是不能调用next的
        if (!isStart) {
            throw new CloudRuntimeException(
                    String.format("[FlowChain(%s): %s] you must call start() first, and only call next() in Flow.run()",
                            id, name));
        }
        //当rollback开始的时候也不容许next
        if (isRollbackStart) {
            throw new CloudRuntimeException(
                    String.format("[FlowChain(%s): %s] rollback has started, you can't call next()", id, name));
        }
        //将当前flow的push进rollback用的stack
        rollBackFlows.push(currentFlow);

        logger.debug(String.format("[FlowChain(%s): %s] successfully executed flow[%s]", id, name, getFlowName(currentFlow)));
        //获取下一个flow。在这里才是真正意义上的next
        Flow flow = getFirstNotSkippedFlow();
        if (flow == null) {
            // no flows, or all flows are skipped
            if (errorCode == null) {
                callDoneHandler();
            } else {
                callErrorHandler(false);
            }
        } else {
            runFlow(flow);
        }
    }

能够看一下getFirstNotSkippedFlow,本质上是利用了迭代器的特性。

private Flow getFirstNotSkippedFlow() {
        Flow flow = null;
        while (it.hasNext()) {
            flow = it.next();
            if (!isSkipFlow(flow)) {
                break;
            }
        }

        return flow;
    }

接下来是setError

@Override
    public void setError(ErrorCode error) {
        setErrorCode(error);
    }

//往下看
    private void setErrorCode(ErrorCode errorCode) {
        this.errorCode = errorCode;
    }

根据以前的next逻辑:

if (flow == null) {
            // no flows, or all flows are skipped
            if (errorCode == null) {
                callDoneHandler();
            } else {
                callErrorHandler(false);
            }
        } else {
            runFlow(flow);
        }

咱们能够大体猜测到,若是在next的时候当前error不为空,则调用错误handle。这样在setError后还能够作一些事情。

不管是调用errorHandle仍是doneHandle,都会调用finalHandle。finalHandle也容许用户定义这部分的逻辑,使flow更加的灵活。

更好的选择

因为该库是为ZStack定制而生,故此有一些防护性判断,源码显得略为verbose。若是有同窗对此感兴趣,想将其应用到本身的系统中,笔者推荐使用:jdeferred

Java Deferred/Promise library similar to JQuery

因为JavaScript 中的代码都是异步调用的。简单说,它的思想是,每个异步任务返回一个Promise对象,该对象有一个then方法,容许指定回调函数。

在这里列出几个较为简单的示范,或者有兴趣的读者也能够参考这里

import org.jdeferred.DeferredManager;
import org.jdeferred.Promise;
import org.jdeferred.impl.DefaultDeferredManager;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

import java.util.concurrent.TimeUnit;


public class deferSimpleTest {

    private static int var = 0;
    final DeferredManager dm = new DefaultDeferredManager();

    @After
    public void cleanUp() {
        var = 0;
    }


    @Test
    public void test() {
        Promise p1 = dm.when(() -> {
            var += 1;
        }).then(result -> {
            var += 1;
        });

        Promise p2 = dm.when(() -> {
            var += 1;
        }).then(result -> {
            var += 1;
        });

        dm.when(p1, p2).done(Void -> var += 1);
        Assert.assertEquals(5, var);
    }

    @Test
    public void test2() {
        final DeferredManager dm = new DefaultDeferredManager();

        Promise promise = dm.when(() -> {
                var += 1;
            }).then(result -> {
                var += 1;
            });

        dm.when(promise).done(Void -> var += 1);
        Assert.assertEquals(3, var);
    }

    @Test
    public void testBadCallback() {
        Promise promise = dm.when(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        dm.when(promise).done(Void -> {
                    var += 1;
                    throw new RuntimeException("this exception is expected");
                }
        ).fail(Void -> {
            System.out.print("fail!");
            var -= 1;
        });
        Assert.assertEquals(0, var);

    }
}

若是你在使用Java8,那么也能够经过CompletableFuture来获得“相似”的支持。

相关文章
相关标签/搜索