notifications.ex (9273B)
1 defmodule Postgrex.Notifications do 2 @moduledoc ~S""" 3 API for notifications (pub/sub) in PostgreSQL. 4 5 In order to use it, first you need to start the notification process. 6 In your supervision tree: 7 8 {Postgrex.Notifications, name: MyApp.Notifications} 9 10 Then you can listen to certain channels: 11 12 {:ok, listen_ref} = Postgrex.Notifications.listen(MyApp.Notifications, "channel") 13 14 Now every time a message is broadcast on said channel, for example via 15 PostgreSQL command line: 16 17 NOTIFY "channel", "Oh hai!"; 18 19 You will receive a message in the format: 20 21 {:notification, notification_pid, listen_ref, channel, message} 22 23 ## Async connect and auto-reconnects 24 25 By default, the notification system establishes a connection to the 26 database on initialization, you can configure the connection to happen 27 asynchronously. You can also configure the connection to automatically 28 reconnect. 29 30 Note however that when the notification system is waiting for a connection, 31 any notifications that occur during the disconnection period are not queued 32 and cannot be recovered. Similarly, any listen command will be queued until 33 the connection is up. 34 35 ## A note on casing 36 37 While PostgreSQL seems to behave as case-insensitive, it actually has a very 38 perculiar behaviour on casing. When you write: 39 40 SELECT * FROM POSTS 41 42 PostgreSQL actually converts `POSTS` into the lowercase `posts`. That's why 43 both `SELECT * FROM POSTS` and `SELECT * FROM posts` feel equivalent. 44 However, if you wrap the table name in quotes, then the casing in quotes 45 will be preserved. 46 47 These same rules apply to PostgreSQL notification channels. More importantly, 48 whenever `Postgrex.Notifications` listens to a channel, it wraps the channel 49 name in quotes. Therefore, if you listen to a channel named "fooBar" and 50 you send a notification without quotes in the channel name, such as: 51 52 NOTIFY fooBar, "Oh hai!"; 53 54 The notification will not be received by Postgrex.Notifications because the 55 notification will be effectively sent to `"foobar"` and not `"fooBar"`. Therefore, 56 you must guarantee one of the two following properties: 57 58 1. If you can wrap the channel name in quotes when sending a notification, 59 then make sure the channel name has the exact same casing when listening 60 and sending notifications 61 62 2. If you cannot wrap the channel name in quotes when sending a notification, 63 then make sure to give the lowercased channel name when listening 64 """ 65 66 alias Postgrex.SimpleConnection 67 68 @behaviour SimpleConnection 69 70 require Logger 71 72 defstruct [ 73 :from, 74 :ref, 75 auto_reconnect: false, 76 connected: false, 77 listeners: %{}, 78 listener_channels: %{} 79 ] 80 81 @timeout 5000 82 83 @doc false 84 def child_spec(opts) do 85 %{id: __MODULE__, start: {__MODULE__, :start_link, [opts]}} 86 end 87 88 @doc """ 89 Start the notification connection process and connect to postgres. 90 91 The options that this function accepts are the same as those accepted by 92 `Postgrex.start_link/1`, as well as the extra options `:sync_connect`, 93 `:auto_reconnect`, `:reconnect_backoff`, and `:configure`. 94 95 ## Options 96 97 * `:sync_connect` - controls if the connection should be established on boot 98 or asynchronously right after boot. Defaults to `true`. 99 100 * `:auto_reconnect` - automatically attempt to reconnect to the database 101 in event of a disconnection. See the 102 [note about async connect and auto-reconnects](#module-async-connect-and-auto-reconnects) 103 above. Defaults to `false`, which means the process terminates. 104 105 * `:reconnect_backoff` - time (in ms) between reconnection attempts when 106 `auto_reconnect` is enabled. Defaults to `500`. 107 108 * `:idle_interval` - while also accepted on `Postgrex.start_link/1`, it has 109 a default of `5000ms` in `Postgrex.Notifications` (instead of 1000ms). 110 111 * `:configure` - A function to run before every connect attempt to dynamically 112 configure the options as a `{module, function, args}`, where the current 113 options will prepended to `args`. Defaults to `nil`. 114 """ 115 @spec start_link(Keyword.t()) :: {:ok, pid} | {:error, Postgrex.Error.t() | term} 116 def start_link(opts) do 117 args = Keyword.take(opts, [:auto_reconnect]) 118 119 SimpleConnection.start_link(__MODULE__, args, opts) 120 end 121 122 @doc """ 123 Listens to an asynchronous notification channel using the `LISTEN` command. 124 125 A message `{:notification, connection_pid, ref, channel, payload}` will be 126 sent to the calling process when a notification is received. 127 128 It returns `{:ok, reference}`. It may also return `{:eventually, reference}` 129 if the notification process is not currently connected to the database and 130 it was started with `:sync_connect` set to false or `:auto_reconnect` set 131 to true. The `reference` can be used to issue an `unlisten/3` command. 132 133 ## Options 134 135 * `:timeout` - Call timeout (default: `#{@timeout}`) 136 """ 137 @spec listen(GenServer.server(), String.t(), Keyword.t()) :: 138 {:ok, reference} | {:eventually, reference} 139 def listen(pid, channel, opts \\ []) do 140 SimpleConnection.call(pid, {:listen, channel}, Keyword.get(opts, :timeout, @timeout)) 141 end 142 143 @doc """ 144 Listens to an asynchronous notification channel `channel`. See `listen/2`. 145 """ 146 @spec listen!(GenServer.server(), String.t(), Keyword.t()) :: reference 147 def listen!(pid, channel, opts \\ []) do 148 {:ok, ref} = listen(pid, channel, opts) 149 ref 150 end 151 152 @doc """ 153 Stops listening on the given channel by passing the reference returned from 154 `listen/2`. 155 156 ## Options 157 158 * `:timeout` - Call timeout (default: `#{@timeout}`) 159 """ 160 @spec unlisten(GenServer.server(), reference, Keyword.t()) :: :ok | :error 161 def unlisten(pid, ref, opts \\ []) do 162 SimpleConnection.call(pid, {:unlisten, ref}, Keyword.get(opts, :timeout, @timeout)) 163 end 164 165 @doc """ 166 Stops listening on the given channel by passing the reference returned from 167 `listen/2`. 168 """ 169 @spec unlisten!(GenServer.server(), reference, Keyword.t()) :: :ok 170 def unlisten!(pid, ref, opts \\ []) do 171 case unlisten(pid, ref, opts) do 172 :ok -> :ok 173 :error -> raise ArgumentError, "unknown reference #{inspect(ref)}" 174 end 175 end 176 177 ## CALLBACKS ## 178 179 @impl true 180 def init(args) do 181 {:ok, struct!(__MODULE__, args)} 182 end 183 184 @impl true 185 def notify(channel, payload, state) do 186 for {ref, pid} <- Map.get(state.listener_channels, channel, []) do 187 send(pid, {:notification, self(), ref, channel, payload}) 188 end 189 190 :ok 191 end 192 193 @impl true 194 def handle_connect(state) do 195 state = %{state | connected: true} 196 197 if map_size(state.listener_channels) > 0 do 198 listen_statements = 199 state.listener_channels 200 |> Map.keys() 201 |> Enum.map_join("\n", &~s(LISTEN "#{&1}";)) 202 203 query = "DO $$BEGIN #{listen_statements} END$$" 204 205 {:query, query, state} 206 else 207 {:noreply, state} 208 end 209 end 210 211 @impl true 212 def handle_disconnect(state) do 213 state = %{state | connected: false} 214 215 if state.auto_reconnect && state.from && state.ref do 216 SimpleConnection.reply(state.from, {:eventually, state.ref}) 217 218 {:noreply, %{state | from: nil, ref: nil}} 219 else 220 {:noreply, state} 221 end 222 end 223 224 @impl true 225 def handle_call({:listen, channel}, {pid, _} = from, state) do 226 ref = Process.monitor(pid) 227 228 state = put_in(state.listeners[ref], {channel, pid}) 229 state = update_in(state.listener_channels[channel], &Map.put(&1 || %{}, ref, pid)) 230 231 cond do 232 not state.connected -> 233 SimpleConnection.reply(from, {:eventually, ref}) 234 235 {:noreply, state} 236 237 map_size(state.listener_channels[channel]) == 1 -> 238 {:query, ~s(LISTEN "#{channel}"), %{state | from: from, ref: ref}} 239 240 true -> 241 SimpleConnection.reply(from, {:ok, ref}) 242 243 {:noreply, state} 244 end 245 end 246 247 def handle_call({:unlisten, ref}, from, state) do 248 case state.listeners do 249 %{^ref => {channel, _pid}} -> 250 Process.demonitor(ref, [:flush]) 251 252 {_, state} = pop_in(state.listeners[ref]) 253 {_, state} = pop_in(state.listener_channels[channel][ref]) 254 255 if map_size(state.listener_channels[channel]) == 0 do 256 {_, state} = pop_in(state.listener_channels[channel]) 257 258 {:query, ~s(UNLISTEN "#{channel}"), %{state | from: from}} 259 else 260 from && SimpleConnection.reply(from, :ok) 261 262 {:noreply, state} 263 end 264 265 _ -> 266 from && SimpleConnection.reply(from, :error) 267 268 {:noreply, state} 269 end 270 end 271 272 @impl true 273 def handle_info({:DOWN, ref, :process, _, _}, state) do 274 handle_call({:unlisten, ref}, nil, state) 275 end 276 277 def handle_info(msg, state) do 278 Logger.info(fn -> 279 context = " received unexpected message: " 280 [inspect(__MODULE__), ?\s, inspect(self()), context | inspect(msg)] 281 end) 282 283 {:noreply, state} 284 end 285 286 @impl true 287 def handle_result(_message, %{from: from, ref: ref} = state) do 288 cond do 289 from && ref -> 290 SimpleConnection.reply(from, {:ok, ref}) 291 292 {:noreply, %{state | from: nil, ref: nil}} 293 294 from -> 295 SimpleConnection.reply(from, :ok) 296 297 {:noreply, %{state | from: nil}} 298 299 true -> 300 {:noreply, state} 301 end 302 end 303 end