holder.ex (13031B)
1 defmodule DBConnection.Holder do 2 @moduledoc false 3 require Record 4 5 @queue true 6 @timeout 15000 7 @time_unit 1000 8 9 Record.defrecord(:conn, [:connection, :module, :state, :lock, :ts, deadline: nil, status: :ok]) 10 Record.defrecord(:pool_ref, [:pool, :reference, :deadline, :holder, :lock]) 11 12 @type t :: :ets.tid() 13 @type checkin_time :: non_neg_integer() | nil 14 15 ## Holder API 16 17 @spec new(pid, reference, module, term) :: t 18 def new(pool, ref, mod, state) do 19 # Insert before setting heir so that pool can't receive empty table 20 holder = :ets.new(__MODULE__, [:public, :ordered_set]) 21 22 conn = conn(connection: self(), module: mod, state: state, ts: System.monotonic_time()) 23 true = :ets.insert_new(holder, conn) 24 25 :ets.setopts(holder, {:heir, pool, ref}) 26 holder 27 end 28 29 @spec update(pid, reference, module, term) :: {:ok, t} | :error 30 def update(pool, ref, mod, state) do 31 holder = new(pool, ref, mod, state) 32 33 try do 34 :ets.give_away(holder, pool, {:checkin, ref, System.monotonic_time()}) 35 {:ok, holder} 36 rescue 37 ArgumentError -> :error 38 end 39 end 40 41 @spec delete(t) :: {module, term} 42 def delete(holder) do 43 [conn(module: module, state: state)] = :ets.lookup(holder, :conn) 44 :ets.delete(holder) 45 {module, state} 46 end 47 48 ## Pool API (invoked by caller) 49 50 @callback checkout(pool :: GenServer.server(), [pid], opts :: Keyword.t()) :: 51 {:ok, pool_ref :: any, module, checkin_time, state :: any} 52 | {:error, Exception.t()} 53 def checkout(pool, callers, opts) do 54 queue? = Keyword.get(opts, :queue, @queue) 55 now = System.monotonic_time(@time_unit) 56 timeout = abs_timeout(now, opts) 57 58 case checkout(pool, callers, queue?, now, timeout) do 59 {:ok, _, _, _, _} = ok -> 60 ok 61 62 {:error, %DBConnection.ConnectionError{} = connection_error} = error -> 63 :telemetry.execute( 64 [:db_connection, :connection_error], 65 %{count: 1}, 66 %{ 67 error: connection_error, 68 opts: opts 69 } 70 ) 71 72 error 73 74 {:error, _} = error -> 75 error 76 77 {:redirect, caller, proxy} -> 78 case checkout(proxy, [caller], opts) do 79 {:ok, _, _, _, _} = ok -> 80 ok 81 82 {:error, %DBConnection.ConnectionError{message: message} = exception} -> 83 {:error, 84 %{ 85 exception 86 | message: 87 "could not checkout the connection owned by #{inspect(caller)}. " <> 88 "When using the sandbox, connections are shared, so this may imply " <> 89 "another process is using a connection. Reason: #{message}" 90 }} 91 92 {:error, _} = error -> 93 error 94 end 95 96 {:exit, reason} -> 97 exit({reason, {__MODULE__, :checkout, [pool, opts]}}) 98 end 99 end 100 101 @spec checkin(pool_ref :: any) :: :ok 102 def checkin(pool_ref) do 103 # Note we may call checkin after a disconnect/stop. For this reason, we choose 104 # to not change the status on checkin but strictly speaking nobody can access 105 # the holder after disconnect/stop unless they store a copy of %DBConnection{}. 106 # Note status can't be :aborted as aborted is always reverted at the end of a 107 # transaction. 108 done(pool_ref, [{conn(:lock) + 1, nil}], :checkin, System.monotonic_time()) 109 end 110 111 @spec disconnect(pool_ref :: any, err :: Exception.t()) :: :ok 112 def disconnect(pool_ref, err) do 113 done(pool_ref, [{conn(:status) + 1, :error}], :disconnect, err) 114 end 115 116 @spec stop(pool_ref :: any, err :: Exception.t()) :: :ok 117 def stop(pool_ref, err) do 118 done(pool_ref, [{conn(:status) + 1, :error}], :stop, err) 119 end 120 121 @spec handle(pool_ref :: any, fun :: atom, args :: [term], Keyword.t()) :: tuple 122 def handle(pool_ref, fun, args, opts) do 123 handle_or_cleanup(:handle, pool_ref, fun, args, opts) 124 end 125 126 @spec cleanup(pool_ref :: any, fun :: atom, args :: [term], Keyword.t()) :: tuple 127 def cleanup(pool_ref, fun, args, opts) do 128 handle_or_cleanup(:cleanup, pool_ref, fun, args, opts) 129 end 130 131 defp handle_or_cleanup(type, pool_ref, fun, args, opts) do 132 pool_ref(holder: holder, lock: lock) = pool_ref 133 134 try do 135 :ets.lookup(holder, :conn) 136 rescue 137 ArgumentError -> 138 msg = "connection is closed because of an error, disconnect or timeout" 139 {:disconnect, DBConnection.ConnectionError.exception(msg), _state = :unused} 140 else 141 [conn(lock: conn_lock)] when conn_lock != lock -> 142 raise "an outdated connection has been given to DBConnection on #{fun}/#{length(args) + 2}" 143 144 [conn(status: :error)] -> 145 msg = "connection is closed because of an error, disconnect or timeout" 146 {:disconnect, DBConnection.ConnectionError.exception(msg), _state = :unused} 147 148 [conn(status: :aborted)] when type != :cleanup -> 149 msg = "transaction rolling back" 150 {:disconnect, DBConnection.ConnectionError.exception(msg), _state = :unused} 151 152 [conn(module: module, state: state)] -> 153 holder_apply(holder, module, fun, args ++ [opts, state]) 154 end 155 end 156 157 ## Pool state helpers API (invoked by callers) 158 159 @spec put_state(pool_ref :: any, term) :: :ok 160 def put_state(pool_ref(holder: sink_holder), state) do 161 :ets.update_element(sink_holder, :conn, [{conn(:state) + 1, state}]) 162 :ok 163 end 164 165 @spec status?(pool_ref :: any, :ok | :aborted) :: boolean() 166 def status?(pool_ref(holder: holder), status) do 167 try do 168 :ets.lookup_element(holder, :conn, conn(:status) + 1) == status 169 rescue 170 ArgumentError -> false 171 end 172 end 173 174 @spec put_status(pool_ref :: any, :ok | :aborted) :: boolean() 175 def put_status(pool_ref(holder: holder), status) do 176 try do 177 :ets.update_element(holder, :conn, [{conn(:status) + 1, status}]) 178 rescue 179 ArgumentError -> false 180 end 181 end 182 183 ## Pool callbacks (invoked by pools) 184 185 @spec reply_redirect({pid, reference}, pid | :shared | :auto, GenServer.server()) :: :ok 186 def reply_redirect(from, caller, redirect) do 187 GenServer.reply(from, {:redirect, caller, redirect}) 188 :ok 189 end 190 191 @spec reply_error({pid, reference}, Exception.t()) :: :ok 192 def reply_error(from, exception) do 193 GenServer.reply(from, {:error, exception}) 194 :ok 195 end 196 197 @spec handle_checkout(t, {pid, reference}, reference, checkin_time) :: boolean 198 def handle_checkout(holder, {pid, mref}, ref, checkin_time) do 199 :ets.give_away(holder, pid, {mref, ref, checkin_time}) 200 rescue 201 ArgumentError -> 202 if Process.alive?(pid) or :ets.info(holder, :owner) != self() do 203 raise ArgumentError, no_holder(holder, pid) 204 else 205 false 206 end 207 end 208 209 @spec handle_deadline(t, reference) :: boolean 210 def handle_deadline(holder, deadline) do 211 :ets.lookup_element(holder, :conn, conn(:deadline) + 1) 212 rescue 213 ArgumentError -> false 214 else 215 ^deadline -> true 216 _ -> false 217 end 218 219 @spec handle_ping(t) :: true 220 def handle_ping(holder) do 221 :ets.lookup(holder, :conn) 222 rescue 223 ArgumentError -> 224 raise ArgumentError, no_holder(holder, nil) 225 else 226 [conn(connection: conn, state: state)] -> 227 DBConnection.Connection.ping({conn, holder}, state) 228 :ets.delete(holder) 229 true 230 end 231 232 @spec handle_disconnect(t, Exception.t()) :: boolean 233 def handle_disconnect(holder, err) do 234 handle_done(holder, &DBConnection.Connection.disconnect/3, err) 235 end 236 237 @spec handle_stop(t, term) :: boolean 238 def handle_stop(holder, err) do 239 handle_done(holder, &DBConnection.Connection.stop/3, err) 240 end 241 242 @spec maybe_disconnect(t, integer, non_neg_integer) :: boolean() 243 def maybe_disconnect(holder, start, interval) do 244 ts = :ets.lookup_element(holder, :conn, conn(:ts) + 1) 245 246 cond do 247 ts >= start -> 248 false 249 250 interval == 0 -> 251 true 252 253 true -> 254 pid = :ets.lookup_element(holder, :conn, conn(:connection) + 1) 255 System.monotonic_time() > :erlang.phash2(pid, interval) + start 256 end 257 rescue 258 _ -> false 259 else 260 true -> 261 opts = [message: "disconnect_all requested", severity: :info] 262 handle_disconnect(holder, DBConnection.ConnectionError.exception(opts)) 263 264 false -> 265 false 266 end 267 268 ## Private 269 270 defp checkout(pool, callers, queue?, start, timeout) do 271 case GenServer.whereis(pool) do 272 pid when node(pid) == node() -> 273 checkout_call(pid, callers, queue?, start, timeout) 274 275 pid when node(pid) != node() -> 276 {:exit, {:badnode, node(pid)}} 277 278 {_name, node} -> 279 {:exit, {:badnode, node}} 280 281 nil -> 282 {:exit, :noproc} 283 end 284 end 285 286 defp checkout_call(pid, callers, queue?, start, timeout) do 287 lock = Process.monitor(pid) 288 send(pid, {:db_connection, {self(), lock}, {:checkout, callers, start, queue?}}) 289 290 receive do 291 {:"ETS-TRANSFER", holder, pool, {^lock, ref, checkin_time}} -> 292 Process.demonitor(lock, [:flush]) 293 {deadline, ops} = start_deadline(timeout, pool, ref, holder, start) 294 :ets.update_element(holder, :conn, [{conn(:lock) + 1, lock} | ops]) 295 296 pool_ref = 297 pool_ref(pool: pool, reference: ref, deadline: deadline, holder: holder, lock: lock) 298 299 checkout_result(holder, pool_ref, checkin_time) 300 301 {^lock, reply} -> 302 Process.demonitor(lock, [:flush]) 303 reply 304 305 {:DOWN, ^lock, _, _, reason} -> 306 {:exit, reason} 307 end 308 end 309 310 defp checkout_result(holder, pool_ref, checkin_time) do 311 try do 312 :ets.lookup(holder, :conn) 313 rescue 314 ArgumentError -> 315 # Deadline could hit and be handled pool before using connection 316 msg = "connection not available because deadline reached while in queue" 317 {:error, DBConnection.ConnectionError.exception(msg)} 318 else 319 [conn(module: mod, state: state)] -> 320 {:ok, pool_ref, mod, checkin_time, state} 321 end 322 end 323 324 defp no_holder(holder, maybe_pid) do 325 reason = 326 case :ets.info(holder, :owner) do 327 :undefined -> "does not exist" 328 ^maybe_pid -> "is being given to its current owner" 329 owner when owner != self() -> "does not belong to the giving process" 330 _ -> "could not be given away" 331 end 332 333 call_reason = 334 if maybe_pid do 335 "Error happened when attempting to transfer to #{inspect(maybe_pid)} " <> 336 "(alive: #{Process.alive?(maybe_pid)})" 337 else 338 "Error happened when looking up connection" 339 end 340 341 """ 342 #{inspect(__MODULE__)} #{inspect(holder)} #{reason}, pool inconsistent. 343 #{call_reason}. 344 345 SELF: #{inspect(self())} 346 ETS INFO: #{inspect(:ets.info(holder))} 347 348 Please report at https://github.com/elixir-ecto/db_connection/issues" 349 """ 350 end 351 352 defp holder_apply(holder, module, fun, args) do 353 try do 354 apply(module, fun, args) 355 catch 356 kind, reason -> 357 {:catch, kind, reason, __STACKTRACE__} 358 else 359 result when is_tuple(result) -> 360 state = :erlang.element(:erlang.tuple_size(result), result) 361 362 try do 363 :ets.update_element(holder, :conn, {conn(:state) + 1, state}) 364 result 365 rescue 366 ArgumentError -> 367 augment_disconnect(result) 368 end 369 370 # If it is not a tuple, we just return it as is so we raise bad return. 371 result -> 372 result 373 end 374 end 375 376 defp augment_disconnect({:disconnect, %DBConnection.ConnectionError{} = err, state}) do 377 %{message: message} = err 378 379 message = 380 message <> 381 " (the connection was closed by the pool, " <> 382 "possibly due to a timeout or because the pool has been terminated)" 383 384 {:disconnect, %{err | message: message}, state} 385 end 386 387 defp augment_disconnect(result), do: result 388 389 defp done(pool_ref, ops, tag, info) do 390 pool_ref(pool: pool, reference: ref, deadline: deadline, holder: holder) = pool_ref 391 cancel_deadline(deadline) 392 393 try do 394 :ets.update_element(holder, :conn, [{conn(:deadline) + 1, nil} | ops]) 395 :ets.give_away(holder, pool, {tag, ref, info}) 396 rescue 397 ArgumentError -> :ok 398 else 399 true -> :ok 400 end 401 end 402 403 defp handle_done(holder, stop, err) do 404 :ets.lookup(holder, :conn) 405 rescue 406 ArgumentError -> 407 false 408 else 409 [conn(connection: pid, deadline: deadline, state: state)] -> 410 cancel_deadline(deadline) 411 :ets.delete(holder) 412 stop.({pid, holder}, err, state) 413 true 414 end 415 416 defp abs_timeout(now, opts) do 417 case Keyword.get(opts, :timeout, @timeout) do 418 :infinity -> Keyword.get(opts, :deadline) 419 timeout -> min(now + timeout, Keyword.get(opts, :deadline)) 420 end 421 end 422 423 defp start_deadline(nil, _, _, _, _) do 424 {nil, []} 425 end 426 427 defp start_deadline(timeout, pid, ref, holder, start) do 428 deadline = 429 :erlang.start_timer(timeout, pid, {ref, holder, self(), timeout - start}, abs: true) 430 431 {deadline, [{conn(:deadline) + 1, deadline}]} 432 end 433 434 defp cancel_deadline(nil) do 435 :ok 436 end 437 438 defp cancel_deadline(deadline) do 439 :erlang.cancel_timer(deadline, async: true, info: false) 440 end 441 end