Erlang Process input queue

http://www.cnblogs.com/me-sa/archive/2011/11/05/erlang0012.htmlhtml

     Erlang进程有本身的消息队列来保存接收到的消息,新接收到的消息放在队列的尾部。Erlang的接收原语receive就是用来从消息队列中选择性提取消息的。receive提取消息的过程是:从消息队列的第一条消息开始匹配,若是有一条消息能够匹配上就从消息队列中移除,并执行相应的消息处理逻辑。若是没有模式能够匹配消息队列中的消息,这条消息就会保留在消息队列中。若是检查到消息队列中的最后一条消息尚未找到能够匹配的消息,进程就会阻塞直到接收到下一条消息再一次出发提取过程。shell

    咱们能不能直观的看到这个过程呢?Erlang对运行时信息的提取提供了很好的支持,咱们要查看的是一个进程在运行时的信息,使用的方法:erlang:process_info/1 .这个方法接收的参数就是进程的PID,返回的数据结果:cookie

[{current_function,{looper,loop,0}},
{initial_call,{looper,loop,0}},
{status,waiting},
{message_queue_len,4},
{messages,[abcd,abcde,abcdef,abcdefg]},
{links,[]},{dictionary,[]},
{trap_exit,false},{error_handler,error_handler},
{priority,normal},{group_leader,<0.29.0>},
{total_heap_size,233},{heap_size,233},
{stack_size,1},{reductions,41},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]oop

    上面erlang:process_info(Pid).的执行结果中,红色标出就是消息队列中的消息数量,蓝色标出的就是消息队列的内容。下面咱们就经过一系列的Demo来理解Erlang进程消息队列的处理机制.测试

 

 消息队列堆积的状况优化

      咱们先把进行测试的脚手架代码准备好,逻辑很简单若是接收到abc消息就输出一下而后继续接收消息,这经过尾递归loop方法就能够实现。ui

-module(looper).this

-compile(export_all).spa

loop() ->
     receive 
        abc -> 
                  io:format("Receive abc. ~n "),
                  loop();
        stop-> 
                  io:format("stop"),
                  stop
      end.    rest

 

    模拟消息堆积的方法很简单,咱们不停向这个进程发送没法匹配的消息就能够了,而后咱们查看进程的运行时状态,下面是shell中执行的结果,你们看注释:

(demo@192.168.1.123)1> Pid= spawn(looper,loop,[]).    %%启动进程返回进程PID
<0.38.0>
(demo@192.168.1.123)2> Pid!abc.                              %%向进程发送abc消息
Receive abc.                                                              %% abc消息正常处理
abc
(demo@192.168.1.123)4> Pid!abcd.                            %%向进程发送消息abcd
abcd
(demo@192.168.1.123)5> Pid!abcde.                            %%进程发送消息abcde
abcde
(demo@192.168.1.123)6> Pid!abcdef.
abcdef
(demo@192.168.1.123)7> Pid!abcdefg.
abcdefg
(demo@192.168.1.123)9> erlang:process_info(Pid).      %%查看进程状态
[{current_function,{looper,loop,0}},
{initial_call,{looper,loop,0}},
{status,waiting},
{message_queue_len,4},
{messages,[abcd,abcde,abcdef,abcdefg]},              %%这里能看到积压的消息
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,1},
{reductions,41},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)10> Pid!haha.            %%再发送一条垃圾消息haha
haha
(demo@192.168.1.123)11> erlang:process_info(Pid).
[{current_function,{looper,loop,0}},
{initial_call,{looper,loop,0}},
{status,waiting},
{message_queue_len,5},
{messages,[abcd,abcde,abcdef,abcdefg,haha]},   %%能够看到haha消息被放在了消息队列的队尾
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,1},
{reductions,41},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)12>

 

按照优先级接收消息

 

下面的代码范例来自LYSE,能够看到首先是处理高优先级的消息,若是高优先级的消息处理完毕以后,处理低优先级的消息.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
important() ->
     receive
         {Priority, Message} when Priority > 10 ->
             [Message | important()]
     after 0 ->
         normal()
     end.
  
