zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

connection.ex (12001B)


      1 defmodule DBConnection.ConnectionError do
      2   defexception [:message, severity: :error, reason: :error]
      3 
      4   @moduledoc """
      5   The raised exception might include the reason which would be useful
      6   to programmatically determine what was causing the error.
      7   """
      8 
      9   @doc false
     10   def exception(message, reason) do
     11     message
     12     |> exception()
     13     |> Map.replace!(:reason, reason)
     14   end
     15 end
     16 
     17 defmodule DBConnection.Connection do
     18   @moduledoc false
     19 
     20   use Connection
     21   require Logger
     22   alias DBConnection.Backoff
     23   alias DBConnection.Holder
     24 
     25   @timeout 15_000
     26 
     27   @doc false
     28   def start_link(mod, opts, pool, tag) do
     29     start_opts = Keyword.take(opts, [:debug, :spawn_opt])
     30     Connection.start_link(__MODULE__, {mod, opts, pool, tag}, start_opts)
     31   end
     32 
     33   @doc false
     34   def child_spec(mod, opts, pool, tag, child_opts) do
     35     Supervisor.child_spec(
     36       %{id: __MODULE__, start: {__MODULE__, :start_link, [mod, opts, pool, tag]}},
     37       child_opts
     38     )
     39   end
     40 
     41   @doc false
     42   def disconnect({pid, ref}, err, state) do
     43     Connection.cast(pid, {:disconnect, ref, err, state})
     44   end
     45 
     46   @doc false
     47   def stop({pid, ref}, err, state) do
     48     Connection.cast(pid, {:stop, ref, err, state})
     49   end
     50 
     51   @doc false
     52   def ping({pid, ref}, state) do
     53     Connection.cast(pid, {:ping, ref, state})
     54   end
     55 
     56   ## Connection API
     57 
     58   @doc false
     59   def init({mod, opts, pool, tag}) do
     60     s = %{
     61       mod: mod,
     62       opts: opts,
     63       state: nil,
     64       client: :closed,
     65       pool: pool,
     66       tag: tag,
     67       timer: nil,
     68       backoff: Backoff.new(opts),
     69       connection_listeners: Keyword.get(opts, :connection_listeners, []),
     70       after_connect: Keyword.get(opts, :after_connect),
     71       after_connect_timeout: Keyword.get(opts, :after_connect_timeout, @timeout)
     72     }
     73 
     74     {:connect, :init, s}
     75   end
     76 
     77   @doc false
     78   def connect(_, s) do
     79     %{mod: mod, opts: opts, backoff: backoff, after_connect: after_connect} = s
     80 
     81     try do
     82       apply(mod, :connect, [connect_opts(opts)])
     83     rescue
     84       e ->
     85         {e, stack} = maybe_sanitize_exception(e, __STACKTRACE__, opts)
     86         reraise e, stack
     87     else
     88       {:ok, state} when after_connect != nil ->
     89         ref = make_ref()
     90         Connection.cast(self(), {:after_connect, ref})
     91         {:ok, %{s | state: state, client: {ref, :connect}}}
     92 
     93       {:ok, state} ->
     94         backoff = backoff && Backoff.reset(backoff)
     95         ref = make_ref()
     96         Connection.cast(self(), {:connected, ref})
     97         {:ok, %{s | state: state, client: {ref, :connect}, backoff: backoff}}
     98 
     99       {:error, err} when is_nil(backoff) ->
    100         raise err
    101 
    102       {:error, err} ->
    103         Logger.error(
    104           fn ->
    105             [
    106               inspect(mod),
    107               ?\s,
    108               ?(,
    109               inspect(self()),
    110               ") failed to connect: "
    111               | Exception.format_banner(:error, err, [])
    112             ]
    113           end,
    114           crash_reason: {err, []}
    115         )
    116 
    117         {timeout, backoff} = Backoff.backoff(backoff)
    118         {:backoff, timeout, %{s | backoff: backoff}}
    119     end
    120   end
    121 
    122   defp maybe_sanitize_exception(e, stack, opts) do
    123     if Keyword.get(opts, :show_sensitive_data_on_connection_error, false) do
    124       {e, stack}
    125     else
    126       message =
    127         "connect raised #{inspect(e.__struct__)} exception#{sanitized_message(e)}. " <>
    128           "The exception details are hidden, as they may contain sensitive data such as " <>
    129           "database credentials. You may set :show_sensitive_data_on_connection_error " <>
    130           "to true when starting your connection if you wish to see all of the details"
    131 
    132       {RuntimeError.exception(message), cleanup_stacktrace(stack)}
    133     end
    134   end
    135 
    136   defp sanitized_message(%KeyError{} = e), do: ": #{Exception.message(%{e | term: nil})}"
    137   defp sanitized_message(_), do: ""
    138 
    139   @doc false
    140   def disconnect({log, err}, %{mod: mod} = s) do
    141     if log == :log do
    142       severity =
    143         case err do
    144           %DBConnection.ConnectionError{severity: severity} -> severity
    145           _ -> :error
    146         end
    147 
    148       Logger.log(severity, fn ->
    149         [
    150           inspect(mod),
    151           ?\s,
    152           ?(,
    153           inspect(self()),
    154           ") disconnected: " | Exception.format_banner(:error, err, [])
    155         ]
    156       end)
    157 
    158       :ok
    159     end
    160 
    161     %{state: state, client: client, timer: timer, backoff: backoff} = s
    162     demonitor(client)
    163     cancel_timer(timer)
    164     :ok = apply(mod, :disconnect, [err, state])
    165     s = %{s | state: nil, client: :closed, timer: nil}
    166 
    167     notify_connection_listeners({:disconnected, self()}, s)
    168 
    169     case client do
    170       _ when backoff == nil ->
    171         {:stop, {:shutdown, err}, s}
    172 
    173       {_, :after_connect} ->
    174         {timeout, backoff} = Backoff.backoff(backoff)
    175         {:backoff, timeout, %{s | backoff: backoff}}
    176 
    177       _ ->
    178         {:connect, :disconnect, s}
    179     end
    180   end
    181 
    182   @doc false
    183   def handle_cast({:ping, ref, state}, %{client: {ref, :pool}, mod: mod} = s) do
    184     case apply(mod, :ping, [state]) do
    185       {:ok, state} ->
    186         pool_update(state, s)
    187 
    188       {:disconnect, err, state} ->
    189         {:disconnect, {:log, err}, %{s | state: state}}
    190     end
    191   end
    192 
    193   def handle_cast({:disconnect, ref, err, state}, %{client: {ref, _}} = s) do
    194     {:disconnect, {:log, err}, %{s | state: state}}
    195   end
    196 
    197   def handle_cast({:stop, ref, err, state}, %{client: {ref, _}} = s) do
    198     {_, stack} = :erlang.process_info(self(), :current_stacktrace)
    199     {:stop, {err, stack}, %{s | state: state}}
    200   end
    201 
    202   def handle_cast({tag, _, _, _}, s) when tag in [:disconnect, :stop] do
    203     handle_timeout(s)
    204   end
    205 
    206   def handle_cast({:after_connect, ref}, %{client: {ref, :connect}} = s) do
    207     %{
    208       mod: mod,
    209       state: state,
    210       after_connect: after_connect,
    211       after_connect_timeout: timeout,
    212       opts: opts
    213     } = s
    214 
    215     notify_connection_listeners({:connected, self()}, s)
    216 
    217     case apply(mod, :checkout, [state]) do
    218       {:ok, state} ->
    219         opts = [timeout: timeout] ++ opts
    220         {pid, ref} = DBConnection.Task.run_child(mod, state, after_connect, opts)
    221         timer = start_timer(pid, timeout)
    222         s = %{s | client: {ref, :after_connect}, timer: timer, state: state}
    223         {:noreply, s}
    224 
    225       {:disconnect, err, state} ->
    226         {:disconnect, {:log, err}, %{s | state: state}}
    227     end
    228   end
    229 
    230   def handle_cast({:after_connect, _}, s) do
    231     {:noreply, s}
    232   end
    233 
    234   def handle_cast({:connected, ref}, %{client: {ref, :connect}} = s) do
    235     %{mod: mod, state: state} = s
    236 
    237     notify_connection_listeners({:connected, self()}, s)
    238 
    239     case apply(mod, :checkout, [state]) do
    240       {:ok, state} ->
    241         pool_update(state, s)
    242 
    243       {:disconnect, err, state} ->
    244         {:disconnect, {:log, err}, %{s | state: state}}
    245     end
    246   end
    247 
    248   def handle_cast({:connected, _}, s) do
    249     {:noreply, s}
    250   end
    251 
    252   @doc false
    253   def handle_info({:DOWN, ref, _, pid, reason}, %{client: {ref, :after_connect}} = s) do
    254     message = "client #{inspect(pid)} exited: " <> Exception.format_exit(reason)
    255     err = DBConnection.ConnectionError.exception(message)
    256     {:disconnect, {down_log(reason), err}, %{s | client: {nil, :after_connect}}}
    257   end
    258 
    259   def handle_info({:DOWN, mon, _, pid, reason}, %{client: {ref, mon}} = s) do
    260     message = "client #{inspect(pid)} exited: " <> Exception.format_exit(reason)
    261     err = DBConnection.ConnectionError.exception(message)
    262     {:disconnect, {down_log(reason), err}, %{s | client: {ref, nil}}}
    263   end
    264 
    265   def handle_info({:timeout, timer, {__MODULE__, pid, timeout}}, %{timer: timer} = s)
    266       when is_reference(timer) do
    267     message =
    268       "client #{inspect(pid)} timed out because it checked out " <>
    269         "the connection for longer than #{timeout}ms"
    270 
    271     exc =
    272       case Process.info(pid, :current_stacktrace) do
    273         {:current_stacktrace, stacktrace} ->
    274           message <>
    275             "\n\n#{inspect(pid)} was at location:\n\n" <>
    276             Exception.format_stacktrace(stacktrace)
    277 
    278         _ ->
    279           message
    280       end
    281       |> DBConnection.ConnectionError.exception()
    282 
    283     {:disconnect, {:log, exc}, %{s | timer: nil}}
    284   end
    285 
    286   def handle_info(
    287         {:"ETS-TRANSFER", holder, _pid, {msg, ref, extra}},
    288         %{client: {ref, :after_connect}, timer: timer} = s
    289       ) do
    290     {_, state} = Holder.delete(holder)
    291     cancel_timer(timer)
    292     s = %{s | timer: nil}
    293 
    294     case msg do
    295       :checkin -> handle_checkin(state, s)
    296       :disconnect -> handle_cast({:disconnect, ref, extra, state}, s)
    297       :stop -> handle_cast({:stop, ref, extra, state}, s)
    298     end
    299   end
    300 
    301   def handle_info(msg, %{mod: mod} = s) do
    302     Logger.info(fn ->
    303       [inspect(mod), ?\s, ?(, inspect(self()), ") missed message: " | inspect(msg)]
    304     end)
    305 
    306     handle_timeout(s)
    307   end
    308 
    309   @doc false
    310   def format_status(info, [_, %{client: :closed, mod: mod}]) do
    311     case info do
    312       :normal -> [{:data, [{'Module', mod}]}]
    313       :terminate -> mod
    314     end
    315   end
    316 
    317   def format_status(info, [pdict, %{mod: mod, state: state}]) do
    318     case function_exported?(mod, :format_status, 2) do
    319       true when info == :normal ->
    320         normal_status(mod, pdict, state)
    321 
    322       false when info == :normal ->
    323         normal_status_default(mod, state)
    324 
    325       true when info == :terminate ->
    326         {mod, terminate_status(mod, pdict, state)}
    327 
    328       false when info == :terminate ->
    329         {mod, state}
    330     end
    331   end
    332 
    333   ## Helpers
    334 
    335   defp connect_opts(opts) do
    336     case Keyword.get(opts, :configure) do
    337       {mod, fun, args} ->
    338         apply(mod, fun, [opts | args])
    339 
    340       fun when is_function(fun, 1) ->
    341         fun.(opts)
    342 
    343       nil ->
    344         opts
    345     end
    346   end
    347 
    348   defp down_log(:normal), do: :nolog
    349   defp down_log(:shutdown), do: :nolog
    350   defp down_log({:shutdown, _}), do: :nolog
    351   defp down_log(_), do: :log
    352 
    353   defp handle_timeout(s), do: {:noreply, s}
    354 
    355   defp demonitor({_, mon}) when is_reference(mon) do
    356     Process.demonitor(mon, [:flush])
    357   end
    358 
    359   defp demonitor({mon, :after_connect}) when is_reference(mon) do
    360     Process.demonitor(mon, [:flush])
    361   end
    362 
    363   defp demonitor({_, _}), do: true
    364   defp demonitor(nil), do: true
    365 
    366   defp start_timer(_, :infinity), do: nil
    367 
    368   defp start_timer(pid, timeout) do
    369     :erlang.start_timer(timeout, self(), {__MODULE__, pid, timeout})
    370   end
    371 
    372   defp cancel_timer(nil), do: :ok
    373 
    374   defp cancel_timer(timer) do
    375     case :erlang.cancel_timer(timer) do
    376       false -> flush_timer(timer)
    377       _ -> :ok
    378     end
    379   end
    380 
    381   defp flush_timer(timer) do
    382     receive do
    383       {:timeout, ^timer, {__MODULE__, _, _}} ->
    384         :ok
    385     after
    386       0 ->
    387         raise ArgumentError, "timer #{inspect(timer)} does not exist"
    388     end
    389   end
    390 
    391   defp handle_checkin(state, s) do
    392     %{backoff: backoff, client: client} = s
    393     backoff = backoff && Backoff.reset(backoff)
    394     demonitor(client)
    395     pool_update(state, %{s | client: nil, backoff: backoff})
    396   end
    397 
    398   defp pool_update(state, %{pool: pool, tag: tag, mod: mod} = s) do
    399     case Holder.update(pool, tag, mod, state) do
    400       {:ok, ref} ->
    401         {:noreply, %{s | client: {ref, :pool}, state: state}, :hibernate}
    402 
    403       :error ->
    404         {:stop, {:shutdown, :no_more_pool}, s}
    405     end
    406   end
    407 
    408   defp normal_status(mod, pdict, state) do
    409     try do
    410       mod.format_status(:normal, [pdict, state])
    411     catch
    412       _, _ ->
    413         normal_status_default(mod, state)
    414     else
    415       status ->
    416         status
    417     end
    418   end
    419 
    420   defp normal_status_default(mod, state) do
    421     [{:data, [{'Module', mod}, {'State', state}]}]
    422   end
    423 
    424   defp terminate_status(mod, pdict, state) do
    425     try do
    426       mod.format_status(:terminate, [pdict, state])
    427     catch
    428       _, _ ->
    429         state
    430     else
    431       status ->
    432         status
    433     end
    434   end
    435 
    436   defp cleanup_stacktrace(stack) do
    437     case stack do
    438       [{_, _, arity, _} | _rest] = stacktrace when is_integer(arity) ->
    439         stacktrace
    440 
    441       [{mod, fun, args, info} | rest] when is_list(args) ->
    442         [{mod, fun, length(args), info} | rest]
    443     end
    444   end
    445 
    446   defp notify_connection_listeners(message, %{} = state) do
    447     %{connection_listeners: connection_listeners} = state
    448 
    449     Enum.each(connection_listeners, &send(&1, message))
    450   end
    451 end