connection.ex (12001B)
1 defmodule DBConnection.ConnectionError do 2 defexception [:message, severity: :error, reason: :error] 3 4 @moduledoc """ 5 The raised exception might include the reason which would be useful 6 to programmatically determine what was causing the error. 7 """ 8 9 @doc false 10 def exception(message, reason) do 11 message 12 |> exception() 13 |> Map.replace!(:reason, reason) 14 end 15 end 16 17 defmodule DBConnection.Connection do 18 @moduledoc false 19 20 use Connection 21 require Logger 22 alias DBConnection.Backoff 23 alias DBConnection.Holder 24 25 @timeout 15_000 26 27 @doc false 28 def start_link(mod, opts, pool, tag) do 29 start_opts = Keyword.take(opts, [:debug, :spawn_opt]) 30 Connection.start_link(__MODULE__, {mod, opts, pool, tag}, start_opts) 31 end 32 33 @doc false 34 def child_spec(mod, opts, pool, tag, child_opts) do 35 Supervisor.child_spec( 36 %{id: __MODULE__, start: {__MODULE__, :start_link, [mod, opts, pool, tag]}}, 37 child_opts 38 ) 39 end 40 41 @doc false 42 def disconnect({pid, ref}, err, state) do 43 Connection.cast(pid, {:disconnect, ref, err, state}) 44 end 45 46 @doc false 47 def stop({pid, ref}, err, state) do 48 Connection.cast(pid, {:stop, ref, err, state}) 49 end 50 51 @doc false 52 def ping({pid, ref}, state) do 53 Connection.cast(pid, {:ping, ref, state}) 54 end 55 56 ## Connection API 57 58 @doc false 59 def init({mod, opts, pool, tag}) do 60 s = %{ 61 mod: mod, 62 opts: opts, 63 state: nil, 64 client: :closed, 65 pool: pool, 66 tag: tag, 67 timer: nil, 68 backoff: Backoff.new(opts), 69 connection_listeners: Keyword.get(opts, :connection_listeners, []), 70 after_connect: Keyword.get(opts, :after_connect), 71 after_connect_timeout: Keyword.get(opts, :after_connect_timeout, @timeout) 72 } 73 74 {:connect, :init, s} 75 end 76 77 @doc false 78 def connect(_, s) do 79 %{mod: mod, opts: opts, backoff: backoff, after_connect: after_connect} = s 80 81 try do 82 apply(mod, :connect, [connect_opts(opts)]) 83 rescue 84 e -> 85 {e, stack} = maybe_sanitize_exception(e, __STACKTRACE__, opts) 86 reraise e, stack 87 else 88 {:ok, state} when after_connect != nil -> 89 ref = make_ref() 90 Connection.cast(self(), {:after_connect, ref}) 91 {:ok, %{s | state: state, client: {ref, :connect}}} 92 93 {:ok, state} -> 94 backoff = backoff && Backoff.reset(backoff) 95 ref = make_ref() 96 Connection.cast(self(), {:connected, ref}) 97 {:ok, %{s | state: state, client: {ref, :connect}, backoff: backoff}} 98 99 {:error, err} when is_nil(backoff) -> 100 raise err 101 102 {:error, err} -> 103 Logger.error( 104 fn -> 105 [ 106 inspect(mod), 107 ?\s, 108 ?(, 109 inspect(self()), 110 ") failed to connect: " 111 | Exception.format_banner(:error, err, []) 112 ] 113 end, 114 crash_reason: {err, []} 115 ) 116 117 {timeout, backoff} = Backoff.backoff(backoff) 118 {:backoff, timeout, %{s | backoff: backoff}} 119 end 120 end 121 122 defp maybe_sanitize_exception(e, stack, opts) do 123 if Keyword.get(opts, :show_sensitive_data_on_connection_error, false) do 124 {e, stack} 125 else 126 message = 127 "connect raised #{inspect(e.__struct__)} exception#{sanitized_message(e)}. " <> 128 "The exception details are hidden, as they may contain sensitive data such as " <> 129 "database credentials. You may set :show_sensitive_data_on_connection_error " <> 130 "to true when starting your connection if you wish to see all of the details" 131 132 {RuntimeError.exception(message), cleanup_stacktrace(stack)} 133 end 134 end 135 136 defp sanitized_message(%KeyError{} = e), do: ": #{Exception.message(%{e | term: nil})}" 137 defp sanitized_message(_), do: "" 138 139 @doc false 140 def disconnect({log, err}, %{mod: mod} = s) do 141 if log == :log do 142 severity = 143 case err do 144 %DBConnection.ConnectionError{severity: severity} -> severity 145 _ -> :error 146 end 147 148 Logger.log(severity, fn -> 149 [ 150 inspect(mod), 151 ?\s, 152 ?(, 153 inspect(self()), 154 ") disconnected: " | Exception.format_banner(:error, err, []) 155 ] 156 end) 157 158 :ok 159 end 160 161 %{state: state, client: client, timer: timer, backoff: backoff} = s 162 demonitor(client) 163 cancel_timer(timer) 164 :ok = apply(mod, :disconnect, [err, state]) 165 s = %{s | state: nil, client: :closed, timer: nil} 166 167 notify_connection_listeners({:disconnected, self()}, s) 168 169 case client do 170 _ when backoff == nil -> 171 {:stop, {:shutdown, err}, s} 172 173 {_, :after_connect} -> 174 {timeout, backoff} = Backoff.backoff(backoff) 175 {:backoff, timeout, %{s | backoff: backoff}} 176 177 _ -> 178 {:connect, :disconnect, s} 179 end 180 end 181 182 @doc false 183 def handle_cast({:ping, ref, state}, %{client: {ref, :pool}, mod: mod} = s) do 184 case apply(mod, :ping, [state]) do 185 {:ok, state} -> 186 pool_update(state, s) 187 188 {:disconnect, err, state} -> 189 {:disconnect, {:log, err}, %{s | state: state}} 190 end 191 end 192 193 def handle_cast({:disconnect, ref, err, state}, %{client: {ref, _}} = s) do 194 {:disconnect, {:log, err}, %{s | state: state}} 195 end 196 197 def handle_cast({:stop, ref, err, state}, %{client: {ref, _}} = s) do 198 {_, stack} = :erlang.process_info(self(), :current_stacktrace) 199 {:stop, {err, stack}, %{s | state: state}} 200 end 201 202 def handle_cast({tag, _, _, _}, s) when tag in [:disconnect, :stop] do 203 handle_timeout(s) 204 end 205 206 def handle_cast({:after_connect, ref}, %{client: {ref, :connect}} = s) do 207 %{ 208 mod: mod, 209 state: state, 210 after_connect: after_connect, 211 after_connect_timeout: timeout, 212 opts: opts 213 } = s 214 215 notify_connection_listeners({:connected, self()}, s) 216 217 case apply(mod, :checkout, [state]) do 218 {:ok, state} -> 219 opts = [timeout: timeout] ++ opts 220 {pid, ref} = DBConnection.Task.run_child(mod, state, after_connect, opts) 221 timer = start_timer(pid, timeout) 222 s = %{s | client: {ref, :after_connect}, timer: timer, state: state} 223 {:noreply, s} 224 225 {:disconnect, err, state} -> 226 {:disconnect, {:log, err}, %{s | state: state}} 227 end 228 end 229 230 def handle_cast({:after_connect, _}, s) do 231 {:noreply, s} 232 end 233 234 def handle_cast({:connected, ref}, %{client: {ref, :connect}} = s) do 235 %{mod: mod, state: state} = s 236 237 notify_connection_listeners({:connected, self()}, s) 238 239 case apply(mod, :checkout, [state]) do 240 {:ok, state} -> 241 pool_update(state, s) 242 243 {:disconnect, err, state} -> 244 {:disconnect, {:log, err}, %{s | state: state}} 245 end 246 end 247 248 def handle_cast({:connected, _}, s) do 249 {:noreply, s} 250 end 251 252 @doc false 253 def handle_info({:DOWN, ref, _, pid, reason}, %{client: {ref, :after_connect}} = s) do 254 message = "client #{inspect(pid)} exited: " <> Exception.format_exit(reason) 255 err = DBConnection.ConnectionError.exception(message) 256 {:disconnect, {down_log(reason), err}, %{s | client: {nil, :after_connect}}} 257 end 258 259 def handle_info({:DOWN, mon, _, pid, reason}, %{client: {ref, mon}} = s) do 260 message = "client #{inspect(pid)} exited: " <> Exception.format_exit(reason) 261 err = DBConnection.ConnectionError.exception(message) 262 {:disconnect, {down_log(reason), err}, %{s | client: {ref, nil}}} 263 end 264 265 def handle_info({:timeout, timer, {__MODULE__, pid, timeout}}, %{timer: timer} = s) 266 when is_reference(timer) do 267 message = 268 "client #{inspect(pid)} timed out because it checked out " <> 269 "the connection for longer than #{timeout}ms" 270 271 exc = 272 case Process.info(pid, :current_stacktrace) do 273 {:current_stacktrace, stacktrace} -> 274 message <> 275 "\n\n#{inspect(pid)} was at location:\n\n" <> 276 Exception.format_stacktrace(stacktrace) 277 278 _ -> 279 message 280 end 281 |> DBConnection.ConnectionError.exception() 282 283 {:disconnect, {:log, exc}, %{s | timer: nil}} 284 end 285 286 def handle_info( 287 {:"ETS-TRANSFER", holder, _pid, {msg, ref, extra}}, 288 %{client: {ref, :after_connect}, timer: timer} = s 289 ) do 290 {_, state} = Holder.delete(holder) 291 cancel_timer(timer) 292 s = %{s | timer: nil} 293 294 case msg do 295 :checkin -> handle_checkin(state, s) 296 :disconnect -> handle_cast({:disconnect, ref, extra, state}, s) 297 :stop -> handle_cast({:stop, ref, extra, state}, s) 298 end 299 end 300 301 def handle_info(msg, %{mod: mod} = s) do 302 Logger.info(fn -> 303 [inspect(mod), ?\s, ?(, inspect(self()), ") missed message: " | inspect(msg)] 304 end) 305 306 handle_timeout(s) 307 end 308 309 @doc false 310 def format_status(info, [_, %{client: :closed, mod: mod}]) do 311 case info do 312 :normal -> [{:data, [{'Module', mod}]}] 313 :terminate -> mod 314 end 315 end 316 317 def format_status(info, [pdict, %{mod: mod, state: state}]) do 318 case function_exported?(mod, :format_status, 2) do 319 true when info == :normal -> 320 normal_status(mod, pdict, state) 321 322 false when info == :normal -> 323 normal_status_default(mod, state) 324 325 true when info == :terminate -> 326 {mod, terminate_status(mod, pdict, state)} 327 328 false when info == :terminate -> 329 {mod, state} 330 end 331 end 332 333 ## Helpers 334 335 defp connect_opts(opts) do 336 case Keyword.get(opts, :configure) do 337 {mod, fun, args} -> 338 apply(mod, fun, [opts | args]) 339 340 fun when is_function(fun, 1) -> 341 fun.(opts) 342 343 nil -> 344 opts 345 end 346 end 347 348 defp down_log(:normal), do: :nolog 349 defp down_log(:shutdown), do: :nolog 350 defp down_log({:shutdown, _}), do: :nolog 351 defp down_log(_), do: :log 352 353 defp handle_timeout(s), do: {:noreply, s} 354 355 defp demonitor({_, mon}) when is_reference(mon) do 356 Process.demonitor(mon, [:flush]) 357 end 358 359 defp demonitor({mon, :after_connect}) when is_reference(mon) do 360 Process.demonitor(mon, [:flush]) 361 end 362 363 defp demonitor({_, _}), do: true 364 defp demonitor(nil), do: true 365 366 defp start_timer(_, :infinity), do: nil 367 368 defp start_timer(pid, timeout) do 369 :erlang.start_timer(timeout, self(), {__MODULE__, pid, timeout}) 370 end 371 372 defp cancel_timer(nil), do: :ok 373 374 defp cancel_timer(timer) do 375 case :erlang.cancel_timer(timer) do 376 false -> flush_timer(timer) 377 _ -> :ok 378 end 379 end 380 381 defp flush_timer(timer) do 382 receive do 383 {:timeout, ^timer, {__MODULE__, _, _}} -> 384 :ok 385 after 386 0 -> 387 raise ArgumentError, "timer #{inspect(timer)} does not exist" 388 end 389 end 390 391 defp handle_checkin(state, s) do 392 %{backoff: backoff, client: client} = s 393 backoff = backoff && Backoff.reset(backoff) 394 demonitor(client) 395 pool_update(state, %{s | client: nil, backoff: backoff}) 396 end 397 398 defp pool_update(state, %{pool: pool, tag: tag, mod: mod} = s) do 399 case Holder.update(pool, tag, mod, state) do 400 {:ok, ref} -> 401 {:noreply, %{s | client: {ref, :pool}, state: state}, :hibernate} 402 403 :error -> 404 {:stop, {:shutdown, :no_more_pool}, s} 405 end 406 end 407 408 defp normal_status(mod, pdict, state) do 409 try do 410 mod.format_status(:normal, [pdict, state]) 411 catch 412 _, _ -> 413 normal_status_default(mod, state) 414 else 415 status -> 416 status 417 end 418 end 419 420 defp normal_status_default(mod, state) do 421 [{:data, [{'Module', mod}, {'State', state}]}] 422 end 423 424 defp terminate_status(mod, pdict, state) do 425 try do 426 mod.format_status(:terminate, [pdict, state]) 427 catch 428 _, _ -> 429 state 430 else 431 status -> 432 status 433 end 434 end 435 436 defp cleanup_stacktrace(stack) do 437 case stack do 438 [{_, _, arity, _} | _rest] = stacktrace when is_integer(arity) -> 439 stacktrace 440 441 [{mod, fun, args, info} | rest] when is_list(args) -> 442 [{mod, fun, length(args), info} | rest] 443 end 444 end 445 446 defp notify_connection_listeners(message, %{} = state) do 447 %{connection_listeners: connection_listeners} = state 448 449 Enum.each(connection_listeners, &send(&1, message)) 450 end 451 end