zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

holder.ex (13031B)


      1 defmodule DBConnection.Holder do
      2   @moduledoc false
      3   require Record
      4 
      5   @queue true
      6   @timeout 15000
      7   @time_unit 1000
      8 
      9   Record.defrecord(:conn, [:connection, :module, :state, :lock, :ts, deadline: nil, status: :ok])
     10   Record.defrecord(:pool_ref, [:pool, :reference, :deadline, :holder, :lock])
     11 
     12   @type t :: :ets.tid()
     13   @type checkin_time :: non_neg_integer() | nil
     14 
     15   ## Holder API
     16 
     17   @spec new(pid, reference, module, term) :: t
     18   def new(pool, ref, mod, state) do
     19     # Insert before setting heir so that pool can't receive empty table
     20     holder = :ets.new(__MODULE__, [:public, :ordered_set])
     21 
     22     conn = conn(connection: self(), module: mod, state: state, ts: System.monotonic_time())
     23     true = :ets.insert_new(holder, conn)
     24 
     25     :ets.setopts(holder, {:heir, pool, ref})
     26     holder
     27   end
     28 
     29   @spec update(pid, reference, module, term) :: {:ok, t} | :error
     30   def update(pool, ref, mod, state) do
     31     holder = new(pool, ref, mod, state)
     32 
     33     try do
     34       :ets.give_away(holder, pool, {:checkin, ref, System.monotonic_time()})
     35       {:ok, holder}
     36     rescue
     37       ArgumentError -> :error
     38     end
     39   end
     40 
     41   @spec delete(t) :: {module, term}
     42   def delete(holder) do
     43     [conn(module: module, state: state)] = :ets.lookup(holder, :conn)
     44     :ets.delete(holder)
     45     {module, state}
     46   end
     47 
     48   ## Pool API (invoked by caller)
     49 
     50   @callback checkout(pool :: GenServer.server(), [pid], opts :: Keyword.t()) ::
     51               {:ok, pool_ref :: any, module, checkin_time, state :: any}
     52               | {:error, Exception.t()}
     53   def checkout(pool, callers, opts) do
     54     queue? = Keyword.get(opts, :queue, @queue)
     55     now = System.monotonic_time(@time_unit)
     56     timeout = abs_timeout(now, opts)
     57 
     58     case checkout(pool, callers, queue?, now, timeout) do
     59       {:ok, _, _, _, _} = ok ->
     60         ok
     61 
     62       {:error, %DBConnection.ConnectionError{} = connection_error} = error ->
     63         :telemetry.execute(
     64           [:db_connection, :connection_error],
     65           %{count: 1},
     66           %{
     67             error: connection_error,
     68             opts: opts
     69           }
     70         )
     71 
     72         error
     73 
     74       {:error, _} = error ->
     75         error
     76 
     77       {:redirect, caller, proxy} ->
     78         case checkout(proxy, [caller], opts) do
     79           {:ok, _, _, _, _} = ok ->
     80             ok
     81 
     82           {:error, %DBConnection.ConnectionError{message: message} = exception} ->
     83             {:error,
     84              %{
     85                exception
     86                | message:
     87                    "could not checkout the connection owned by #{inspect(caller)}. " <>
     88                      "When using the sandbox, connections are shared, so this may imply " <>
     89                      "another process is using a connection. Reason: #{message}"
     90              }}
     91 
     92           {:error, _} = error ->
     93             error
     94         end
     95 
     96       {:exit, reason} ->
     97         exit({reason, {__MODULE__, :checkout, [pool, opts]}})
     98     end
     99   end
    100 
    101   @spec checkin(pool_ref :: any) :: :ok
    102   def checkin(pool_ref) do
    103     # Note we may call checkin after a disconnect/stop. For this reason, we choose
    104     # to not change the status on checkin but strictly speaking nobody can access
    105     # the holder after disconnect/stop unless they store a copy of %DBConnection{}.
    106     # Note status can't be :aborted as aborted is always reverted at the end of a
    107     # transaction.
    108     done(pool_ref, [{conn(:lock) + 1, nil}], :checkin, System.monotonic_time())
    109   end
    110 
    111   @spec disconnect(pool_ref :: any, err :: Exception.t()) :: :ok
    112   def disconnect(pool_ref, err) do
    113     done(pool_ref, [{conn(:status) + 1, :error}], :disconnect, err)
    114   end
    115 
    116   @spec stop(pool_ref :: any, err :: Exception.t()) :: :ok
    117   def stop(pool_ref, err) do
    118     done(pool_ref, [{conn(:status) + 1, :error}], :stop, err)
    119   end
    120 
    121   @spec handle(pool_ref :: any, fun :: atom, args :: [term], Keyword.t()) :: tuple
    122   def handle(pool_ref, fun, args, opts) do
    123     handle_or_cleanup(:handle, pool_ref, fun, args, opts)
    124   end
    125 
    126   @spec cleanup(pool_ref :: any, fun :: atom, args :: [term], Keyword.t()) :: tuple
    127   def cleanup(pool_ref, fun, args, opts) do
    128     handle_or_cleanup(:cleanup, pool_ref, fun, args, opts)
    129   end
    130 
    131   defp handle_or_cleanup(type, pool_ref, fun, args, opts) do
    132     pool_ref(holder: holder, lock: lock) = pool_ref
    133 
    134     try do
    135       :ets.lookup(holder, :conn)
    136     rescue
    137       ArgumentError ->
    138         msg = "connection is closed because of an error, disconnect or timeout"
    139         {:disconnect, DBConnection.ConnectionError.exception(msg), _state = :unused}
    140     else
    141       [conn(lock: conn_lock)] when conn_lock != lock ->
    142         raise "an outdated connection has been given to DBConnection on #{fun}/#{length(args) + 2}"
    143 
    144       [conn(status: :error)] ->
    145         msg = "connection is closed because of an error, disconnect or timeout"
    146         {:disconnect, DBConnection.ConnectionError.exception(msg), _state = :unused}
    147 
    148       [conn(status: :aborted)] when type != :cleanup ->
    149         msg = "transaction rolling back"
    150         {:disconnect, DBConnection.ConnectionError.exception(msg), _state = :unused}
    151 
    152       [conn(module: module, state: state)] ->
    153         holder_apply(holder, module, fun, args ++ [opts, state])
    154     end
    155   end
    156 
    157   ## Pool state helpers API (invoked by callers)
    158 
    159   @spec put_state(pool_ref :: any, term) :: :ok
    160   def put_state(pool_ref(holder: sink_holder), state) do
    161     :ets.update_element(sink_holder, :conn, [{conn(:state) + 1, state}])
    162     :ok
    163   end
    164 
    165   @spec status?(pool_ref :: any, :ok | :aborted) :: boolean()
    166   def status?(pool_ref(holder: holder), status) do
    167     try do
    168       :ets.lookup_element(holder, :conn, conn(:status) + 1) == status
    169     rescue
    170       ArgumentError -> false
    171     end
    172   end
    173 
    174   @spec put_status(pool_ref :: any, :ok | :aborted) :: boolean()
    175   def put_status(pool_ref(holder: holder), status) do
    176     try do
    177       :ets.update_element(holder, :conn, [{conn(:status) + 1, status}])
    178     rescue
    179       ArgumentError -> false
    180     end
    181   end
    182 
    183   ## Pool callbacks (invoked by pools)
    184 
    185   @spec reply_redirect({pid, reference}, pid | :shared | :auto, GenServer.server()) :: :ok
    186   def reply_redirect(from, caller, redirect) do
    187     GenServer.reply(from, {:redirect, caller, redirect})
    188     :ok
    189   end
    190 
    191   @spec reply_error({pid, reference}, Exception.t()) :: :ok
    192   def reply_error(from, exception) do
    193     GenServer.reply(from, {:error, exception})
    194     :ok
    195   end
    196 
    197   @spec handle_checkout(t, {pid, reference}, reference, checkin_time) :: boolean
    198   def handle_checkout(holder, {pid, mref}, ref, checkin_time) do
    199     :ets.give_away(holder, pid, {mref, ref, checkin_time})
    200   rescue
    201     ArgumentError ->
    202       if Process.alive?(pid) or :ets.info(holder, :owner) != self() do
    203         raise ArgumentError, no_holder(holder, pid)
    204       else
    205         false
    206       end
    207   end
    208 
    209   @spec handle_deadline(t, reference) :: boolean
    210   def handle_deadline(holder, deadline) do
    211     :ets.lookup_element(holder, :conn, conn(:deadline) + 1)
    212   rescue
    213     ArgumentError -> false
    214   else
    215     ^deadline -> true
    216     _ -> false
    217   end
    218 
    219   @spec handle_ping(t) :: true
    220   def handle_ping(holder) do
    221     :ets.lookup(holder, :conn)
    222   rescue
    223     ArgumentError ->
    224       raise ArgumentError, no_holder(holder, nil)
    225   else
    226     [conn(connection: conn, state: state)] ->
    227       DBConnection.Connection.ping({conn, holder}, state)
    228       :ets.delete(holder)
    229       true
    230   end
    231 
    232   @spec handle_disconnect(t, Exception.t()) :: boolean
    233   def handle_disconnect(holder, err) do
    234     handle_done(holder, &DBConnection.Connection.disconnect/3, err)
    235   end
    236 
    237   @spec handle_stop(t, term) :: boolean
    238   def handle_stop(holder, err) do
    239     handle_done(holder, &DBConnection.Connection.stop/3, err)
    240   end
    241 
    242   @spec maybe_disconnect(t, integer, non_neg_integer) :: boolean()
    243   def maybe_disconnect(holder, start, interval) do
    244     ts = :ets.lookup_element(holder, :conn, conn(:ts) + 1)
    245 
    246     cond do
    247       ts >= start ->
    248         false
    249 
    250       interval == 0 ->
    251         true
    252 
    253       true ->
    254         pid = :ets.lookup_element(holder, :conn, conn(:connection) + 1)
    255         System.monotonic_time() > :erlang.phash2(pid, interval) + start
    256     end
    257   rescue
    258     _ -> false
    259   else
    260     true ->
    261       opts = [message: "disconnect_all requested", severity: :info]
    262       handle_disconnect(holder, DBConnection.ConnectionError.exception(opts))
    263 
    264     false ->
    265       false
    266   end
    267 
    268   ## Private
    269 
    270   defp checkout(pool, callers, queue?, start, timeout) do
    271     case GenServer.whereis(pool) do
    272       pid when node(pid) == node() ->
    273         checkout_call(pid, callers, queue?, start, timeout)
    274 
    275       pid when node(pid) != node() ->
    276         {:exit, {:badnode, node(pid)}}
    277 
    278       {_name, node} ->
    279         {:exit, {:badnode, node}}
    280 
    281       nil ->
    282         {:exit, :noproc}
    283     end
    284   end
    285 
    286   defp checkout_call(pid, callers, queue?, start, timeout) do
    287     lock = Process.monitor(pid)
    288     send(pid, {:db_connection, {self(), lock}, {:checkout, callers, start, queue?}})
    289 
    290     receive do
    291       {:"ETS-TRANSFER", holder, pool, {^lock, ref, checkin_time}} ->
    292         Process.demonitor(lock, [:flush])
    293         {deadline, ops} = start_deadline(timeout, pool, ref, holder, start)
    294         :ets.update_element(holder, :conn, [{conn(:lock) + 1, lock} | ops])
    295 
    296         pool_ref =
    297           pool_ref(pool: pool, reference: ref, deadline: deadline, holder: holder, lock: lock)
    298 
    299         checkout_result(holder, pool_ref, checkin_time)
    300 
    301       {^lock, reply} ->
    302         Process.demonitor(lock, [:flush])
    303         reply
    304 
    305       {:DOWN, ^lock, _, _, reason} ->
    306         {:exit, reason}
    307     end
    308   end
    309 
    310   defp checkout_result(holder, pool_ref, checkin_time) do
    311     try do
    312       :ets.lookup(holder, :conn)
    313     rescue
    314       ArgumentError ->
    315         # Deadline could hit and be handled pool before using connection
    316         msg = "connection not available because deadline reached while in queue"
    317         {:error, DBConnection.ConnectionError.exception(msg)}
    318     else
    319       [conn(module: mod, state: state)] ->
    320         {:ok, pool_ref, mod, checkin_time, state}
    321     end
    322   end
    323 
    324   defp no_holder(holder, maybe_pid) do
    325     reason =
    326       case :ets.info(holder, :owner) do
    327         :undefined -> "does not exist"
    328         ^maybe_pid -> "is being given to its current owner"
    329         owner when owner != self() -> "does not belong to the giving process"
    330         _ -> "could not be given away"
    331       end
    332 
    333     call_reason =
    334       if maybe_pid do
    335         "Error happened when attempting to transfer to #{inspect(maybe_pid)} " <>
    336           "(alive: #{Process.alive?(maybe_pid)})"
    337       else
    338         "Error happened when looking up connection"
    339       end
    340 
    341     """
    342     #{inspect(__MODULE__)} #{inspect(holder)} #{reason}, pool inconsistent.
    343     #{call_reason}.
    344 
    345     SELF: #{inspect(self())}
    346     ETS INFO: #{inspect(:ets.info(holder))}
    347 
    348     Please report at https://github.com/elixir-ecto/db_connection/issues"
    349     """
    350   end
    351 
    352   defp holder_apply(holder, module, fun, args) do
    353     try do
    354       apply(module, fun, args)
    355     catch
    356       kind, reason ->
    357         {:catch, kind, reason, __STACKTRACE__}
    358     else
    359       result when is_tuple(result) ->
    360         state = :erlang.element(:erlang.tuple_size(result), result)
    361 
    362         try do
    363           :ets.update_element(holder, :conn, {conn(:state) + 1, state})
    364           result
    365         rescue
    366           ArgumentError ->
    367             augment_disconnect(result)
    368         end
    369 
    370       # If it is not a tuple, we just return it as is so we raise bad return.
    371       result ->
    372         result
    373     end
    374   end
    375 
    376   defp augment_disconnect({:disconnect, %DBConnection.ConnectionError{} = err, state}) do
    377     %{message: message} = err
    378 
    379     message =
    380       message <>
    381         " (the connection was closed by the pool, " <>
    382         "possibly due to a timeout or because the pool has been terminated)"
    383 
    384     {:disconnect, %{err | message: message}, state}
    385   end
    386 
    387   defp augment_disconnect(result), do: result
    388 
    389   defp done(pool_ref, ops, tag, info) do
    390     pool_ref(pool: pool, reference: ref, deadline: deadline, holder: holder) = pool_ref
    391     cancel_deadline(deadline)
    392 
    393     try do
    394       :ets.update_element(holder, :conn, [{conn(:deadline) + 1, nil} | ops])
    395       :ets.give_away(holder, pool, {tag, ref, info})
    396     rescue
    397       ArgumentError -> :ok
    398     else
    399       true -> :ok
    400     end
    401   end
    402 
    403   defp handle_done(holder, stop, err) do
    404     :ets.lookup(holder, :conn)
    405   rescue
    406     ArgumentError ->
    407       false
    408   else
    409     [conn(connection: pid, deadline: deadline, state: state)] ->
    410       cancel_deadline(deadline)
    411       :ets.delete(holder)
    412       stop.({pid, holder}, err, state)
    413       true
    414   end
    415 
    416   defp abs_timeout(now, opts) do
    417     case Keyword.get(opts, :timeout, @timeout) do
    418       :infinity -> Keyword.get(opts, :deadline)
    419       timeout -> min(now + timeout, Keyword.get(opts, :deadline))
    420     end
    421   end
    422 
    423   defp start_deadline(nil, _, _, _, _) do
    424     {nil, []}
    425   end
    426 
    427   defp start_deadline(timeout, pid, ref, holder, start) do
    428     deadline =
    429       :erlang.start_timer(timeout, pid, {ref, holder, self(), timeout - start}, abs: true)
    430 
    431     {deadline, [{conn(:deadline) + 1, deadline}]}
    432   end
    433 
    434   defp cancel_deadline(nil) do
    435     :ok
    436   end
    437 
    438   defp cancel_deadline(deadline) do
    439     :erlang.cancel_timer(deadline, async: true, info: false)
    440   end
    441 end