Erlang源码阅读笔记之proc_lib 上篇

in 编程
关注公众号【好便宜】( ID:haopianyi222 ),领红包啦~
阿里云,国内最大的云服务商,注册就送数千元优惠券:https://t.cn/AiQe5A0g
腾讯云,良心云,价格优惠: https://t.cn/AieHwwKl
搬瓦工,CN2 GIA 优质线路,搭梯子、海外建站推荐: https://t.cn/AieHwfX9

概述

在erlang otp源码中,随处可见proc_lib的身影,可以发现,在otp中spawn一个进程,都不是通过erlang:spawn函数,而是通过proc_lib:spawn。那通过这俩库spawn出的进程有啥区别呢?我们要理解erlang otp其它组件的源码,必须要先去了解proc_lib做了什么事情。

官方文档对proc_lib的解释是这样的:

Functions for asynchronous and synchronous start of processes adhering to the OTP design principles.

就是说proc_lib提供符合OTP设计原则的同步或异步进程启动函数。关于OTP设计原则,后续会单开一篇进行讲述,它大致描述了一系列的代码组织标准,包括进程、模块以及项目目录组织结构等等。通过proc_lib启动的进程,会容易符合这种设计原则的要求。

proc_lib开放的API很多,但基本可以分成spawn、start、hibernate、init_ack、init_p、format、initial_call、stop这几组,下面我们一组一组的来看。

spawn 组

spawn组的函数有spawn/1, spawn_link/1, spawn/2, spawn_link/2, spawn/3, spawn_link/3, spawn/4, spawn_link/4, spawn_opt/2, spawn_opt/3, spawn_opt/4, spawn_opt/5

先看最为简单的spawn/1:

-spec spawn(Fun) -> pid() when
      Fun :: function().

spawn(F) when is_function(F) ->
    Parent = get_my_name(),
    Ancestors = get_ancestors(),
    erlang:spawn(?MODULE, init_p, [Parent,Ancestors,F]).

逻辑如下:

  1. 通过get_my_name函数获取当前进程的注册名。
  2. 通过get_ancestors()函数获取进程祖先列表
  3. 通过原生的erlang:spawn/3创建一个新的进程,以init_p函数作为新的进程逻辑,当前进程名、祖先进程列表和要执行的目标函数作为init_p的执行参数。

get_my_name实现的细节:

get_my_name() ->
    case proc_info(self(),registered_name) of
	{registered_name,Name} -> Name;
	_                      -> self()
    end.

...

proc_info(Pid,Item) when node(Pid) =:= node() ->
    process_info(Pid,Item);
proc_info(Pid,Item) ->
    case lists:member(node(Pid),nodes()) of
	true ->
	    check(rpc:call(node(Pid), erlang, process_info, [Pid, Item]));
	_ ->
	    hidden
    end.

get_my_name()根据进程是本地还是远程,从process_info返回进程注册名称。
process_info是个很有用的函数,process_info/1可以返回指定进程的全部信息:

1> Pid = spawn(fun() -> receive hehe -> hehe end end).
<0.35.0>
2> process_info(Pid).
[{current_function,{prim_eval,'receive',2}},
 {initial_call,{erlang,apply,2}},
 {status,waiting},
 {message_queue_len,0},
 {messages,[]},
 {links,[]},
 {dictionary,[]},
 {trap_exit,false},
 {error_handler,error_handler},
 {priority,normal},
 {group_leader,<0.26.0>},
 {total_heap_size,233},
 {heap_size,233},
 {stack_size,9},
 {reductions,17},
 {garbage_collection,[{min_bin_vheap_size,46422},
                      {min_heap_size,233},
                      {fullsweep_after,65535},
                      {minor_gcs,0}]},
 {suspending,[]}]
3> Pid ! hehe.
hehe
4> process_info(Pid).
undefined

而通过process_info/2可以获得某个段的信息。

get_ancestors的实现细节:

get_ancestors() ->
    case get('$ancestors') of
	A when is_list(A) -> A;
	_                 -> []
    end.

本地版的很简单,直接读进程字典并判断类型是否正确即可。另外还有一个远程版本:

get_ancestors(Pid) ->
    case get_dictionary(Pid,'$ancestors') of
	{'$ancestors',Ancestors} ->
	    {ancestors,Ancestors};
	_ ->
	    {ancestors,[]}
    end.

...

get_dictionary(Pid,Tag) ->
    case get_process_info(Pid,dictionary) of
	{dictionary,Dict} ->
	    case lists:keysearch(Tag,1,Dict) of
		{value,Value} -> Value;
		_             -> undefined
	    end;
	_ ->
	    undefined
    end.

...

get_process_info(Pid, Tag) ->
 translate_process_info(Tag, catch proc_info(Pid, Tag)).

translate_process_info(registered_name, []) ->
  {registered_name, []};
