zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

cowboy_children.erl (6290B)


      1 %% Copyright (c) 2017, Loïc Hoguin <essen@ninenines.eu>
      2 %%
      3 %% Permission to use, copy, modify, and/or distribute this software for any
      4 %% purpose with or without fee is hereby granted, provided that the above
      5 %% copyright notice and this permission notice appear in all copies.
      6 %%
      7 %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      8 %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
      9 %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     10 %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     11 %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     12 %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     13 %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     14 
     15 -module(cowboy_children).
     16 
     17 -export([init/0]).
     18 -export([up/4]).
     19 -export([down/2]).
     20 -export([shutdown/2]).
     21 -export([shutdown_timeout/3]).
     22 -export([terminate/1]).
     23 -export([handle_supervisor_call/4]).
     24 
     25 -record(child, {
     26 	pid :: pid(),
     27 	streamid :: cowboy_stream:streamid() | undefined,
     28 	shutdown :: timeout(),
     29 	timer = undefined :: undefined | reference()
     30 }).
     31 
     32 -type children() :: [#child{}].
     33 -export_type([children/0]).
     34 
     35 -spec init() -> [].
     36 init() ->
     37 	[].
     38 
     39 -spec up(Children, pid(), cowboy_stream:streamid(), timeout())
     40 	-> Children when Children::children().
     41 up(Children, Pid, StreamID, Shutdown) ->
     42 	[#child{
     43 		pid=Pid,
     44 		streamid=StreamID,
     45 		shutdown=Shutdown
     46 	}|Children].
     47 
     48 -spec down(Children, pid())
     49 	-> {ok, cowboy_stream:streamid() | undefined, Children} | error
     50 	when Children::children().
     51 down(Children0, Pid) ->
     52 	case lists:keytake(Pid, #child.pid, Children0) of
     53 		{value, #child{streamid=StreamID, timer=Ref}, Children} ->
     54 			_ = case Ref of
     55 				undefined -> ok;
     56 				_ -> erlang:cancel_timer(Ref, [{async, true}, {info, false}])
     57 			end,
     58 			{ok, StreamID, Children};
     59 		false ->
     60 			error
     61 	end.
     62 
     63 %% We ask the processes to shutdown first. This gives
     64 %% a chance to processes that are trapping exits to
     65 %% shut down gracefully. Others will exit immediately.
     66 %%
     67 %% @todo We currently fire one timer per process being
     68 %% shut down. This is probably not the most efficient.
     69 %% A more efficient solution could be to maintain a
     70 %% single timer and decrease the shutdown time of all
     71 %% processes when it fires. This is however much more
     72 %% complex, and there aren't that many processes that
     73 %% will need to be shutdown through this function, so
     74 %% this is left for later.
     75 -spec shutdown(Children, cowboy_stream:streamid())
     76 	-> Children when Children::children().
     77 shutdown(Children0, StreamID) ->
     78 	[
     79 		case Child of
     80 			#child{pid=Pid, streamid=StreamID, shutdown=Shutdown} ->
     81 				exit(Pid, shutdown),
     82 				Ref = erlang:start_timer(Shutdown, self(), {shutdown, Pid}),
     83 				Child#child{streamid=undefined, timer=Ref};
     84 			_ ->
     85 				Child
     86 		end
     87 	|| Child <- Children0].
     88 
     89 -spec shutdown_timeout(children(), reference(), pid()) -> ok.
     90 shutdown_timeout(Children, Ref, Pid) ->
     91 	case lists:keyfind(Pid, #child.pid, Children) of
     92 		#child{timer=Ref} ->
     93 			exit(Pid, kill),
     94 			ok;
     95 		_ ->
     96 			ok
     97 	end.
     98 
     99 -spec terminate(children()) -> ok.
    100 terminate(Children) ->
    101 	%% For each child, either ask for it to shut down,
    102 	%% or cancel its shutdown timer if it already is.
    103 	%%
    104 	%% We do not need to flush stray timeout messages out because
    105 	%% we are either terminating or switching protocols,
    106 	%% and in the latter case we flush all messages.
    107 	_ = [case TRef of
    108 		undefined -> exit(Pid, shutdown);
    109 		_ -> erlang:cancel_timer(TRef, [{async, true}, {info, false}])
    110 	end || #child{pid=Pid, timer=TRef} <- Children],
    111 	before_terminate_loop(Children).
    112 
    113 before_terminate_loop([]) ->
    114 	ok;
    115 before_terminate_loop(Children) ->
    116 	%% Find the longest shutdown time.
    117 	Time = longest_shutdown_time(Children, 0),
    118 	%% We delay the creation of the timer if one of the
    119 	%% processes has an infinity shutdown value.
    120 	TRef = case Time of
    121 		infinity -> undefined;
    122 		_ -> erlang:start_timer(Time, self(), terminate)
    123 	end,
    124 	%% Loop until that time or until all children are dead.
    125 	terminate_loop(Children, TRef).
    126 
    127 terminate_loop([], TRef) ->
    128 	%% Don't forget to cancel the timer, if any!
    129 	case TRef of
    130 		undefined ->
    131 			ok;
    132 		_ ->
    133 			_ = erlang:cancel_timer(TRef, [{async, true}, {info, false}]),
    134 			ok
    135 	end;
    136 terminate_loop(Children, TRef) ->
    137 	receive
    138 		{'EXIT', Pid, _} when TRef =:= undefined ->
    139 			{value, #child{shutdown=Shutdown}, Children1}
    140 				= lists:keytake(Pid, #child.pid, Children),
    141 			%% We delayed the creation of the timer. If a process with
    142 			%% infinity shutdown just ended, we might have to start that timer.
    143 			case Shutdown of
    144 				infinity -> before_terminate_loop(Children1);
    145 				_ -> terminate_loop(Children1, TRef)
    146 			end;
    147 		{'EXIT', Pid, _} ->
    148 			terminate_loop(lists:keydelete(Pid, #child.pid, Children), TRef);
    149 		{timeout, TRef, terminate} ->
    150 			%% Brutally kill any remaining children.
    151 			_ = [exit(Pid, kill) || #child{pid=Pid} <- Children],
    152 			ok
    153 	end.
    154 
    155 longest_shutdown_time([], Time) ->
    156 	Time;
    157 longest_shutdown_time([#child{shutdown=ChildTime}|Tail], Time) when ChildTime > Time ->
    158 	longest_shutdown_time(Tail, ChildTime);
    159 longest_shutdown_time([_|Tail], Time) ->
    160 	longest_shutdown_time(Tail, Time).
    161 
    162 -spec handle_supervisor_call(any(), {pid(), any()}, children(), module()) -> ok.
    163 handle_supervisor_call(which_children, {From, Tag}, Children, Module) ->
    164 	From ! {Tag, which_children(Children, Module)},
    165 	ok;
    166 handle_supervisor_call(count_children, {From, Tag}, Children, _) ->
    167 	From ! {Tag, count_children(Children)},
    168 	ok;
    169 %% We disable start_child since only incoming requests
    170 %% end up creating a new process.
    171 handle_supervisor_call({start_child, _}, {From, Tag}, _, _) ->
    172 	From ! {Tag, {error, start_child_disabled}},
    173 	ok;
    174 %% All other calls refer to children. We act in a similar way
    175 %% to a simple_one_for_one so we never find those.
    176 handle_supervisor_call(_, {From, Tag}, _, _) ->
    177 	From ! {Tag, {error, not_found}},
    178 	ok.
    179 
    180 -spec which_children(children(), module()) -> [{module(), pid(), worker, [module()]}].
    181 which_children(Children, Module) ->
    182 	[{Module, Pid, worker, [Module]} || #child{pid=Pid} <- Children].
    183 
    184 -spec count_children(children()) -> [{atom(), non_neg_integer()}].
    185 count_children(Children) ->
    186 	Count = length(Children),
    187 	[
    188 		{specs, 1},
    189 		{active, Count},
    190 		{supervisors, 0},
    191 		{workers, Count}
    192 	].