zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

postgres.ex (16925B)


      1 defmodule Ecto.Adapters.Postgres do
      2   @moduledoc """
      3   Adapter module for PostgreSQL.
      4 
      5   It uses `Postgrex` for communicating to the database.
      6 
      7   ## Features
      8 
      9     * Full query support (including joins, preloads and associations)
     10     * Support for transactions
     11     * Support for data migrations
     12     * Support for ecto.create and ecto.drop operations
     13     * Support for transactional tests via `Ecto.Adapters.SQL`
     14 
     15   ## Options
     16 
     17   Postgres options split in different categories described
     18   below. All options can be given via the repository
     19   configuration:
     20 
     21       config :your_app, YourApp.Repo,
     22         ...
     23 
     24   The `:prepare` option may be specified per operation:
     25 
     26       YourApp.Repo.all(Queryable, prepare: :unnamed)
     27 
     28   ### Migration options
     29 
     30     * `:migration_lock` - prevent multiple nodes from running migrations at the same
     31       time by obtaining a lock. The value `:table_lock` will lock migrations by wrapping
     32       the entire migration inside a database transaction, including inserting the
     33       migration version into the migration source (by default, "schema_migrations").
     34       You may alternatively select `:pg_advisory_lock` which has the benefit
     35       of allowing concurrent operations such as creating indexes. (default: `:table_lock`)
     36 
     37   When using the `:pg_advisory_lock` migration lock strategy and Ecto cannot obtain
     38   the lock due to another instance occupying the lock, Ecto will wait for 5 seconds
     39   and then retry infinity times. This is configurable on the repo with keys
     40   `:migration_advisory_lock_retry_interval_ms` and `:migration_advisory_lock_max_tries`.
     41   If the retries are exhausted, the migration will fail.
     42 
     43   Some downsides to using advisory locks is that some Postgres-compatible systems or plugins
     44   may not support session level locks well and therefore result in inconsistent behavior.
     45   For example, PgBouncer when using pool_modes other than session won't work well with
     46   advisory locks. CockroachDB is another system that is designed in a way that advisory
     47   locks don't make sense for their distributed database.
     48 
     49   ### Connection options
     50 
     51     * `:hostname` - Server hostname
     52     * `:socket_dir` - Connect to Postgres via UNIX sockets in the given directory
     53       The socket name is derived based on the port. This is the preferred method
     54       for configuring sockets and it takes precedence over the hostname. If you are
     55       connecting to a socket outside of the Postgres convention, use `:socket` instead;
     56     * `:socket` - Connect to Postgres via UNIX sockets in the given path.
     57       This option takes precedence over the `:hostname` and `:socket_dir`
     58     * `:username` - Username
     59     * `:password` - User password
     60     * `:port` - Server port (default: 5432)
     61     * `:database` - the database to connect to
     62     * `:maintenance_database` - Specifies the name of the database to connect to when
     63       creating or dropping the database. Defaults to `"postgres"`
     64     * `:pool` - The connection pool module, may be set to `Ecto.Adapters.SQL.Sandbox`
     65     * `:ssl` - Set to true if ssl should be used (default: false)
     66     * `:ssl_opts` - A list of ssl options, see Erlang's `ssl` docs
     67     * `:parameters` - Keyword list of connection parameters
     68     * `:connect_timeout` - The timeout for establishing new connections (default: 5000)
     69     * `:prepare` - How to prepare queries, either `:named` to use named queries
     70       or `:unnamed` to force unnamed queries (default: `:named`)
     71     * `:socket_options` - Specifies socket configuration
     72     * `:show_sensitive_data_on_connection_error` - show connection data and
     73       configuration whenever there is an error attempting to connect to the
     74       database
     75 
     76   The `:socket_options` are particularly useful when configuring the size
     77   of both send and receive buffers. For example, when Ecto starts with a
     78   pool of 20 connections, the memory usage may quickly grow from 20MB to
     79   50MB based on the operating system default values for TCP buffers. It is
     80   advised to stick with the operating system defaults but they can be
     81   tweaked if desired:
     82 
     83       socket_options: [recbuf: 8192, sndbuf: 8192]
     84 
     85   We also recommend developers to consult the `Postgrex.start_link/1`
     86   documentation for a complete listing of all supported options.
     87 
     88   ### Storage options
     89 
     90     * `:encoding` - the database encoding (default: "UTF8")
     91       or `:unspecified` to remove encoding parameter (alternative engine compatibility)
     92     * `:template` - the template to create the database from
     93     * `:lc_collate` - the collation order
     94     * `:lc_ctype` - the character classification
     95     * `:dump_path` - where to place dumped structures
     96     * `:force_drop` - force the database to be dropped even
     97       if it has connections to it (requires PostgreSQL 13+)
     98 
     99   ### After connect callback
    100 
    101   If you want to execute a callback as soon as connection is established
    102   to the database, you can use the `:after_connect` configuration. For
    103   example, in your repository configuration you can add:
    104 
    105       after_connect: {Postgrex, :query!, ["SET search_path TO global_prefix", []]}
    106 
    107   You can also specify your own module that will receive the Postgrex
    108   connection as argument.
    109 
    110   ## Extensions
    111 
    112   Both PostgreSQL and its adapter for Elixir, Postgrex, support an
    113   extension system. If you want to use custom extensions for Postgrex
    114   alongside Ecto, you must define a type module with your extensions.
    115   Create a new file anywhere in your application with the following:
    116 
    117       Postgrex.Types.define(MyApp.PostgresTypes,
    118                             [MyExtension.Foo, MyExtensionBar] ++ Ecto.Adapters.Postgres.extensions())
    119 
    120   Once your type module is defined, you can configure the repository to use it:
    121 
    122       config :my_app, MyApp.Repo, types: MyApp.PostgresTypes
    123 
    124   """
    125 
    126   # Inherit all behaviour from Ecto.Adapters.SQL
    127   use Ecto.Adapters.SQL, driver: :postgrex
    128 
    129   require Logger
    130 
    131   # And provide a custom storage implementation
    132   @behaviour Ecto.Adapter.Storage
    133   @behaviour Ecto.Adapter.Structure
    134 
    135   @default_maintenance_database "postgres"
    136   @default_prepare_opt :named
    137 
    138   @doc """
    139   All Ecto extensions for Postgrex.
    140   """
    141   def extensions do
    142     []
    143   end
    144 
    145   # Support arrays in place of IN
    146   @impl true
    147   def dumpers({:map, _}, type),        do: [&Ecto.Type.embedded_dump(type, &1, :json)]
    148   def dumpers({:in, sub}, {:in, sub}), do: [{:array, sub}]
    149   def dumpers(:binary_id, type),       do: [type, Ecto.UUID]
    150   def dumpers(_, type),                do: [type]
    151 
    152   ## Query API
    153 
    154   @impl Ecto.Adapter.Queryable
    155   def execute(adapter_meta, query_meta, query, params, opts) do
    156     prepare = Keyword.get(opts, :prepare, @default_prepare_opt)
    157 
    158     unless valid_prepare?(prepare) do
    159       raise ArgumentError,
    160         "expected option `:prepare` to be either `:named` or `:unnamed`, got: #{inspect(prepare)}"
    161     end
    162 
    163     Ecto.Adapters.SQL.execute(prepare, adapter_meta, query_meta, query, params, opts)
    164   end
    165 
    166   defp valid_prepare?(prepare) when prepare in [:named, :unnamed], do: true
    167   defp valid_prepare?(_), do: false
    168 
    169   ## Storage API
    170 
    171   @impl true
    172   def storage_up(opts) do
    173     database =
    174       Keyword.fetch!(opts, :database) || raise ":database is nil in repository configuration"
    175 
    176     encoding = if opts[:encoding] == :unspecified, do: nil, else: opts[:encoding] || "UTF8"
    177     maintenance_database = Keyword.get(opts, :maintenance_database, @default_maintenance_database)
    178     opts = Keyword.put(opts, :database, maintenance_database)
    179 
    180     check_existence_command = "SELECT FROM pg_database WHERE datname = '#{database}'"
    181 
    182     case run_query(check_existence_command, opts) do
    183       {:ok, %{num_rows: 1}} ->
    184         {:error, :already_up}
    185 
    186       _ ->
    187         create_command =
    188           ~s(CREATE DATABASE "#{database}")
    189           |> concat_if(encoding, &"ENCODING '#{&1}'")
    190           |> concat_if(opts[:template], &"TEMPLATE=#{&1}")
    191           |> concat_if(opts[:lc_ctype], &"LC_CTYPE='#{&1}'")
    192           |> concat_if(opts[:lc_collate], &"LC_COLLATE='#{&1}'")
    193 
    194         case run_query(create_command, opts) do
    195           {:ok, _} ->
    196             :ok
    197 
    198           {:error, %{postgres: %{code: :duplicate_database}}} ->
    199             {:error, :already_up}
    200 
    201           {:error, error} ->
    202             {:error, Exception.message(error)}
    203         end
    204     end
    205   end
    206 
    207   defp concat_if(content, nil, _),  do: content
    208   defp concat_if(content, false, _),  do: content
    209   defp concat_if(content, value, fun), do: content <> " " <> fun.(value)
    210 
    211   @impl true
    212   def storage_down(opts) do
    213     database = Keyword.fetch!(opts, :database) || raise ":database is nil in repository configuration"
    214     command = "DROP DATABASE \"#{database}\""
    215               |> concat_if(opts[:force_drop], fn _ -> "WITH (FORCE)" end)
    216     maintenance_database = Keyword.get(opts, :maintenance_database, @default_maintenance_database)
    217     opts = Keyword.put(opts, :database, maintenance_database)
    218 
    219     case run_query(command, opts) do
    220       {:ok, _} ->
    221         :ok
    222       {:error, %{postgres: %{code: :invalid_catalog_name}}} ->
    223         {:error, :already_down}
    224       {:error, error} ->
    225         {:error, Exception.message(error)}
    226     end
    227   end
    228 
    229   @impl Ecto.Adapter.Storage
    230   def storage_status(opts) do
    231     database = Keyword.fetch!(opts, :database) || raise ":database is nil in repository configuration"
    232     maintenance_database = Keyword.get(opts, :maintenance_database, @default_maintenance_database)
    233     opts = Keyword.put(opts, :database, maintenance_database)
    234 
    235     check_database_query = "SELECT datname FROM pg_catalog.pg_database WHERE datname = '#{database}'"
    236 
    237     case run_query(check_database_query, opts) do
    238       {:ok, %{num_rows: 0}} -> :down
    239       {:ok, %{num_rows: _num_rows}} -> :up
    240       other -> {:error, other}
    241     end
    242   end
    243 
    244   @impl true
    245   def supports_ddl_transaction? do
    246     true
    247   end
    248 
    249   @impl true
    250   def lock_for_migrations(meta, opts, fun) do
    251     %{opts: adapter_opts, repo: repo} = meta
    252 
    253     if Keyword.fetch(adapter_opts, :pool_size) == {:ok, 1} do
    254       Ecto.Adapters.SQL.raise_migration_pool_size_error()
    255     end
    256 
    257     opts = Keyword.merge(opts, [timeout: :infinity, telemetry_options: [schema_migration: true]])
    258     config = repo.config()
    259     lock_strategy = Keyword.get(config, :migration_lock, :table_lock)
    260     do_lock_for_migrations(lock_strategy, meta, opts, config, fun)
    261   end
    262 
    263   defp do_lock_for_migrations(:pg_advisory_lock, meta, opts, config, fun) do
    264     lock = :erlang.phash2({:ecto, opts[:prefix], meta.repo})
    265 
    266     retry_state = %{
    267       retry_interval_ms: config[:migration_advisory_lock_retry_interval_ms] || 5000,
    268       max_tries: config[:migration_advisory_lock_max_tries] || :infinity,
    269       tries: 0
    270     }
    271 
    272     advisory_lock(meta, opts, lock, retry_state, fun)
    273   end
    274 
    275   defp do_lock_for_migrations(:table_lock, meta, opts, _config, fun) do
    276     {:ok, res} =
    277       transaction(meta, opts, fn ->
    278         # SHARE UPDATE EXCLUSIVE MODE is the first lock that locks
    279         # itself but still allows updates to happen, see
    280         # # https://www.postgresql.org/docs/9.4/explicit-locking.html
    281         source = Keyword.get(opts, :migration_source, "schema_migrations")
    282         table = if prefix = opts[:prefix], do: ~s|"#{prefix}"."#{source}"|, else: ~s|"#{source}"|
    283         {:ok, _} = Ecto.Adapters.SQL.query(meta, "LOCK TABLE #{table} IN SHARE UPDATE EXCLUSIVE MODE", [], opts)
    284         fun.()
    285       end)
    286 
    287     res
    288   end
    289 
    290   defp advisory_lock(meta, opts, lock, retry_state, fun) do
    291     result = checkout(meta, opts, fn ->
    292       case Ecto.Adapters.SQL.query(meta, "SELECT pg_try_advisory_lock(#{lock})", [], opts) do
    293         {:ok, %{rows: [[true]]}} ->
    294           try do
    295             {:ok, fun.()}
    296           after
    297             release_advisory_lock(meta, opts, lock)
    298           end
    299         _ ->
    300           :no_advisory_lock
    301       end
    302     end)
    303 
    304     case result do
    305       {:ok, fun_result} ->
    306         fun_result
    307 
    308       :no_advisory_lock ->
    309         maybe_retry_advisory_lock(meta, opts, lock, retry_state, fun)
    310     end
    311   end
    312 
    313   defp release_advisory_lock(meta, opts, lock) do
    314     case Ecto.Adapters.SQL.query(meta, "SELECT pg_advisory_unlock(#{lock})", [], opts) do
    315       {:ok, %{rows: [[true]]}} ->
    316         :ok
    317       _ ->
    318         raise "failed to release advisory lock"
    319     end
    320   end
    321 
    322   defp maybe_retry_advisory_lock(meta, opts, lock, retry_state, fun) do
    323     %{retry_interval_ms: interval, max_tries: max_tries, tries: tries} = retry_state
    324 
    325     if max_tries != :infinity && max_tries <= tries do
    326       raise "failed to obtain advisory lock. Tried #{max_tries} times waiting #{interval}ms between tries"
    327     else
    328       if Keyword.get(opts, :log_migrator_sql, false) do
    329         Logger.info("Migration lock occupied for #{inspect(meta.repo)}. Retry #{tries + 1}/#{max_tries} at #{interval}ms intervals.")
    330       end
    331 
    332       Process.sleep(interval)
    333       retry_state = %{retry_state | tries: tries + 1}
    334       advisory_lock(meta, opts, lock, retry_state, fun)
    335     end
    336   end
    337 
    338   @impl true
    339   def structure_dump(default, config) do
    340     table = config[:migration_source] || "schema_migrations"
    341     with {:ok, versions} <- select_versions(table, config),
    342          {:ok, path} <- pg_dump(default, config),
    343          do: append_versions(table, versions, path)
    344   end
    345 
    346   defp select_versions(table, config) do
    347     case run_query(~s[SELECT version FROM public."#{table}" ORDER BY version], config) do
    348       {:ok, %{rows: rows}} -> {:ok, Enum.map(rows, &hd/1)}
    349       {:error, %{postgres: %{code: :undefined_table}}} -> {:ok, []}
    350       {:error, _} = error -> error
    351     end
    352   end
    353 
    354   defp pg_dump(default, config) do
    355     path = config[:dump_path] || Path.join(default, "structure.sql")
    356     File.mkdir_p!(Path.dirname(path))
    357 
    358     case run_with_cmd("pg_dump", config, ["--file", path, "--schema-only", "--no-acl", "--no-owner"]) do
    359       {_output, 0} ->
    360         {:ok, path}
    361       {output, _} ->
    362         {:error, output}
    363     end
    364   end
    365 
    366   defp append_versions(_table, [], path) do
    367     {:ok, path}
    368   end
    369 
    370   defp append_versions(table, versions, path) do
    371     sql = Enum.map_join(versions, &~s[INSERT INTO public."#{table}" (version) VALUES (#{&1});\n])
    372 
    373     File.open!(path, [:append], fn file ->
    374       IO.write(file, sql)
    375     end)
    376 
    377     {:ok, path}
    378   end
    379 
    380   @impl true
    381   def structure_load(default, config) do
    382     path = config[:dump_path] || Path.join(default, "structure.sql")
    383     args = ["--quiet", "--file", path, "-vON_ERROR_STOP=1", "--single-transaction"]
    384     case run_with_cmd("psql", config, args) do
    385       {_output, 0} -> {:ok, path}
    386       {output, _}  -> {:error, output}
    387     end
    388   end
    389 
    390   @impl true
    391   def dump_cmd(args, opts \\ [], config) when is_list(config) and is_list(args),
    392     do: run_with_cmd("pg_dump", config, args, opts)
    393 
    394   ## Helpers
    395 
    396   defp run_query(sql, opts) do
    397     {:ok, _} = Application.ensure_all_started(:ecto_sql)
    398     {:ok, _} = Application.ensure_all_started(:postgrex)
    399 
    400     opts =
    401       opts
    402       |> Keyword.drop([:name, :log, :pool, :pool_size])
    403       |> Keyword.put(:backoff_type, :stop)
    404       |> Keyword.put(:max_restarts, 0)
    405 
    406     task = Task.Supervisor.async_nolink(Ecto.Adapters.SQL.StorageSupervisor, fn ->
    407       {:ok, conn} = Postgrex.start_link(opts)
    408 
    409       value = Postgrex.query(conn, sql, [], opts)
    410       GenServer.stop(conn)
    411       value
    412     end)
    413 
    414     timeout = Keyword.get(opts, :timeout, 15_000)
    415 
    416     case Task.yield(task, timeout) || Task.shutdown(task) do
    417       {:ok, {:ok, result}} ->
    418         {:ok, result}
    419       {:ok, {:error, error}} ->
    420         {:error, error}
    421       {:exit, {%{__struct__: struct} = error, _}}
    422           when struct in [Postgrex.Error, DBConnection.Error] ->
    423         {:error, error}
    424       {:exit, reason}  ->
    425         {:error, RuntimeError.exception(Exception.format_exit(reason))}
    426       nil ->
    427         {:error, RuntimeError.exception("command timed out")}
    428     end
    429   end
    430 
    431   defp run_with_cmd(cmd, opts, opt_args, cmd_opts \\ []) do
    432     unless System.find_executable(cmd) do
    433       raise "could not find executable `#{cmd}` in path, " <>
    434             "please guarantee it is available before running ecto commands"
    435     end
    436 
    437     env =
    438       [{"PGCONNECT_TIMEOUT", "10"}]
    439     env =
    440       if password = opts[:password] do
    441         [{"PGPASSWORD", password}|env]
    442       else
    443         env
    444       end
    445 
    446     args =
    447       []
    448     args =
    449       if username = opts[:username], do: ["--username", username | args], else: args
    450     args =
    451       if port = opts[:port], do: ["--port", to_string(port) | args], else: args
    452     args =
    453       if database = opts[:database], do: ["--dbname", database | args], else: args
    454 
    455     host = opts[:socket_dir] || opts[:hostname] || System.get_env("PGHOST") || "localhost"
    456 
    457     if opts[:socket] do
    458       IO.warn(
    459         ":socket option is ignored when connecting in structure_load/2 and structure_dump/2," <>
    460           " use :socket_dir or :hostname instead"
    461       )
    462     end
    463 
    464     args = ["--host", host | args]
    465     args = args ++ opt_args
    466 
    467     cmd_opts =
    468       cmd_opts
    469       |> Keyword.put_new(:stderr_to_stdout, true)
    470       |> Keyword.update(:env, env, &Enum.concat(env, &1))
    471 
    472     System.cmd(cmd, args, cmd_opts)
    473   end
    474 end