zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

simple_connection.ex (12669B)


      1 defmodule Postgrex.SimpleConnection do
      2   @moduledoc ~S"""
      3   A generic connection suitable for simple queries and pubsub functionality.
      4 
      5   On its own, a SimpleConnection server only maintains a connection. To execute
      6   queries, process results, or relay notices you must implement a callback module
      7   with the SimpleConnection behaviour.
      8 
      9   ## Example
     10 
     11   The SimpleConnection behaviour abstracts common client/server interactions,
     12   along with optional mechanisms for running queries or relaying notifications.
     13 
     14   Let's start with a minimal callback module that executes a query and relays
     15   the result back to the caller.
     16 
     17       defmodule MyConnection do
     18         @behaviour Postgrex.SimpleConnection
     19 
     20         @impl true
     21         def init(_args) do
     22           {:ok, %{from: nil}}
     23         end
     24 
     25         @impl true
     26         def handle_call({:query, query}, from, state) do
     27           {:query, query, %{state | from: from}}
     28         end
     29 
     30         @impl true
     31         def handle_result(results, state) when is_list(results) do
     32           SimpleConnection.reply(state.from, results)
     33 
     34           {:noreply, state}
     35         end
     36 
     37         @impl true
     38         def handle_result(%Postgrex.Error{} = error, state) do
     39           SimpleConnection.reply(state.from, error)
     40 
     41           {:noreply, state}
     42         end
     43       end
     44 
     45       # Start the connection
     46       {:ok, pid} = SimpleConnection.start_link(MyConnection, [], database: "demo")
     47 
     48       # Execute a literal query
     49       SimpleConnection.call(pid, {:query, "SELECT 1"})
     50       # => %Postgrex.Result{rows: [["1"]]}
     51 
     52   We start a connection by passing the callback module, callback options, and
     53   server options to `SimpleConnection.start_link/3`. The `init/1` function
     54   receives any callback options and returns the callback state.
     55 
     56   Queries are sent through `SimpleConnection.call/2`, executed on the server,
     57   and the result is handed off to `handle_result/2`. At that point the callback
     58   can process the result before replying back to the caller with
     59   `SimpleConnection.reply/2`.
     60 
     61   ## Building a PubSub Connection
     62 
     63   With the `notify/3` callback you can also build a pubsub server on top of
     64   `LISTEN/NOTIFY`. Here's a naive pubsub implementation:
     65 
     66       defmodule MyPubSub do
     67         @behaviour Postgrex.SimpleConnection
     68 
     69         defstruct [:from, listeners: %{}]
     70 
     71         @impl true
     72         def init(args) do
     73           {:ok, struct!(__MODULE__, args)}
     74         end
     75 
     76         @impl true
     77         def notify(channel, payload, state) do
     78           for pid <- state.listeners[channel] do
     79             send(pid, {:notice, channel, payload})
     80           end
     81         end
     82 
     83         @impl true
     84         def handle_call({:listen, channel}, {pid, _} = from, state) do
     85           listeners = Map.update(state.listeners, channel, [pid], &[pid | &1])
     86 
     87           {:query, ~s(LISTEN "#{channel}"), %{state | from: from, listeners: listeners}}
     88         end
     89 
     90         def handle_call({:query, query}, from, state) do
     91           {:query, query, %{state | from: from}}
     92         end
     93 
     94         @impl true
     95         def handle_result(_results, state) do
     96           SimpleConnection.reply(state.from, :ok)
     97 
     98           {:noreply, %{state | from: nil}}
     99         end
    100       end
    101 
    102       # Start the connection
    103       {:ok, pid} = SimpleConnection.start_link(MyPubSub, [], database: "demo")
    104 
    105       # Start listening to the "demo" channel
    106       SimpleConnection.call(pid, {:listen, "demo"})
    107       # => %Postgrex.Result{command: :listen}
    108 
    109       # Notify all listeners
    110       SimpleConnection.call(pid, {:query, ~s(NOTIFY "demo", 'hello')})
    111       # => %Postgrex.Result{command: :notify}
    112 
    113       # Check the inbox to see the notice message
    114       flush()
    115       # => {:notice, "demo", "hello"}
    116 
    117   See `Postgrex.Notifications` for a more complex implementation that can
    118   unlisten, handle process exits, and persist across reconnection.
    119 
    120   ## Name registration
    121 
    122   A `Postgrex.SimpleConnection` is bound to the same name registration rules as a
    123   `GenServer`. Read more about them in the `GenServer` docs.
    124   """
    125 
    126   use Connection
    127 
    128   require Logger
    129 
    130   alias Postgrex.Protocol
    131 
    132   @doc false
    133   defstruct idle_interval: 5000,
    134             protocol: nil,
    135             auto_reconnect: false,
    136             reconnect_backoff: 500,
    137             state: nil
    138 
    139   ## PUBLIC API ##
    140 
    141   @type query :: iodata
    142   @type state :: term
    143 
    144   @doc """
    145   Callback for process initialization.
    146 
    147   This is called once and before the Postgrex connection is established.
    148   """
    149   @callback init(term) :: {:ok, state}
    150 
    151   @doc """
    152   Callback for processing or relaying pubsub notifications.
    153   """
    154   @callback notify(binary, binary, state) :: :ok
    155 
    156   @doc """
    157   Invoked after connecting or reconnecting.
    158 
    159   This may be called multiple times if `:auto_reconnect` is true.
    160   """
    161   @callback handle_connect(state) :: {:noreply, state} | {:query, query, state}
    162 
    163   @doc """
    164   Invoked after disconnection.
    165 
    166   This is invoked regardless of the `:auto_reconnect` option.
    167   """
    168   @callback handle_disconnect(state) :: {:noreply, state}
    169 
    170   @doc """
    171   Callback for `SimpleConnection.call/3`.
    172 
    173   Replies must be sent with `SimpleConnection.reply/2`.
    174   """
    175   @callback handle_call(term, GenServer.from(), state) ::
    176               {:noreply, state} | {:query, query, state}
    177 
    178   @doc """
    179   Callback for `Kernel.send/2`.
    180   """
    181   @callback handle_info(term, state) :: {:noreply, state} | {:query, query, state}
    182 
    183   @doc """
    184   Callback for processing or relaying queries executed via `{:query, query, state}`.
    185 
    186   Either a list of successful query results or an error will be passed to this callback.
    187   A list is passed because the simple query protocol allows multiple commands to be
    188   issued in a single query.
    189   """
    190   @callback handle_result([Postgrex.Result.t()] | Postgrex.Error.t(), state) ::
    191               {:noreply, state}
    192 
    193   @optional_callbacks handle_call: 3,
    194                       handle_connect: 1,
    195                       handle_disconnect: 1,
    196                       handle_info: 2,
    197                       handle_result: 2
    198 
    199   @doc """
    200   Replies to the given client.
    201 
    202   Wrapper for `GenServer.reply/2`.
    203   """
    204   defdelegate reply(client, reply), to: GenServer
    205 
    206   @doc """
    207   Calls the given server.
    208 
    209   Wrapper for `GenServer.call/3`.
    210   """
    211   def call(server, message, timeout \\ 5000) do
    212     with {__MODULE__, reason} <- GenServer.call(server, message, timeout) do
    213       exit({reason, {__MODULE__, :call, [server, message, timeout]}})
    214     end
    215   end
    216 
    217   @doc false
    218   def child_spec(opts) do
    219     %{id: __MODULE__, start: {__MODULE__, :start_link, opts}}
    220   end
    221 
    222   @doc """
    223   Start the connection process and connect to Postgres.
    224 
    225   The options that this function accepts are the same as those accepted by
    226   `Postgrex.start_link/1`, as well as the extra options `:sync_connect`,
    227   `:auto_reconnect`, `:reconnect_backoff`, and `:configure`.
    228 
    229   ## Options
    230 
    231     * `:auto_reconnect` - automatically attempt to reconnect to the database
    232       in event of a disconnection. See the
    233       [note about async connect and auto-reconnects](#module-async-connect-and-auto-reconnects)
    234       above. Defaults to `false`, which means the process terminates.
    235 
    236     * `:configure` - A function to run before every connect attempt to dynamically
    237       configure the options as a `{module, function, args}`, where the current
    238       options will prepended to `args`. Defaults to `nil`.
    239 
    240     * `:idle_interval` - while also accepted on `Postgrex.start_link/1`, it has
    241       a default of `5000ms` in `Postgrex.SimpleConnection` (instead of 1000ms).
    242 
    243     * `:reconnect_backoff` - time (in ms) between reconnection attempts when
    244       `auto_reconnect` is enabled. Defaults to `500`.
    245 
    246     * `:sync_connect` - controls if the connection should be established on boot
    247       or asynchronously right after boot. Defaults to `true`.
    248   """
    249   @spec start_link(module, term, Keyword.t()) :: {:ok, pid} | {:error, Postgrex.Error.t() | term}
    250   def start_link(module, args, opts) do
    251     {server_opts, opts} = Keyword.split(opts, [:name])
    252     opts = Keyword.put_new(opts, :sync_connect, true)
    253     connection_opts = Postgrex.Utils.default_opts(opts)
    254     Connection.start_link(__MODULE__, {module, args, connection_opts}, server_opts)
    255   end
    256 
    257   ## CALLBACKS ##
    258 
    259   @doc false
    260   def init({mod, args, opts}) do
    261     case mod.init(args) do
    262       {:ok, mod_state} ->
    263         idle_timeout = opts[:idle_timeout]
    264 
    265         if idle_timeout do
    266           Logger.warning(
    267             ":idle_timeout in Postgrex.SimpleConnection is deprecated, " <>
    268               "please use :idle_interval instead"
    269           )
    270         end
    271 
    272         {idle_interval, opts} = Keyword.pop(opts, :idle_interval, idle_timeout || 5000)
    273         {auto_reconnect, opts} = Keyword.pop(opts, :auto_reconnect, false)
    274         {reconnect_backoff, opts} = Keyword.pop(opts, :reconnect_backoff, 500)
    275 
    276         state = %__MODULE__{
    277           idle_interval: idle_interval,
    278           auto_reconnect: auto_reconnect,
    279           reconnect_backoff: reconnect_backoff,
    280           state: {mod, mod_state}
    281         }
    282 
    283         put_opts(mod, opts)
    284 
    285         if opts[:sync_connect] do
    286           case connect(:init, state) do
    287             {:ok, _} = ok -> ok
    288             {:backoff, _, _} = backoff -> backoff
    289             {:stop, reason, _} -> {:stop, reason}
    290           end
    291         else
    292           {:connect, :init, state}
    293         end
    294     end
    295   end
    296 
    297   @doc false
    298   def connect(_, %{state: {mod, mod_state}} = state) do
    299     opts =
    300       case Keyword.get(opts(mod), :configure) do
    301         {module, fun, args} -> apply(module, fun, [opts(mod) | args])
    302         fun when is_function(fun, 1) -> fun.(opts(mod))
    303         nil -> opts(mod)
    304       end
    305 
    306     case Protocol.connect(opts) do
    307       {:ok, protocol} ->
    308         state = %{state | protocol: protocol}
    309 
    310         with {:noreply, state, _} <- maybe_handle(mod, :handle_connect, [mod_state], state) do
    311           {:ok, state}
    312         end
    313 
    314       {:error, reason} ->
    315         if state.auto_reconnect do
    316           {:backoff, state.reconnect_backoff, state}
    317         else
    318           {:stop, reason, state}
    319         end
    320     end
    321   end
    322 
    323   @doc false
    324   def handle_call(msg, from, %{state: {mod, mod_state}} = state) do
    325     handle(mod, :handle_call, [msg, from, mod_state], from, state)
    326   end
    327 
    328   @doc false
    329   def handle_info(:timeout, %{protocol: protocol} = state) do
    330     case Protocol.ping(protocol) do
    331       {:ok, protocol} ->
    332         {:noreply, %{state | protocol: protocol}, state.idle_interval}
    333 
    334       {error, reason, protocol} ->
    335         reconnect_or_stop(error, reason, protocol, state)
    336     end
    337   end
    338 
    339   def handle_info(msg, %{protocol: protocol, state: {mod, mod_state}} = state) do
    340     opts = [notify: &mod.notify(&1, &2, mod_state)]
    341 
    342     case Protocol.handle_info(msg, opts, protocol) do
    343       {:ok, protocol} ->
    344         {:noreply, %{state | protocol: protocol}, state.idle_interval}
    345 
    346       {:unknown, protocol} ->
    347         maybe_handle(mod, :handle_info, [msg, mod_state], %{state | protocol: protocol})
    348 
    349       {error, reason, protocol} ->
    350         reconnect_or_stop(error, reason, protocol, state)
    351     end
    352   end
    353 
    354   def handle_info(msg, %{state: {mod, mod_state}} = state) do
    355     maybe_handle(mod, :handle_info, [msg, mod_state], state)
    356   end
    357 
    358   defp maybe_handle(mod, fun, args, state) do
    359     if function_exported?(mod, fun, length(args)) do
    360       handle(mod, fun, args, nil, state)
    361     else
    362       {:noreply, state, state.idle_interval}
    363     end
    364   end
    365 
    366   defp handle(mod, fun, args, from, state) do
    367     case apply(mod, fun, args) do
    368       {:noreply, mod_state} ->
    369         {:noreply, %{state | state: {mod, mod_state}}, state.idle_interval}
    370 
    371       {:query, query, mod_state} ->
    372         opts = [notify: &mod.notify(&1, &2, mod_state)]
    373 
    374         state = %{state | state: {mod, mod_state}}
    375 
    376         with {:ok, results, protocol} <- Protocol.handle_simple(query, opts, state.protocol),
    377              {:ok, protocol} <- Protocol.checkin(protocol) do
    378           state = %{state | protocol: protocol}
    379 
    380           handle(mod, :handle_result, [results, mod_state], from, state)
    381         else
    382           {:error, %Postgrex.Error{} = error, protocol} ->
    383             handle(mod, :handle_result, [error, mod_state], from, %{state | protocol: protocol})
    384 
    385           {:disconnect, reason, protocol} ->
    386             reconnect_or_stop(:disconnect, reason, protocol, state)
    387         end
    388     end
    389   end
    390 
    391   defp reconnect_or_stop(error, reason, protocol, %{state: {mod, mod_state}} = state)
    392        when error in [:error, :disconnect] do
    393     {:noreply, state, _} = maybe_handle(mod, :handle_disconnect, [mod_state], state)
    394 
    395     if state.auto_reconnect do
    396       {:connect, :reconnect, state}
    397     else
    398       {:stop, reason, %{state | protocol: protocol}}
    399     end
    400   end
    401 
    402   defp opts(mod), do: Process.get(mod)
    403 
    404   defp put_opts(mod, opts), do: Process.put(mod, opts)
    405 end