zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

notifications.ex (9273B)


      1 defmodule Postgrex.Notifications do
      2   @moduledoc ~S"""
      3   API for notifications (pub/sub) in PostgreSQL.
      4 
      5   In order to use it, first you need to start the notification process.
      6   In your supervision tree:
      7 
      8       {Postgrex.Notifications, name: MyApp.Notifications}
      9 
     10   Then you can listen to certain channels:
     11 
     12       {:ok, listen_ref} = Postgrex.Notifications.listen(MyApp.Notifications, "channel")
     13 
     14   Now every time a message is broadcast on said channel, for example via
     15   PostgreSQL command line:
     16 
     17       NOTIFY "channel", "Oh hai!";
     18 
     19   You will receive a message in the format:
     20 
     21       {:notification, notification_pid, listen_ref, channel, message}
     22 
     23   ## Async connect and auto-reconnects
     24 
     25   By default, the notification system establishes a connection to the
     26   database on initialization, you can configure the connection to happen
     27   asynchronously. You can also configure the connection to automatically
     28   reconnect.
     29 
     30   Note however that when the notification system is waiting for a connection,
     31   any notifications that occur during the disconnection period are not queued
     32   and cannot be recovered. Similarly, any listen command will be queued until
     33   the connection is up.
     34 
     35   ## A note on casing
     36 
     37   While PostgreSQL seems to behave as case-insensitive, it actually has a very
     38   perculiar behaviour on casing. When you write:
     39 
     40       SELECT * FROM POSTS
     41 
     42   PostgreSQL actually converts `POSTS` into the lowercase `posts`. That's why
     43   both `SELECT * FROM POSTS` and `SELECT * FROM posts` feel equivalent.
     44   However, if you wrap the table name in quotes, then the casing in quotes
     45   will be preserved.
     46 
     47   These same rules apply to PostgreSQL notification channels. More importantly,
     48   whenever `Postgrex.Notifications` listens to a channel, it wraps the channel
     49   name in quotes. Therefore, if you listen to a channel named "fooBar" and
     50   you send a notification without quotes in the channel name, such as:
     51 
     52       NOTIFY fooBar, "Oh hai!";
     53 
     54   The notification will not be received by Postgrex.Notifications because the
     55   notification will be effectively sent to `"foobar"` and not `"fooBar"`. Therefore,
     56   you must guarantee one of the two following properties:
     57 
     58     1. If you can wrap the channel name in quotes when sending a notification,
     59        then make sure the channel name has the exact same casing when listening
     60        and sending notifications
     61 
     62     2. If you cannot wrap the channel name in quotes when sending a notification,
     63        then make sure to give the lowercased channel name when listening
     64   """
     65 
     66   alias Postgrex.SimpleConnection
     67 
     68   @behaviour SimpleConnection
     69 
     70   require Logger
     71 
     72   defstruct [
     73     :from,
     74     :ref,
     75     auto_reconnect: false,
     76     connected: false,
     77     listeners: %{},
     78     listener_channels: %{}
     79   ]
     80 
     81   @timeout 5000
     82 
     83   @doc false
     84   def child_spec(opts) do
     85     %{id: __MODULE__, start: {__MODULE__, :start_link, [opts]}}
     86   end
     87 
     88   @doc """
     89   Start the notification connection process and connect to postgres.
     90 
     91   The options that this function accepts are the same as those accepted by
     92   `Postgrex.start_link/1`, as well as the extra options `:sync_connect`,
     93   `:auto_reconnect`, `:reconnect_backoff`, and `:configure`.
     94 
     95   ## Options
     96 
     97     * `:sync_connect` - controls if the connection should be established on boot
     98       or asynchronously right after boot. Defaults to `true`.
     99 
    100     * `:auto_reconnect` - automatically attempt to reconnect to the database
    101       in event of a disconnection. See the
    102       [note about async connect and auto-reconnects](#module-async-connect-and-auto-reconnects)
    103       above. Defaults to `false`, which means the process terminates.
    104 
    105     * `:reconnect_backoff` - time (in ms) between reconnection attempts when
    106       `auto_reconnect` is enabled. Defaults to `500`.
    107 
    108     * `:idle_interval` - while also accepted on `Postgrex.start_link/1`, it has
    109       a default of `5000ms` in `Postgrex.Notifications` (instead of 1000ms).
    110 
    111     * `:configure` - A function to run before every connect attempt to dynamically
    112       configure the options as a `{module, function, args}`, where the current
    113       options will prepended to `args`. Defaults to `nil`.
    114   """
    115   @spec start_link(Keyword.t()) :: {:ok, pid} | {:error, Postgrex.Error.t() | term}
    116   def start_link(opts) do
    117     args = Keyword.take(opts, [:auto_reconnect])
    118 
    119     SimpleConnection.start_link(__MODULE__, args, opts)
    120   end
    121 
    122   @doc """
    123   Listens to an asynchronous notification channel using the `LISTEN` command.
    124 
    125   A message `{:notification, connection_pid, ref, channel, payload}` will be
    126   sent to the calling process when a notification is received.
    127 
    128   It returns `{:ok, reference}`. It may also return `{:eventually, reference}`
    129   if the notification process is not currently connected to the database and
    130   it was started with `:sync_connect` set to false or `:auto_reconnect` set
    131   to true. The `reference` can be used to issue an `unlisten/3` command.
    132 
    133   ## Options
    134 
    135     * `:timeout` - Call timeout (default: `#{@timeout}`)
    136   """
    137   @spec listen(GenServer.server(), String.t(), Keyword.t()) ::
    138           {:ok, reference} | {:eventually, reference}
    139   def listen(pid, channel, opts \\ []) do
    140     SimpleConnection.call(pid, {:listen, channel}, Keyword.get(opts, :timeout, @timeout))
    141   end
    142 
    143   @doc """
    144   Listens to an asynchronous notification channel `channel`. See `listen/2`.
    145   """
    146   @spec listen!(GenServer.server(), String.t(), Keyword.t()) :: reference
    147   def listen!(pid, channel, opts \\ []) do
    148     {:ok, ref} = listen(pid, channel, opts)
    149     ref
    150   end
    151 
    152   @doc """
    153   Stops listening on the given channel by passing the reference returned from
    154   `listen/2`.
    155 
    156   ## Options
    157 
    158     * `:timeout` - Call timeout (default: `#{@timeout}`)
    159   """
    160   @spec unlisten(GenServer.server(), reference, Keyword.t()) :: :ok | :error
    161   def unlisten(pid, ref, opts \\ []) do
    162     SimpleConnection.call(pid, {:unlisten, ref}, Keyword.get(opts, :timeout, @timeout))
    163   end
    164 
    165   @doc """
    166   Stops listening on the given channel by passing the reference returned from
    167   `listen/2`.
    168   """
    169   @spec unlisten!(GenServer.server(), reference, Keyword.t()) :: :ok
    170   def unlisten!(pid, ref, opts \\ []) do
    171     case unlisten(pid, ref, opts) do
    172       :ok -> :ok
    173       :error -> raise ArgumentError, "unknown reference #{inspect(ref)}"
    174     end
    175   end
    176 
    177   ## CALLBACKS ##
    178 
    179   @impl true
    180   def init(args) do
    181     {:ok, struct!(__MODULE__, args)}
    182   end
    183 
    184   @impl true
    185   def notify(channel, payload, state) do
    186     for {ref, pid} <- Map.get(state.listener_channels, channel, []) do
    187       send(pid, {:notification, self(), ref, channel, payload})
    188     end
    189 
    190     :ok
    191   end
    192 
    193   @impl true
    194   def handle_connect(state) do
    195     state = %{state | connected: true}
    196 
    197     if map_size(state.listener_channels) > 0 do
    198       listen_statements =
    199         state.listener_channels
    200         |> Map.keys()
    201         |> Enum.map_join("\n", &~s(LISTEN "#{&1}";))
    202 
    203       query = "DO $$BEGIN #{listen_statements} END$$"
    204 
    205       {:query, query, state}
    206     else
    207       {:noreply, state}
    208     end
    209   end
    210 
    211   @impl true
    212   def handle_disconnect(state) do
    213     state = %{state | connected: false}
    214 
    215     if state.auto_reconnect && state.from && state.ref do
    216       SimpleConnection.reply(state.from, {:eventually, state.ref})
    217 
    218       {:noreply, %{state | from: nil, ref: nil}}
    219     else
    220       {:noreply, state}
    221     end
    222   end
    223 
    224   @impl true
    225   def handle_call({:listen, channel}, {pid, _} = from, state) do
    226     ref = Process.monitor(pid)
    227 
    228     state = put_in(state.listeners[ref], {channel, pid})
    229     state = update_in(state.listener_channels[channel], &Map.put(&1 || %{}, ref, pid))
    230 
    231     cond do
    232       not state.connected ->
    233         SimpleConnection.reply(from, {:eventually, ref})
    234 
    235         {:noreply, state}
    236 
    237       map_size(state.listener_channels[channel]) == 1 ->
    238         {:query, ~s(LISTEN "#{channel}"), %{state | from: from, ref: ref}}
    239 
    240       true ->
    241         SimpleConnection.reply(from, {:ok, ref})
    242 
    243         {:noreply, state}
    244     end
    245   end
    246 
    247   def handle_call({:unlisten, ref}, from, state) do
    248     case state.listeners do
    249       %{^ref => {channel, _pid}} ->
    250         Process.demonitor(ref, [:flush])
    251 
    252         {_, state} = pop_in(state.listeners[ref])
    253         {_, state} = pop_in(state.listener_channels[channel][ref])
    254 
    255         if map_size(state.listener_channels[channel]) == 0 do
    256           {_, state} = pop_in(state.listener_channels[channel])
    257 
    258           {:query, ~s(UNLISTEN "#{channel}"), %{state | from: from}}
    259         else
    260           from && SimpleConnection.reply(from, :ok)
    261 
    262           {:noreply, state}
    263         end
    264 
    265       _ ->
    266         from && SimpleConnection.reply(from, :error)
    267 
    268         {:noreply, state}
    269     end
    270   end
    271 
    272   @impl true
    273   def handle_info({:DOWN, ref, :process, _, _}, state) do
    274     handle_call({:unlisten, ref}, nil, state)
    275   end
    276 
    277   def handle_info(msg, state) do
    278     Logger.info(fn ->
    279       context = " received unexpected message: "
    280       [inspect(__MODULE__), ?\s, inspect(self()), context | inspect(msg)]
    281     end)
    282 
    283     {:noreply, state}
    284   end
    285 
    286   @impl true
    287   def handle_result(_message, %{from: from, ref: ref} = state) do
    288     cond do
    289       from && ref ->
    290         SimpleConnection.reply(from, {:ok, ref})
    291 
    292         {:noreply, %{state | from: nil, ref: nil}}
    293 
    294       from ->
    295         SimpleConnection.reply(from, :ok)
    296 
    297         {:noreply, %{state | from: nil}}
    298 
    299       true ->
    300         {:noreply, state}
    301     end
    302   end
    303 end