manager.ex (10437B)
1 defmodule DBConnection.Ownership.Manager do 2 @moduledoc false 3 use GenServer 4 require Logger 5 alias DBConnection.Ownership.Proxy 6 7 @timeout 5_000 8 9 @callback start_link({module, opts :: Keyword.t()}) :: 10 GenServer.on_start() 11 def start_link({module, opts}) do 12 {owner_opts, pool_opts} = Keyword.split(opts, [:name]) 13 GenServer.start_link(__MODULE__, {module, owner_opts, pool_opts}, owner_opts) 14 end 15 16 @callback disconnect_all(GenServer.server(), non_neg_integer, Keyword.t()) :: :ok 17 def disconnect_all(pool, interval, opts) do 18 inner_pool = GenServer.call(pool, :pool, :infinity) 19 DBConnection.ConnectionPool.disconnect_all(inner_pool, interval, opts) 20 end 21 22 @spec proxy_for(callers :: [pid], Keyword.t()) :: {caller :: pid, proxy :: pid} | nil 23 def proxy_for(callers, opts) do 24 case Keyword.fetch(opts, :name) do 25 {:ok, name} -> 26 Enum.find_value(callers, &List.first(:ets.lookup(name, &1))) 27 28 :error -> 29 nil 30 end 31 end 32 33 @spec checkout(GenServer.server(), Keyword.t()) :: 34 {:ok, pid} | {:already, :owner | :allowed} 35 def checkout(manager, opts) do 36 GenServer.call(manager, {:checkout, opts}, :infinity) 37 end 38 39 @spec checkin(GenServer.server(), Keyword.t()) :: 40 :ok | :not_owner | :not_found 41 def checkin(manager, opts) do 42 timeout = Keyword.get(opts, :timeout, @timeout) 43 GenServer.call(manager, :checkin, timeout) 44 end 45 46 @spec mode(GenServer.server(), :auto | :manual | {:shared, pid}, Keyword.t()) :: 47 :ok | :already_shared | :not_owner | :not_found 48 def mode(manager, mode, opts) 49 when mode in [:auto, :manual] 50 when elem(mode, 0) == :shared and is_pid(elem(mode, 1)) do 51 timeout = Keyword.get(opts, :timeout, @timeout) 52 GenServer.call(manager, {:mode, mode}, timeout) 53 end 54 55 @spec allow(GenServer.server(), parent :: pid, allow :: pid, Keyword.t()) :: 56 :ok | {:already, :owner | :allowed} | :not_found 57 def allow(manager, parent, allow, opts) do 58 timeout = Keyword.get(opts, :timeout, @timeout) 59 GenServer.call(manager, {:allow, parent, allow}, timeout) 60 end 61 62 ## Callbacks 63 64 def init({module, owner_opts, pool_opts}) do 65 DBConnection.register_as_pool(module) 66 67 ets = 68 case Keyword.fetch(owner_opts, :name) do 69 {:ok, name} when is_atom(name) -> 70 :ets.new(name, [:set, :named_table, :protected, read_concurrency: true]) 71 72 _ -> 73 nil 74 end 75 76 # We can only start the connection pool directly because 77 # neither the pool's GenServer nor the manager trap exits. 78 # Otherwise we would need a supervisor plus a watcher process. 79 pool_opts = Keyword.delete(pool_opts, :pool) 80 {:ok, pool} = DBConnection.start_link(module, pool_opts) 81 82 log = Keyword.get(pool_opts, :ownership_log, nil) 83 mode = Keyword.get(pool_opts, :ownership_mode, :auto) 84 checkout_opts = Keyword.take(pool_opts, [:ownership_timeout, :queue_target, :queue_interval]) 85 86 {:ok, 87 %{ 88 pool: pool, 89 checkouts: %{}, 90 owners: %{}, 91 checkout_opts: checkout_opts, 92 mode: mode, 93 mode_ref: nil, 94 ets: ets, 95 log: log 96 }} 97 end 98 99 def handle_call(:pool, _from, %{pool: pool} = state) do 100 {:reply, pool, state} 101 end 102 103 def handle_call({:mode, {:shared, shared}}, {caller, _}, %{mode: {:shared, current}} = state) do 104 cond do 105 shared == current -> 106 {:reply, :ok, state} 107 108 Process.alive?(current) -> 109 {:reply, :already_shared, state} 110 111 true -> 112 share_and_reply(state, shared, caller) 113 end 114 end 115 116 def handle_call({:mode, {:shared, shared}}, {caller, _}, state) do 117 share_and_reply(state, shared, caller) 118 end 119 120 def handle_call({:mode, mode}, _from, %{mode: mode} = state) do 121 {:reply, :ok, state} 122 end 123 124 def handle_call({:mode, mode}, {caller, _}, state) do 125 state = proxy_checkin_all_except(state, [], caller) 126 {:reply, :ok, %{state | mode: mode, mode_ref: nil}} 127 end 128 129 def handle_call(:checkin, {caller, _}, state) do 130 {reply, state} = proxy_checkin(state, caller, caller) 131 {:reply, reply, state} 132 end 133 134 def handle_call({:allow, caller, allow}, _from, %{checkouts: checkouts} = state) do 135 if kind = already_checked_out(checkouts, allow) do 136 {:reply, {:already, kind}, state} 137 else 138 case Map.get(checkouts, caller, :not_found) do 139 {:owner, ref, proxy} -> 140 {:reply, :ok, owner_allow(state, allow, ref, proxy)} 141 142 {:allowed, ref, proxy} -> 143 {:reply, :ok, owner_allow(state, allow, ref, proxy)} 144 145 :not_found -> 146 {:reply, :not_found, state} 147 end 148 end 149 end 150 151 def handle_call({:checkout, opts}, {caller, _}, %{checkouts: checkouts} = state) do 152 if kind = already_checked_out(checkouts, caller) do 153 {:reply, {:already, kind}, state} 154 else 155 {proxy, state} = proxy_checkout(state, caller, opts) 156 {:reply, {:ok, proxy}, state} 157 end 158 end 159 160 def handle_info({:db_connection, from, {:checkout, callers, _now, queue?}}, state) do 161 %{checkouts: checkouts, mode: mode, checkout_opts: checkout_opts} = state 162 caller = find_caller(callers, checkouts, mode) 163 164 case Map.get(checkouts, caller, :not_found) do 165 {status, _ref, proxy} when status in [:owner, :allowed] -> 166 DBConnection.Holder.reply_redirect(from, caller, proxy) 167 {:noreply, state} 168 169 :not_found when mode == :auto -> 170 {proxy, state} = proxy_checkout(state, caller, [queue: queue?] ++ checkout_opts) 171 DBConnection.Holder.reply_redirect(from, caller, proxy) 172 {:noreply, state} 173 174 :not_found when mode == :manual -> 175 not_found(from) 176 {:noreply, state} 177 178 :not_found -> 179 {:shared, shared} = mode 180 {:owner, _ref, proxy} = Map.fetch!(checkouts, shared) 181 DBConnection.Holder.reply_redirect(from, shared, proxy) 182 {:noreply, state} 183 end 184 end 185 186 def handle_info({:DOWN, ref, _, _, _}, state) do 187 {:noreply, state |> owner_down(ref) |> unshare(ref)} 188 end 189 190 def handle_info(_msg, state) do 191 {:noreply, state} 192 end 193 194 defp already_checked_out(checkouts, pid) do 195 case Map.get(checkouts, pid, :not_found) do 196 {:owner, _, _} -> :owner 197 {:allowed, _, _} -> :allowed 198 :not_found -> nil 199 end 200 end 201 202 defp proxy_checkout(state, caller, opts) do 203 %{pool: pool, checkouts: checkouts, owners: owners, ets: ets, log: log} = state 204 205 {:ok, proxy} = 206 DynamicSupervisor.start_child( 207 DBConnection.Ownership.Supervisor, 208 {DBConnection.Ownership.Proxy, {caller, pool, opts}} 209 ) 210 211 log && Logger.log(log, fn -> [inspect(caller), " owns proxy " | inspect(proxy)] end) 212 ref = Process.monitor(proxy) 213 checkouts = Map.put(checkouts, caller, {:owner, ref, proxy}) 214 owners = Map.put(owners, ref, {proxy, caller, []}) 215 ets && :ets.insert(ets, {caller, proxy}) 216 {proxy, %{state | checkouts: checkouts, owners: owners}} 217 end 218 219 defp proxy_checkin(state, maybe_owner, caller) do 220 case get_and_update_in(state.checkouts, &Map.pop(&1, maybe_owner, :not_found)) do 221 {{:owner, ref, proxy}, state} -> 222 Proxy.stop(proxy, caller) 223 {:ok, state |> owner_down(ref) |> unshare(ref)} 224 225 {{:allowed, _, _}, _} -> 226 {:not_owner, state} 227 228 {:not_found, _} -> 229 {:not_found, state} 230 end 231 end 232 233 defp proxy_checkin_all_except(state, except, caller) do 234 Enum.reduce(state.checkouts, state, fn {pid, _}, state -> 235 if pid in except do 236 state 237 else 238 {_, state} = proxy_checkin(state, pid, caller) 239 state 240 end 241 end) 242 end 243 244 defp owner_allow(%{ets: ets, log: log} = state, allow, ref, proxy) do 245 log && Logger.log(log, fn -> [inspect(allow), " allowed on proxy " | inspect(proxy)] end) 246 state = put_in(state.checkouts[allow], {:allowed, ref, proxy}) 247 248 state = 249 update_in(state.owners[ref], fn {proxy, caller, allowed} -> 250 {proxy, caller, [allow | List.delete(allowed, allow)]} 251 end) 252 253 ets && :ets.insert(ets, {allow, proxy}) 254 state 255 end 256 257 defp owner_down(%{ets: ets, log: log} = state, ref) do 258 case get_and_update_in(state.owners, &Map.pop(&1, ref)) do 259 {{proxy, caller, allowed}, state} -> 260 Process.demonitor(ref, [:flush]) 261 entries = [caller | allowed] 262 263 log && 264 Logger.log(log, fn -> 265 [Enum.map_join(entries, ", ", &inspect/1), " lose proxy " | inspect(proxy)] 266 end) 267 268 ets && Enum.each(entries, &:ets.delete(ets, &1)) 269 update_in(state.checkouts, &Map.drop(&1, entries)) 270 271 {nil, state} -> 272 state 273 end 274 end 275 276 defp share_and_reply(%{checkouts: checkouts} = state, shared, caller) do 277 case Map.get(checkouts, shared, :not_found) do 278 {:owner, ref, _} -> 279 state = proxy_checkin_all_except(state, [shared], caller) 280 {:reply, :ok, %{state | mode: {:shared, shared}, mode_ref: ref}} 281 282 {:allowed, _, _} -> 283 {:reply, :not_owner, state} 284 285 :not_found -> 286 {:reply, :not_found, state} 287 end 288 end 289 290 defp unshare(%{mode_ref: ref} = state, ref) do 291 %{state | mode: :manual, mode_ref: nil} 292 end 293 294 defp unshare(state, _ref) do 295 state 296 end 297 298 defp find_caller(callers, checkouts, :manual) do 299 Enum.find(callers, &Map.has_key?(checkouts, &1)) || hd(callers) 300 end 301 302 defp find_caller([caller | _], _checkouts, _mode) do 303 caller 304 end 305 306 defp not_found({pid, _} = from) do 307 msg = """ 308 cannot find ownership process for #{inspect(pid)}. 309 310 When using ownership, you must manage connections in one 311 of the four ways: 312 313 * By explicitly checking out a connection 314 * By explicitly allowing a spawned process 315 * By running the pool in shared mode 316 * By using :caller option with allowed process 317 318 The first two options require every new process to explicitly 319 check a connection out or be allowed by calling checkout or 320 allow respectively. 321 322 The third option requires a {:shared, pid} mode to be set. 323 If using shared mode in tests, make sure your tests are not 324 async. 325 326 The fourth option requires [caller: pid] to be used when 327 checking out a connection from the pool. The caller process 328 should already be allowed on a connection. 329 330 If you are reading this error, it means you have not done one 331 of the steps above or that the owner process has crashed. 332 """ 333 334 DBConnection.Holder.reply_error(from, DBConnection.OwnershipError.exception(msg)) 335 end 336 end