translate_process_info(_ , {'EXIT', _}) ->
  undefined;
translate_process_info(_, Result) ->
  Result.

这一长串其实就做了一件事情,从进程字典中读出'$ancestors'这个属性,但因为涉及到远程进程的访问,无法直接使用get,所以需要通过proc_info这个函数(process_info是可以返回进程字典内容的),另外translate_process_info对proc_info返回的结果作了包装,包括异常发生的情况。

最后,我们重点来看init_p这个函数,init_p里面所包含的逻辑才是proc_lib真正的对外出售内容 —— 符合OTP设计原则的进程。init_p的实现:

init_p(Parent, Ancestors, Fun) when is_function(Fun) ->
    put('$ancestors', [Parent|Ancestors]),
    Mfa = erlang:fun_info_mfa(Fun),
    put('$initial_call', Mfa),
    try
	Fun()
    catch
	Class:Reason ->
	    exit_p(Class, Reason, erlang:get_stacktrace())
    end.

逻辑如下:

  1. 将Parent进程合并入Ancestors列表并加入到进程字典中。
  2. 获取目标函数的MFA信息(MFA即Module、Function、Args,我们在erlang中会到处看到这个缩写)
  3. 将MFA信息也保存到进程字典中。
  4. 在try catch中运行目标函数。

我们看到init_p为进程增加了更多元信息以及提供了一个错误处理框架,目标函数所发生的异常都会由exit_p来处理。exit_p的实现:

exit_p(Class, Reason, Stacktrace) ->
    case get('$initial_call') of
	{M,F,A} when is_atom(M), is_atom(F), is_integer(A) ->
	    MFA = {M,F,make_dummy_args(A, [])},
	    crash_report(Class, Reason, MFA, Stacktrace),
	    erlang:raise(exit, exit_reason(Class, Reason, Stacktrace), Stacktrace);
	_ ->
	    %% The process dictionary has been cleared or
	    %% possibly modified.
	    crash_report(Class, Reason, [], Stacktrace),
	    erlang:raise(exit, exit_reason(Class, Reason, Stacktrace), Stacktrace)
    end.

exit_reason(error, Reason, Stacktrace) ->
    {Reason, Stacktrace};
exit_reason(exit, Reason, _Stacktrace) ->
    Reason;
exit_reason(throw, Reason, Stacktrace) ->
    {{nocatch, Reason}, Stacktrace}.

exit_p做了两件事情,一是调用crash_report生成错误报告,二是通过exit_reason函数重新对异常原因进行标准化包装,然后再次抛出。

crash_report(exit, normal, _, _)       -> ok;
crash_report(exit, shutdown, _, _)     -> ok;
crash_report(exit, {shutdown,_}, _, _) -> ok;
crash_report(Class, Reason, StartF, Stacktrace) ->
    OwnReport = my_info(Class, Reason, StartF, Stacktrace),
    LinkReport = linked_info(self()),
    Rep = [OwnReport,LinkReport],
    error_logger:error_report(crash_report, Rep).

以上分析就是proc_lib:spawn/1所做的主要工作了,spawn/2也是大同小异,只不过增加了Node参数:

spawn(Node, F) when is_function(F) ->
    Parent = get_my_name(),
    Ancestors = get_ancestors(),
    erlang:spawn(Node, ?MODULE, init_p, [Parent,Ancestors,F]).

spawn/3和spawn/4所调用的init_p有些差别,目标函数是通过apply调用的:

spawn(M,F,A) when is_atom(M), is_atom(F), is_list(A) ->
    Parent = get_my_name(),
    Ancestors = get_ancestors(),
    erlang:spawn(?MODULE, init_p, [Parent,Ancestors,M,F,A]).

spawn(Node, M, F, A) when is_atom(M), is_atom(F), is_list(A) ->
    Parent = get_my_name(),
    Ancestors = get_ancestors(),
    erlang:spawn(Node, ?MODULE, init_p, [Parent,Ancestors,M,F,A]).

...

init_p(Parent, Ancestors, M, F, A) when is_atom(M), is_atom(F), is_list(A) ->
    put('$ancestors', [Parent|Ancestors]),
    put('$initial_call', trans_init(M, F, A)),
    init_p_do_apply(M, F, A).

init_p_do_apply(M, F, A) ->
    try
	apply(M, F, A) 
    catch
	Class:Reason ->
	    exit_p(Class, Reason, erlang:get_stacktrace())
    end.

另外spawn_link做的事情也一样,只不过是通过erlang:spawn_link函数来创建进程的,在当前进程和新创建的进程之间建立了一个link关系:

spawn_link(F) when is_function(F) ->
    Parent = get_my_name(),
    Ancestors = get_ancestors(),
    erlang:spawn_link(?MODULE, init_p, [Parent,Ancestors,F]).

