zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

proxy.ex (9257B)


      1 defmodule DBConnection.Ownership.Proxy do
      2   @moduledoc false
      3 
      4   alias DBConnection.Holder
      5   use GenServer, restart: :temporary
      6 
      7   @time_unit 1000
      8   @ownership_timeout 120_000
      9   @queue_target 50
     10   @queue_interval 1000
     11 
     12   def start_link({caller, pool, pool_opts}) do
     13     GenServer.start_link(__MODULE__, {caller, pool, pool_opts}, [])
     14   end
     15 
     16   def stop(proxy, caller) do
     17     GenServer.cast(proxy, {:stop, caller})
     18   end
     19 
     20   # Callbacks
     21 
     22   def init({caller, pool, pool_opts}) do
     23     pool_opts =
     24       pool_opts
     25       |> Keyword.put(:timeout, :infinity)
     26       |> Keyword.delete(:deadline)
     27 
     28     owner_ref = Process.monitor(caller)
     29     ownership_timeout = Keyword.get(pool_opts, :ownership_timeout, @ownership_timeout)
     30     timeout = Keyword.get(pool_opts, :queue_target, @queue_target) * 2
     31     interval = Keyword.get(pool_opts, :queue_interval, @queue_interval)
     32 
     33     pre_checkin = Keyword.get(pool_opts, :pre_checkin, fn _, mod, state -> {:ok, mod, state} end)
     34     post_checkout = Keyword.get(pool_opts, :post_checkout, &{:ok, &1, &2})
     35 
     36     state = %{
     37       client: nil,
     38       timer: nil,
     39       holder: nil,
     40       timeout: timeout,
     41       interval: interval,
     42       poll: nil,
     43       owner: {caller, owner_ref},
     44       pool: pool,
     45       pool_ref: nil,
     46       pool_opts: pool_opts,
     47       queue: :queue.new(),
     48       mod: nil,
     49       pre_checkin: pre_checkin,
     50       post_checkout: post_checkout,
     51       ownership_timer: start_timer(caller, ownership_timeout)
     52     }
     53 
     54     now = System.monotonic_time(@time_unit)
     55     {:ok, start_poll(now, state)}
     56   end
     57 
     58   def handle_info({:DOWN, ref, _, pid, _reason}, %{owner: {_, ref}} = state) do
     59     down("owner #{inspect(pid)} exited", state)
     60   end
     61 
     62   def handle_info({:timeout, deadline, {_ref, holder, pid, len}}, %{holder: holder} = state) do
     63     if Holder.handle_deadline(holder, deadline) do
     64       message =
     65         "client #{inspect(pid)} timed out because " <>
     66           "it queued and checked out the connection for longer than #{len}ms"
     67 
     68       down(message, state)
     69     else
     70       {:noreply, state}
     71     end
     72   end
     73 
     74   def handle_info(
     75         {:timeout, timer, {__MODULE__, pid, timeout}},
     76         %{ownership_timer: timer} = state
     77       ) do
     78     message =
     79       "owner #{inspect(pid)} timed out because " <>
     80         "it owned the connection for longer than #{timeout}ms (set via the :ownership_timeout option)"
     81 
     82     # We don't invoke down because this is always a disconnect, even if there is no client.
     83     # On the other hand, those timeouts are unlikely to trigger, as it defaults to 2 mins.
     84     pool_disconnect(DBConnection.ConnectionError.exception(message), state)
     85   end
     86 
     87   def handle_info({:timeout, poll, time}, %{poll: poll} = state) do
     88     state = timeout(time, state)
     89     {:noreply, start_poll(time, state)}
     90   end
     91 
     92   def handle_info(
     93         {:db_connection, from, {:checkout, _caller, _now, _queue?}},
     94         %{holder: nil} = state
     95       ) do
     96     %{pool: pool, pool_opts: pool_opts, owner: {_, owner_ref}, post_checkout: post_checkout} =
     97       state
     98 
     99     case Holder.checkout(pool, [self()], pool_opts) do
    100       {:ok, pool_ref, original_mod, _idle_time, conn_state} ->
    101         case post_checkout.(original_mod, conn_state) do
    102           {:ok, conn_mod, conn_state} ->
    103             holder = Holder.new(self(), owner_ref, conn_mod, conn_state)
    104             state = %{state | pool_ref: pool_ref, holder: holder, mod: original_mod}
    105             checkout(from, state)
    106 
    107           {:disconnect, err, ^original_mod, _conn_state} ->
    108             Holder.disconnect(pool_ref, err)
    109             Holder.reply_error(from, err)
    110             {:stop, {:shutdown, err}, state}
    111         end
    112 
    113       {:error, err} ->
    114         Holder.reply_error(from, err)
    115         {:stop, {:shutdown, err}, state}
    116     end
    117   end
    118 
    119   def handle_info(
    120         {:db_connection, from, {:checkout, _caller, _now, _queue?}},
    121         %{client: nil} = state
    122       ) do
    123     checkout(from, state)
    124   end
    125 
    126   def handle_info({:db_connection, from, {:checkout, _caller, now, queue?}}, state) do
    127     if queue? do
    128       %{queue: queue} = state
    129       queue = :queue.in({now, from}, queue)
    130       {:noreply, %{state | queue: queue}}
    131     else
    132       message = "connection not available and queuing is disabled"
    133       err = DBConnection.ConnectionError.exception(message)
    134       Holder.reply_error(from, err)
    135       {:noreply, state}
    136     end
    137   end
    138 
    139   def handle_info(
    140         {:"ETS-TRANSFER", holder, _, {msg, ref, extra}},
    141         %{holder: holder, client: {_, ref, _}} = state
    142       ) do
    143     case msg do
    144       :checkin -> checkin(state)
    145       :disconnect -> pool_disconnect(extra, state)
    146       :stop -> pool_stop(extra, state)
    147     end
    148   end
    149 
    150   def handle_info({:"ETS-TRANSFER", holder, pid, ref}, %{holder: holder, owner: {_, ref}} = state) do
    151     down("client #{inspect(pid)} exited", state)
    152   end
    153 
    154   def handle_cast({:stop, caller}, %{owner: {owner, _}} = state) do
    155     message = "#{inspect(caller)} checked in the connection owned by #{inspect(owner)}"
    156 
    157     message =
    158       case pruned_stacktrace(caller) do
    159         [] ->
    160           message
    161 
    162         current_stack ->
    163           message <>
    164             "\n\n#{inspect(caller)} triggered the checkin at location:\n\n" <>
    165             Exception.format_stacktrace(current_stack)
    166       end
    167 
    168     down(message, state)
    169   end
    170 
    171   defp checkout({pid, ref} = from, %{holder: holder} = state) do
    172     if Holder.handle_checkout(holder, from, ref, nil) do
    173       {:noreply, %{state | client: {pid, ref, pruned_stacktrace(pid)}}}
    174     else
    175       next(state)
    176     end
    177   end
    178 
    179   defp checkin(state) do
    180     next(%{state | client: nil})
    181   end
    182 
    183   defp next(%{queue: queue} = state) do
    184     case :queue.out(queue) do
    185       {{:value, {_, from}}, queue} ->
    186         checkout(from, %{state | queue: queue})
    187 
    188       {:empty, queue} ->
    189         {:noreply, %{state | queue: queue}}
    190     end
    191   end
    192 
    193   defp start_timer(_, :infinity), do: nil
    194 
    195   defp start_timer(pid, timeout) do
    196     :erlang.start_timer(timeout, self(), {__MODULE__, pid, timeout})
    197   end
    198 
    199   # It is down but never checked out from pool
    200   defp down(reason, %{holder: nil} = state) do
    201     {:stop, {:shutdown, reason}, state}
    202   end
    203 
    204   # If it is down but it has no client, checkin
    205   defp down(reason, %{client: nil} = state) do
    206     pool_checkin(reason, state)
    207   end
    208 
    209   # If it is down but it has a client, disconnect
    210   defp down(reason, %{client: {client, _, checkout_stack}} = state) do
    211     reason =
    212       case pruned_stacktrace(client) do
    213         [] ->
    214           reason
    215 
    216         current_stack ->
    217           reason <>
    218             """
    219             \n\nClient #{inspect(client)} is still using a connection from owner at location:
    220 
    221             #{Exception.format_stacktrace(current_stack)}
    222             The connection itself was checked out by #{inspect(client)} at location:
    223 
    224             #{Exception.format_stacktrace(checkout_stack)}
    225             """
    226       end
    227 
    228     err = DBConnection.ConnectionError.exception(reason)
    229     pool_disconnect(err, state)
    230   end
    231 
    232   ## Helpers
    233 
    234   defp pool_checkin(reason, state) do
    235     pool_done(reason, state, :checkin, fn pool_ref, _ -> Holder.checkin(pool_ref) end)
    236   end
    237 
    238   defp pool_disconnect(err, state) do
    239     pool_done(err, state, {:disconnect, err}, &Holder.disconnect/2)
    240   end
    241 
    242   defp pool_stop(err, state) do
    243     pool_done(err, state, {:stop, err}, &Holder.stop/2, &Holder.stop/2)
    244   end
    245 
    246   defp pool_done(err, state, op, done, stop_or_disconnect \\ &Holder.disconnect/2) do
    247     %{holder: holder, pool_ref: pool_ref, pre_checkin: pre_checkin, mod: original_mod} = state
    248 
    249     if holder do
    250       {conn_mod, conn_state} = Holder.delete(holder)
    251 
    252       case pre_checkin.(op, conn_mod, conn_state) do
    253         {:ok, ^original_mod, conn_state} ->
    254           Holder.put_state(pool_ref, conn_state)
    255           done.(pool_ref, err)
    256           {:stop, {:shutdown, err}, state}
    257 
    258         {:disconnect, err, ^original_mod, conn_state} ->
    259           Holder.put_state(pool_ref, conn_state)
    260           stop_or_disconnect.(pool_ref, err)
    261           {:stop, {:shutdown, err}, state}
    262       end
    263     else
    264       {:stop, {:shutdown, err}, state}
    265     end
    266   end
    267 
    268   defp start_poll(now, %{interval: interval} = state) do
    269     timeout = now + interval
    270     poll = :erlang.start_timer(timeout, self(), timeout, abs: true)
    271     %{state | poll: poll}
    272   end
    273 
    274   defp timeout(time, %{queue: queue, timeout: timeout} = state) do
    275     case :queue.out(queue) do
    276       {{:value, {sent, from}}, queue} when sent + timeout < time ->
    277         drop(time - sent, from)
    278         timeout(time, %{state | queue: queue})
    279 
    280       {_, _} ->
    281         state
    282     end
    283   end
    284 
    285   defp drop(delay, from) do
    286     message =
    287       "connection not available and request was dropped from queue after #{delay}ms. " <>
    288         "You can configure how long requests wait in the queue using :queue_target and " <>
    289         ":queue_interval. See DBConnection.start_link/2 for more information"
    290 
    291     err = DBConnection.ConnectionError.exception(message, :queue_timeout)
    292     Holder.reply_error(from, err)
    293   end
    294 
    295   @prune_modules [:gen, GenServer, DBConnection, DBConnection.Holder, DBConnection.Ownership]
    296 
    297   defp pruned_stacktrace(pid) do
    298     case Process.info(pid, :current_stacktrace) do
    299       {:current_stacktrace, stacktrace} ->
    300         Enum.drop_while(stacktrace, &match?({mod, _, _, _} when mod in @prune_modules, &1))
    301 
    302       _ ->
    303         []
    304     end
    305   end
    306 end