normal() ->
     receive
         {_, Message} ->
             [Message | normal()]
     after 0 ->
         []
     end.

  

 


  定时清理堆积的垃圾消息

   堆积的垃圾消息会慢慢吃掉内存,并且堆积的消息在Selective Receive过程当中会不断地被遍历检查,成为负担,咱们如今就添加一个定时清理堆积消息的逻辑:

-module(looper).

-compile(export_all).

      
%% P= spawn(looper,loop2,[]).      
%% erlang:process_info(P).
%%
loop2() -> 
     receive 
             abc -> 
                  io:format("Receive abc. ~n "),
                  loop2();
        stop-> 
                  io:format("stop"),
                  stop
         after 15000 -> 
             receive 
                      Any ->
                               io:format("Receive ~p ~n ",[ Any])
              end,         
        
                    io:format("clear . ~n "),
                    loop2()
      end. 

  作法也很简单,添加一个超时,超时以后用一个能够接收任意消息(Any)的receive代码段来从消息队列中提取一条消息.为了留出足够的时间来输入命令,咱们把超时时间定为15000(15s).好了,启动shell从新来过,你们仍是看我添加的注释:

D:\Erlang\Einaction>"C:\Program Files (x86)\erl5.8.2\bin\erl.exe" -name demo@192.168.1.123 -setcookie 123
Eshell V5.8.2 (abort with ^G)
(demo@192.168.1.123)1> P= spawn(looper,loop2,[]).  %使用loop2建立进程
<0.38.0>
(demo@192.168.1.123)2> P!abcd.                    %快速输入下面的命令向进程发送没法匹配的垃圾进程
abcd
(demo@192.168.1.123)3> P!abcde.
abcde
(demo@192.168.1.123)4> P!abcdef.
abcdef
(demo@192.168.1.123)5> P!abcdefg.
abcdefg
(demo@192.168.1.123)6> P!abcdefgg.
abcdefgg
(demo@192.168.1.123)7> erlang:process_info(P).Receive abcd      %咱们输入完erlang:process_info(P).的时候刚好遇到了清理逻辑执行
(demo@192.168.1.123)7> clear .
(demo@192.168.1.123)7>
[{current_function,{looper,loop2,0}},
{initial_call,{looper,loop2,0}},
{status,waiting},
{message_queue_len,4},
{messages,[abcde,abcdef,abcdefg,abcdefgg]}, %%除了已经被移除掉的abcd 其它垃圾消息还堆积在进程中
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,1},
{reductions,39},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)8> Receive abcde
(demo@192.168.1.123)8> clear .
(demo@192.168.1.123)8> erlang:process_info(P). %上面又执行了一次清理逻辑,咱们再次查看进程信息
[{current_function,{looper,loop2,0}},
{initial_call,{looper,loop2,0}},
{status,waiting},
{message_queue_len,3},
{messages,[abcdef,abcdefg,abcdefgg]},  %%看到了吧,又少了一条消息垃圾消息abcde
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,1},
{reductions,69},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)9> Receive abcdef
(demo@192.168.1.123)9> clear .
(demo@192.168.1.123)9> erlang:process_info(P).
[{current_function,{looper,loop2,0}},
{initial_call,{looper,loop2,0}},
{status,waiting},
{message_queue_len,2},
{messages,[abcdefg,abcdefgg]},
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,1},
{reductions,99},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)10> P!abc.
Receive abc.
abc
(demo@192.168.1.123)11> erlang:process_info(P).
[{current_function,{looper,loop2,0}},
{initial_call,{looper,loop2,0}},
{status,waiting},
{message_queue_len,2},
{messages,[abcdefg,abcdefgg]},
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,1},
{reductions,115},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)12> Receive abcdefg
(demo@192.168.1.123)12> clear .
(demo@192.168.1.123)12> Receive abcdefgg
(demo@192.168.1.123)12> clear .
(demo@192.168.1.123)12> erlang:process_info(P).
[{current_function,{looper,loop2,0}},
{initial_call,{looper,loop2,0}},
{status,waiting},
{message_queue_len,0},
{messages,[]},                   %好了,执行到这里消息队列已经清空
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,1},
{reductions,175},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)13>

 

 从消息队列中提取消息的慢镜头

  receive原语执行过程当中,遇到匹配的消息,提取消息进行处理的过程稍纵即逝,咱们如今经过添加sleep,来看看这个过程的慢镜头.注意下面代码的修改