关于spawn_opt,逻辑也一样,重点在于可以传递一些创建进程的控制参数,这里并不准备去研究这些参数,后续会专门拿出一篇来讲述。

start组

前面说过,proc_lib包含的是同步和异步的进程启动API,spawn组的函数无疑都是异步的,而start组提供的都是同步的。相对于spawn,start组提供的函数要少一些:start/3, start/4, start/5, start_link/3, start_link/4, start_link/5

先看具有代表性的start/4的实现:

start(M, F, A, Timeout) when is_atom(M), is_atom(F), is_list(A) ->
    PidRef = spawn_mon(M, F, A),
    sync_wait_mon(PidRef, Timeout).

...

spawn_mon(M,F,A) ->
    Parent = get_my_name(),
    Ancestors = get_ancestors(),
    erlang:spawn_monitor(?MODULE, init_p, [Parent,Ancestors,M,F,A]).

...

sync_wait_mon({Pid, Ref}, Timeout) ->
    receive
	{ack, Pid, Return} ->
	    erlang:demonitor(Ref, [flush]),
	    Return;
	{'DOWN', Ref, _Type, Pid, Reason} ->
	    {error, Reason};
	{'EXIT', Pid, Reason} -> %% link as spawn_opt?
	    erlang:demonitor(Ref, [flush]),
	    {error, Reason}
    after Timeout ->
	    erlang:demonitor(Ref, [flush]),
	    exit(Pid, kill),
	    flush(Pid),
	    {error, timeout}
    end.

可以看到,start/4的工作分为两个过程,首先是基于init_p创建新进程,并于当前进程创建monitor的关系;接下来会同步等待新进程反馈的信息,分ack、'DOWN'、'EXIT'、超时四种情况,并做了不同的处理。需要注意的是,erlang:demonitor函数可以取消进程的监控关系,如果超时,会强制杀掉目标进程,超时有一个细节就是flush函数:

flush(Pid) ->
    receive
	{'EXIT', Pid, _} ->
	    true
    after 0 ->
	    true
    end.

这个函数有什么用途?因为当发生超时后,在我们显式调用demonitor函数结束之前,函数可能已经向监控进程发出了exit消息,这条消息就会积攒在当前进程的邮箱里得不到消费,flush可以清空邮箱中的这些消息,指定了after 0的receive语句会率先将邮箱里所有的消息进行匹配后立即返回而不会阻塞。让我们再复习一下receive ... after的执行规则:

  1. 如果包含after,进入receive语句时会先启动一个定时器。
  2. 取出邮箱里面的第一个消息,并尝试同Pattern1、Pattern2等模式匹配,如果匹配成功,系统会从邮箱删除这个消息,并执行模式后面的表达式。
  3. 如果receive里的所有模式都不匹配邮箱的第一个消息,系统会从邮箱中移除这个消息并把它放入一个保存队列,然后继续尝试邮箱里的第二个消息,这一过程会不断重复,直到发现匹配消息或者邮箱里的所有消息都检查过了为止。
  4. 如果邮箱里的所有消息都不匹配,进程就会被挂起并重新调度,直到新的消息进入邮箱才会继续执行。新消息到达后,保存队列里的所有消息不会重新匹配,只有新消息才会进行匹配。
  5. 一旦某个消息匹配成功,保存队列里的所有消息就会按照到达进程的顺序重新进入邮箱,如果设置了定时器,就会清除它。
  6. 如果定时器在我们等待消息时到期了,系统就会执行after后的表达式,并把所有保存的消息按照它们的到达进程的顺序重新放回邮箱。

其它start的实现也是这两个步骤,只不过参数重载有差异。再来看start_link的实现,这里选取的是start_link/4:

start_link(M, F, A, Timeout) when is_atom(M), is_atom(F), is_list(A) ->
    Pid = ?MODULE:spawn_link(M, F, A),
    sync_wait(Pid, Timeout).

...

sync_wait(Pid, Timeout) ->
    receive
	{ack, Pid, Return} ->
	    Return;
	{'EXIT', Pid, Reason} ->
	    {error, Reason}
    after Timeout ->
	    unlink(Pid),
	    exit(Pid, kill),
	    flush(Pid),
	    {error, timeout}
    end.

也是分为两个步骤,重点是sync_link的实现,相对比于monitor,没有了'DOWN'的情况。

关注公众号【好便宜】( ID:haopianyi222 ),领红包啦~
阿里云,国内最大的云服务商,注册就送数千元优惠券:https://t.cn/AiQe5A0g
腾讯云,良心云,价格优惠: https://t.cn/AieHwwKl
搬瓦工,CN2 GIA 优质线路,搭梯子、海外建站推荐: https://t.cn/AieHwfX9
扫一扫关注公众号添加购物返利助手,领红包
Comments are closed.

推荐使用阿里云服务器

超多优惠券

服务器最低一折,一年不到100!

朕已阅去看看