zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

ranch_server.erl (8534B)


      1 %% Copyright (c) 2012-2018, Loïc Hoguin <essen@ninenines.eu>
      2 %%
      3 %% Permission to use, copy, modify, and/or distribute this software for any
      4 %% purpose with or without fee is hereby granted, provided that the above
      5 %% copyright notice and this permission notice appear in all copies.
      6 %%
      7 %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      8 %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
      9 %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     10 %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     11 %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     12 %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     13 %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     14 
     15 -module(ranch_server).
     16 -behaviour(gen_server).
     17 
     18 %% API.
     19 -export([start_link/0]).
     20 -export([set_new_listener_opts/5]).
     21 -export([cleanup_listener_opts/1]).
     22 -export([set_connections_sup/2]).
     23 -export([get_connections_sup/1]).
     24 -export([get_connections_sups/0]).
     25 -export([set_listener_sup/2]).
     26 -export([get_listener_sup/1]).
     27 -export([get_listener_sups/0]).
     28 -export([set_addr/2]).
     29 -export([get_addr/1]).
     30 -export([set_max_connections/2]).
     31 -export([get_max_connections/1]).
     32 -export([set_transport_options/2]).
     33 -export([get_transport_options/1]).
     34 -export([set_protocol_options/2]).
     35 -export([get_protocol_options/1]).
     36 -export([get_listener_start_args/1]).
     37 -export([count_connections/1]).
     38 
     39 %% gen_server.
     40 -export([init/1]).
     41 -export([handle_call/3]).
     42 -export([handle_cast/2]).
     43 -export([handle_info/2]).
     44 -export([terminate/2]).
     45 -export([code_change/3]).
     46 
     47 -define(TAB, ?MODULE).
     48 
     49 -type monitors() :: [{{reference(), pid()}, any()}].
     50 -record(state, {
     51 	monitors = [] :: monitors()
     52 }).
     53 
     54 %% API.
     55 
     56 -spec start_link() -> {ok, pid()}.
     57 start_link() ->
     58 	gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
     59 
     60 -spec set_new_listener_opts(ranch:ref(), ranch:max_conns(), any(), any(), [any()]) -> ok.
     61 set_new_listener_opts(Ref, MaxConns, TransOpts, ProtoOpts, StartArgs) ->
     62 	gen_server:call(?MODULE, {set_new_listener_opts, Ref, MaxConns, TransOpts, ProtoOpts, StartArgs}).
     63 
     64 -spec cleanup_listener_opts(ranch:ref()) -> ok.
     65 cleanup_listener_opts(Ref) ->
     66 	_ = ets:delete(?TAB, {addr, Ref}),
     67 	_ = ets:delete(?TAB, {max_conns, Ref}),
     68 	_ = ets:delete(?TAB, {trans_opts, Ref}),
     69 	_ = ets:delete(?TAB, {proto_opts, Ref}),
     70 	_ = ets:delete(?TAB, {listener_start_args, Ref}),
     71 	%% We also remove the pid of the connections supervisor.
     72 	%% Depending on the timing, it might already have been deleted
     73 	%% when we handled the monitor DOWN message. However, in some
     74 	%% cases when calling stop_listener followed by get_connections_sup,
     75 	%% we could end up with the pid still being returned, when we
     76 	%% expected a crash (because the listener was stopped).
     77 	%% Deleting it explictly here removes any possible confusion.
     78 	_ = ets:delete(?TAB, {conns_sup, Ref}),
     79 	%% Ditto for the listener supervisor.
     80 	_ = ets:delete(?TAB, {listener_sup, Ref}),
     81 	ok.
     82 
     83 -spec set_connections_sup(ranch:ref(), pid()) -> ok.
     84 set_connections_sup(Ref, Pid) ->
     85 	gen_server:call(?MODULE, {set_connections_sup, Ref, Pid}).
     86 
     87 -spec get_connections_sup(ranch:ref()) -> pid().
     88 get_connections_sup(Ref) ->
     89 	ets:lookup_element(?TAB, {conns_sup, Ref}, 2).
     90 
     91 -spec get_connections_sups() -> [{ranch:ref(), pid()}].
     92 get_connections_sups() ->
     93 	[{Ref, Pid} || [Ref, Pid] <- ets:match(?TAB, {{conns_sup, '$1'}, '$2'})].
     94 
     95 -spec set_listener_sup(ranch:ref(), pid()) -> ok.
     96 set_listener_sup(Ref, Pid) ->
     97 	gen_server:call(?MODULE, {set_listener_sup, Ref, Pid}).
     98 
     99 -spec get_listener_sup(ranch:ref()) -> pid().
    100 get_listener_sup(Ref) ->
    101 	ets:lookup_element(?TAB, {listener_sup, Ref}, 2).
    102 
    103 -spec get_listener_sups() -> [{ranch:ref(), pid()}].
    104 get_listener_sups() ->
    105 	[{Ref, Pid} || [Ref, Pid] <- ets:match(?TAB, {{listener_sup, '$1'}, '$2'})].
    106 
    107 -spec set_addr(ranch:ref(), {inet:ip_address(), inet:port_number()} | {undefined, undefined}) -> ok.
    108 set_addr(Ref, Addr) ->
    109 	gen_server:call(?MODULE, {set_addr, Ref, Addr}).
    110 
    111 -spec get_addr(ranch:ref()) -> {inet:ip_address(), inet:port_number()} | {undefined, undefined}.
    112 get_addr(Ref) ->
    113 	ets:lookup_element(?TAB, {addr, Ref}, 2).
    114 
    115 -spec set_max_connections(ranch:ref(), ranch:max_conns()) -> ok.
    116 set_max_connections(Ref, MaxConnections) ->
    117 	gen_server:call(?MODULE, {set_max_conns, Ref, MaxConnections}).
    118 
    119 -spec get_max_connections(ranch:ref()) -> ranch:max_conns().
    120 get_max_connections(Ref) ->
    121 	ets:lookup_element(?TAB, {max_conns, Ref}, 2).
    122 
    123 -spec set_transport_options(ranch:ref(), any()) -> ok.
    124 set_transport_options(Ref, TransOpts) ->
    125 	gen_server:call(?MODULE, {set_trans_opts, Ref, TransOpts}).
    126 
    127 -spec get_transport_options(ranch:ref()) -> any().
    128 get_transport_options(Ref) ->
    129 	ets:lookup_element(?TAB, {trans_opts, Ref}, 2).
    130 
    131 -spec set_protocol_options(ranch:ref(), any()) -> ok.
    132 set_protocol_options(Ref, ProtoOpts) ->
    133 	gen_server:call(?MODULE, {set_proto_opts, Ref, ProtoOpts}).
    134 
    135 -spec get_protocol_options(ranch:ref()) -> any().
    136 get_protocol_options(Ref) ->
    137 	ets:lookup_element(?TAB, {proto_opts, Ref}, 2).
    138 
    139 -spec get_listener_start_args(ranch:ref()) -> [any()].
    140 get_listener_start_args(Ref) ->
    141 	ets:lookup_element(?TAB, {listener_start_args, Ref}, 2).
    142 
    143 -spec count_connections(ranch:ref()) -> non_neg_integer().
    144 count_connections(Ref) ->
    145 	ranch_conns_sup:active_connections(get_connections_sup(Ref)).
    146 
    147 %% gen_server.
    148 
    149 init([]) ->
    150 	ConnMonitors = [{{erlang:monitor(process, Pid), Pid}, {conns_sup, Ref}} ||
    151 		[Ref, Pid] <- ets:match(?TAB, {{conns_sup, '$1'}, '$2'})],
    152 	ListenerMonitors = [{{erlang:monitor(process, Pid), Pid}, {listener_sup, Ref}} ||
    153 		[Ref, Pid] <- ets:match(?TAB, {{listener_sup, '$1'}, '$2'})],
    154 	{ok, #state{monitors=ConnMonitors++ListenerMonitors}}.
    155 
    156 handle_call({set_new_listener_opts, Ref, MaxConns, TransOpts, ProtoOpts, StartArgs}, _, State) ->
    157 	ets:insert_new(?TAB, {{max_conns, Ref}, MaxConns}),
    158 	ets:insert_new(?TAB, {{trans_opts, Ref}, TransOpts}),
    159 	ets:insert_new(?TAB, {{proto_opts, Ref}, ProtoOpts}),
    160 	ets:insert_new(?TAB, {{listener_start_args, Ref}, StartArgs}),
    161 	{reply, ok, State};
    162 handle_call({set_connections_sup, Ref, Pid}, _, State0) ->
    163 	State = set_monitored_process({conns_sup, Ref}, Pid, State0),
    164 	{reply, ok, State};
    165 handle_call({set_listener_sup, Ref, Pid}, _, State0) ->
    166 	State = set_monitored_process({listener_sup, Ref}, Pid, State0),
    167 	{reply, ok, State};
    168 handle_call({set_addr, Ref, Addr}, _, State) ->
    169 	true = ets:insert(?TAB, {{addr, Ref}, Addr}),
    170 	{reply, ok, State};
    171 handle_call({set_max_conns, Ref, MaxConns}, _, State) ->
    172 	ets:insert(?TAB, {{max_conns, Ref}, MaxConns}),
    173 	ConnsSup = get_connections_sup(Ref),
    174 	ConnsSup ! {set_max_conns, MaxConns},
    175 	{reply, ok, State};
    176 handle_call({set_trans_opts, Ref, Opts}, _, State) ->
    177 	ets:insert(?TAB, {{trans_opts, Ref}, Opts}),
    178 	{reply, ok, State};
    179 handle_call({set_proto_opts, Ref, Opts}, _, State) ->
    180 	ets:insert(?TAB, {{proto_opts, Ref}, Opts}),
    181 	ConnsSup = get_connections_sup(Ref),
    182 	ConnsSup ! {set_opts, Opts},
    183 	{reply, ok, State};
    184 handle_call(_Request, _From, State) ->
    185 	{reply, ignore, State}.
    186 
    187 handle_cast(_Request, State) ->
    188 	{noreply, State}.
    189 
    190 handle_info({'DOWN', MonitorRef, process, Pid, Reason},
    191 		State=#state{monitors=Monitors}) ->
    192 	{_, TypeRef} = lists:keyfind({MonitorRef, Pid}, 1, Monitors),
    193 	ok = case {TypeRef, Reason} of
    194 		{{listener_sup, Ref}, normal} ->
    195 			cleanup_listener_opts(Ref);
    196 		{{listener_sup, Ref}, shutdown} ->
    197 			cleanup_listener_opts(Ref);
    198 		{{listener_sup, Ref}, {shutdown, _}} ->
    199 			cleanup_listener_opts(Ref);
    200 		_ ->
    201 			_ = ets:delete(?TAB, TypeRef),
    202 			ok
    203 	end,
    204 	Monitors2 = lists:keydelete({MonitorRef, Pid}, 1, Monitors),
    205 	{noreply, State#state{monitors=Monitors2}};
    206 handle_info(_Info, State) ->
    207 	{noreply, State}.
    208 
    209 terminate(_Reason, _State) ->
    210 	ok.
    211 
    212 code_change(_OldVsn, State, _Extra) ->
    213 	{ok, State}.
    214 
    215 %% Internal.
    216 
    217 set_monitored_process(Key, Pid, State=#state{monitors=Monitors0}) ->
    218 	%% First we cleanup the monitor if a residual one exists.
    219 	%% This can happen during crashes when the restart is faster
    220 	%% than the cleanup.
    221 	Monitors = case lists:keytake(Key, 2, Monitors0) of
    222 		false ->
    223 			Monitors0;
    224 		{value, {{OldMonitorRef, _}, _}, Monitors1} ->
    225 			true = erlang:demonitor(OldMonitorRef, [flush]),
    226 			Monitors1
    227 	end,
    228 	%% Then we unconditionally insert in the ets table.
    229 	%% If residual data is there, it will be overwritten.
    230 	true = ets:insert(?TAB, {Key, Pid}),
    231 	%% Finally we start monitoring this new process.
    232 	MonitorRef = erlang:monitor(process, Pid),
    233 	State#state{monitors=[{{MonitorRef, Pid}, Key}|Monitors]}.