zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

manager.ex (10437B)


      1 defmodule DBConnection.Ownership.Manager do
      2   @moduledoc false
      3   use GenServer
      4   require Logger
      5   alias DBConnection.Ownership.Proxy
      6 
      7   @timeout 5_000
      8 
      9   @callback start_link({module, opts :: Keyword.t()}) ::
     10               GenServer.on_start()
     11   def start_link({module, opts}) do
     12     {owner_opts, pool_opts} = Keyword.split(opts, [:name])
     13     GenServer.start_link(__MODULE__, {module, owner_opts, pool_opts}, owner_opts)
     14   end
     15 
     16   @callback disconnect_all(GenServer.server(), non_neg_integer, Keyword.t()) :: :ok
     17   def disconnect_all(pool, interval, opts) do
     18     inner_pool = GenServer.call(pool, :pool, :infinity)
     19     DBConnection.ConnectionPool.disconnect_all(inner_pool, interval, opts)
     20   end
     21 
     22   @spec proxy_for(callers :: [pid], Keyword.t()) :: {caller :: pid, proxy :: pid} | nil
     23   def proxy_for(callers, opts) do
     24     case Keyword.fetch(opts, :name) do
     25       {:ok, name} ->
     26         Enum.find_value(callers, &List.first(:ets.lookup(name, &1)))
     27 
     28       :error ->
     29         nil
     30     end
     31   end
     32 
     33   @spec checkout(GenServer.server(), Keyword.t()) ::
     34           {:ok, pid} | {:already, :owner | :allowed}
     35   def checkout(manager, opts) do
     36     GenServer.call(manager, {:checkout, opts}, :infinity)
     37   end
     38 
     39   @spec checkin(GenServer.server(), Keyword.t()) ::
     40           :ok | :not_owner | :not_found
     41   def checkin(manager, opts) do
     42     timeout = Keyword.get(opts, :timeout, @timeout)
     43     GenServer.call(manager, :checkin, timeout)
     44   end
     45 
     46   @spec mode(GenServer.server(), :auto | :manual | {:shared, pid}, Keyword.t()) ::
     47           :ok | :already_shared | :not_owner | :not_found
     48   def mode(manager, mode, opts)
     49       when mode in [:auto, :manual]
     50       when elem(mode, 0) == :shared and is_pid(elem(mode, 1)) do
     51     timeout = Keyword.get(opts, :timeout, @timeout)
     52     GenServer.call(manager, {:mode, mode}, timeout)
     53   end
     54 
     55   @spec allow(GenServer.server(), parent :: pid, allow :: pid, Keyword.t()) ::
     56           :ok | {:already, :owner | :allowed} | :not_found
     57   def allow(manager, parent, allow, opts) do
     58     timeout = Keyword.get(opts, :timeout, @timeout)
     59     GenServer.call(manager, {:allow, parent, allow}, timeout)
     60   end
     61 
     62   ## Callbacks
     63 
     64   def init({module, owner_opts, pool_opts}) do
     65     DBConnection.register_as_pool(module)
     66 
     67     ets =
     68       case Keyword.fetch(owner_opts, :name) do
     69         {:ok, name} when is_atom(name) ->
     70           :ets.new(name, [:set, :named_table, :protected, read_concurrency: true])
     71 
     72         _ ->
     73           nil
     74       end
     75 
     76     # We can only start the connection pool directly because
     77     # neither the pool's GenServer nor the manager trap exits.
     78     # Otherwise we would need a supervisor plus a watcher process.
     79     pool_opts = Keyword.delete(pool_opts, :pool)
     80     {:ok, pool} = DBConnection.start_link(module, pool_opts)
     81 
     82     log = Keyword.get(pool_opts, :ownership_log, nil)
     83     mode = Keyword.get(pool_opts, :ownership_mode, :auto)
     84     checkout_opts = Keyword.take(pool_opts, [:ownership_timeout, :queue_target, :queue_interval])
     85 
     86     {:ok,
     87      %{
     88        pool: pool,
     89        checkouts: %{},
     90        owners: %{},
     91        checkout_opts: checkout_opts,
     92        mode: mode,
     93        mode_ref: nil,
     94        ets: ets,
     95        log: log
     96      }}
     97   end
     98 
     99   def handle_call(:pool, _from, %{pool: pool} = state) do
    100     {:reply, pool, state}
    101   end
    102 
    103   def handle_call({:mode, {:shared, shared}}, {caller, _}, %{mode: {:shared, current}} = state) do
    104     cond do
    105       shared == current ->
    106         {:reply, :ok, state}
    107 
    108       Process.alive?(current) ->
    109         {:reply, :already_shared, state}
    110 
    111       true ->
    112         share_and_reply(state, shared, caller)
    113     end
    114   end
    115 
    116   def handle_call({:mode, {:shared, shared}}, {caller, _}, state) do
    117     share_and_reply(state, shared, caller)
    118   end
    119 
    120   def handle_call({:mode, mode}, _from, %{mode: mode} = state) do
    121     {:reply, :ok, state}
    122   end
    123 
    124   def handle_call({:mode, mode}, {caller, _}, state) do
    125     state = proxy_checkin_all_except(state, [], caller)
    126     {:reply, :ok, %{state | mode: mode, mode_ref: nil}}
    127   end
    128 
    129   def handle_call(:checkin, {caller, _}, state) do
    130     {reply, state} = proxy_checkin(state, caller, caller)
    131     {:reply, reply, state}
    132   end
    133 
    134   def handle_call({:allow, caller, allow}, _from, %{checkouts: checkouts} = state) do
    135     if kind = already_checked_out(checkouts, allow) do
    136       {:reply, {:already, kind}, state}
    137     else
    138       case Map.get(checkouts, caller, :not_found) do
    139         {:owner, ref, proxy} ->
    140           {:reply, :ok, owner_allow(state, allow, ref, proxy)}
    141 
    142         {:allowed, ref, proxy} ->
    143           {:reply, :ok, owner_allow(state, allow, ref, proxy)}
    144 
    145         :not_found ->
    146           {:reply, :not_found, state}
    147       end
    148     end
    149   end
    150 
    151   def handle_call({:checkout, opts}, {caller, _}, %{checkouts: checkouts} = state) do
    152     if kind = already_checked_out(checkouts, caller) do
    153       {:reply, {:already, kind}, state}
    154     else
    155       {proxy, state} = proxy_checkout(state, caller, opts)
    156       {:reply, {:ok, proxy}, state}
    157     end
    158   end
    159 
    160   def handle_info({:db_connection, from, {:checkout, callers, _now, queue?}}, state) do
    161     %{checkouts: checkouts, mode: mode, checkout_opts: checkout_opts} = state
    162     caller = find_caller(callers, checkouts, mode)
    163 
    164     case Map.get(checkouts, caller, :not_found) do
    165       {status, _ref, proxy} when status in [:owner, :allowed] ->
    166         DBConnection.Holder.reply_redirect(from, caller, proxy)
    167         {:noreply, state}
    168 
    169       :not_found when mode == :auto ->
    170         {proxy, state} = proxy_checkout(state, caller, [queue: queue?] ++ checkout_opts)
    171         DBConnection.Holder.reply_redirect(from, caller, proxy)
    172         {:noreply, state}
    173 
    174       :not_found when mode == :manual ->
    175         not_found(from)
    176         {:noreply, state}
    177 
    178       :not_found ->
    179         {:shared, shared} = mode
    180         {:owner, _ref, proxy} = Map.fetch!(checkouts, shared)
    181         DBConnection.Holder.reply_redirect(from, shared, proxy)
    182         {:noreply, state}
    183     end
    184   end
    185 
    186   def handle_info({:DOWN, ref, _, _, _}, state) do
    187     {:noreply, state |> owner_down(ref) |> unshare(ref)}
    188   end
    189 
    190   def handle_info(_msg, state) do
    191     {:noreply, state}
    192   end
    193 
    194   defp already_checked_out(checkouts, pid) do
    195     case Map.get(checkouts, pid, :not_found) do
    196       {:owner, _, _} -> :owner
    197       {:allowed, _, _} -> :allowed
    198       :not_found -> nil
    199     end
    200   end
    201 
    202   defp proxy_checkout(state, caller, opts) do
    203     %{pool: pool, checkouts: checkouts, owners: owners, ets: ets, log: log} = state
    204 
    205     {:ok, proxy} =
    206       DynamicSupervisor.start_child(
    207         DBConnection.Ownership.Supervisor,
    208         {DBConnection.Ownership.Proxy, {caller, pool, opts}}
    209       )
    210 
    211     log && Logger.log(log, fn -> [inspect(caller), " owns proxy " | inspect(proxy)] end)
    212     ref = Process.monitor(proxy)
    213     checkouts = Map.put(checkouts, caller, {:owner, ref, proxy})
    214     owners = Map.put(owners, ref, {proxy, caller, []})
    215     ets && :ets.insert(ets, {caller, proxy})
    216     {proxy, %{state | checkouts: checkouts, owners: owners}}
    217   end
    218 
    219   defp proxy_checkin(state, maybe_owner, caller) do
    220     case get_and_update_in(state.checkouts, &Map.pop(&1, maybe_owner, :not_found)) do
    221       {{:owner, ref, proxy}, state} ->
    222         Proxy.stop(proxy, caller)
    223         {:ok, state |> owner_down(ref) |> unshare(ref)}
    224 
    225       {{:allowed, _, _}, _} ->
    226         {:not_owner, state}
    227 
    228       {:not_found, _} ->
    229         {:not_found, state}
    230     end
    231   end
    232 
    233   defp proxy_checkin_all_except(state, except, caller) do
    234     Enum.reduce(state.checkouts, state, fn {pid, _}, state ->
    235       if pid in except do
    236         state
    237       else
    238         {_, state} = proxy_checkin(state, pid, caller)
    239         state
    240       end
    241     end)
    242   end
    243 
    244   defp owner_allow(%{ets: ets, log: log} = state, allow, ref, proxy) do
    245     log && Logger.log(log, fn -> [inspect(allow), " allowed on proxy " | inspect(proxy)] end)
    246     state = put_in(state.checkouts[allow], {:allowed, ref, proxy})
    247 
    248     state =
    249       update_in(state.owners[ref], fn {proxy, caller, allowed} ->
    250         {proxy, caller, [allow | List.delete(allowed, allow)]}
    251       end)
    252 
    253     ets && :ets.insert(ets, {allow, proxy})
    254     state
    255   end
    256 
    257   defp owner_down(%{ets: ets, log: log} = state, ref) do
    258     case get_and_update_in(state.owners, &Map.pop(&1, ref)) do
    259       {{proxy, caller, allowed}, state} ->
    260         Process.demonitor(ref, [:flush])
    261         entries = [caller | allowed]
    262 
    263         log &&
    264           Logger.log(log, fn ->
    265             [Enum.map_join(entries, ", ", &inspect/1), " lose proxy " | inspect(proxy)]
    266           end)
    267 
    268         ets && Enum.each(entries, &:ets.delete(ets, &1))
    269         update_in(state.checkouts, &Map.drop(&1, entries))
    270 
    271       {nil, state} ->
    272         state
    273     end
    274   end
    275 
    276   defp share_and_reply(%{checkouts: checkouts} = state, shared, caller) do
    277     case Map.get(checkouts, shared, :not_found) do
    278       {:owner, ref, _} ->
    279         state = proxy_checkin_all_except(state, [shared], caller)
    280         {:reply, :ok, %{state | mode: {:shared, shared}, mode_ref: ref}}
    281 
    282       {:allowed, _, _} ->
    283         {:reply, :not_owner, state}
    284 
    285       :not_found ->
    286         {:reply, :not_found, state}
    287     end
    288   end
    289 
    290   defp unshare(%{mode_ref: ref} = state, ref) do
    291     %{state | mode: :manual, mode_ref: nil}
    292   end
    293 
    294   defp unshare(state, _ref) do
    295     state
    296   end
    297 
    298   defp find_caller(callers, checkouts, :manual) do
    299     Enum.find(callers, &Map.has_key?(checkouts, &1)) || hd(callers)
    300   end
    301 
    302   defp find_caller([caller | _], _checkouts, _mode) do
    303     caller
    304   end
    305 
    306   defp not_found({pid, _} = from) do
    307     msg = """
    308     cannot find ownership process for #{inspect(pid)}.
    309 
    310     When using ownership, you must manage connections in one
    311     of the four ways:
    312 
    313     * By explicitly checking out a connection
    314     * By explicitly allowing a spawned process
    315     * By running the pool in shared mode
    316     * By using :caller option with allowed process
    317 
    318     The first two options require every new process to explicitly
    319     check a connection out or be allowed by calling checkout or
    320     allow respectively.
    321 
    322     The third option requires a {:shared, pid} mode to be set.
    323     If using shared mode in tests, make sure your tests are not
    324     async.
    325 
    326     The fourth option requires [caller: pid] to be used when
    327     checking out a connection from the pool. The caller process
    328     should already be allowed on a connection.
    329 
    330     If you are reading this error, it means you have not done one
    331     of the steps above or that the owner process has crashed.
    332     """
    333 
    334     DBConnection.Holder.reply_error(from, DBConnection.OwnershipError.exception(msg))
    335   end
    336 end