replication_connection.ex (19410B)
1 defmodule Postgrex.ReplicationConnection do 2 @moduledoc ~S""" 3 A process that receives and sends PostgreSQL replication messages. 4 5 > Note: this module is experimental and may be subject to changes 6 > in the future. 7 8 ## Logical replication 9 10 Let's see how to use this module for connecting to PostgreSQL 11 for logical replication. First of all, you need to configure the 12 wal level in PostgreSQL to logical. Run this inside your PostgreSQL 13 shell/configuration: 14 15 ALTER SYSTEM SET wal_level='logical'; 16 ALTER SYSTEM SET max_wal_senders='10'; 17 ALTER SYSTEM SET max_replication_slots='10'; 18 19 Then **you must restart your server**. Alternatively, you can set 20 those values when starting "postgres". This is useful, for example, 21 when running it from Docker: 22 23 services: 24 postgres: 25 image: postgres:14 26 env: 27 ... 28 command: ["postgres", "-c", "wal_level=logical"] 29 30 For CI, GitHub Actions do not support setting command, so you can 31 update and restart Postgres instead in a step: 32 33 - name: "Set PG settings" 34 run: | 35 docker exec ${{ job.services.postgres.id }} sh -c 'echo "wal_level=logical" >> /var/lib/postgresql/data/postgresql.conf' 36 docker restart ${{ job.services.pg.id }} 37 38 Then you must create a publication to be replicated. 39 This can be done in any session: 40 41 CREATE PUBLICATION example FOR ALL TABLES; 42 43 You can also filter if you want to publish insert, update, 44 delete or a subset of them: 45 46 # Skips updates (keeps inserts, deletes, begins, commits, etc) 47 create PUBLICATION example FOR ALL TABLES WITH (publish = 'insert,delete'); 48 49 # Skips inserts, updates, and deletes (keeps begins, commits, etc) 50 create PUBLICATION example FOR ALL TABLES WITH (publish = ''); 51 52 Now we are ready to create module that starts a replication slot 53 and listens to our publication. Our example will use the pgoutput 54 for logical replication and print all incoming messages to the 55 terminal: 56 57 Mix.install([:postgrex]) 58 59 defmodule Repl do 60 use Postgrex.ReplicationConnection 61 62 def start_link(opts) do 63 # Automatically reconnect if we lose connection. 64 extra_opts = [ 65 auto_reconnect: true 66 ] 67 68 Postgrex.ReplicationConnection.start_link(__MODULE__, :ok, extra_opts ++ opts) 69 end 70 71 @impl true 72 def init(:ok) do 73 {:ok, %{step: :disconnected}} 74 end 75 76 @impl true 77 def handle_connect(state) do 78 query = "CREATE_REPLICATION_SLOT postgrex TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT" 79 {:query, query, %{state | step: :create_slot}} 80 end 81 82 @impl true 83 def handle_result(results, %{step: :create_slot} = state) when is_list(results) do 84 query = "START_REPLICATION SLOT postgrex LOGICAL 0/0 (proto_version '1', publication_names 'postgrex_example')" 85 {:stream, query, [], %{state | step: :streaming}} 86 end 87 88 @impl true 89 # https://www.postgresql.org/docs/14/protocol-replication.html 90 def handle_data(<<?w, _wal_start::64, _wal_end::64, _clock::64, rest::binary>>, state) do 91 IO.inspect(rest) 92 {:noreply, state} 93 end 94 95 def handle_data(<<?k, wal_end::64, _clock::64, reply>>, state) do 96 messages = 97 case reply do 98 1 -> [<<?r, wal_end + 1::64, wal_end + 1::64, wal_end + 1::64, current_time()::64, 0>>] 99 0 -> [] 100 end 101 102 {:noreply, messages, state} 103 end 104 105 @epoch DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond) 106 defp current_time(), do: System.os_time(:microsecond) - @epoch 107 end 108 109 {:ok, pid} = 110 Repl.start_link( 111 host: "localhost", 112 database: "demo_dev", 113 username: "postgres", 114 ) 115 116 Process.sleep(:infinity) 117 118 ## `use` options 119 120 `use Postgrex.ReplicationConnection` accepts a list of options which configures the 121 child specification and therefore how it runs under a supervisor. 122 The generated `child_spec/1` can be customized with the following options: 123 124 * `:id` - the child specification identifier, defaults to the current module 125 * `:restart` - when the child should be restarted, defaults to `:permanent` 126 * `:shutdown` - how to shut down the child, either immediately or by giving 127 it time to shut down 128 129 For example: 130 131 use Postgrex.ReplicationConnection, restart: :transient, shutdown: 10_000 132 133 See the "Child specification" section in the `Supervisor` module for more 134 detailed information. The `@doc` annotation immediately preceding 135 `use Postgrex.ReplicationConnection` will be attached to the generated `child_spec/1` 136 function. 137 138 ## Name registration 139 140 A `Postgrex.ReplicationConnection` is bound to the same name registration rules as a 141 `GenServer`. Read more about them in the `GenServer` docs. 142 """ 143 144 use Connection 145 require Logger 146 import Bitwise 147 148 alias Postgrex.Protocol 149 150 @doc false 151 defstruct protocol: nil, 152 state: nil, 153 auto_reconnect: false, 154 reconnect_backoff: 500, 155 streaming: nil 156 157 ## PUBLIC API ## 158 159 @type server :: GenServer.server() 160 @type state :: term 161 @type ack :: iodata 162 @type query :: iodata 163 164 @typedoc """ 165 The following options configure streaming: 166 167 * `:max_messages` - The maximum number of replications messages that can be 168 accumulated from the wire until they are relayed to `handle_data/2`. 169 Defaults to `500`. 170 171 """ 172 @type stream_opts :: [max_messages: pos_integer] 173 @max_lsn_component_size 8 174 @max_uint64 18_446_744_073_709_551_615 175 @max_messages 500 176 177 @doc """ 178 Callback for process initialization. 179 180 This is called once and before the Postgrex connection is established. 181 """ 182 @callback init(term) :: {:ok, state} 183 184 @doc """ 185 Invoked after connecting. 186 187 This may be invoked multiple times if `:auto_reconnect` is set to true. 188 """ 189 @callback handle_connect(state) :: 190 {:noreply, state} 191 | {:noreply, ack, state} 192 | {:query, query, state} 193 | {:stream, query, stream_opts, state} 194 195 @doc """ 196 Invoked after disconnecting. 197 198 This is only invoked if `:auto_reconnect` is set to true. 199 """ 200 @callback handle_disconnect(state) :: {:noreply, state} 201 202 @doc """ 203 Callback for `:stream` outputs. 204 205 If any callback returns `{:stream, iodata, opts, state}`, then this 206 callback will be eventually called with the result of the query. 207 It receives `binary` streaming messages. 208 209 This can be useful for replication and copy commands. For replication, 210 the format of the messages are described [under the START_REPLICATION 211 section in PostgreSQL docs](https://www.postgresql.org/docs/14/protocol-replication.html). 212 Replication messages may require explicit acknowledgement, which can 213 be done by returning a list of binaries according to the replication 214 protocol. 215 """ 216 @callback handle_data(binary | :done, state) :: 217 {:noreply, state} 218 | {:noreply, ack, state} 219 | {:query, query, state} 220 | {:stream, query, stream_opts, state} 221 222 @doc """ 223 Callback for `Kernel.send/2`. 224 """ 225 @callback handle_info(term, state) :: 226 {:noreply, state} 227 | {:noreply, ack, state} 228 | {:query, query, state} 229 | {:stream, query, stream_opts, state} 230 231 @doc """ 232 Callback for `call/3`. 233 234 Replies must be sent with `reply/2`. 235 236 If `auto_reconnect: false` (the default) and there is a disconnection, 237 the process will terminate and the caller will exit even if no reply is 238 sent. However, if `auto_reconnect` is set to true, a disconnection will 239 keep the process alive, which means that any command that has not yet 240 been replied to should eventually do so. One simple approach is to 241 reply to any pending commands on `c:handle_disconnect/1`. 242 """ 243 @callback handle_call(term, GenServer.from(), state) :: 244 {:noreply, state} 245 | {:noreply, ack, state} 246 | {:query, query, state} 247 | {:stream, query, stream_opts, state} 248 249 @doc """ 250 Callback for `:query` outputs. 251 252 If any callback returns `{:query, iodata, state}`, 253 then this callback will be immediatelly called with 254 the result of the query. Please note that even though 255 replicaton connections use the simple query protocol, 256 Postgres currently limits them to single command queries. 257 Due to this constraint, this callback will be passed 258 either a list with a single successful query result or 259 an error. 260 """ 261 @callback handle_result([Postgrex.Result.t()] | Postgrex.Error.t(), state) :: 262 {:noreply, state} 263 | {:noreply, ack, state} 264 | {:query, query, state} 265 | {:stream, query, stream_opts, state} 266 267 @optional_callbacks handle_call: 3, 268 handle_connect: 1, 269 handle_data: 2, 270 handle_disconnect: 1, 271 handle_info: 2, 272 handle_result: 2 273 274 @doc """ 275 Replies to the given `call/3`. 276 """ 277 defdelegate reply(client, reply), to: GenServer 278 279 @doc """ 280 Calls the given replication server. 281 """ 282 def call(server, message, timeout \\ 5000) do 283 with {__MODULE__, reason} <- GenServer.call(server, message, timeout) do 284 exit({reason, {__MODULE__, :call, [server, message, timeout]}}) 285 end 286 end 287 288 @doc false 289 defmacro __using__(opts) do 290 quote location: :keep, bind_quoted: [opts: opts] do 291 @behaviour Postgrex.ReplicationConnection 292 293 unless Module.has_attribute?(__MODULE__, :doc) do 294 @doc """ 295 Returns a specification to start this module under a supervisor. 296 297 See `Supervisor`. 298 """ 299 end 300 301 def child_spec(init_arg) do 302 default = %{ 303 id: __MODULE__, 304 start: {__MODULE__, :start_link, [init_arg]} 305 } 306 307 Supervisor.child_spec(default, unquote(Macro.escape(opts))) 308 end 309 310 defoverridable child_spec: 1 311 end 312 end 313 314 @doc """ 315 Starts a replication process with the given callback `module`. 316 317 ## Options 318 319 The options that this function accepts are the same as those 320 accepted by `Postgrex.start_link/1`, except for `:idle_interval`. 321 322 It also accepts extra options for connection management, documented below. 323 Also note this function also automatically set `:replication` to `"database"` 324 as part of the connection `:parameters` if none is set yet. 325 326 ### Connection options 327 328 * `:sync_connect` - controls if the connection should be established on boot 329 or asynchronously right after boot. Defaults to `true`. 330 331 * `:auto_reconnect` - automatically attempt to reconnect to the database 332 in event of a disconnection. See the 333 [note about async connect and auto-reconnects](#module-async-connect-and-auto-reconnects) 334 above. Defaults to `false`, which means the process terminates. 335 336 * `:reconnect_backoff` - time (in ms) between reconnection attempts when 337 `:auto_reconnect` is enabled. Defaults to `500`. 338 """ 339 @spec start_link(module(), term(), Keyword.t()) :: 340 {:ok, pid} | {:error, Postgrex.Error.t() | term} 341 def start_link(module, arg, opts) do 342 {server_opts, opts} = Keyword.split(opts, [:name]) 343 opts = Keyword.put_new(opts, :sync_connect, true) 344 connection_opts = Postgrex.Utils.default_opts(opts) 345 Connection.start_link(__MODULE__, {module, arg, connection_opts}, server_opts) 346 end 347 348 @doc """ 349 Returns the string representation of an LSN value, given its integer representation. 350 351 It returns `:error` if the provided integer falls outside the range for a valid 352 unsigned 64-bit integer. 353 354 ## Log Sequence Numbers 355 356 PostgreSQL uses two representations for the Log Sequence Number (LSN): 357 358 * An unsigned 64-bit integer. Used internally by PostgreSQL and sent in the XLogData 359 replication messages. 360 361 * A string of two hexadecimal numbers of up to eight digits each, separated 362 by a slash. e.g. `1/F73E0220`. This is the form accepted by `start_replication/2`. 363 364 For more information on Log Sequence Numbers, see 365 [PostgreSQL pg_lsn docs](https://www.postgresql.org/docs/current/datatype-pg-lsn.html) and 366 [PostgreSQL WAL internals docs](https://www.postgresql.org/docs/current/wal-internals.html). 367 """ 368 @spec encode_lsn(integer) :: {:ok, String.t()} | :error 369 def encode_lsn(lsn) when is_integer(lsn) do 370 if 0 <= lsn and lsn <= @max_uint64 do 371 <<file_id::32, offset::32>> = <<lsn::64>> 372 {:ok, Integer.to_string(file_id, 16) <> "/" <> Integer.to_string(offset, 16)} 373 else 374 :error 375 end 376 end 377 378 @doc """ 379 Returns the integer representation of an LSN value, given its string representation. 380 381 It returns `:error` if the provided string is not a valid LSN. 382 383 ## Log Sequence Numbers 384 385 PostgreSQL uses two representations for the Log Sequence Number (LSN): 386 387 * An unsigned 64-bit integer. Used internally by PostgreSQL and sent in the XLogData 388 replication messages. 389 390 * A string of two hexadecimal numbers of up to eight digits each, separated 391 by a slash. e.g. `1/F73E0220`. This is the form accepted by `start_replication/2`. 392 393 For more information on Log Sequence Numbers, see 394 [PostgreSQL pg_lsn docs](https://www.postgresql.org/docs/current/datatype-pg-lsn.html) and 395 [PostgreSQL WAL internals docs](https://www.postgresql.org/docs/current/wal-internals.html). 396 """ 397 @spec decode_lsn(String.t()) :: {:ok, integer} | :error 398 def decode_lsn(lsn) when is_binary(lsn) do 399 with [file_id, offset] <- String.split(lsn, "/", trim: true), 400 true <- byte_size(file_id) <= @max_lsn_component_size, 401 true <- byte_size(offset) <= @max_lsn_component_size, 402 {file_id, ""} when file_id >= 0 <- Integer.parse(file_id, 16), 403 {offset, ""} when offset >= 0 <- Integer.parse(offset, 16) do 404 {:ok, file_id <<< 32 ||| offset} 405 else 406 _ -> :error 407 end 408 end 409 410 ## CALLBACKS ## 411 412 @doc false 413 def init({mod, arg, opts}) do 414 case mod.init(arg) do 415 {:ok, mod_state} -> 416 opts = 417 Keyword.update( 418 opts, 419 :parameters, 420 [replication: "database"], 421 &Keyword.put_new(&1, :replication, "database") 422 ) 423 424 {auto_reconnect, opts} = Keyword.pop(opts, :auto_reconnect, false) 425 {reconnect_backoff, opts} = Keyword.pop(opts, :reconnect_backoff, 500) 426 427 state = %__MODULE__{ 428 auto_reconnect: auto_reconnect, 429 reconnect_backoff: reconnect_backoff, 430 state: {mod, mod_state} 431 } 432 433 put_opts(opts) 434 435 if opts[:sync_connect] do 436 case connect(:init, state) do 437 {:ok, _} = ok -> ok 438 {:backoff, _, _} = backoff -> backoff 439 {:stop, reason, _} -> {:stop, reason} 440 end 441 else 442 {:connect, :init, state} 443 end 444 end 445 end 446 447 @doc false 448 def connect(_, %{state: {mod, mod_state}} = s) do 449 case Protocol.connect(opts()) do 450 {:ok, protocol} -> 451 s = %{s | protocol: protocol} 452 453 with {:noreply, s} <- maybe_handle(mod, :handle_connect, [mod_state], s) do 454 {:ok, s} 455 end 456 457 {:error, reason} -> 458 if s.auto_reconnect do 459 {:backoff, s.reconnect_backoff, s} 460 else 461 {:stop, reason, s} 462 end 463 end 464 end 465 466 def handle_call(msg, from, %{state: {mod, mod_state}} = s) do 467 handle(mod, :handle_call, [msg, from, mod_state], from, s) 468 end 469 470 @doc false 471 def handle_info(msg, %{protocol: protocol, streaming: streaming} = s) do 472 case Protocol.handle_copy_recv(msg, streaming, protocol) do 473 {:ok, copies, protocol} -> 474 handle_data(copies, %{s | protocol: protocol}) 475 476 :unknown -> 477 %{state: {mod, mod_state}} = s 478 maybe_handle(mod, :handle_info, [msg, mod_state], s) 479 480 {error, reason, protocol} -> 481 reconnect_or_stop(error, reason, protocol, s) 482 end 483 end 484 485 defp handle_data([], s), do: {:noreply, s} 486 487 defp handle_data([:copy_done | copies], %{state: {mod, mod_state}} = s) do 488 with {:noreply, s} <- handle(mod, :handle_data, [:done, mod_state], nil, s) do 489 handle_data(copies, %{s | streaming: nil}) 490 end 491 end 492 493 defp handle_data([copy | copies], %{state: {mod, mod_state}} = s) do 494 with {:noreply, s} <- handle(mod, :handle_data, [copy, mod_state], nil, s) do 495 handle_data(copies, s) 496 end 497 end 498 499 defp maybe_handle(mod, fun, args, s) do 500 if function_exported?(mod, fun, length(args)) do 501 handle(mod, fun, args, nil, s) 502 else 503 {:noreply, s} 504 end 505 end 506 507 defp handle(mod, fun, args, from, %{streaming: streaming} = s) do 508 case apply(mod, fun, args) do 509 {:noreply, mod_state} -> 510 {:noreply, %{s | state: {mod, mod_state}}} 511 512 {:noreply, replies, mod_state} -> 513 s = %{s | state: {mod, mod_state}} 514 515 case Protocol.handle_copy_send(replies, s.protocol) do 516 :ok -> {:noreply, s} 517 {error, reason, protocol} -> reconnect_or_stop(error, reason, protocol, s) 518 end 519 520 {:stream, query, opts, mod_state} when streaming == nil -> 521 s = %{s | state: {mod, mod_state}} 522 max_messages = opts[:max_messages] || @max_messages 523 524 with {:ok, protocol} <- Protocol.handle_streaming(query, s.protocol), 525 {:ok, protocol} <- Protocol.checkin(protocol) do 526 {:noreply, %{s | protocol: protocol, streaming: max_messages}} 527 else 528 {error_or_disconnect, reason, protocol} -> 529 reconnect_or_stop(error_or_disconnect, reason, protocol, s) 530 end 531 532 {:stream, _query, _opts, mod_state} -> 533 stream_in_progress(:stream, mod, mod_state, from, s) 534 535 {:query, query, mod_state} when streaming == nil -> 536 case Protocol.handle_simple(query, [], s.protocol) do 537 {:ok, results, protocol} when is_list(results) -> 538 handle(mod, :handle_result, [results, mod_state], from, %{s | protocol: protocol}) 539 540 {:error, %Postgrex.Error{} = error, protocol} -> 541 handle(mod, :handle_result, [error, mod_state], from, %{s | protocol: protocol}) 542 543 {:disconnect, reason, protocol} -> 544 reconnect_or_stop(:disconnect, reason, protocol, %{s | state: {mod, mod_state}}) 545 end 546 547 {:query, _query, mod_state} -> 548 stream_in_progress(:query, mod, mod_state, from, s) 549 end 550 end 551 552 defp stream_in_progress(command, mod, mod_state, from, s) do 553 Logger.warning("received #{command} while stream is already in progress") 554 from && reply(from, {__MODULE__, :stream_in_progress}) 555 {:noreply, %{s | state: {mod, mod_state}}} 556 end 557 558 defp reconnect_or_stop(error, reason, protocol, %{auto_reconnect: false} = s) 559 when error in [:error, :disconnect] do 560 %{state: {mod, mod_state}} = s 561 {:noreply, s} = maybe_handle(mod, :handle_disconnect, [mod_state], %{s | protocol: protocol}) 562 {:stop, reason, s} 563 end 564 565 defp reconnect_or_stop(error, _reason, _protocol, %{auto_reconnect: true} = s) 566 when error in [:error, :disconnect] do 567 %{state: {mod, mod_state}} = s 568 {:noreply, s} = maybe_handle(mod, :handle_disconnect, [mod_state], s) 569 {:connect, :reconnect, %{s | streaming: nil}} 570 end 571 572 defp opts(), do: Process.get(__MODULE__) 573 defp put_opts(opts), do: Process.put(__MODULE__, opts) 574 end