connection_pool.ex (10794B)
1 defmodule DBConnection.ConnectionPool do 2 @moduledoc """ 3 The default connection pool. 4 5 The queueing algorithm is based on [CoDel](https://queue.acm.org/appendices/codel.html). 6 """ 7 8 use GenServer 9 alias DBConnection.Holder 10 11 @queue_target 50 12 @queue_interval 1000 13 @idle_interval 1000 14 @time_unit 1000 15 16 @doc false 17 def start_link({mod, opts}) do 18 GenServer.start_link(__MODULE__, {mod, opts}, start_opts(opts)) 19 end 20 21 @doc false 22 def child_spec(opts) do 23 super(opts) 24 end 25 26 @doc false 27 def checkout(pool, callers, opts) do 28 Holder.checkout(pool, callers, opts) 29 end 30 31 @doc false 32 def disconnect_all(pool, interval, _opts) do 33 GenServer.call(pool, {:disconnect_all, interval}, :infinity) 34 end 35 36 ## GenServer api 37 38 @impl true 39 def init({mod, opts}) do 40 DBConnection.register_as_pool(mod) 41 42 queue = :ets.new(__MODULE__.Queue, [:protected, :ordered_set]) 43 ts = {System.monotonic_time(), 0} 44 {:ok, _} = DBConnection.ConnectionPool.Pool.start_supervised(queue, mod, opts) 45 target = Keyword.get(opts, :queue_target, @queue_target) 46 interval = Keyword.get(opts, :queue_interval, @queue_interval) 47 idle_interval = Keyword.get(opts, :idle_interval, @idle_interval) 48 now_in_native = System.monotonic_time() 49 now_in_ms = System.convert_time_unit(now_in_native, :native, @time_unit) 50 51 codel = %{ 52 target: target, 53 interval: interval, 54 delay: 0, 55 slow: false, 56 next: now_in_ms, 57 poll: nil, 58 idle_interval: idle_interval, 59 idle: nil 60 } 61 62 codel = start_idle(now_in_native, start_poll(now_in_ms, now_in_ms, codel)) 63 {:ok, {:busy, queue, codel, ts}} 64 end 65 66 @impl true 67 def handle_call({:disconnect_all, interval}, _from, {type, queue, codel, _ts}) do 68 ts = {System.monotonic_time(), interval} 69 {:reply, :ok, {type, queue, codel, ts}} 70 end 71 72 @impl true 73 def handle_info( 74 {:db_connection, from, {:checkout, _caller, now, queue?}}, 75 {:busy, queue, _, _} = busy 76 ) do 77 case queue? do 78 true -> 79 :ets.insert(queue, {{now, System.unique_integer(), from}}) 80 {:noreply, busy} 81 82 false -> 83 message = "connection not available and queuing is disabled" 84 err = DBConnection.ConnectionError.exception(message) 85 Holder.reply_error(from, err) 86 {:noreply, busy} 87 end 88 end 89 90 def handle_info( 91 {:db_connection, from, {:checkout, _caller, _now, _queue?}} = checkout, 92 {:ready, queue, _codel, _ts} = ready 93 ) do 94 case :ets.first(queue) do 95 {queued_in_native, holder} = key -> 96 Holder.handle_checkout(holder, from, queue, queued_in_native) and :ets.delete(queue, key) 97 {:noreply, ready} 98 99 :"$end_of_table" -> 100 handle_info(checkout, put_elem(ready, 0, :busy)) 101 end 102 end 103 104 def handle_info({:"ETS-TRANSFER", holder, pid, queue}, {_, queue, _, _} = data) do 105 message = "client #{inspect(pid)} exited" 106 err = DBConnection.ConnectionError.exception(message: message, severity: :info) 107 Holder.handle_disconnect(holder, err) 108 {:noreply, data} 109 end 110 111 def handle_info({:"ETS-TRANSFER", holder, _, {msg, queue, extra}}, {_, queue, _, ts} = data) do 112 case msg do 113 :checkin -> 114 owner = self() 115 116 case :ets.info(holder, :owner) do 117 ^owner -> 118 {time, interval} = ts 119 120 if Holder.maybe_disconnect(holder, time, interval) do 121 {:noreply, data} 122 else 123 handle_checkin(holder, extra, data) 124 end 125 126 :undefined -> 127 {:noreply, data} 128 end 129 130 :disconnect -> 131 Holder.handle_disconnect(holder, extra) 132 {:noreply, data} 133 134 :stop -> 135 Holder.handle_stop(holder, extra) 136 {:noreply, data} 137 end 138 end 139 140 def handle_info({:timeout, deadline, {queue, holder, pid, len}}, {_, queue, _, _} = data) do 141 # Check that timeout refers to current holder (and not previous) 142 if Holder.handle_deadline(holder, deadline) do 143 message = 144 "client #{inspect(pid)} timed out because " <> 145 "it queued and checked out the connection for longer than #{len}ms" 146 147 exc = 148 case Process.info(pid, :current_stacktrace) do 149 {:current_stacktrace, stacktrace} -> 150 message <> 151 "\n\n#{inspect(pid)} was at location:\n\n" <> 152 Exception.format_stacktrace(stacktrace) 153 154 _ -> 155 message 156 end 157 |> DBConnection.ConnectionError.exception() 158 159 Holder.handle_disconnect(holder, exc) 160 end 161 162 {:noreply, data} 163 end 164 165 def handle_info({:timeout, poll, {time, last_sent}}, {_, _, %{poll: poll}, _} = data) do 166 {status, queue, codel, ts} = data 167 168 # If no queue progress since last poll check queue 169 case :ets.first(queue) do 170 {sent, _, _} when sent <= last_sent and status == :busy -> 171 delay = time - sent 172 timeout(delay, time, queue, start_poll(time, sent, codel), ts) 173 174 {sent, _, _} -> 175 {:noreply, {status, queue, start_poll(time, sent, codel), ts}} 176 177 _ -> 178 {:noreply, {status, queue, start_poll(time, time, codel), ts}} 179 end 180 end 181 182 def handle_info({:timeout, idle, past_in_native}, {_, _, %{idle: idle}, _} = data) do 183 {status, queue, codel, ts} = data 184 drop_idle(past_in_native, status, queue, codel, ts) 185 end 186 187 defp drop_idle(past_in_native, status, queue, codel, ts) do 188 # If no queue progress since last idle check oldest connection 189 case :ets.first(queue) do 190 {queued_in_native, holder} = key 191 when queued_in_native <= past_in_native and status == :ready -> 192 :ets.delete(queue, key) 193 Holder.maybe_disconnect(holder, elem(ts, 0), 0) or Holder.handle_ping(holder) 194 drop_idle(past_in_native, status, queue, codel, ts) 195 196 _ -> 197 {:noreply, {status, queue, start_idle(System.monotonic_time(), codel), ts}} 198 end 199 end 200 201 defp timeout(delay, time, queue, codel, ts) do 202 case codel do 203 %{delay: min_delay, next: next, target: target, interval: interval} 204 when time >= next and min_delay > target -> 205 codel = %{codel | slow: true, delay: delay, next: time + interval} 206 drop_slow(time, target * 2, queue) 207 {:noreply, {:busy, queue, codel, ts}} 208 209 %{next: next, interval: interval} when time >= next -> 210 codel = %{codel | slow: false, delay: delay, next: time + interval} 211 {:noreply, {:busy, queue, codel, ts}} 212 213 _ -> 214 {:noreply, {:busy, queue, codel, ts}} 215 end 216 end 217 218 defp drop_slow(time, timeout, queue) do 219 min_sent = time - timeout 220 match = {{:"$1", :_, :"$2"}} 221 guards = [{:<, :"$1", min_sent}] 222 select_slow = [{match, guards, [{{:"$1", :"$2"}}]}] 223 224 for {sent, from} <- :ets.select(queue, select_slow) do 225 drop(time - sent, from) 226 end 227 228 :ets.select_delete(queue, [{match, guards, [true]}]) 229 end 230 231 defp handle_checkin(holder, now_in_native, {:ready, queue, _, _} = data) do 232 :ets.insert(queue, {{now_in_native, holder}}) 233 {:noreply, data} 234 end 235 236 defp handle_checkin(holder, now_in_native, {:busy, queue, codel, ts}) do 237 now_in_ms = System.convert_time_unit(now_in_native, :native, @time_unit) 238 239 case dequeue(now_in_ms, holder, queue, codel, ts) do 240 {:busy, _, _, _} = busy -> 241 {:noreply, busy} 242 243 {:ready, _, _, _} = ready -> 244 :ets.insert(queue, {{now_in_native, holder}}) 245 {:noreply, ready} 246 end 247 end 248 249 defp dequeue(time, holder, queue, codel, ts) do 250 case codel do 251 %{next: next, delay: delay, target: target} when time >= next -> 252 dequeue_first(time, delay > target, holder, queue, codel, ts) 253 254 %{slow: false} -> 255 dequeue_fast(time, holder, queue, codel, ts) 256 257 %{slow: true, target: target} -> 258 dequeue_slow(time, target * 2, holder, queue, codel, ts) 259 end 260 end 261 262 defp dequeue_first(time, slow?, holder, queue, codel, ts) do 263 %{interval: interval} = codel 264 next = time + interval 265 266 case :ets.first(queue) do 267 {sent, _, from} = key -> 268 :ets.delete(queue, key) 269 delay = time - sent 270 codel = %{codel | next: next, delay: delay, slow: slow?} 271 go(delay, from, time, holder, queue, codel, ts) 272 273 :"$end_of_table" -> 274 codel = %{codel | next: next, delay: 0, slow: slow?} 275 {:ready, queue, codel, ts} 276 end 277 end 278 279 defp dequeue_fast(time, holder, queue, codel, ts) do 280 case :ets.first(queue) do 281 {sent, _, from} = key -> 282 :ets.delete(queue, key) 283 go(time - sent, from, time, holder, queue, codel, ts) 284 285 :"$end_of_table" -> 286 {:ready, queue, %{codel | delay: 0}, ts} 287 end 288 end 289 290 defp dequeue_slow(time, timeout, holder, queue, codel, ts) do 291 case :ets.first(queue) do 292 {sent, _, from} = key when time - sent > timeout -> 293 :ets.delete(queue, key) 294 drop(time - sent, from) 295 dequeue_slow(time, timeout, holder, queue, codel, ts) 296 297 {sent, _, from} = key -> 298 :ets.delete(queue, key) 299 go(time - sent, from, time, holder, queue, codel, ts) 300 301 :"$end_of_table" -> 302 {:ready, queue, %{codel | delay: 0}, ts} 303 end 304 end 305 306 defp go(delay, from, time, holder, queue, %{delay: min} = codel, ts) do 307 case Holder.handle_checkout(holder, from, queue, 0) do 308 true when delay < min -> 309 {:busy, queue, %{codel | delay: delay}, ts} 310 311 true -> 312 {:busy, queue, codel, ts} 313 314 false -> 315 dequeue(time, holder, queue, codel, ts) 316 end 317 end 318 319 defp drop(delay, from) do 320 message = """ 321 connection not available and request was dropped from queue after #{delay}ms. \ 322 This means requests are coming in and your connection pool cannot serve them fast enough. \ 323 You can address this by: 324 325 1. Ensuring your database is available and that you can connect to it 326 2. Tracking down slow queries and making sure they are running fast enough 327 3. Increasing the pool_size (although this increases resource consumption) 328 4. Allowing requests to wait longer by increasing :queue_target and :queue_interval 329 330 See DBConnection.start_link/2 for more information 331 """ 332 333 err = DBConnection.ConnectionError.exception(message, :queue_timeout) 334 335 Holder.reply_error(from, err) 336 end 337 338 defp start_opts(opts) do 339 Keyword.take(opts, [:name, :spawn_opt]) 340 end 341 342 defp start_poll(now, last_sent, %{interval: interval} = codel) do 343 timeout = now + interval 344 poll = :erlang.start_timer(timeout, self(), {timeout, last_sent}, abs: true) 345 %{codel | poll: poll} 346 end 347 348 defp start_idle(now_in_native, %{idle_interval: interval} = codel) do 349 timeout = System.convert_time_unit(now_in_native, :native, :millisecond) + interval 350 idle = :erlang.start_timer(timeout, self(), now_in_native, abs: true) 351 %{codel | idle: idle} 352 end 353 end