zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

ranch.erl (16016B)


      1 %% Copyright (c) 2011-2018, Loïc Hoguin <essen@ninenines.eu>
      2 %%
      3 %% Permission to use, copy, modify, and/or distribute this software for any
      4 %% purpose with or without fee is hereby granted, provided that the above
      5 %% copyright notice and this permission notice appear in all copies.
      6 %%
      7 %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      8 %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
      9 %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     10 %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     11 %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     12 %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     13 %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     14 
     15 -module(ranch).
     16 
     17 -export([start_listener/5]).
     18 -export([start_listener/6]).
     19 -export([normalize_opts/1]).
     20 -export([stop_listener/1]).
     21 -export([suspend_listener/1]).
     22 -export([resume_listener/1]).
     23 -export([child_spec/5]).
     24 -export([child_spec/6]).
     25 -export([accept_ack/1]).
     26 -export([handshake/1]).
     27 -export([handshake/2]).
     28 -export([recv_proxy_header/2]).
     29 -export([remove_connection/1]).
     30 -export([get_status/1]).
     31 -export([get_addr/1]).
     32 -export([get_port/1]).
     33 -export([get_max_connections/1]).
     34 -export([set_max_connections/2]).
     35 -export([get_transport_options/1]).
     36 -export([set_transport_options/2]).
     37 -export([get_protocol_options/1]).
     38 -export([set_protocol_options/2]).
     39 -export([info/0]).
     40 -export([info/1]).
     41 -export([procs/2]).
     42 -export([wait_for_connections/3]).
     43 -export([wait_for_connections/4]).
     44 -export([filter_options/3]).
     45 -export([set_option_default/3]).
     46 -export([require/1]).
     47 -export([log/4]).
     48 
     49 -deprecated([start_listener/6, child_spec/6, accept_ack/1]).
     50 
     51 -type max_conns() :: non_neg_integer() | infinity.
     52 -export_type([max_conns/0]).
     53 
     54 %% This type is deprecated and will be removed in Ranch 2.0.
     55 -type opt() :: {ack_timeout, timeout()}
     56 	| {connection_type, worker | supervisor}
     57 	| {max_connections, max_conns()}
     58 	| {num_acceptors, pos_integer()}
     59 	| {shutdown, timeout() | brutal_kill}
     60 	| {socket, any()}.
     61 -export_type([opt/0]).
     62 
     63 -type opts() :: any() | #{
     64 	connection_type => worker | supervisor,
     65 	handshake_timeout => timeout(),
     66 	max_connections => max_conns(),
     67 	logger => module(),
     68 	num_acceptors => pos_integer(),
     69 	shutdown => timeout() | brutal_kill,
     70 	socket => any(),
     71 	socket_opts => any()
     72 }.
     73 -export_type([opts/0]).
     74 
     75 -type ref() :: any().
     76 -export_type([ref/0]).
     77 
     78 -spec start_listener(ref(), module(), opts(), module(), any())
     79 	-> supervisor:startchild_ret().
     80 start_listener(Ref, Transport, TransOpts0, Protocol, ProtoOpts)
     81 		when is_atom(Transport), is_atom(Protocol) ->
     82 	TransOpts = normalize_opts(TransOpts0),
     83 	_ = code:ensure_loaded(Transport),
     84 	case erlang:function_exported(Transport, name, 0) of
     85 		false ->
     86 			{error, badarg};
     87 		true ->
     88 			Res = supervisor:start_child(ranch_sup, child_spec(Ref,
     89 					Transport, TransOpts, Protocol, ProtoOpts)),
     90 			Socket = maps:get(socket, TransOpts, undefined),
     91 			case Res of
     92 				{ok, Pid} when Socket =/= undefined ->
     93 					%% Give ownership of the socket to ranch_acceptors_sup
     94 					%% to make sure the socket stays open as long as the
     95 					%% listener is alive. If the socket closes however there
     96 					%% will be no way to recover because we don't know how
     97 					%% to open it again.
     98 					Children = supervisor:which_children(Pid),
     99 					{_, AcceptorsSup, _, _}
    100 						= lists:keyfind(ranch_acceptors_sup, 1, Children),
    101 					Transport:controlling_process(Socket, AcceptorsSup);
    102 				_ ->
    103 					ok
    104 			end,
    105 			maybe_started(Res)
    106 	end.
    107 
    108 -spec start_listener(ref(), non_neg_integer(), module(), opts(), module(), any())
    109 	-> supervisor:startchild_ret().
    110 start_listener(Ref, NumAcceptors, Transport, TransOpts0, Protocol, ProtoOpts)
    111 		when is_integer(NumAcceptors), is_atom(Transport), is_atom(Protocol) ->
    112 	TransOpts = normalize_opts(TransOpts0),
    113 	start_listener(Ref, Transport, TransOpts#{num_acceptors => NumAcceptors},
    114 		Protocol, ProtoOpts).
    115 
    116 -spec normalize_opts(opts()) -> opts().
    117 normalize_opts(Map) when is_map(Map) ->
    118 	Map;
    119 normalize_opts(List0) when is_list(List0) ->
    120 	Map0 = #{},
    121 	{Map1, List1} = case take(ack_timeout, List0) of
    122 		{value, HandshakeTimeout, Tail0} ->
    123 			{Map0#{handshake_timeout => HandshakeTimeout}, Tail0};
    124 		false ->
    125 			{Map0, List0}
    126 	end,
    127 	{Map, List} = lists:foldl(fun(Key, {Map2, List2}) ->
    128 		case take(Key, List2) of
    129 			{value, ConnectionType, Tail2} ->
    130 				{Map2#{Key => ConnectionType}, Tail2};
    131 			false ->
    132 				{Map2, List2}
    133 		end
    134 	end, {Map1, List1}, [connection_type, max_connections, num_acceptors, shutdown, socket]),
    135 	if
    136 		Map =:= #{} ->
    137 			ok;
    138 		true ->
    139 			log(warning,
    140 				"Setting Ranch options together with socket options "
    141 				"is deprecated. Please use the new map syntax that allows "
    142 				"specifying socket options separately from other options.~n",
    143 				[], Map)
    144 	end,
    145 	case List of
    146 		[] -> Map;
    147 		_ -> Map#{socket_opts => List}
    148 	end;
    149 normalize_opts(Any) ->
    150 	#{socket_opts => Any}.
    151 
    152 take(Key, List) ->
    153 	take(Key, List, []).
    154 
    155 take(_, [], _) ->
    156 	false;
    157 take(Key, [{Key, Value}|Tail], Acc) ->
    158 	{value, Value, lists:reverse(Acc, Tail)};
    159 take(Key, [Value|Tail], Acc) ->
    160 	take(Key, Tail, [Value|Acc]).
    161 
    162 maybe_started({error, {{shutdown,
    163 		{failed_to_start_child, ranch_acceptors_sup,
    164 			{listen_error, _, Reason}}}, _}} = Error) ->
    165 	start_error(Reason, Error);
    166 maybe_started(Res) ->
    167 	Res.
    168 
    169 start_error(E=eaddrinuse, _) -> {error, E};
    170 start_error(E=eacces, _) -> {error, E};
    171 start_error(E=no_cert, _) -> {error, E};
    172 start_error(_, Error) -> Error.
    173 
    174 -spec stop_listener(ref()) -> ok | {error, not_found}.
    175 stop_listener(Ref) ->
    176 	case supervisor:terminate_child(ranch_sup, {ranch_listener_sup, Ref}) of
    177 		ok ->
    178 			_ = supervisor:delete_child(ranch_sup, {ranch_listener_sup, Ref}),
    179 			ranch_server:cleanup_listener_opts(Ref);
    180 		{error, Reason} ->
    181 			{error, Reason}
    182 	end.
    183 
    184 -spec suspend_listener(ref()) -> ok | {error, any()}.
    185 suspend_listener(Ref) ->
    186 	case get_status(Ref) of
    187 		running ->
    188 			ListenerSup = ranch_server:get_listener_sup(Ref),
    189 			ok = ranch_server:set_addr(Ref, {undefined, undefined}),
    190 			supervisor:terminate_child(ListenerSup, ranch_acceptors_sup);
    191 		suspended ->
    192 			ok
    193 	end.
    194 
    195 -spec resume_listener(ref()) -> ok | {error, any()}.
    196 resume_listener(Ref) ->
    197 	case get_status(Ref) of
    198 		running ->
    199 			ok;
    200 		suspended ->
    201 			ListenerSup = ranch_server:get_listener_sup(Ref),
    202 			Res = supervisor:restart_child(ListenerSup, ranch_acceptors_sup),
    203 			maybe_resumed(Res)
    204 	end.
    205 
    206 maybe_resumed(Error={error, {listen_error, _, Reason}}) ->
    207 	start_error(Reason, Error);
    208 maybe_resumed({ok, _}) ->
    209 	ok;
    210 maybe_resumed({ok, _, _}) ->
    211 	ok;
    212 maybe_resumed(Res) ->
    213 	Res.
    214 
    215 -spec child_spec(ref(), module(), opts(), module(), any())
    216 	-> supervisor:child_spec().
    217 child_spec(Ref, Transport, TransOpts0, Protocol, ProtoOpts) ->
    218 	TransOpts = normalize_opts(TransOpts0),
    219 	{{ranch_listener_sup, Ref}, {ranch_listener_sup, start_link, [
    220 		Ref, Transport, TransOpts, Protocol, ProtoOpts
    221 	]}, permanent, infinity, supervisor, [ranch_listener_sup]}.
    222 
    223 -spec child_spec(ref(), non_neg_integer(), module(), opts(), module(), any())
    224 	-> supervisor:child_spec().
    225 child_spec(Ref, NumAcceptors, Transport, TransOpts0, Protocol, ProtoOpts)
    226 		when is_integer(NumAcceptors), is_atom(Transport), is_atom(Protocol) ->
    227 	TransOpts = normalize_opts(TransOpts0),
    228 	child_spec(Ref, Transport, TransOpts#{num_acceptors => NumAcceptors},
    229 		Protocol, ProtoOpts).
    230 
    231 -spec accept_ack(ref()) -> ok.
    232 accept_ack(Ref) ->
    233 	{ok, _} = handshake(Ref),
    234 	ok.
    235 
    236 -spec handshake(ref()) -> {ok, ranch_transport:socket()}.
    237 handshake(Ref) ->
    238 	handshake(Ref, []).
    239 
    240 -spec handshake(ref(), any()) -> {ok, ranch_transport:socket()}.
    241 handshake(Ref, Opts) ->
    242 	receive {handshake, Ref, Transport, CSocket, HandshakeTimeout} ->
    243 		case Transport:handshake(CSocket, Opts, HandshakeTimeout) of
    244 			OK = {ok, _} ->
    245 				OK;
    246 			%% Garbage was most likely sent to the socket, don't error out.
    247 			{error, {tls_alert, _}} ->
    248 				ok = Transport:close(CSocket),
    249 				exit(normal);
    250 			%% Socket most likely stopped responding, don't error out.
    251 			{error, Reason} when Reason =:= timeout; Reason =:= closed ->
    252 				ok = Transport:close(CSocket),
    253 				exit(normal);
    254 			{error, Reason} ->
    255 				ok = Transport:close(CSocket),
    256 				error(Reason)
    257 		end
    258 	end.
    259 
    260 %% Unlike handshake/2 this function always return errors because
    261 %% the communication between the proxy and the server are expected
    262 %% to be reliable. If there is a problem while receiving the proxy
    263 %% header, we probably want to know about it.
    264 -spec recv_proxy_header(ref(), timeout())
    265 	-> {ok, ranch_proxy_header:proxy_info()}
    266 	| {error, closed | atom()}
    267 	| {error, protocol_error, atom()}.
    268 recv_proxy_header(Ref, Timeout) ->
    269 	receive HandshakeState={handshake, Ref, Transport, CSocket, _} ->
    270 		self() ! HandshakeState,
    271 		Transport:recv_proxy_header(CSocket, Timeout)
    272 	end.
    273 
    274 -spec remove_connection(ref()) -> ok.
    275 remove_connection(Ref) ->
    276 	ConnsSup = ranch_server:get_connections_sup(Ref),
    277 	ConnsSup ! {remove_connection, Ref, self()},
    278 	ok.
    279 
    280 -spec get_status(ref()) -> running | suspended.
    281 get_status(Ref) ->
    282 	ListenerSup = ranch_server:get_listener_sup(Ref),
    283 	Children = supervisor:which_children(ListenerSup),
    284 	case lists:keyfind(ranch_acceptors_sup, 1, Children) of
    285 		{_, undefined, _, _} ->
    286 			suspended;
    287 		_ ->
    288 			running
    289 	end.
    290 
    291 -spec get_addr(ref()) -> {inet:ip_address(), inet:port_number()} | {undefined, undefined}.
    292 get_addr(Ref) ->
    293 	ranch_server:get_addr(Ref).
    294 
    295 -spec get_port(ref()) -> inet:port_number() | undefined.
    296 get_port(Ref) ->
    297 	{_, Port} = get_addr(Ref),
    298 	Port.
    299 
    300 -spec get_max_connections(ref()) -> max_conns().
    301 get_max_connections(Ref) ->
    302 	ranch_server:get_max_connections(Ref).
    303 
    304 -spec set_max_connections(ref(), max_conns()) -> ok.
    305 set_max_connections(Ref, MaxConnections) ->
    306 	ranch_server:set_max_connections(Ref, MaxConnections).
    307 
    308 -spec get_transport_options(ref()) -> any().
    309 get_transport_options(Ref) ->
    310 	ranch_server:get_transport_options(Ref).
    311 
    312 -spec set_transport_options(ref(), opts()) -> ok | {error, running}.
    313 set_transport_options(Ref, TransOpts0) ->
    314 	TransOpts = normalize_opts(TransOpts0),
    315 	case get_status(Ref) of
    316 		suspended ->
    317 			ok = ranch_server:set_transport_options(Ref, TransOpts);
    318 		running ->
    319 			{error, running}
    320 	end.
    321 
    322 -spec get_protocol_options(ref()) -> opts().
    323 get_protocol_options(Ref) ->
    324 	ranch_server:get_protocol_options(Ref).
    325 
    326 -spec set_protocol_options(ref(), any()) -> ok.
    327 set_protocol_options(Ref, Opts) ->
    328 	ranch_server:set_protocol_options(Ref, Opts).
    329 
    330 -spec info() -> [{any(), [{atom(), any()}]}].
    331 info() ->
    332 	[{Ref, listener_info(Ref, Pid)}
    333 		|| {Ref, Pid} <- ranch_server:get_listener_sups()].
    334 
    335 -spec info(ref()) -> [{atom(), any()}].
    336 info(Ref) ->
    337 	Pid = ranch_server:get_listener_sup(Ref),
    338 	listener_info(Ref, Pid).
    339 
    340 listener_info(Ref, Pid) ->
    341 	[_, Transport, _, Protocol, _] = ranch_server:get_listener_start_args(Ref),
    342 	ConnsSup = ranch_server:get_connections_sup(Ref),
    343 	Status = get_status(Ref),
    344 	{IP, Port} = get_addr(Ref),
    345 	MaxConns = get_max_connections(Ref),
    346 	TransOpts = ranch_server:get_transport_options(Ref),
    347 	ProtoOpts = get_protocol_options(Ref),
    348 	[
    349 		{pid, Pid},
    350 		{status, Status},
    351 		{ip, IP},
    352 		{port, Port},
    353 		{max_connections, MaxConns},
    354 		{active_connections, ranch_conns_sup:active_connections(ConnsSup)},
    355 		{all_connections, proplists:get_value(active, supervisor:count_children(ConnsSup))},
    356 		{transport, Transport},
    357 		{transport_options, TransOpts},
    358 		{protocol, Protocol},
    359 		{protocol_options, ProtoOpts}
    360 	].
    361 
    362 -spec procs(ref(), acceptors | connections) -> [pid()].
    363 procs(Ref, acceptors) ->
    364 	procs1(Ref, ranch_acceptors_sup);
    365 procs(Ref, connections) ->
    366 	procs1(Ref, ranch_conns_sup).
    367 
    368 procs1(Ref, Sup) ->
    369 	ListenerSup = ranch_server:get_listener_sup(Ref),
    370 	{_, SupPid, _, _} = lists:keyfind(Sup, 1,
    371 		supervisor:which_children(ListenerSup)),
    372 	try
    373 		[Pid || {_, Pid, _, _} <- supervisor:which_children(SupPid)]
    374 	catch exit:{noproc, _} when Sup =:= ranch_acceptors_sup ->
    375 		[]
    376 	end.
    377 
    378 -spec wait_for_connections
    379 	(ref(), '>' | '>=' | '==' | '=<', non_neg_integer()) -> ok;
    380 	(ref(), '<', pos_integer()) -> ok.
    381 wait_for_connections(Ref, Op, NumConns) ->
    382 	wait_for_connections(Ref, Op, NumConns, 1000).
    383 
    384 -spec wait_for_connections
    385 	(ref(), '>' | '>=' | '==' | '=<', non_neg_integer(), non_neg_integer()) -> ok;
    386 	(ref(), '<', pos_integer(), non_neg_integer()) -> ok.
    387 wait_for_connections(Ref, Op, NumConns, Interval) ->
    388 	validate_op(Op, NumConns),
    389 	validate_num_conns(NumConns),
    390 	validate_interval(Interval),
    391 	wait_for_connections_loop(Ref, Op, NumConns, Interval).
    392 
    393 validate_op('>', _) -> ok;
    394 validate_op('>=', _) -> ok;
    395 validate_op('==', _) -> ok;
    396 validate_op('=<', _) -> ok;
    397 validate_op('<', NumConns) when NumConns > 0 -> ok;
    398 validate_op(_, _) -> error(badarg).
    399 
    400 validate_num_conns(NumConns) when is_integer(NumConns), NumConns >= 0 -> ok;
    401 validate_num_conns(_) -> error(badarg).
    402 
    403 validate_interval(Interval) when is_integer(Interval), Interval >= 0 -> ok;
    404 validate_interval(_) -> error(badarg).
    405 
    406 wait_for_connections_loop(Ref, Op, NumConns, Interval) ->
    407 	CurConns = try
    408 		ConnsSup = ranch_server:get_connections_sup(Ref),
    409 		proplists:get_value(active, supervisor:count_children(ConnsSup))
    410 	catch _:_ ->
    411 		0
    412 	end,
    413 	case erlang:Op(CurConns, NumConns) of
    414 		true ->
    415 			ok;
    416 		false when Interval =:= 0 ->
    417 			wait_for_connections_loop(Ref, Op, NumConns, Interval);
    418 		false ->
    419 			timer:sleep(Interval),
    420 			wait_for_connections_loop(Ref, Op, NumConns, Interval)
    421 	end.
    422 
    423 -spec filter_options([inet | inet6 | {atom(), any()} | {raw, any(), any(), any()}],
    424 	[atom()], Acc) -> Acc when Acc :: [any()].
    425 filter_options(UserOptions, DisallowedKeys, DefaultOptions) ->
    426 	AllowedOptions = filter_user_options(UserOptions, DisallowedKeys),
    427 	lists:foldl(fun merge_options/2, DefaultOptions, AllowedOptions).
    428 
    429 %% 2-tuple options.
    430 filter_user_options([Opt = {Key, _}|Tail], DisallowedKeys) ->
    431 	case lists:member(Key, DisallowedKeys) of
    432 		false ->
    433 			[Opt|filter_user_options(Tail, DisallowedKeys)];
    434 		true ->
    435 			filter_options_warning(Opt),
    436 			filter_user_options(Tail, DisallowedKeys)
    437 	end;
    438 %% Special option forms.
    439 filter_user_options([inet|Tail], DisallowedKeys) ->
    440 	[inet|filter_user_options(Tail, DisallowedKeys)];
    441 filter_user_options([inet6|Tail], DisallowedKeys) ->
    442 	[inet6|filter_user_options(Tail, DisallowedKeys)];
    443 filter_user_options([Opt = {raw, _, _, _}|Tail], DisallowedKeys) ->
    444 	[Opt|filter_user_options(Tail, DisallowedKeys)];
    445 filter_user_options([Opt|Tail], DisallowedKeys) ->
    446 	filter_options_warning(Opt),
    447 	filter_user_options(Tail, DisallowedKeys);
    448 filter_user_options([], _) ->
    449 	[].
    450 
    451 filter_options_warning(Opt) ->
    452 	Logger = case get(logger) of
    453 		undefined -> error_logger;
    454 		Logger0 -> Logger0
    455 	end,
    456 	log(warning,
    457 		"Transport option ~p unknown or invalid.~n",
    458 		[Opt], Logger).
    459 
    460 merge_options({Key, _} = Option, OptionList) ->
    461 	lists:keystore(Key, 1, OptionList, Option);
    462 merge_options(Option, OptionList) ->
    463 	[Option|OptionList].
    464 
    465 -spec set_option_default(Opts, atom(), any())
    466 	-> Opts when Opts :: [{atom(), any()}].
    467 set_option_default(Opts, Key, Value) ->
    468 	case lists:keymember(Key, 1, Opts) of
    469 		true -> Opts;
    470 		false -> [{Key, Value}|Opts]
    471 	end.
    472 
    473 -spec require([atom()]) -> ok.
    474 require([]) ->
    475 	ok;
    476 require([App|Tail]) ->
    477 	case application:start(App) of
    478 		ok -> ok;
    479 		{error, {already_started, App}} -> ok
    480 	end,
    481 	require(Tail).
    482 
    483 -spec log(logger:level(), io:format(), list(), module() | #{logger => module()}) -> ok.
    484 log(Level, Format, Args, Logger) when is_atom(Logger) ->
    485 	log(Level, Format, Args, #{logger => Logger});
    486 log(Level, Format, Args, #{logger := Logger})
    487 		when Logger =/= error_logger ->
    488 	_ = Logger:Level(Format, Args),
    489 	ok;
    490 %% We use error_logger by default. Because error_logger does
    491 %% not have all the levels we accept we have to do some
    492 %% mapping to error_logger functions.
    493 log(Level, Format, Args, _) ->
    494 	Function = case Level of
    495 		emergency -> error_msg;
    496 		alert -> error_msg;
    497 		critical -> error_msg;
    498 		error -> error_msg;
    499 		warning -> warning_msg;
    500 		notice -> warning_msg;
    501 		info -> info_msg;
    502 		debug -> info_msg
    503 	end,
    504 	error_logger:Function(Format, Args).