zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

ranch_conns_sup.erl (11538B)


      1 %% Copyright (c) 2011-2018, Loïc Hoguin <essen@ninenines.eu>
      2 %%
      3 %% Permission to use, copy, modify, and/or distribute this software for any
      4 %% purpose with or without fee is hereby granted, provided that the above
      5 %% copyright notice and this permission notice appear in all copies.
      6 %%
      7 %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      8 %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
      9 %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     10 %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     11 %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     12 %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     13 %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     14 
     15 %% Make sure to never reload this module outside a release upgrade,
     16 %% as calling l(ranch_conns_sup) twice will kill the process and all
     17 %% the currently open connections.
     18 -module(ranch_conns_sup).
     19 
     20 %% API.
     21 -export([start_link/3]).
     22 -export([start_protocol/2]).
     23 -export([active_connections/1]).
     24 
     25 %% Supervisor internals.
     26 -export([init/4]).
     27 -export([system_continue/3]).
     28 -export([system_terminate/4]).
     29 -export([system_code_change/4]).
     30 
     31 -type conn_type() :: worker | supervisor.
     32 -type shutdown() :: brutal_kill | timeout().
     33 
     34 -record(state, {
     35 	parent = undefined :: pid(),
     36 	ref :: ranch:ref(),
     37 	conn_type :: conn_type(),
     38 	shutdown :: shutdown(),
     39 	transport = undefined :: module(),
     40 	protocol = undefined :: module(),
     41 	opts :: any(),
     42 	handshake_timeout :: timeout(),
     43 	max_conns = undefined :: ranch:max_conns(),
     44 	logger = undefined :: module()
     45 }).
     46 
     47 %% API.
     48 
     49 -spec start_link(ranch:ref(), module(), module()) -> {ok, pid()}.
     50 start_link(Ref, Transport, Protocol) ->
     51 	proc_lib:start_link(?MODULE, init,
     52 		[self(), Ref, Transport, Protocol]).
     53 
     54 %% We can safely assume we are on the same node as the supervisor.
     55 %%
     56 %% We can also safely avoid having a monitor and a timeout here
     57 %% because only three things can happen:
     58 %%  *  The supervisor died; rest_for_one strategy killed all acceptors
     59 %%     so this very calling process is going to di--
     60 %%  *  There's too many connections, the supervisor will resume the
     61 %%     acceptor only when we get below the limit again.
     62 %%  *  The supervisor is overloaded, there's either too many acceptors
     63 %%     or the max_connections limit is too large. It's better if we
     64 %%     don't keep accepting connections because this leaves
     65 %%     more room for the situation to be resolved.
     66 %%
     67 %% We do not need the reply, we only need the ok from the supervisor
     68 %% to continue. The supervisor sends its own pid when the acceptor can
     69 %% continue.
     70 -spec start_protocol(pid(), inet:socket()) -> ok.
     71 start_protocol(SupPid, Socket) ->
     72 	SupPid ! {?MODULE, start_protocol, self(), Socket},
     73 	receive SupPid -> ok end.
     74 
     75 %% We can't make the above assumptions here. This function might be
     76 %% called from anywhere.
     77 -spec active_connections(pid()) -> non_neg_integer().
     78 active_connections(SupPid) ->
     79 	Tag = erlang:monitor(process, SupPid),
     80 	catch erlang:send(SupPid, {?MODULE, active_connections, self(), Tag},
     81 		[noconnect]),
     82 	receive
     83 		{Tag, Ret} ->
     84 			erlang:demonitor(Tag, [flush]),
     85 			Ret;
     86 		{'DOWN', Tag, _, _, noconnection} ->
     87 			exit({nodedown, node(SupPid)});
     88 		{'DOWN', Tag, _, _, Reason} ->
     89 			exit(Reason)
     90 	after 5000 ->
     91 		erlang:demonitor(Tag, [flush]),
     92 		exit(timeout)
     93 	end.
     94 
     95 %% Supervisor internals.
     96 
     97 -spec init(pid(), ranch:ref(), module(), module()) -> no_return().
     98 init(Parent, Ref, Transport, Protocol) ->
     99 	process_flag(trap_exit, true),
    100 	ok = ranch_server:set_connections_sup(Ref, self()),
    101 	MaxConns = ranch_server:get_max_connections(Ref),
    102 	TransOpts = ranch_server:get_transport_options(Ref),
    103 	ConnType = maps:get(connection_type, TransOpts, worker),
    104 	Shutdown = maps:get(shutdown, TransOpts, 5000),
    105 	HandshakeTimeout = maps:get(handshake_timeout, TransOpts, 5000),
    106 	Logger = maps:get(logger, TransOpts, error_logger),
    107 	ProtoOpts = ranch_server:get_protocol_options(Ref),
    108 	ok = proc_lib:init_ack(Parent, {ok, self()}),
    109 	loop(#state{parent=Parent, ref=Ref, conn_type=ConnType,
    110 		shutdown=Shutdown, transport=Transport, protocol=Protocol,
    111 		opts=ProtoOpts, handshake_timeout=HandshakeTimeout,
    112 		max_conns=MaxConns, logger=Logger}, 0, 0, []).
    113 
    114 loop(State=#state{parent=Parent, ref=Ref, conn_type=ConnType,
    115 		transport=Transport, protocol=Protocol, opts=Opts,
    116 		max_conns=MaxConns, logger=Logger}, CurConns, NbChildren, Sleepers) ->
    117 	receive
    118 		{?MODULE, start_protocol, To, Socket} ->
    119 			try Protocol:start_link(Ref, Socket, Transport, Opts) of
    120 				{ok, Pid} ->
    121 					handshake(State, CurConns, NbChildren, Sleepers, To, Socket, Pid, Pid);
    122 				{ok, SupPid, ProtocolPid} when ConnType =:= supervisor ->
    123 					handshake(State, CurConns, NbChildren, Sleepers, To, Socket, SupPid, ProtocolPid);
    124 				Ret ->
    125 					To ! self(),
    126 					ranch:log(error,
    127 						"Ranch listener ~p connection process start failure; "
    128 						"~p:start_link/4 returned: ~999999p~n",
    129 						[Ref, Protocol, Ret], Logger),
    130 					Transport:close(Socket),
    131 					loop(State, CurConns, NbChildren, Sleepers)
    132 			catch Class:Reason ->
    133 				To ! self(),
    134 				ranch:log(error,
    135 					"Ranch listener ~p connection process start failure; "
    136 					"~p:start_link/4 crashed with reason: ~p:~999999p~n",
    137 					[Ref, Protocol, Class, Reason], Logger),
    138 				loop(State, CurConns, NbChildren, Sleepers)
    139 			end;
    140 		{?MODULE, active_connections, To, Tag} ->
    141 			To ! {Tag, CurConns},
    142 			loop(State, CurConns, NbChildren, Sleepers);
    143 		%% Remove a connection from the count of connections.
    144 		{remove_connection, Ref, Pid} ->
    145 			case put(Pid, removed) of
    146 				active ->
    147 					loop(State, CurConns - 1, NbChildren, Sleepers);
    148 				remove ->
    149 					loop(State, CurConns, NbChildren, Sleepers);
    150 				undefined ->
    151 					_ = erase(Pid),
    152 					loop(State, CurConns, NbChildren, Sleepers)
    153 			end;
    154 		%% Upgrade the max number of connections allowed concurrently.
    155 		%% We resume all sleeping acceptors if this number increases.
    156 		{set_max_conns, MaxConns2} when MaxConns2 > MaxConns ->
    157 			_ = [To ! self() || To <- Sleepers],
    158 			loop(State#state{max_conns=MaxConns2},
    159 				CurConns, NbChildren, []);
    160 		{set_max_conns, MaxConns2} ->
    161 			loop(State#state{max_conns=MaxConns2},
    162 				CurConns, NbChildren, Sleepers);
    163 		%% Upgrade the protocol options.
    164 		{set_opts, Opts2} ->
    165 			loop(State#state{opts=Opts2},
    166 				CurConns, NbChildren, Sleepers);
    167 		{'EXIT', Parent, Reason} ->
    168 			terminate(State, Reason, NbChildren);
    169 		{'EXIT', Pid, Reason} when Sleepers =:= [] ->
    170 			case erase(Pid) of
    171 				active ->
    172 					report_error(Logger, Ref, Protocol, Pid, Reason),
    173 					loop(State, CurConns - 1, NbChildren - 1, Sleepers);
    174 				removed ->
    175 					report_error(Logger, Ref, Protocol, Pid, Reason),
    176 					loop(State, CurConns, NbChildren - 1, Sleepers);
    177 				undefined ->
    178 					loop(State, CurConns, NbChildren, Sleepers)
    179 			end;
    180 		%% Resume a sleeping acceptor if needed.
    181 		{'EXIT', Pid, Reason} ->
    182 			case erase(Pid) of
    183 				active when CurConns > MaxConns ->
    184 					report_error(Logger, Ref, Protocol, Pid, Reason),
    185 					loop(State, CurConns - 1, NbChildren - 1, Sleepers);
    186 				active ->
    187 					report_error(Logger, Ref, Protocol, Pid, Reason),
    188 					[To|Sleepers2] = Sleepers,
    189 					To ! self(),
    190 					loop(State, CurConns - 1, NbChildren - 1, Sleepers2);
    191 				removed ->
    192 					report_error(Logger, Ref, Protocol, Pid, Reason),
    193 					loop(State, CurConns, NbChildren - 1, Sleepers);
    194 				undefined ->
    195 					loop(State, CurConns, NbChildren, Sleepers)
    196 			end;
    197 		{system, From, Request} ->
    198 			sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
    199 				{State, CurConns, NbChildren, Sleepers});
    200 		%% Calls from the supervisor module.
    201 		{'$gen_call', {To, Tag}, which_children} ->
    202 			Children = [{Protocol, Pid, ConnType, [Protocol]}
    203 				|| {Pid, Type} <- get(),
    204 				Type =:= active orelse Type =:= removed],
    205 			To ! {Tag, Children},
    206 			loop(State, CurConns, NbChildren, Sleepers);
    207 		{'$gen_call', {To, Tag}, count_children} ->
    208 			Counts = case ConnType of
    209 				worker -> [{supervisors, 0}, {workers, NbChildren}];
    210 				supervisor -> [{supervisors, NbChildren}, {workers, 0}]
    211 			end,
    212 			Counts2 = [{specs, 1}, {active, NbChildren}|Counts],
    213 			To ! {Tag, Counts2},
    214 			loop(State, CurConns, NbChildren, Sleepers);
    215 		{'$gen_call', {To, Tag}, _} ->
    216 			To ! {Tag, {error, ?MODULE}},
    217 			loop(State, CurConns, NbChildren, Sleepers);
    218 		Msg ->
    219 			ranch:log(error,
    220 				"Ranch listener ~p received unexpected message ~p~n",
    221 				[Ref, Msg], Logger),
    222 			loop(State, CurConns, NbChildren, Sleepers)
    223 	end.
    224 
    225 handshake(State=#state{ref=Ref, transport=Transport, handshake_timeout=HandshakeTimeout,
    226 		max_conns=MaxConns}, CurConns, NbChildren, Sleepers, To, Socket, SupPid, ProtocolPid) ->
    227 	case Transport:controlling_process(Socket, ProtocolPid) of
    228 		ok ->
    229 			ProtocolPid ! {handshake, Ref, Transport, Socket, HandshakeTimeout},
    230 			put(SupPid, active),
    231 			CurConns2 = CurConns + 1,
    232 			if CurConns2 < MaxConns ->
    233 					To ! self(),
    234 					loop(State, CurConns2, NbChildren + 1, Sleepers);
    235 				true ->
    236 					loop(State, CurConns2, NbChildren + 1, [To|Sleepers])
    237 			end;
    238 		{error, _} ->
    239 			Transport:close(Socket),
    240 			%% Only kill the supervised pid, because the connection's pid,
    241 			%% when different, is supposed to be sitting under it and linked.
    242 			exit(SupPid, kill),
    243 			To ! self(),
    244 			loop(State, CurConns, NbChildren, Sleepers)
    245 	end.
    246 
    247 -spec terminate(#state{}, any(), non_neg_integer()) -> no_return().
    248 terminate(#state{shutdown=brutal_kill}, Reason, _) ->
    249 	kill_children(get_keys(active)),
    250 	kill_children(get_keys(removed)),
    251 	exit(Reason);
    252 %% Attempt to gracefully shutdown all children.
    253 terminate(#state{shutdown=Shutdown}, Reason, NbChildren) ->
    254 	shutdown_children(get_keys(active)),
    255 	shutdown_children(get_keys(removed)),
    256 	_ = if
    257 		Shutdown =:= infinity ->
    258 			ok;
    259 		true ->
    260 			erlang:send_after(Shutdown, self(), kill)
    261 	end,
    262 	wait_children(NbChildren),
    263 	exit(Reason).
    264 
    265 %% Kill all children and then exit. We unlink first to avoid
    266 %% getting a message for each child getting killed.
    267 kill_children(Pids) ->
    268 	_ = [begin
    269 		unlink(P),
    270 		exit(P, kill)
    271 	end || P <- Pids],
    272 	ok.
    273 
    274 %% Monitor processes so we can know which ones have shutdown
    275 %% before the timeout. Unlink so we avoid receiving an extra
    276 %% message. Then send a shutdown exit signal.
    277 shutdown_children(Pids) ->
    278 	_ = [begin
    279 		monitor(process, P),
    280 		unlink(P),
    281 		exit(P, shutdown)
    282 	end || P <- Pids],
    283 	ok.
    284 
    285 wait_children(0) ->
    286 	ok;
    287 wait_children(NbChildren) ->
    288 	receive
    289         {'DOWN', _, process, Pid, _} ->
    290 			case erase(Pid) of
    291 				active -> wait_children(NbChildren - 1);
    292 				removed -> wait_children(NbChildren - 1);
    293 				_ -> wait_children(NbChildren)
    294 			end;
    295 		kill ->
    296 			Active = get_keys(active),
    297 			_ = [exit(P, kill) || P <- Active],
    298 			Removed = get_keys(removed),
    299 			_ = [exit(P, kill) || P <- Removed],
    300 			ok
    301 	end.
    302 
    303 system_continue(_, _, {State, CurConns, NbChildren, Sleepers}) ->
    304 	loop(State, CurConns, NbChildren, Sleepers).
    305 
    306 -spec system_terminate(any(), _, _, _) -> no_return().
    307 system_terminate(Reason, _, _, {State, _, NbChildren, _}) ->
    308 	terminate(State, Reason, NbChildren).
    309 
    310 system_code_change(Misc, _, _, _) ->
    311 	{ok, Misc}.
    312 
    313 %% We use ~999999p here instead of ~w because the latter doesn't
    314 %% support printable strings.
    315 report_error(_, _, _, _, normal) ->
    316 	ok;
    317 report_error(_, _, _, _, shutdown) ->
    318 	ok;
    319 report_error(_, _, _, _, {shutdown, _}) ->
    320 	ok;
    321 report_error(Logger, Ref, Protocol, Pid, Reason) ->
    322 	ranch:log(error,
    323 		"Ranch listener ~p had connection process started with "
    324 		"~p:start_link/4 at ~p exit with reason: ~999999p~n",
    325 		[Ref, Protocol, Pid, Reason], Logger).