mnesia里的lamport clock

mnesia使用"wait-die"机制预防死锁,"wait-die"是基于时间戳的,mnesia采用了lamport clock算法做为"wait-die"机制的时间戳。 java

Lamport clock是解决分布式系统中事件发生时序的一种方式。 Lamport定义了一个关系叫作happens-before,记为-> 。a->b意味着全部的进程都赞成事件a发生在事件b以前。在如下两种状况下,能够很容易的获得这个关系: node

1)若是事件a和事件b是同一个进程中的而且事件a发生在事件b前面,那么a->b 算法

2)若是进程A发送一条消息m给进程B,a表明进程A发送消息m的事件,b表明进程B接收消息m的事件,那么a->b 网络

Lamport logical clock算法大体实现为: app

每一个进程Pi维护一个本地计数器Ci,至关于logical clock,按照如下规则更新Ci: 分布式

1)每次执行一个事件(例如经过网络发送消息,或者将消息交换给应用层,或者其余的一些内部事件 )以前,将Ci加1 oop

2)当进程Pi发送消息m给Pj的时候,在消息m上附上Ci code

3)当接收进程Pj接收到进程Pi发送的消息时,更新本身的Cj = max{Cj,Ci} 进程

Lamport论文连接:http://www.stanford.edu/class/cs240/readings/lamport.pdf 事件

===========================================================

在mnesia中充当这个计数器的就是事务ID,事务ID统一由mnesia_tm进程负责初始化和更新。mnesia_locker进程就是根据比较事务ID的大小来执行"wait-die"机制和防止饿死现象的。

allowed_to_be_queued(WaitForTid, Tid) ->
    case get(pid_sort_order) of
	undefined -> WaitForTid > Tid;
	r9b_plain ->
	    cmp_tid(true, WaitForTid, Tid) =:= 1;
	standard  ->
	    cmp_tid(false, WaitForTid, Tid) =:= 1
    end.
mnesia主要在如下几种状况下更新事务ID

1)本节点请求开始新事务时

在执行一个新事务前,会向mnesia_tm进程获取一个事务ID和临时存储空间,mnesia_tm收到请求后会将事务ID计数器加1并赋给该事务。

mnesia_tm.erl:

doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor=Sup}=State) ->
    receive
    ...
    {From, start_outer} ->
        case catch ?ets_new_table(mnesia_trans_store, [bag, public]) of
        Etab ->
            tmlink(From),
            C = mnesia_recover:incr_trans_tid_serial(),
            ?ets_insert(Etab, {nodes, node()}),
            Tid = #tid{pid = tmpid(From), counter = C},
            ...

mnesia_recover.erl:

incr_trans_tid_serial() ->
    ?ets_update_counter(mnesia_decision, serial, 1).
2)参与其余节点发起的事务结束时

事务涉及多个节点时,事务发起者会将带有本节点事务ID的信息告诉全部参与的节点。当参与节点处理事务结束时,无论是正常结束仍是异常结束,都会更新本节点的事务计数器。

mnesia_tm.erl:

transaction_terminated(Tid)  ->
    mnesia_checkpoint:tm_exit_pending(Tid),
    Pid = Tid#tid.pid,
    if
	node(Pid) == node() ->
	    unlink(Pid);
	true ->  %% Do the Lamport thing here
	    mnesia_recover:sync_trans_tid_serial(Tid)
    end.

mnesia_recover.erl:

sync_trans_tid_serial(ThatCounter) when is_integer(ThatCounter) ->
    ThisCounter = trans_tid_serial(),
    if
	ThatCounter > ThisCounter ->
	    set_trans_tid_serial(ThatCounter + 1);
	true ->
	    ignore
    end;
sync_trans_tid_serial(Tid) ->
    sync_trans_tid_serial(Tid#tid.counter).

set_trans_tid_serial(Val) ->
    ?ets_insert(mnesia_decision, {trans_tid, serial, Val}).
其余状况:例如,mnesia启动进行dump时、事务过程当中出现异常,向其余节点询问事务结果时也都会进行事务计数器的更新。 
相关文章
相关标签/搜索