-module(looper).

-compile(export_all).

     

%% P= spawn(looper,loop3,[]).      
%% erlang:process_info(P).
%%      
loop3() -> 
     receive 
             abc -> 
                  io:format("Receive abc. ~n "),
                  timer:sleep(10000),
                  io:format("sleep after receive abc done. ~n "),
                  loop3();
        stop-> 
                  io:format("stop"),
                  stop
         after 25000 -> 
             receive 
                      Any ->
                               io:format("Receive ~p ~n ",[ Any])
              end,                 
                    io:format("clear . ~n "),
                    loop3()
      end.                  

下面的shell中,咱们向进程发送了一批能够正常处理的abc消息,可是因为处理逻辑中的sleep,消息提取会被拖慢,这个时间咱们能够执行process_info

 

D:\Erlang\Einaction>"C:\Program Files (x86)\erl5.8.2\bin\erl.exe" -name demo@192.168.1.123 -setcookie 123
Eshell V5.8.2 (abort with ^G)
(demo@192.168.1.123)1> P= spawn(looper,loop3,[]).
<0.38.0>
(demo@192.168.1.123)2> erlang:process_info(P).
[{current_function,{looper,loop3,0}},
{initial_call,{looper,loop3,0}},
{status,waiting},
{message_queue_len,0},
{messages,[]},
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,1},
{reductions,9},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)3> P!abc.  
Receive abc.
abc
(demo@192.168.1.123)4> P!abc.
abc
(demo@192.168.1.123)5> P!abc.
abc
(demo@192.168.1.123)6> P!abc.
abc
(demo@192.168.1.123)7> P!abc.
abc
(demo@192.168.1.123)8> P!abc.
abc
(demo@192.168.1.123)9> P!abc.
abc
(demo@192.168.1.123)10> P!abc.
abc
(demo@192.168.1.123)11> P!abcd.
abcd
(demo@192.168.1.123)12> P!abcdd.
abcdd
(demo@192.168.1.123)13> erlang:process_info(P).sleep after receive abc done.
(demo@192.168.1.123)13> Receive abc.
(demo@192.168.1.123)13>
[{current_function,{timer,sleep,1}},
{initial_call,{looper,loop3,0}},
{status,waiting},
{message_queue_len,8},
{messages,[abc,abc,abc,abc,abc,abc,abcd,abcdd]},
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,3},
{reductions,66},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)14> erlang:process_info(P).
[{current_function,{timer,sleep,1}},
{initial_call,{looper,loop3,0}},
{status,waiting},
{message_queue_len,8},
{messages,[abc,abc,abc,abc,abc,abc,abcd,abcdd]},
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,3},
{reductions,66},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)15> sleep after receive abc done.
(demo@192.168.1.123)15> Receive abc.
(demo@192.168.1.123)15> sleep after receive abc done.
(demo@192.168.1.123)15> Receive abc.
(demo@192.168.1.123)15> erlang:process_info(P).
[{current_function,{timer,sleep,1}},
{initial_call,{looper,loop3,0}},
{status,waiting},
{message_queue_len,6},
{messages,[abc,abc,abc,abc,abcd,abcdd]},
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,3},
{reductions,131},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)16> sleep after receive abc done.
(demo@192.168.1.123)16> Receive abc.
(demo@192.168.1.123)16> sleep after receive abc done.
(demo@192.168.1.123)16> Receive abc.
(demo@192.168.1.123)16> sleep after receive abc done.
(demo@192.168.1.123)16> Receive abc.
(demo@192.168.1.123)16> sleep after receive abc done.
(demo@192.168.1.123)16> Receive abc.
(demo@192.168.1.123)16> sleep after receive abc done.
(demo@192.168.1.123)16> Receive abcd
(demo@192.168.1.123)16> clear .
(demo@192.168.1.123)16> erlang:process_info(P).
[{current_function,{looper,loop3,0}},
{initial_call,{looper,loop3,0}},
{status,waiting},
{message_queue_len,1},
{messages,[abcdd]},  %执行到这里只有一条垃圾数据堆积了
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,610},
{heap_size,233},
{stack_size,1},
{reductions,305},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,2}]},
{suspending,[]}]
(demo@192.168.1.123)17> Receive abcdd
(demo@192.168.1.123)17> clear .
(demo@192.168.1.123)17> erlang:process_info(P).
[{current_function,{looper,loop3,0}},
{initial_call,{looper,loop3,0}},
{status,waiting},
{message_queue_len,0},
{messages,[]},  %%消息队列已经清空
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,610},
{heap_size,233},
{stack_size,1},
{reductions,335},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,2}]},
{suspending,[]}]
(demo@192.168.1.123)18>

  日出而做子夜还,偶得周末半日闲,各位周末愉快!

 

