开发工做中,常常会碰到进程池或者线程池,或者其它的资源池。在这里,用erlang实现一个简单的进程池。api
erlang进程是很是轻量级的,这个进程池的主要目的是用一种通用的方式去管理和限制系统中运行的资源占用。当运行的工做者进程数量达到上限,进程池还能够把任务放到队列中,只要进程资源被释放,排队的任务就能得到运行,不然任务只能阻塞。async
这是进程池的监督树:atom
ppool_supersup监督着全部的进程池。一个进程池由ppool_sup、ppool_serv和worker_sup监督的工做者进程池组成。ppool_serv提供对外的进程池调用api,ppool_sup负责监督单个进程池。线程
下面是实现代码。3d
%% ppool_supersup -module(ppool_supersup). -behavior(supervisor). -export([start_link/0, stop/0, start_pool/3, stop_pool/1]). -export([init/1]). start_link() -> supervisor:start_link({local, ppool}, ?MODULE, []). stop() -> case whereis(ppool) of P when is_pid(P) -> exit(P, kill); _ -> ok end. init([]) -> MaxRestart = 6, MaxTime = 3600, {ok, {{one_for_one, MaxRestart, MaxTime}, []}}. start_pool(Name, Limit, MFA) -> %%每一个进程池的最大终止时间设置为10500,这个值并无什么特殊意义,只是保证足够大,全部子进程都有足够的时间终止。若是实在不知道设置为多大,能够试试infinity。 ChildSpec = {Name, {ppool_sup, start_link, [Name, Limit, MFA]}, permanent, 10500, supervisor, [ppool_sup]}, supervisor:start_child(ppool, ChildSpec). stop_pool(Name) -> supervisor:terminate_child(ppool, Name), supervisor:delete_child(ppool, Name).
%% ppool_sup -module(ppool_sup). -export([start_link/3, init/1]). -behavior(supervisor). start_link(Name, Limit, MFA) -> supervisor:start_link(?MODULE, {Name, Limit, MFA}). init({Name, Limit, MFA}) -> MaxRestart = 1, MaxTime = 3600, {ok, {{one_for_all, MaxRestart, MaxTime}, [{serv, {ppool_serv, start_link, [Name, Limit, self(), MFA]}, permanent, 5000, worker, [ppool_serv]}]}}.
%% ppool_worker_sup -module(ppool_worker_sup). -export([start_link/1, init/1]). -behavior(supervisor). start_link(MFA = {_, _, _}) -> supervisor:start_link(?MODULE, MFA). init({M, F, A}) -> MaxRestart = 5, MaxTime = 3600, {ok, {{simple_one_for_one, MaxRestart, MaxTime}, [{ppool_worker, {M, F, A}, temporary, 5000, worker, [M]}]}}.
ppool_serv是最复杂的一个模块了。由于ppool_serv对外提供接口,它须要能联系到worker_sup。若是由ppool_sup同时启动ppool_serv和worker_sup,存在乱序的风险,除非都注册进程名。但erlang中对于原子的使用必定要慎重,能少用就少用。因此在这儿,由ppool_serv动态添加worker_sup到ppool_sup。code
ppool_serv提供了三种添加任务的方式:orm
%% ppool_serv -module(ppool_serv). -behavior(gen_server). -export([start/4, start_link/4, run/2, sync_queue/2, async_queue/2, stop/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). -define(SPEC(MFA), {worker_sup, {ppool_worker_sup, start_link, [MFA]}, permanent, 10000, supervisor, [ppool_woker_sup]}). -record(state, {limit = 0, sup, refs, queue = queue:new()}). start(Name, Limit, Sup, MFA) when is_atom(Name), is_integer(Limit) -> gen_server:start({local, Name}, ?MODULE, {Limit, MFA, Sup}, []). start_link(Name, Limit, Sup, MFA) when is_atom(Name), is_integer(Limit) -> gen_server:start_link({local, Name}, ?MODULE, {Limit, MFA, Sup}, []). run(Name, Args) -> gen_server:call(Name, {run, Args}). sync_queue(Name, Args) -> gen_server:call(Name, {sync, Args}, infinity). async_queue(Name, Args) -> gen_server:cast(Name, {async, Args}). stop(Name) -> gen_server:call(Name, stop). init({Limit, MFA, Sup}) -> self() ! {start_worker_supervisor, Sup, MFA}, {ok, #state{limit = Limit, refs = gb_sets:empty()}}. handle_info({'DOWN', Ref, process, _Pid, _}, S = #state{refs = Refs}) -> case gb_sets:is_element(Ref, Refs) of true -> handle_down_worker(Ref, S); false -> {noreply, S} end; handle_info({start_worker_supervisor, Sup, MFA}, S = #state{}) -> {ok, Pid} = supervisor:start_child(Sup, ?SPEC(MFA)), link(Pid), {noreply, S#state{sup = Pid}}; handle_info(_Msg, State) -> {noreply, State}. handle_call({run, Args}, _From, S = #state{limit = N, sup = Sup, refs = R}) when N > 0 -> {ok, Pid} = supervisor:start_child(Sup, Args), Ref = erlang:monitor(process, Pid), {reply, {ok, Pid}, S#state{limit = N - 1, refs = gb_sets:add(Ref, R)}}; handle_call({run, _Args}, _From, S = #state{limit = N}) when N =< 0 -> {reply, noalloc, S}; handle_call({sync, Args}, _From, S = #state{limit=N, sup=Sup, refs=R}) when N > 0 -> {ok, Pid} = supervisor:start_child(Sup, Args), Ref = erlang:monitor(process, Pid), {reply, {ok,Pid}, S#state{limit=N-1, refs=gb_sets:add(Ref,R)}}; handle_call({sync, Args}, From, S = #state{queue=Q}) -> {noreply, S#state{queue=queue:in({From, Args}, Q)}}; handle_call(stop, _From, State) -> {stop, normal, ok, State}; handle_call(_Msg, _From, State) -> {noreply, State}. handle_cast({async, Args}, S=#state{limit=N, sup=Sup, refs=R}) when N > 0 -> {ok, Pid} = supervisor:start_child(Sup, Args), Ref = erlang:monitor(process, Pid), {noreply, S#state{limit=N-1, refs=gb_sets:add(Ref,R)}}; handle_cast({async, Args}, S=#state{limit=N, queue=Q}) when N =< 0 -> {noreply, S#state{queue=queue:in(Args,Q)}}; handle_cast(_Msg, State) -> {noreply, State}. code_change(_OldVsn, State, _Extra) -> {ok, State}. terminate(_Reason, _State) -> ok. handle_down_worker(Ref, S = #state{limit = L, sup = Sup, refs = Refs}) -> case queue:out(S#state.queue) of {{value, {From, Args}}, Q} -> {ok, Pid} = supervisor:start_child(Sup, Args), NewRef = erlang:monitor(process, Pid), NewRefs = gb_sets:insert(NewRef, gb_sets:delete(Ref, Refs)), gen_server:reply(From, {ok, Pid}), {noreply, S#state{refs = NewRefs, queue = Q}}; {{value, Args}, Q} -> {ok, Pid} = supervisor:start_child(Sup, Args), NewRef = erlang:monitor(process, Pid), NewRefs = gb_sets:insert(NewRef, gb_sets:delete(Ref, Refs)), {noreply, S#state{refs = NewRefs, queue = Q}}; {empty, _} -> {noreply, S#state{limit = L + 1, refs = gb_sets:delete(Ref, Refs)}} end.