simple_connection.ex (12669B)
1 defmodule Postgrex.SimpleConnection do 2 @moduledoc ~S""" 3 A generic connection suitable for simple queries and pubsub functionality. 4 5 On its own, a SimpleConnection server only maintains a connection. To execute 6 queries, process results, or relay notices you must implement a callback module 7 with the SimpleConnection behaviour. 8 9 ## Example 10 11 The SimpleConnection behaviour abstracts common client/server interactions, 12 along with optional mechanisms for running queries or relaying notifications. 13 14 Let's start with a minimal callback module that executes a query and relays 15 the result back to the caller. 16 17 defmodule MyConnection do 18 @behaviour Postgrex.SimpleConnection 19 20 @impl true 21 def init(_args) do 22 {:ok, %{from: nil}} 23 end 24 25 @impl true 26 def handle_call({:query, query}, from, state) do 27 {:query, query, %{state | from: from}} 28 end 29 30 @impl true 31 def handle_result(results, state) when is_list(results) do 32 SimpleConnection.reply(state.from, results) 33 34 {:noreply, state} 35 end 36 37 @impl true 38 def handle_result(%Postgrex.Error{} = error, state) do 39 SimpleConnection.reply(state.from, error) 40 41 {:noreply, state} 42 end 43 end 44 45 # Start the connection 46 {:ok, pid} = SimpleConnection.start_link(MyConnection, [], database: "demo") 47 48 # Execute a literal query 49 SimpleConnection.call(pid, {:query, "SELECT 1"}) 50 # => %Postgrex.Result{rows: [["1"]]} 51 52 We start a connection by passing the callback module, callback options, and 53 server options to `SimpleConnection.start_link/3`. The `init/1` function 54 receives any callback options and returns the callback state. 55 56 Queries are sent through `SimpleConnection.call/2`, executed on the server, 57 and the result is handed off to `handle_result/2`. At that point the callback 58 can process the result before replying back to the caller with 59 `SimpleConnection.reply/2`. 60 61 ## Building a PubSub Connection 62 63 With the `notify/3` callback you can also build a pubsub server on top of 64 `LISTEN/NOTIFY`. Here's a naive pubsub implementation: 65 66 defmodule MyPubSub do 67 @behaviour Postgrex.SimpleConnection 68 69 defstruct [:from, listeners: %{}] 70 71 @impl true 72 def init(args) do 73 {:ok, struct!(__MODULE__, args)} 74 end 75 76 @impl true 77 def notify(channel, payload, state) do 78 for pid <- state.listeners[channel] do 79 send(pid, {:notice, channel, payload}) 80 end 81 end 82 83 @impl true 84 def handle_call({:listen, channel}, {pid, _} = from, state) do 85 listeners = Map.update(state.listeners, channel, [pid], &[pid | &1]) 86 87 {:query, ~s(LISTEN "#{channel}"), %{state | from: from, listeners: listeners}} 88 end 89 90 def handle_call({:query, query}, from, state) do 91 {:query, query, %{state | from: from}} 92 end 93 94 @impl true 95 def handle_result(_results, state) do 96 SimpleConnection.reply(state.from, :ok) 97 98 {:noreply, %{state | from: nil}} 99 end 100 end 101 102 # Start the connection 103 {:ok, pid} = SimpleConnection.start_link(MyPubSub, [], database: "demo") 104 105 # Start listening to the "demo" channel 106 SimpleConnection.call(pid, {:listen, "demo"}) 107 # => %Postgrex.Result{command: :listen} 108 109 # Notify all listeners 110 SimpleConnection.call(pid, {:query, ~s(NOTIFY "demo", 'hello')}) 111 # => %Postgrex.Result{command: :notify} 112 113 # Check the inbox to see the notice message 114 flush() 115 # => {:notice, "demo", "hello"} 116 117 See `Postgrex.Notifications` for a more complex implementation that can 118 unlisten, handle process exits, and persist across reconnection. 119 120 ## Name registration 121 122 A `Postgrex.SimpleConnection` is bound to the same name registration rules as a 123 `GenServer`. Read more about them in the `GenServer` docs. 124 """ 125 126 use Connection 127 128 require Logger 129 130 alias Postgrex.Protocol 131 132 @doc false 133 defstruct idle_interval: 5000, 134 protocol: nil, 135 auto_reconnect: false, 136 reconnect_backoff: 500, 137 state: nil 138 139 ## PUBLIC API ## 140 141 @type query :: iodata 142 @type state :: term 143 144 @doc """ 145 Callback for process initialization. 146 147 This is called once and before the Postgrex connection is established. 148 """ 149 @callback init(term) :: {:ok, state} 150 151 @doc """ 152 Callback for processing or relaying pubsub notifications. 153 """ 154 @callback notify(binary, binary, state) :: :ok 155 156 @doc """ 157 Invoked after connecting or reconnecting. 158 159 This may be called multiple times if `:auto_reconnect` is true. 160 """ 161 @callback handle_connect(state) :: {:noreply, state} | {:query, query, state} 162 163 @doc """ 164 Invoked after disconnection. 165 166 This is invoked regardless of the `:auto_reconnect` option. 167 """ 168 @callback handle_disconnect(state) :: {:noreply, state} 169 170 @doc """ 171 Callback for `SimpleConnection.call/3`. 172 173 Replies must be sent with `SimpleConnection.reply/2`. 174 """ 175 @callback handle_call(term, GenServer.from(), state) :: 176 {:noreply, state} | {:query, query, state} 177 178 @doc """ 179 Callback for `Kernel.send/2`. 180 """ 181 @callback handle_info(term, state) :: {:noreply, state} | {:query, query, state} 182 183 @doc """ 184 Callback for processing or relaying queries executed via `{:query, query, state}`. 185 186 Either a list of successful query results or an error will be passed to this callback. 187 A list is passed because the simple query protocol allows multiple commands to be 188 issued in a single query. 189 """ 190 @callback handle_result([Postgrex.Result.t()] | Postgrex.Error.t(), state) :: 191 {:noreply, state} 192 193 @optional_callbacks handle_call: 3, 194 handle_connect: 1, 195 handle_disconnect: 1, 196 handle_info: 2, 197 handle_result: 2 198 199 @doc """ 200 Replies to the given client. 201 202 Wrapper for `GenServer.reply/2`. 203 """ 204 defdelegate reply(client, reply), to: GenServer 205 206 @doc """ 207 Calls the given server. 208 209 Wrapper for `GenServer.call/3`. 210 """ 211 def call(server, message, timeout \\ 5000) do 212 with {__MODULE__, reason} <- GenServer.call(server, message, timeout) do 213 exit({reason, {__MODULE__, :call, [server, message, timeout]}}) 214 end 215 end 216 217 @doc false 218 def child_spec(opts) do 219 %{id: __MODULE__, start: {__MODULE__, :start_link, opts}} 220 end 221 222 @doc """ 223 Start the connection process and connect to Postgres. 224 225 The options that this function accepts are the same as those accepted by 226 `Postgrex.start_link/1`, as well as the extra options `:sync_connect`, 227 `:auto_reconnect`, `:reconnect_backoff`, and `:configure`. 228 229 ## Options 230 231 * `:auto_reconnect` - automatically attempt to reconnect to the database 232 in event of a disconnection. See the 233 [note about async connect and auto-reconnects](#module-async-connect-and-auto-reconnects) 234 above. Defaults to `false`, which means the process terminates. 235 236 * `:configure` - A function to run before every connect attempt to dynamically 237 configure the options as a `{module, function, args}`, where the current 238 options will prepended to `args`. Defaults to `nil`. 239 240 * `:idle_interval` - while also accepted on `Postgrex.start_link/1`, it has 241 a default of `5000ms` in `Postgrex.SimpleConnection` (instead of 1000ms). 242 243 * `:reconnect_backoff` - time (in ms) between reconnection attempts when 244 `auto_reconnect` is enabled. Defaults to `500`. 245 246 * `:sync_connect` - controls if the connection should be established on boot 247 or asynchronously right after boot. Defaults to `true`. 248 """ 249 @spec start_link(module, term, Keyword.t()) :: {:ok, pid} | {:error, Postgrex.Error.t() | term} 250 def start_link(module, args, opts) do 251 {server_opts, opts} = Keyword.split(opts, [:name]) 252 opts = Keyword.put_new(opts, :sync_connect, true) 253 connection_opts = Postgrex.Utils.default_opts(opts) 254 Connection.start_link(__MODULE__, {module, args, connection_opts}, server_opts) 255 end 256 257 ## CALLBACKS ## 258 259 @doc false 260 def init({mod, args, opts}) do 261 case mod.init(args) do 262 {:ok, mod_state} -> 263 idle_timeout = opts[:idle_timeout] 264 265 if idle_timeout do 266 Logger.warning( 267 ":idle_timeout in Postgrex.SimpleConnection is deprecated, " <> 268 "please use :idle_interval instead" 269 ) 270 end 271 272 {idle_interval, opts} = Keyword.pop(opts, :idle_interval, idle_timeout || 5000) 273 {auto_reconnect, opts} = Keyword.pop(opts, :auto_reconnect, false) 274 {reconnect_backoff, opts} = Keyword.pop(opts, :reconnect_backoff, 500) 275 276 state = %__MODULE__{ 277 idle_interval: idle_interval, 278 auto_reconnect: auto_reconnect, 279 reconnect_backoff: reconnect_backoff, 280 state: {mod, mod_state} 281 } 282 283 put_opts(mod, opts) 284 285 if opts[:sync_connect] do 286 case connect(:init, state) do 287 {:ok, _} = ok -> ok 288 {:backoff, _, _} = backoff -> backoff 289 {:stop, reason, _} -> {:stop, reason} 290 end 291 else 292 {:connect, :init, state} 293 end 294 end 295 end 296 297 @doc false 298 def connect(_, %{state: {mod, mod_state}} = state) do 299 opts = 300 case Keyword.get(opts(mod), :configure) do 301 {module, fun, args} -> apply(module, fun, [opts(mod) | args]) 302 fun when is_function(fun, 1) -> fun.(opts(mod)) 303 nil -> opts(mod) 304 end 305 306 case Protocol.connect(opts) do 307 {:ok, protocol} -> 308 state = %{state | protocol: protocol} 309 310 with {:noreply, state, _} <- maybe_handle(mod, :handle_connect, [mod_state], state) do 311 {:ok, state} 312 end 313 314 {:error, reason} -> 315 if state.auto_reconnect do 316 {:backoff, state.reconnect_backoff, state} 317 else 318 {:stop, reason, state} 319 end 320 end 321 end 322 323 @doc false 324 def handle_call(msg, from, %{state: {mod, mod_state}} = state) do 325 handle(mod, :handle_call, [msg, from, mod_state], from, state) 326 end 327 328 @doc false 329 def handle_info(:timeout, %{protocol: protocol} = state) do 330 case Protocol.ping(protocol) do 331 {:ok, protocol} -> 332 {:noreply, %{state | protocol: protocol}, state.idle_interval} 333 334 {error, reason, protocol} -> 335 reconnect_or_stop(error, reason, protocol, state) 336 end 337 end 338 339 def handle_info(msg, %{protocol: protocol, state: {mod, mod_state}} = state) do 340 opts = [notify: &mod.notify(&1, &2, mod_state)] 341 342 case Protocol.handle_info(msg, opts, protocol) do 343 {:ok, protocol} -> 344 {:noreply, %{state | protocol: protocol}, state.idle_interval} 345 346 {:unknown, protocol} -> 347 maybe_handle(mod, :handle_info, [msg, mod_state], %{state | protocol: protocol}) 348 349 {error, reason, protocol} -> 350 reconnect_or_stop(error, reason, protocol, state) 351 end 352 end 353 354 def handle_info(msg, %{state: {mod, mod_state}} = state) do 355 maybe_handle(mod, :handle_info, [msg, mod_state], state) 356 end 357 358 defp maybe_handle(mod, fun, args, state) do 359 if function_exported?(mod, fun, length(args)) do 360 handle(mod, fun, args, nil, state) 361 else 362 {:noreply, state, state.idle_interval} 363 end 364 end 365 366 defp handle(mod, fun, args, from, state) do 367 case apply(mod, fun, args) do 368 {:noreply, mod_state} -> 369 {:noreply, %{state | state: {mod, mod_state}}, state.idle_interval} 370 371 {:query, query, mod_state} -> 372 opts = [notify: &mod.notify(&1, &2, mod_state)] 373 374 state = %{state | state: {mod, mod_state}} 375 376 with {:ok, results, protocol} <- Protocol.handle_simple(query, opts, state.protocol), 377 {:ok, protocol} <- Protocol.checkin(protocol) do 378 state = %{state | protocol: protocol} 379 380 handle(mod, :handle_result, [results, mod_state], from, state) 381 else 382 {:error, %Postgrex.Error{} = error, protocol} -> 383 handle(mod, :handle_result, [error, mod_state], from, %{state | protocol: protocol}) 384 385 {:disconnect, reason, protocol} -> 386 reconnect_or_stop(:disconnect, reason, protocol, state) 387 end 388 end 389 end 390 391 defp reconnect_or_stop(error, reason, protocol, %{state: {mod, mod_state}} = state) 392 when error in [:error, :disconnect] do 393 {:noreply, state, _} = maybe_handle(mod, :handle_disconnect, [mod_state], state) 394 395 if state.auto_reconnect do 396 {:connect, :reconnect, state} 397 else 398 {:stop, reason, %{state | protocol: protocol}} 399 end 400 end 401 402 defp opts(mod), do: Process.get(mod) 403 404 defp put_opts(mod, opts), do: Process.put(mod, opts) 405 end