2012-4-28 17:45补充

Process Data 
For each process there will be at least one =proc_stack and one =proc_heap tag followed by the raw memory information for the stack and heap of the process. 
For each process there will also be a =proc_messages tag if the process' message queue is non-empty and a =proc_dictionary tag if the process' dictionary (the put/2 and get/1 thing) is non-empty.

The raw memory information can be decoded by the Crashdump Viewer tool. You will then be able to see the stack dump, the message queue (if any) and the dictionary (if any). The stack dump is a dump of the Erlang process stack. Most of the live data (i.e., variables currently in use) are placed on the stack; thus this can be quite interesting. One has to "guess" what's what, but as the information is symbolic, thorough reading of this information can be very useful. As an example we can find the state variable of the Erlang primitive loader on line (5) in the example below:

(1) 3cac44 Return addr 0x13BF58 (<terminate process normally>) 
(2) y(0) 
["/view/siri_r10_dev/clearcase/otp/erts/lib/kernel/ebin","/view/siri_r10_dev/ 
(3) clearcase/otp/erts/lib/stdlib/ebin"] 
(4) y(1) <0.1.0> 
(5) y(2) 
{state,[],none,#Fun<erl_prim_loader.6.7085890>,undefined,#Fun<erl_prim_loader.7.900
0327>,#Fun<erl_prim_loader.8.116480692>,#Port<0.2>,infinity,#Fun<erl_prim_loader.9.
10708760>} 
(6) y(3) infinity 
When interpreting the data for a process, it is helpful to know that anonymous function objects (funs) are given a name constructed from the name of the function in which they are created, and a number (starting with 
0) indicating the number of that fun within that function.

 

2012-08-30 16:23 更新

 
      若是须要按照优先级收发消息,能够使用二叉堆(min_heap)或者gb_trees,接收到的消息填充到这样的结构里面(把优先级数值放在第一个key用来排序).使用的时候只须要检索最小或者最大值就能够了.大部分状况下这种方法均可以实现按优先级接受消息,但收到大量高优先级消息的状况就会变慢;从R14A开始,Erlang编译器有一个优化减小了消息接收时进程之间的反复通讯,在消息通讯之初会建立一个reference附加在往来的消息中,这样在reference建立成功以前自动过滤掉全部不包含这个Reference特征的消息.
 
复制代码
%% optimized in R14A
optimized(Pid) ->
    Ref = make_ref(),
    Pid ! {self(), Ref, hello},
    receive
        {Pid, Ref, Msg} ->
            io:format("~p~n", [Msg])
    end.
复制代码
相关文章
相关标签/搜索