ranch_conns_sup.erl (11538B)
1 %% Copyright (c) 2011-2018, Loïc Hoguin <essen@ninenines.eu> 2 %% 3 %% Permission to use, copy, modify, and/or distribute this software for any 4 %% purpose with or without fee is hereby granted, provided that the above 5 %% copyright notice and this permission notice appear in all copies. 6 %% 7 %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 8 %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 9 %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 10 %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 11 %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 12 %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 13 %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 14 15 %% Make sure to never reload this module outside a release upgrade, 16 %% as calling l(ranch_conns_sup) twice will kill the process and all 17 %% the currently open connections. 18 -module(ranch_conns_sup). 19 20 %% API. 21 -export([start_link/3]). 22 -export([start_protocol/2]). 23 -export([active_connections/1]). 24 25 %% Supervisor internals. 26 -export([init/4]). 27 -export([system_continue/3]). 28 -export([system_terminate/4]). 29 -export([system_code_change/4]). 30 31 -type conn_type() :: worker | supervisor. 32 -type shutdown() :: brutal_kill | timeout(). 33 34 -record(state, { 35 parent = undefined :: pid(), 36 ref :: ranch:ref(), 37 conn_type :: conn_type(), 38 shutdown :: shutdown(), 39 transport = undefined :: module(), 40 protocol = undefined :: module(), 41 opts :: any(), 42 handshake_timeout :: timeout(), 43 max_conns = undefined :: ranch:max_conns(), 44 logger = undefined :: module() 45 }). 46 47 %% API. 48 49 -spec start_link(ranch:ref(), module(), module()) -> {ok, pid()}. 50 start_link(Ref, Transport, Protocol) -> 51 proc_lib:start_link(?MODULE, init, 52 [self(), Ref, Transport, Protocol]). 53 54 %% We can safely assume we are on the same node as the supervisor. 55 %% 56 %% We can also safely avoid having a monitor and a timeout here 57 %% because only three things can happen: 58 %% * The supervisor died; rest_for_one strategy killed all acceptors 59 %% so this very calling process is going to di-- 60 %% * There's too many connections, the supervisor will resume the 61 %% acceptor only when we get below the limit again. 62 %% * The supervisor is overloaded, there's either too many acceptors 63 %% or the max_connections limit is too large. It's better if we 64 %% don't keep accepting connections because this leaves 65 %% more room for the situation to be resolved. 66 %% 67 %% We do not need the reply, we only need the ok from the supervisor 68 %% to continue. The supervisor sends its own pid when the acceptor can 69 %% continue. 70 -spec start_protocol(pid(), inet:socket()) -> ok. 71 start_protocol(SupPid, Socket) -> 72 SupPid ! {?MODULE, start_protocol, self(), Socket}, 73 receive SupPid -> ok end. 74 75 %% We can't make the above assumptions here. This function might be 76 %% called from anywhere. 77 -spec active_connections(pid()) -> non_neg_integer(). 78 active_connections(SupPid) -> 79 Tag = erlang:monitor(process, SupPid), 80 catch erlang:send(SupPid, {?MODULE, active_connections, self(), Tag}, 81 [noconnect]), 82 receive 83 {Tag, Ret} -> 84 erlang:demonitor(Tag, [flush]), 85 Ret; 86 {'DOWN', Tag, _, _, noconnection} -> 87 exit({nodedown, node(SupPid)}); 88 {'DOWN', Tag, _, _, Reason} -> 89 exit(Reason) 90 after 5000 -> 91 erlang:demonitor(Tag, [flush]), 92 exit(timeout) 93 end. 94 95 %% Supervisor internals. 96 97 -spec init(pid(), ranch:ref(), module(), module()) -> no_return(). 98 init(Parent, Ref, Transport, Protocol) -> 99 process_flag(trap_exit, true), 100 ok = ranch_server:set_connections_sup(Ref, self()), 101 MaxConns = ranch_server:get_max_connections(Ref), 102 TransOpts = ranch_server:get_transport_options(Ref), 103 ConnType = maps:get(connection_type, TransOpts, worker), 104 Shutdown = maps:get(shutdown, TransOpts, 5000), 105 HandshakeTimeout = maps:get(handshake_timeout, TransOpts, 5000), 106 Logger = maps:get(logger, TransOpts, error_logger), 107 ProtoOpts = ranch_server:get_protocol_options(Ref), 108 ok = proc_lib:init_ack(Parent, {ok, self()}), 109 loop(#state{parent=Parent, ref=Ref, conn_type=ConnType, 110 shutdown=Shutdown, transport=Transport, protocol=Protocol, 111 opts=ProtoOpts, handshake_timeout=HandshakeTimeout, 112 max_conns=MaxConns, logger=Logger}, 0, 0, []). 113 114 loop(State=#state{parent=Parent, ref=Ref, conn_type=ConnType, 115 transport=Transport, protocol=Protocol, opts=Opts, 116 max_conns=MaxConns, logger=Logger}, CurConns, NbChildren, Sleepers) -> 117 receive 118 {?MODULE, start_protocol, To, Socket} -> 119 try Protocol:start_link(Ref, Socket, Transport, Opts) of 120 {ok, Pid} -> 121 handshake(State, CurConns, NbChildren, Sleepers, To, Socket, Pid, Pid); 122 {ok, SupPid, ProtocolPid} when ConnType =:= supervisor -> 123 handshake(State, CurConns, NbChildren, Sleepers, To, Socket, SupPid, ProtocolPid); 124 Ret -> 125 To ! self(), 126 ranch:log(error, 127 "Ranch listener ~p connection process start failure; " 128 "~p:start_link/4 returned: ~999999p~n", 129 [Ref, Protocol, Ret], Logger), 130 Transport:close(Socket), 131 loop(State, CurConns, NbChildren, Sleepers) 132 catch Class:Reason -> 133 To ! self(), 134 ranch:log(error, 135 "Ranch listener ~p connection process start failure; " 136 "~p:start_link/4 crashed with reason: ~p:~999999p~n", 137 [Ref, Protocol, Class, Reason], Logger), 138 loop(State, CurConns, NbChildren, Sleepers) 139 end; 140 {?MODULE, active_connections, To, Tag} -> 141 To ! {Tag, CurConns}, 142 loop(State, CurConns, NbChildren, Sleepers); 143 %% Remove a connection from the count of connections. 144 {remove_connection, Ref, Pid} -> 145 case put(Pid, removed) of 146 active -> 147 loop(State, CurConns - 1, NbChildren, Sleepers); 148 remove -> 149 loop(State, CurConns, NbChildren, Sleepers); 150 undefined -> 151 _ = erase(Pid), 152 loop(State, CurConns, NbChildren, Sleepers) 153 end; 154 %% Upgrade the max number of connections allowed concurrently. 155 %% We resume all sleeping acceptors if this number increases. 156 {set_max_conns, MaxConns2} when MaxConns2 > MaxConns -> 157 _ = [To ! self() || To <- Sleepers], 158 loop(State#state{max_conns=MaxConns2}, 159 CurConns, NbChildren, []); 160 {set_max_conns, MaxConns2} -> 161 loop(State#state{max_conns=MaxConns2}, 162 CurConns, NbChildren, Sleepers); 163 %% Upgrade the protocol options. 164 {set_opts, Opts2} -> 165 loop(State#state{opts=Opts2}, 166 CurConns, NbChildren, Sleepers); 167 {'EXIT', Parent, Reason} -> 168 terminate(State, Reason, NbChildren); 169 {'EXIT', Pid, Reason} when Sleepers =:= [] -> 170 case erase(Pid) of 171 active -> 172 report_error(Logger, Ref, Protocol, Pid, Reason), 173 loop(State, CurConns - 1, NbChildren - 1, Sleepers); 174 removed -> 175 report_error(Logger, Ref, Protocol, Pid, Reason), 176 loop(State, CurConns, NbChildren - 1, Sleepers); 177 undefined -> 178 loop(State, CurConns, NbChildren, Sleepers) 179 end; 180 %% Resume a sleeping acceptor if needed. 181 {'EXIT', Pid, Reason} -> 182 case erase(Pid) of 183 active when CurConns > MaxConns -> 184 report_error(Logger, Ref, Protocol, Pid, Reason), 185 loop(State, CurConns - 1, NbChildren - 1, Sleepers); 186 active -> 187 report_error(Logger, Ref, Protocol, Pid, Reason), 188 [To|Sleepers2] = Sleepers, 189 To ! self(), 190 loop(State, CurConns - 1, NbChildren - 1, Sleepers2); 191 removed -> 192 report_error(Logger, Ref, Protocol, Pid, Reason), 193 loop(State, CurConns, NbChildren - 1, Sleepers); 194 undefined -> 195 loop(State, CurConns, NbChildren, Sleepers) 196 end; 197 {system, From, Request} -> 198 sys:handle_system_msg(Request, From, Parent, ?MODULE, [], 199 {State, CurConns, NbChildren, Sleepers}); 200 %% Calls from the supervisor module. 201 {'$gen_call', {To, Tag}, which_children} -> 202 Children = [{Protocol, Pid, ConnType, [Protocol]} 203 || {Pid, Type} <- get(), 204 Type =:= active orelse Type =:= removed], 205 To ! {Tag, Children}, 206 loop(State, CurConns, NbChildren, Sleepers); 207 {'$gen_call', {To, Tag}, count_children} -> 208 Counts = case ConnType of 209 worker -> [{supervisors, 0}, {workers, NbChildren}]; 210 supervisor -> [{supervisors, NbChildren}, {workers, 0}] 211 end, 212 Counts2 = [{specs, 1}, {active, NbChildren}|Counts], 213 To ! {Tag, Counts2}, 214 loop(State, CurConns, NbChildren, Sleepers); 215 {'$gen_call', {To, Tag}, _} -> 216 To ! {Tag, {error, ?MODULE}}, 217 loop(State, CurConns, NbChildren, Sleepers); 218 Msg -> 219 ranch:log(error, 220 "Ranch listener ~p received unexpected message ~p~n", 221 [Ref, Msg], Logger), 222 loop(State, CurConns, NbChildren, Sleepers) 223 end. 224 225 handshake(State=#state{ref=Ref, transport=Transport, handshake_timeout=HandshakeTimeout, 226 max_conns=MaxConns}, CurConns, NbChildren, Sleepers, To, Socket, SupPid, ProtocolPid) -> 227 case Transport:controlling_process(Socket, ProtocolPid) of 228 ok -> 229 ProtocolPid ! {handshake, Ref, Transport, Socket, HandshakeTimeout}, 230 put(SupPid, active), 231 CurConns2 = CurConns + 1, 232 if CurConns2 < MaxConns -> 233 To ! self(), 234 loop(State, CurConns2, NbChildren + 1, Sleepers); 235 true -> 236 loop(State, CurConns2, NbChildren + 1, [To|Sleepers]) 237 end; 238 {error, _} -> 239 Transport:close(Socket), 240 %% Only kill the supervised pid, because the connection's pid, 241 %% when different, is supposed to be sitting under it and linked. 242 exit(SupPid, kill), 243 To ! self(), 244 loop(State, CurConns, NbChildren, Sleepers) 245 end. 246 247 -spec terminate(#state{}, any(), non_neg_integer()) -> no_return(). 248 terminate(#state{shutdown=brutal_kill}, Reason, _) -> 249 kill_children(get_keys(active)), 250 kill_children(get_keys(removed)), 251 exit(Reason); 252 %% Attempt to gracefully shutdown all children. 253 terminate(#state{shutdown=Shutdown}, Reason, NbChildren) -> 254 shutdown_children(get_keys(active)), 255 shutdown_children(get_keys(removed)), 256 _ = if 257 Shutdown =:= infinity -> 258 ok; 259 true -> 260 erlang:send_after(Shutdown, self(), kill) 261 end, 262 wait_children(NbChildren), 263 exit(Reason). 264 265 %% Kill all children and then exit. We unlink first to avoid 266 %% getting a message for each child getting killed. 267 kill_children(Pids) -> 268 _ = [begin 269 unlink(P), 270 exit(P, kill) 271 end || P <- Pids], 272 ok. 273 274 %% Monitor processes so we can know which ones have shutdown 275 %% before the timeout. Unlink so we avoid receiving an extra 276 %% message. Then send a shutdown exit signal. 277 shutdown_children(Pids) -> 278 _ = [begin 279 monitor(process, P), 280 unlink(P), 281 exit(P, shutdown) 282 end || P <- Pids], 283 ok. 284 285 wait_children(0) -> 286 ok; 287 wait_children(NbChildren) -> 288 receive 289 {'DOWN', _, process, Pid, _} -> 290 case erase(Pid) of 291 active -> wait_children(NbChildren - 1); 292 removed -> wait_children(NbChildren - 1); 293 _ -> wait_children(NbChildren) 294 end; 295 kill -> 296 Active = get_keys(active), 297 _ = [exit(P, kill) || P <- Active], 298 Removed = get_keys(removed), 299 _ = [exit(P, kill) || P <- Removed], 300 ok 301 end. 302 303 system_continue(_, _, {State, CurConns, NbChildren, Sleepers}) -> 304 loop(State, CurConns, NbChildren, Sleepers). 305 306 -spec system_terminate(any(), _, _, _) -> no_return(). 307 system_terminate(Reason, _, _, {State, _, NbChildren, _}) -> 308 terminate(State, Reason, NbChildren). 309 310 system_code_change(Misc, _, _, _) -> 311 {ok, Misc}. 312 313 %% We use ~999999p here instead of ~w because the latter doesn't 314 %% support printable strings. 315 report_error(_, _, _, _, normal) -> 316 ok; 317 report_error(_, _, _, _, shutdown) -> 318 ok; 319 report_error(_, _, _, _, {shutdown, _}) -> 320 ok; 321 report_error(Logger, Ref, Protocol, Pid, Reason) -> 322 ranch:log(error, 323 "Ranch listener ~p had connection process started with " 324 "~p:start_link/4 at ~p exit with reason: ~999999p~n", 325 [Ref, Protocol, Pid, Reason], Logger).