zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

connection_pool.ex (10794B)


      1 defmodule DBConnection.ConnectionPool do
      2   @moduledoc """
      3   The default connection pool.
      4 
      5   The queueing algorithm is based on [CoDel](https://queue.acm.org/appendices/codel.html).
      6   """
      7 
      8   use GenServer
      9   alias DBConnection.Holder
     10 
     11   @queue_target 50
     12   @queue_interval 1000
     13   @idle_interval 1000
     14   @time_unit 1000
     15 
     16   @doc false
     17   def start_link({mod, opts}) do
     18     GenServer.start_link(__MODULE__, {mod, opts}, start_opts(opts))
     19   end
     20 
     21   @doc false
     22   def child_spec(opts) do
     23     super(opts)
     24   end
     25 
     26   @doc false
     27   def checkout(pool, callers, opts) do
     28     Holder.checkout(pool, callers, opts)
     29   end
     30 
     31   @doc false
     32   def disconnect_all(pool, interval, _opts) do
     33     GenServer.call(pool, {:disconnect_all, interval}, :infinity)
     34   end
     35 
     36   ## GenServer api
     37 
     38   @impl true
     39   def init({mod, opts}) do
     40     DBConnection.register_as_pool(mod)
     41 
     42     queue = :ets.new(__MODULE__.Queue, [:protected, :ordered_set])
     43     ts = {System.monotonic_time(), 0}
     44     {:ok, _} = DBConnection.ConnectionPool.Pool.start_supervised(queue, mod, opts)
     45     target = Keyword.get(opts, :queue_target, @queue_target)
     46     interval = Keyword.get(opts, :queue_interval, @queue_interval)
     47     idle_interval = Keyword.get(opts, :idle_interval, @idle_interval)
     48     now_in_native = System.monotonic_time()
     49     now_in_ms = System.convert_time_unit(now_in_native, :native, @time_unit)
     50 
     51     codel = %{
     52       target: target,
     53       interval: interval,
     54       delay: 0,
     55       slow: false,
     56       next: now_in_ms,
     57       poll: nil,
     58       idle_interval: idle_interval,
     59       idle: nil
     60     }
     61 
     62     codel = start_idle(now_in_native, start_poll(now_in_ms, now_in_ms, codel))
     63     {:ok, {:busy, queue, codel, ts}}
     64   end
     65 
     66   @impl true
     67   def handle_call({:disconnect_all, interval}, _from, {type, queue, codel, _ts}) do
     68     ts = {System.monotonic_time(), interval}
     69     {:reply, :ok, {type, queue, codel, ts}}
     70   end
     71 
     72   @impl true
     73   def handle_info(
     74         {:db_connection, from, {:checkout, _caller, now, queue?}},
     75         {:busy, queue, _, _} = busy
     76       ) do
     77     case queue? do
     78       true ->
     79         :ets.insert(queue, {{now, System.unique_integer(), from}})
     80         {:noreply, busy}
     81 
     82       false ->
     83         message = "connection not available and queuing is disabled"
     84         err = DBConnection.ConnectionError.exception(message)
     85         Holder.reply_error(from, err)
     86         {:noreply, busy}
     87     end
     88   end
     89 
     90   def handle_info(
     91         {:db_connection, from, {:checkout, _caller, _now, _queue?}} = checkout,
     92         {:ready, queue, _codel, _ts} = ready
     93       ) do
     94     case :ets.first(queue) do
     95       {queued_in_native, holder} = key ->
     96         Holder.handle_checkout(holder, from, queue, queued_in_native) and :ets.delete(queue, key)
     97         {:noreply, ready}
     98 
     99       :"$end_of_table" ->
    100         handle_info(checkout, put_elem(ready, 0, :busy))
    101     end
    102   end
    103 
    104   def handle_info({:"ETS-TRANSFER", holder, pid, queue}, {_, queue, _, _} = data) do
    105     message = "client #{inspect(pid)} exited"
    106     err = DBConnection.ConnectionError.exception(message: message, severity: :info)
    107     Holder.handle_disconnect(holder, err)
    108     {:noreply, data}
    109   end
    110 
    111   def handle_info({:"ETS-TRANSFER", holder, _, {msg, queue, extra}}, {_, queue, _, ts} = data) do
    112     case msg do
    113       :checkin ->
    114         owner = self()
    115 
    116         case :ets.info(holder, :owner) do
    117           ^owner ->
    118             {time, interval} = ts
    119 
    120             if Holder.maybe_disconnect(holder, time, interval) do
    121               {:noreply, data}
    122             else
    123               handle_checkin(holder, extra, data)
    124             end
    125 
    126           :undefined ->
    127             {:noreply, data}
    128         end
    129 
    130       :disconnect ->
    131         Holder.handle_disconnect(holder, extra)
    132         {:noreply, data}
    133 
    134       :stop ->
    135         Holder.handle_stop(holder, extra)
    136         {:noreply, data}
    137     end
    138   end
    139 
    140   def handle_info({:timeout, deadline, {queue, holder, pid, len}}, {_, queue, _, _} = data) do
    141     # Check that timeout refers to current holder (and not previous)
    142     if Holder.handle_deadline(holder, deadline) do
    143       message =
    144         "client #{inspect(pid)} timed out because " <>
    145           "it queued and checked out the connection for longer than #{len}ms"
    146 
    147       exc =
    148         case Process.info(pid, :current_stacktrace) do
    149           {:current_stacktrace, stacktrace} ->
    150             message <>
    151               "\n\n#{inspect(pid)} was at location:\n\n" <>
    152               Exception.format_stacktrace(stacktrace)
    153 
    154           _ ->
    155             message
    156         end
    157         |> DBConnection.ConnectionError.exception()
    158 
    159       Holder.handle_disconnect(holder, exc)
    160     end
    161 
    162     {:noreply, data}
    163   end
    164 
    165   def handle_info({:timeout, poll, {time, last_sent}}, {_, _, %{poll: poll}, _} = data) do
    166     {status, queue, codel, ts} = data
    167 
    168     # If no queue progress since last poll check queue
    169     case :ets.first(queue) do
    170       {sent, _, _} when sent <= last_sent and status == :busy ->
    171         delay = time - sent
    172         timeout(delay, time, queue, start_poll(time, sent, codel), ts)
    173 
    174       {sent, _, _} ->
    175         {:noreply, {status, queue, start_poll(time, sent, codel), ts}}
    176 
    177       _ ->
    178         {:noreply, {status, queue, start_poll(time, time, codel), ts}}
    179     end
    180   end
    181 
    182   def handle_info({:timeout, idle, past_in_native}, {_, _, %{idle: idle}, _} = data) do
    183     {status, queue, codel, ts} = data
    184     drop_idle(past_in_native, status, queue, codel, ts)
    185   end
    186 
    187   defp drop_idle(past_in_native, status, queue, codel, ts) do
    188     # If no queue progress since last idle check oldest connection
    189     case :ets.first(queue) do
    190       {queued_in_native, holder} = key
    191       when queued_in_native <= past_in_native and status == :ready ->
    192         :ets.delete(queue, key)
    193         Holder.maybe_disconnect(holder, elem(ts, 0), 0) or Holder.handle_ping(holder)
    194         drop_idle(past_in_native, status, queue, codel, ts)
    195 
    196       _ ->
    197         {:noreply, {status, queue, start_idle(System.monotonic_time(), codel), ts}}
    198     end
    199   end
    200 
    201   defp timeout(delay, time, queue, codel, ts) do
    202     case codel do
    203       %{delay: min_delay, next: next, target: target, interval: interval}
    204       when time >= next and min_delay > target ->
    205         codel = %{codel | slow: true, delay: delay, next: time + interval}
    206         drop_slow(time, target * 2, queue)
    207         {:noreply, {:busy, queue, codel, ts}}
    208 
    209       %{next: next, interval: interval} when time >= next ->
    210         codel = %{codel | slow: false, delay: delay, next: time + interval}
    211         {:noreply, {:busy, queue, codel, ts}}
    212 
    213       _ ->
    214         {:noreply, {:busy, queue, codel, ts}}
    215     end
    216   end
    217 
    218   defp drop_slow(time, timeout, queue) do
    219     min_sent = time - timeout
    220     match = {{:"$1", :_, :"$2"}}
    221     guards = [{:<, :"$1", min_sent}]
    222     select_slow = [{match, guards, [{{:"$1", :"$2"}}]}]
    223 
    224     for {sent, from} <- :ets.select(queue, select_slow) do
    225       drop(time - sent, from)
    226     end
    227 
    228     :ets.select_delete(queue, [{match, guards, [true]}])
    229   end
    230 
    231   defp handle_checkin(holder, now_in_native, {:ready, queue, _, _} = data) do
    232     :ets.insert(queue, {{now_in_native, holder}})
    233     {:noreply, data}
    234   end
    235 
    236   defp handle_checkin(holder, now_in_native, {:busy, queue, codel, ts}) do
    237     now_in_ms = System.convert_time_unit(now_in_native, :native, @time_unit)
    238 
    239     case dequeue(now_in_ms, holder, queue, codel, ts) do
    240       {:busy, _, _, _} = busy ->
    241         {:noreply, busy}
    242 
    243       {:ready, _, _, _} = ready ->
    244         :ets.insert(queue, {{now_in_native, holder}})
    245         {:noreply, ready}
    246     end
    247   end
    248 
    249   defp dequeue(time, holder, queue, codel, ts) do
    250     case codel do
    251       %{next: next, delay: delay, target: target} when time >= next ->
    252         dequeue_first(time, delay > target, holder, queue, codel, ts)
    253 
    254       %{slow: false} ->
    255         dequeue_fast(time, holder, queue, codel, ts)
    256 
    257       %{slow: true, target: target} ->
    258         dequeue_slow(time, target * 2, holder, queue, codel, ts)
    259     end
    260   end
    261 
    262   defp dequeue_first(time, slow?, holder, queue, codel, ts) do
    263     %{interval: interval} = codel
    264     next = time + interval
    265 
    266     case :ets.first(queue) do
    267       {sent, _, from} = key ->
    268         :ets.delete(queue, key)
    269         delay = time - sent
    270         codel = %{codel | next: next, delay: delay, slow: slow?}
    271         go(delay, from, time, holder, queue, codel, ts)
    272 
    273       :"$end_of_table" ->
    274         codel = %{codel | next: next, delay: 0, slow: slow?}
    275         {:ready, queue, codel, ts}
    276     end
    277   end
    278 
    279   defp dequeue_fast(time, holder, queue, codel, ts) do
    280     case :ets.first(queue) do
    281       {sent, _, from} = key ->
    282         :ets.delete(queue, key)
    283         go(time - sent, from, time, holder, queue, codel, ts)
    284 
    285       :"$end_of_table" ->
    286         {:ready, queue, %{codel | delay: 0}, ts}
    287     end
    288   end
    289 
    290   defp dequeue_slow(time, timeout, holder, queue, codel, ts) do
    291     case :ets.first(queue) do
    292       {sent, _, from} = key when time - sent > timeout ->
    293         :ets.delete(queue, key)
    294         drop(time - sent, from)
    295         dequeue_slow(time, timeout, holder, queue, codel, ts)
    296 
    297       {sent, _, from} = key ->
    298         :ets.delete(queue, key)
    299         go(time - sent, from, time, holder, queue, codel, ts)
    300 
    301       :"$end_of_table" ->
    302         {:ready, queue, %{codel | delay: 0}, ts}
    303     end
    304   end
    305 
    306   defp go(delay, from, time, holder, queue, %{delay: min} = codel, ts) do
    307     case Holder.handle_checkout(holder, from, queue, 0) do
    308       true when delay < min ->
    309         {:busy, queue, %{codel | delay: delay}, ts}
    310 
    311       true ->
    312         {:busy, queue, codel, ts}
    313 
    314       false ->
    315         dequeue(time, holder, queue, codel, ts)
    316     end
    317   end
    318 
    319   defp drop(delay, from) do
    320     message = """
    321     connection not available and request was dropped from queue after #{delay}ms. \
    322     This means requests are coming in and your connection pool cannot serve them fast enough. \
    323     You can address this by:
    324 
    325       1. Ensuring your database is available and that you can connect to it
    326       2. Tracking down slow queries and making sure they are running fast enough
    327       3. Increasing the pool_size (although this increases resource consumption)
    328       4. Allowing requests to wait longer by increasing :queue_target and :queue_interval
    329 
    330     See DBConnection.start_link/2 for more information
    331     """
    332 
    333     err = DBConnection.ConnectionError.exception(message, :queue_timeout)
    334 
    335     Holder.reply_error(from, err)
    336   end
    337 
    338   defp start_opts(opts) do
    339     Keyword.take(opts, [:name, :spawn_opt])
    340   end
    341 
    342   defp start_poll(now, last_sent, %{interval: interval} = codel) do
    343     timeout = now + interval
    344     poll = :erlang.start_timer(timeout, self(), {timeout, last_sent}, abs: true)
    345     %{codel | poll: poll}
    346   end
    347 
    348   defp start_idle(now_in_native, %{idle_interval: interval} = codel) do
    349     timeout = System.convert_time_unit(now_in_native, :native, :millisecond) + interval
    350     idle = :erlang.start_timer(timeout, self(), now_in_native, abs: true)
    351     %{codel | idle: idle}
    352   end
    353 end