proxy.ex (9257B)
1 defmodule DBConnection.Ownership.Proxy do 2 @moduledoc false 3 4 alias DBConnection.Holder 5 use GenServer, restart: :temporary 6 7 @time_unit 1000 8 @ownership_timeout 120_000 9 @queue_target 50 10 @queue_interval 1000 11 12 def start_link({caller, pool, pool_opts}) do 13 GenServer.start_link(__MODULE__, {caller, pool, pool_opts}, []) 14 end 15 16 def stop(proxy, caller) do 17 GenServer.cast(proxy, {:stop, caller}) 18 end 19 20 # Callbacks 21 22 def init({caller, pool, pool_opts}) do 23 pool_opts = 24 pool_opts 25 |> Keyword.put(:timeout, :infinity) 26 |> Keyword.delete(:deadline) 27 28 owner_ref = Process.monitor(caller) 29 ownership_timeout = Keyword.get(pool_opts, :ownership_timeout, @ownership_timeout) 30 timeout = Keyword.get(pool_opts, :queue_target, @queue_target) * 2 31 interval = Keyword.get(pool_opts, :queue_interval, @queue_interval) 32 33 pre_checkin = Keyword.get(pool_opts, :pre_checkin, fn _, mod, state -> {:ok, mod, state} end) 34 post_checkout = Keyword.get(pool_opts, :post_checkout, &{:ok, &1, &2}) 35 36 state = %{ 37 client: nil, 38 timer: nil, 39 holder: nil, 40 timeout: timeout, 41 interval: interval, 42 poll: nil, 43 owner: {caller, owner_ref}, 44 pool: pool, 45 pool_ref: nil, 46 pool_opts: pool_opts, 47 queue: :queue.new(), 48 mod: nil, 49 pre_checkin: pre_checkin, 50 post_checkout: post_checkout, 51 ownership_timer: start_timer(caller, ownership_timeout) 52 } 53 54 now = System.monotonic_time(@time_unit) 55 {:ok, start_poll(now, state)} 56 end 57 58 def handle_info({:DOWN, ref, _, pid, _reason}, %{owner: {_, ref}} = state) do 59 down("owner #{inspect(pid)} exited", state) 60 end 61 62 def handle_info({:timeout, deadline, {_ref, holder, pid, len}}, %{holder: holder} = state) do 63 if Holder.handle_deadline(holder, deadline) do 64 message = 65 "client #{inspect(pid)} timed out because " <> 66 "it queued and checked out the connection for longer than #{len}ms" 67 68 down(message, state) 69 else 70 {:noreply, state} 71 end 72 end 73 74 def handle_info( 75 {:timeout, timer, {__MODULE__, pid, timeout}}, 76 %{ownership_timer: timer} = state 77 ) do 78 message = 79 "owner #{inspect(pid)} timed out because " <> 80 "it owned the connection for longer than #{timeout}ms (set via the :ownership_timeout option)" 81 82 # We don't invoke down because this is always a disconnect, even if there is no client. 83 # On the other hand, those timeouts are unlikely to trigger, as it defaults to 2 mins. 84 pool_disconnect(DBConnection.ConnectionError.exception(message), state) 85 end 86 87 def handle_info({:timeout, poll, time}, %{poll: poll} = state) do 88 state = timeout(time, state) 89 {:noreply, start_poll(time, state)} 90 end 91 92 def handle_info( 93 {:db_connection, from, {:checkout, _caller, _now, _queue?}}, 94 %{holder: nil} = state 95 ) do 96 %{pool: pool, pool_opts: pool_opts, owner: {_, owner_ref}, post_checkout: post_checkout} = 97 state 98 99 case Holder.checkout(pool, [self()], pool_opts) do 100 {:ok, pool_ref, original_mod, _idle_time, conn_state} -> 101 case post_checkout.(original_mod, conn_state) do 102 {:ok, conn_mod, conn_state} -> 103 holder = Holder.new(self(), owner_ref, conn_mod, conn_state) 104 state = %{state | pool_ref: pool_ref, holder: holder, mod: original_mod} 105 checkout(from, state) 106 107 {:disconnect, err, ^original_mod, _conn_state} -> 108 Holder.disconnect(pool_ref, err) 109 Holder.reply_error(from, err) 110 {:stop, {:shutdown, err}, state} 111 end 112 113 {:error, err} -> 114 Holder.reply_error(from, err) 115 {:stop, {:shutdown, err}, state} 116 end 117 end 118 119 def handle_info( 120 {:db_connection, from, {:checkout, _caller, _now, _queue?}}, 121 %{client: nil} = state 122 ) do 123 checkout(from, state) 124 end 125 126 def handle_info({:db_connection, from, {:checkout, _caller, now, queue?}}, state) do 127 if queue? do 128 %{queue: queue} = state 129 queue = :queue.in({now, from}, queue) 130 {:noreply, %{state | queue: queue}} 131 else 132 message = "connection not available and queuing is disabled" 133 err = DBConnection.ConnectionError.exception(message) 134 Holder.reply_error(from, err) 135 {:noreply, state} 136 end 137 end 138 139 def handle_info( 140 {:"ETS-TRANSFER", holder, _, {msg, ref, extra}}, 141 %{holder: holder, client: {_, ref, _}} = state 142 ) do 143 case msg do 144 :checkin -> checkin(state) 145 :disconnect -> pool_disconnect(extra, state) 146 :stop -> pool_stop(extra, state) 147 end 148 end 149 150 def handle_info({:"ETS-TRANSFER", holder, pid, ref}, %{holder: holder, owner: {_, ref}} = state) do 151 down("client #{inspect(pid)} exited", state) 152 end 153 154 def handle_cast({:stop, caller}, %{owner: {owner, _}} = state) do 155 message = "#{inspect(caller)} checked in the connection owned by #{inspect(owner)}" 156 157 message = 158 case pruned_stacktrace(caller) do 159 [] -> 160 message 161 162 current_stack -> 163 message <> 164 "\n\n#{inspect(caller)} triggered the checkin at location:\n\n" <> 165 Exception.format_stacktrace(current_stack) 166 end 167 168 down(message, state) 169 end 170 171 defp checkout({pid, ref} = from, %{holder: holder} = state) do 172 if Holder.handle_checkout(holder, from, ref, nil) do 173 {:noreply, %{state | client: {pid, ref, pruned_stacktrace(pid)}}} 174 else 175 next(state) 176 end 177 end 178 179 defp checkin(state) do 180 next(%{state | client: nil}) 181 end 182 183 defp next(%{queue: queue} = state) do 184 case :queue.out(queue) do 185 {{:value, {_, from}}, queue} -> 186 checkout(from, %{state | queue: queue}) 187 188 {:empty, queue} -> 189 {:noreply, %{state | queue: queue}} 190 end 191 end 192 193 defp start_timer(_, :infinity), do: nil 194 195 defp start_timer(pid, timeout) do 196 :erlang.start_timer(timeout, self(), {__MODULE__, pid, timeout}) 197 end 198 199 # It is down but never checked out from pool 200 defp down(reason, %{holder: nil} = state) do 201 {:stop, {:shutdown, reason}, state} 202 end 203 204 # If it is down but it has no client, checkin 205 defp down(reason, %{client: nil} = state) do 206 pool_checkin(reason, state) 207 end 208 209 # If it is down but it has a client, disconnect 210 defp down(reason, %{client: {client, _, checkout_stack}} = state) do 211 reason = 212 case pruned_stacktrace(client) do 213 [] -> 214 reason 215 216 current_stack -> 217 reason <> 218 """ 219 \n\nClient #{inspect(client)} is still using a connection from owner at location: 220 221 #{Exception.format_stacktrace(current_stack)} 222 The connection itself was checked out by #{inspect(client)} at location: 223 224 #{Exception.format_stacktrace(checkout_stack)} 225 """ 226 end 227 228 err = DBConnection.ConnectionError.exception(reason) 229 pool_disconnect(err, state) 230 end 231 232 ## Helpers 233 234 defp pool_checkin(reason, state) do 235 pool_done(reason, state, :checkin, fn pool_ref, _ -> Holder.checkin(pool_ref) end) 236 end 237 238 defp pool_disconnect(err, state) do 239 pool_done(err, state, {:disconnect, err}, &Holder.disconnect/2) 240 end 241 242 defp pool_stop(err, state) do 243 pool_done(err, state, {:stop, err}, &Holder.stop/2, &Holder.stop/2) 244 end 245 246 defp pool_done(err, state, op, done, stop_or_disconnect \\ &Holder.disconnect/2) do 247 %{holder: holder, pool_ref: pool_ref, pre_checkin: pre_checkin, mod: original_mod} = state 248 249 if holder do 250 {conn_mod, conn_state} = Holder.delete(holder) 251 252 case pre_checkin.(op, conn_mod, conn_state) do 253 {:ok, ^original_mod, conn_state} -> 254 Holder.put_state(pool_ref, conn_state) 255 done.(pool_ref, err) 256 {:stop, {:shutdown, err}, state} 257 258 {:disconnect, err, ^original_mod, conn_state} -> 259 Holder.put_state(pool_ref, conn_state) 260 stop_or_disconnect.(pool_ref, err) 261 {:stop, {:shutdown, err}, state} 262 end 263 else 264 {:stop, {:shutdown, err}, state} 265 end 266 end 267 268 defp start_poll(now, %{interval: interval} = state) do 269 timeout = now + interval 270 poll = :erlang.start_timer(timeout, self(), timeout, abs: true) 271 %{state | poll: poll} 272 end 273 274 defp timeout(time, %{queue: queue, timeout: timeout} = state) do 275 case :queue.out(queue) do 276 {{:value, {sent, from}}, queue} when sent + timeout < time -> 277 drop(time - sent, from) 278 timeout(time, %{state | queue: queue}) 279 280 {_, _} -> 281 state 282 end 283 end 284 285 defp drop(delay, from) do 286 message = 287 "connection not available and request was dropped from queue after #{delay}ms. " <> 288 "You can configure how long requests wait in the queue using :queue_target and " <> 289 ":queue_interval. See DBConnection.start_link/2 for more information" 290 291 err = DBConnection.ConnectionError.exception(message, :queue_timeout) 292 Holder.reply_error(from, err) 293 end 294 295 @prune_modules [:gen, GenServer, DBConnection, DBConnection.Holder, DBConnection.Ownership] 296 297 defp pruned_stacktrace(pid) do 298 case Process.info(pid, :current_stacktrace) do 299 {:current_stacktrace, stacktrace} -> 300 Enum.drop_while(stacktrace, &match?({mod, _, _, _} when mod in @prune_modules, &1)) 301 302 _ -> 303 [] 304 end 305 end 306 end