zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

replication_connection.ex (19410B)


      1 defmodule Postgrex.ReplicationConnection do
      2   @moduledoc ~S"""
      3   A process that receives and sends PostgreSQL replication messages.
      4 
      5   > Note: this module is experimental and may be subject to changes
      6   > in the future.
      7 
      8   ## Logical replication
      9 
     10   Let's see how to use this module for connecting to PostgreSQL
     11   for logical replication. First of all, you need to configure the
     12   wal level in PostgreSQL to logical. Run this inside your PostgreSQL
     13   shell/configuration:
     14 
     15       ALTER SYSTEM SET wal_level='logical';
     16       ALTER SYSTEM SET max_wal_senders='10';
     17       ALTER SYSTEM SET max_replication_slots='10';
     18 
     19   Then **you must restart your server**. Alternatively, you can set
     20   those values when starting "postgres". This is useful, for example,
     21   when running it from Docker:
     22 
     23       services:
     24         postgres:
     25           image: postgres:14
     26           env:
     27             ...
     28           command: ["postgres", "-c", "wal_level=logical"]
     29 
     30   For CI, GitHub Actions do not support setting command, so you can
     31   update and restart Postgres instead in a step:
     32 
     33       - name: "Set PG settings"
     34         run: |
     35           docker exec ${{ job.services.postgres.id }} sh -c 'echo "wal_level=logical" >> /var/lib/postgresql/data/postgresql.conf'
     36           docker restart ${{ job.services.pg.id }}
     37 
     38   Then you must create a publication to be replicated.
     39   This can be done in any session:
     40 
     41       CREATE PUBLICATION example FOR ALL TABLES;
     42 
     43   You can also filter if you want to publish insert, update,
     44   delete or a subset of them:
     45 
     46       # Skips updates (keeps inserts, deletes, begins, commits, etc)
     47       create PUBLICATION example FOR ALL TABLES WITH (publish = 'insert,delete');
     48 
     49       # Skips inserts, updates, and deletes (keeps begins, commits, etc)
     50       create PUBLICATION example FOR ALL TABLES WITH (publish = '');
     51 
     52   Now we are ready to create module that starts a replication slot
     53   and listens to our publication. Our example will use the pgoutput
     54   for logical replication and print all incoming messages to the
     55   terminal:
     56 
     57       Mix.install([:postgrex])
     58 
     59       defmodule Repl do
     60         use Postgrex.ReplicationConnection
     61 
     62         def start_link(opts) do
     63           # Automatically reconnect if we lose connection.
     64           extra_opts = [
     65             auto_reconnect: true
     66           ]
     67 
     68           Postgrex.ReplicationConnection.start_link(__MODULE__, :ok, extra_opts ++ opts)
     69         end
     70 
     71         @impl true
     72         def init(:ok) do
     73           {:ok, %{step: :disconnected}}
     74         end
     75 
     76         @impl true
     77         def handle_connect(state) do
     78           query = "CREATE_REPLICATION_SLOT postgrex TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"
     79           {:query, query, %{state | step: :create_slot}}
     80         end
     81 
     82         @impl true
     83         def handle_result(results, %{step: :create_slot} = state) when is_list(results) do
     84           query = "START_REPLICATION SLOT postgrex LOGICAL 0/0 (proto_version '1', publication_names 'postgrex_example')"
     85           {:stream, query, [], %{state | step: :streaming}}
     86         end
     87 
     88         @impl true
     89         # https://www.postgresql.org/docs/14/protocol-replication.html
     90         def handle_data(<<?w, _wal_start::64, _wal_end::64, _clock::64, rest::binary>>, state) do
     91           IO.inspect(rest)
     92           {:noreply, state}
     93         end
     94 
     95         def handle_data(<<?k, wal_end::64, _clock::64, reply>>, state) do
     96           messages =
     97             case reply do
     98               1 -> [<<?r, wal_end + 1::64, wal_end + 1::64, wal_end + 1::64, current_time()::64, 0>>]
     99               0 -> []
    100             end
    101 
    102           {:noreply, messages, state}
    103         end
    104 
    105         @epoch DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond)
    106         defp current_time(), do: System.os_time(:microsecond) - @epoch
    107       end
    108 
    109       {:ok, pid} =
    110         Repl.start_link(
    111           host: "localhost",
    112           database: "demo_dev",
    113           username: "postgres",
    114         )
    115 
    116       Process.sleep(:infinity)
    117 
    118   ## `use` options
    119 
    120   `use Postgrex.ReplicationConnection` accepts a list of options which configures the
    121   child specification and therefore how it runs under a supervisor.
    122   The generated `child_spec/1` can be customized with the following options:
    123 
    124     * `:id` - the child specification identifier, defaults to the current module
    125     * `:restart` - when the child should be restarted, defaults to `:permanent`
    126     * `:shutdown` - how to shut down the child, either immediately or by giving
    127       it time to shut down
    128 
    129   For example:
    130 
    131       use Postgrex.ReplicationConnection, restart: :transient, shutdown: 10_000
    132 
    133   See the "Child specification" section in the `Supervisor` module for more
    134   detailed information. The `@doc` annotation immediately preceding
    135   `use Postgrex.ReplicationConnection` will be attached to the generated `child_spec/1`
    136   function.
    137 
    138   ## Name registration
    139 
    140   A `Postgrex.ReplicationConnection` is bound to the same name registration rules as a
    141   `GenServer`. Read more about them in the `GenServer` docs.
    142   """
    143 
    144   use Connection
    145   require Logger
    146   import Bitwise
    147 
    148   alias Postgrex.Protocol
    149 
    150   @doc false
    151   defstruct protocol: nil,
    152             state: nil,
    153             auto_reconnect: false,
    154             reconnect_backoff: 500,
    155             streaming: nil
    156 
    157   ## PUBLIC API ##
    158 
    159   @type server :: GenServer.server()
    160   @type state :: term
    161   @type ack :: iodata
    162   @type query :: iodata
    163 
    164   @typedoc """
    165   The following options configure streaming:
    166 
    167     * `:max_messages` - The maximum number of replications messages that can be
    168       accumulated from the wire until they are relayed to `handle_data/2`.
    169       Defaults to `500`.
    170 
    171   """
    172   @type stream_opts :: [max_messages: pos_integer]
    173   @max_lsn_component_size 8
    174   @max_uint64 18_446_744_073_709_551_615
    175   @max_messages 500
    176 
    177   @doc """
    178   Callback for process initialization.
    179 
    180   This is called once and before the Postgrex connection is established.
    181   """
    182   @callback init(term) :: {:ok, state}
    183 
    184   @doc """
    185   Invoked after connecting.
    186 
    187   This may be invoked multiple times if `:auto_reconnect` is set to true.
    188   """
    189   @callback handle_connect(state) ::
    190               {:noreply, state}
    191               | {:noreply, ack, state}
    192               | {:query, query, state}
    193               | {:stream, query, stream_opts, state}
    194 
    195   @doc """
    196   Invoked after disconnecting.
    197 
    198   This is only invoked if `:auto_reconnect` is set to true.
    199   """
    200   @callback handle_disconnect(state) :: {:noreply, state}
    201 
    202   @doc """
    203   Callback for `:stream` outputs.
    204 
    205   If any callback returns `{:stream, iodata, opts, state}`, then this
    206   callback will be eventually called with the result of the query.
    207   It receives `binary` streaming messages.
    208 
    209   This can be useful for replication and copy commands. For replication,
    210   the format of the messages are described [under the START_REPLICATION
    211   section in PostgreSQL docs](https://www.postgresql.org/docs/14/protocol-replication.html).
    212   Replication messages may require explicit acknowledgement, which can
    213   be done by returning a list of binaries according to the replication
    214   protocol.
    215   """
    216   @callback handle_data(binary | :done, state) ::
    217               {:noreply, state}
    218               | {:noreply, ack, state}
    219               | {:query, query, state}
    220               | {:stream, query, stream_opts, state}
    221 
    222   @doc """
    223   Callback for `Kernel.send/2`.
    224   """
    225   @callback handle_info(term, state) ::
    226               {:noreply, state}
    227               | {:noreply, ack, state}
    228               | {:query, query, state}
    229               | {:stream, query, stream_opts, state}
    230 
    231   @doc """
    232   Callback for `call/3`.
    233 
    234   Replies must be sent with `reply/2`.
    235 
    236   If `auto_reconnect: false` (the default) and there is a disconnection,
    237   the process will terminate and the caller will exit even if no reply is
    238   sent. However, if `auto_reconnect` is set to true, a disconnection will
    239   keep the process alive, which means that any command that has not yet
    240   been replied to should eventually do so. One simple approach is to
    241   reply to any pending commands on `c:handle_disconnect/1`.
    242   """
    243   @callback handle_call(term, GenServer.from(), state) ::
    244               {:noreply, state}
    245               | {:noreply, ack, state}
    246               | {:query, query, state}
    247               | {:stream, query, stream_opts, state}
    248 
    249   @doc """
    250   Callback for `:query` outputs.
    251 
    252   If any callback returns `{:query, iodata, state}`,
    253   then this callback will be immediatelly called with
    254   the result of the query. Please note that even though
    255   replicaton connections use the simple query protocol,
    256   Postgres currently limits them to single command queries.
    257   Due to this constraint, this callback will be passed
    258   either a list with a single successful query result or
    259   an error.
    260   """
    261   @callback handle_result([Postgrex.Result.t()] | Postgrex.Error.t(), state) ::
    262               {:noreply, state}
    263               | {:noreply, ack, state}
    264               | {:query, query, state}
    265               | {:stream, query, stream_opts, state}
    266 
    267   @optional_callbacks handle_call: 3,
    268                       handle_connect: 1,
    269                       handle_data: 2,
    270                       handle_disconnect: 1,
    271                       handle_info: 2,
    272                       handle_result: 2
    273 
    274   @doc """
    275   Replies to the given `call/3`.
    276   """
    277   defdelegate reply(client, reply), to: GenServer
    278 
    279   @doc """
    280   Calls the given replication server.
    281   """
    282   def call(server, message, timeout \\ 5000) do
    283     with {__MODULE__, reason} <- GenServer.call(server, message, timeout) do
    284       exit({reason, {__MODULE__, :call, [server, message, timeout]}})
    285     end
    286   end
    287 
    288   @doc false
    289   defmacro __using__(opts) do
    290     quote location: :keep, bind_quoted: [opts: opts] do
    291       @behaviour Postgrex.ReplicationConnection
    292 
    293       unless Module.has_attribute?(__MODULE__, :doc) do
    294         @doc """
    295         Returns a specification to start this module under a supervisor.
    296 
    297         See `Supervisor`.
    298         """
    299       end
    300 
    301       def child_spec(init_arg) do
    302         default = %{
    303           id: __MODULE__,
    304           start: {__MODULE__, :start_link, [init_arg]}
    305         }
    306 
    307         Supervisor.child_spec(default, unquote(Macro.escape(opts)))
    308       end
    309 
    310       defoverridable child_spec: 1
    311     end
    312   end
    313 
    314   @doc """
    315   Starts a replication process with the given callback `module`.
    316 
    317   ## Options
    318 
    319   The options that this function accepts are the same as those
    320   accepted by `Postgrex.start_link/1`, except for `:idle_interval`.
    321 
    322   It also accepts extra options for connection management, documented below.
    323   Also note this function also automatically set `:replication` to `"database"`
    324   as part of the connection `:parameters` if none is set yet.
    325 
    326   ### Connection options
    327 
    328     * `:sync_connect` - controls if the connection should be established on boot
    329       or asynchronously right after boot. Defaults to `true`.
    330 
    331     * `:auto_reconnect` - automatically attempt to reconnect to the database
    332       in event of a disconnection. See the
    333       [note about async connect and auto-reconnects](#module-async-connect-and-auto-reconnects)
    334       above. Defaults to `false`, which means the process terminates.
    335 
    336     * `:reconnect_backoff` - time (in ms) between reconnection attempts when
    337       `:auto_reconnect` is enabled. Defaults to `500`.
    338   """
    339   @spec start_link(module(), term(), Keyword.t()) ::
    340           {:ok, pid} | {:error, Postgrex.Error.t() | term}
    341   def start_link(module, arg, opts) do
    342     {server_opts, opts} = Keyword.split(opts, [:name])
    343     opts = Keyword.put_new(opts, :sync_connect, true)
    344     connection_opts = Postgrex.Utils.default_opts(opts)
    345     Connection.start_link(__MODULE__, {module, arg, connection_opts}, server_opts)
    346   end
    347 
    348   @doc """
    349   Returns the string representation of an LSN value, given its integer representation.
    350 
    351   It returns `:error` if the provided integer falls outside the range for a valid
    352   unsigned 64-bit integer.
    353 
    354   ## Log Sequence Numbers
    355 
    356   PostgreSQL uses two representations for the Log Sequence Number (LSN):
    357 
    358     * An unsigned 64-bit integer. Used internally by PostgreSQL and sent in the XLogData
    359     replication messages.
    360 
    361     * A string of two hexadecimal numbers of up to eight digits each, separated
    362     by a slash. e.g. `1/F73E0220`. This is the form accepted by `start_replication/2`.
    363 
    364   For more information on Log Sequence Numbers, see
    365   [PostgreSQL pg_lsn docs](https://www.postgresql.org/docs/current/datatype-pg-lsn.html) and
    366   [PostgreSQL WAL internals docs](https://www.postgresql.org/docs/current/wal-internals.html).
    367   """
    368   @spec encode_lsn(integer) :: {:ok, String.t()} | :error
    369   def encode_lsn(lsn) when is_integer(lsn) do
    370     if 0 <= lsn and lsn <= @max_uint64 do
    371       <<file_id::32, offset::32>> = <<lsn::64>>
    372       {:ok, Integer.to_string(file_id, 16) <> "/" <> Integer.to_string(offset, 16)}
    373     else
    374       :error
    375     end
    376   end
    377 
    378   @doc """
    379   Returns the integer representation of an LSN value, given its string representation.
    380 
    381   It returns `:error` if the provided string is not a valid LSN.
    382 
    383   ## Log Sequence Numbers
    384 
    385   PostgreSQL uses two representations for the Log Sequence Number (LSN):
    386 
    387     * An unsigned 64-bit integer. Used internally by PostgreSQL and sent in the XLogData
    388     replication messages.
    389 
    390     * A string of two hexadecimal numbers of up to eight digits each, separated
    391     by a slash. e.g. `1/F73E0220`. This is the form accepted by `start_replication/2`.
    392 
    393   For more information on Log Sequence Numbers, see
    394   [PostgreSQL pg_lsn docs](https://www.postgresql.org/docs/current/datatype-pg-lsn.html) and
    395   [PostgreSQL WAL internals docs](https://www.postgresql.org/docs/current/wal-internals.html).
    396   """
    397   @spec decode_lsn(String.t()) :: {:ok, integer} | :error
    398   def decode_lsn(lsn) when is_binary(lsn) do
    399     with [file_id, offset] <- String.split(lsn, "/", trim: true),
    400          true <- byte_size(file_id) <= @max_lsn_component_size,
    401          true <- byte_size(offset) <= @max_lsn_component_size,
    402          {file_id, ""} when file_id >= 0 <- Integer.parse(file_id, 16),
    403          {offset, ""} when offset >= 0 <- Integer.parse(offset, 16) do
    404       {:ok, file_id <<< 32 ||| offset}
    405     else
    406       _ -> :error
    407     end
    408   end
    409 
    410   ## CALLBACKS ##
    411 
    412   @doc false
    413   def init({mod, arg, opts}) do
    414     case mod.init(arg) do
    415       {:ok, mod_state} ->
    416         opts =
    417           Keyword.update(
    418             opts,
    419             :parameters,
    420             [replication: "database"],
    421             &Keyword.put_new(&1, :replication, "database")
    422           )
    423 
    424         {auto_reconnect, opts} = Keyword.pop(opts, :auto_reconnect, false)
    425         {reconnect_backoff, opts} = Keyword.pop(opts, :reconnect_backoff, 500)
    426 
    427         state = %__MODULE__{
    428           auto_reconnect: auto_reconnect,
    429           reconnect_backoff: reconnect_backoff,
    430           state: {mod, mod_state}
    431         }
    432 
    433         put_opts(opts)
    434 
    435         if opts[:sync_connect] do
    436           case connect(:init, state) do
    437             {:ok, _} = ok -> ok
    438             {:backoff, _, _} = backoff -> backoff
    439             {:stop, reason, _} -> {:stop, reason}
    440           end
    441         else
    442           {:connect, :init, state}
    443         end
    444     end
    445   end
    446 
    447   @doc false
    448   def connect(_, %{state: {mod, mod_state}} = s) do
    449     case Protocol.connect(opts()) do
    450       {:ok, protocol} ->
    451         s = %{s | protocol: protocol}
    452 
    453         with {:noreply, s} <- maybe_handle(mod, :handle_connect, [mod_state], s) do
    454           {:ok, s}
    455         end
    456 
    457       {:error, reason} ->
    458         if s.auto_reconnect do
    459           {:backoff, s.reconnect_backoff, s}
    460         else
    461           {:stop, reason, s}
    462         end
    463     end
    464   end
    465 
    466   def handle_call(msg, from, %{state: {mod, mod_state}} = s) do
    467     handle(mod, :handle_call, [msg, from, mod_state], from, s)
    468   end
    469 
    470   @doc false
    471   def handle_info(msg, %{protocol: protocol, streaming: streaming} = s) do
    472     case Protocol.handle_copy_recv(msg, streaming, protocol) do
    473       {:ok, copies, protocol} ->
    474         handle_data(copies, %{s | protocol: protocol})
    475 
    476       :unknown ->
    477         %{state: {mod, mod_state}} = s
    478         maybe_handle(mod, :handle_info, [msg, mod_state], s)
    479 
    480       {error, reason, protocol} ->
    481         reconnect_or_stop(error, reason, protocol, s)
    482     end
    483   end
    484 
    485   defp handle_data([], s), do: {:noreply, s}
    486 
    487   defp handle_data([:copy_done | copies], %{state: {mod, mod_state}} = s) do
    488     with {:noreply, s} <- handle(mod, :handle_data, [:done, mod_state], nil, s) do
    489       handle_data(copies, %{s | streaming: nil})
    490     end
    491   end
    492 
    493   defp handle_data([copy | copies], %{state: {mod, mod_state}} = s) do
    494     with {:noreply, s} <- handle(mod, :handle_data, [copy, mod_state], nil, s) do
    495       handle_data(copies, s)
    496     end
    497   end
    498 
    499   defp maybe_handle(mod, fun, args, s) do
    500     if function_exported?(mod, fun, length(args)) do
    501       handle(mod, fun, args, nil, s)
    502     else
    503       {:noreply, s}
    504     end
    505   end
    506 
    507   defp handle(mod, fun, args, from, %{streaming: streaming} = s) do
    508     case apply(mod, fun, args) do
    509       {:noreply, mod_state} ->
    510         {:noreply, %{s | state: {mod, mod_state}}}
    511 
    512       {:noreply, replies, mod_state} ->
    513         s = %{s | state: {mod, mod_state}}
    514 
    515         case Protocol.handle_copy_send(replies, s.protocol) do
    516           :ok -> {:noreply, s}
    517           {error, reason, protocol} -> reconnect_or_stop(error, reason, protocol, s)
    518         end
    519 
    520       {:stream, query, opts, mod_state} when streaming == nil ->
    521         s = %{s | state: {mod, mod_state}}
    522         max_messages = opts[:max_messages] || @max_messages
    523 
    524         with {:ok, protocol} <- Protocol.handle_streaming(query, s.protocol),
    525              {:ok, protocol} <- Protocol.checkin(protocol) do
    526           {:noreply, %{s | protocol: protocol, streaming: max_messages}}
    527         else
    528           {error_or_disconnect, reason, protocol} ->
    529             reconnect_or_stop(error_or_disconnect, reason, protocol, s)
    530         end
    531 
    532       {:stream, _query, _opts, mod_state} ->
    533         stream_in_progress(:stream, mod, mod_state, from, s)
    534 
    535       {:query, query, mod_state} when streaming == nil ->
    536         case Protocol.handle_simple(query, [], s.protocol) do
    537           {:ok, results, protocol} when is_list(results) ->
    538             handle(mod, :handle_result, [results, mod_state], from, %{s | protocol: protocol})
    539 
    540           {:error, %Postgrex.Error{} = error, protocol} ->
    541             handle(mod, :handle_result, [error, mod_state], from, %{s | protocol: protocol})
    542 
    543           {:disconnect, reason, protocol} ->
    544             reconnect_or_stop(:disconnect, reason, protocol, %{s | state: {mod, mod_state}})
    545         end
    546 
    547       {:query, _query, mod_state} ->
    548         stream_in_progress(:query, mod, mod_state, from, s)
    549     end
    550   end
    551 
    552   defp stream_in_progress(command, mod, mod_state, from, s) do
    553     Logger.warning("received #{command} while stream is already in progress")
    554     from && reply(from, {__MODULE__, :stream_in_progress})
    555     {:noreply, %{s | state: {mod, mod_state}}}
    556   end
    557 
    558   defp reconnect_or_stop(error, reason, protocol, %{auto_reconnect: false} = s)
    559        when error in [:error, :disconnect] do
    560     %{state: {mod, mod_state}} = s
    561     {:noreply, s} = maybe_handle(mod, :handle_disconnect, [mod_state], %{s | protocol: protocol})
    562     {:stop, reason, s}
    563   end
    564 
    565   defp reconnect_or_stop(error, _reason, _protocol, %{auto_reconnect: true} = s)
    566        when error in [:error, :disconnect] do
    567     %{state: {mod, mod_state}} = s
    568     {:noreply, s} = maybe_handle(mod, :handle_disconnect, [mod_state], s)
    569     {:connect, :reconnect, %{s | streaming: nil}}
    570   end
    571 
    572   defp opts(), do: Process.get(__MODULE__)
    573   defp put_opts(opts), do: Process.put(__MODULE__, opts)
    574 end