postgres.ex (16925B)
1 defmodule Ecto.Adapters.Postgres do 2 @moduledoc """ 3 Adapter module for PostgreSQL. 4 5 It uses `Postgrex` for communicating to the database. 6 7 ## Features 8 9 * Full query support (including joins, preloads and associations) 10 * Support for transactions 11 * Support for data migrations 12 * Support for ecto.create and ecto.drop operations 13 * Support for transactional tests via `Ecto.Adapters.SQL` 14 15 ## Options 16 17 Postgres options split in different categories described 18 below. All options can be given via the repository 19 configuration: 20 21 config :your_app, YourApp.Repo, 22 ... 23 24 The `:prepare` option may be specified per operation: 25 26 YourApp.Repo.all(Queryable, prepare: :unnamed) 27 28 ### Migration options 29 30 * `:migration_lock` - prevent multiple nodes from running migrations at the same 31 time by obtaining a lock. The value `:table_lock` will lock migrations by wrapping 32 the entire migration inside a database transaction, including inserting the 33 migration version into the migration source (by default, "schema_migrations"). 34 You may alternatively select `:pg_advisory_lock` which has the benefit 35 of allowing concurrent operations such as creating indexes. (default: `:table_lock`) 36 37 When using the `:pg_advisory_lock` migration lock strategy and Ecto cannot obtain 38 the lock due to another instance occupying the lock, Ecto will wait for 5 seconds 39 and then retry infinity times. This is configurable on the repo with keys 40 `:migration_advisory_lock_retry_interval_ms` and `:migration_advisory_lock_max_tries`. 41 If the retries are exhausted, the migration will fail. 42 43 Some downsides to using advisory locks is that some Postgres-compatible systems or plugins 44 may not support session level locks well and therefore result in inconsistent behavior. 45 For example, PgBouncer when using pool_modes other than session won't work well with 46 advisory locks. CockroachDB is another system that is designed in a way that advisory 47 locks don't make sense for their distributed database. 48 49 ### Connection options 50 51 * `:hostname` - Server hostname 52 * `:socket_dir` - Connect to Postgres via UNIX sockets in the given directory 53 The socket name is derived based on the port. This is the preferred method 54 for configuring sockets and it takes precedence over the hostname. If you are 55 connecting to a socket outside of the Postgres convention, use `:socket` instead; 56 * `:socket` - Connect to Postgres via UNIX sockets in the given path. 57 This option takes precedence over the `:hostname` and `:socket_dir` 58 * `:username` - Username 59 * `:password` - User password 60 * `:port` - Server port (default: 5432) 61 * `:database` - the database to connect to 62 * `:maintenance_database` - Specifies the name of the database to connect to when 63 creating or dropping the database. Defaults to `"postgres"` 64 * `:pool` - The connection pool module, may be set to `Ecto.Adapters.SQL.Sandbox` 65 * `:ssl` - Set to true if ssl should be used (default: false) 66 * `:ssl_opts` - A list of ssl options, see Erlang's `ssl` docs 67 * `:parameters` - Keyword list of connection parameters 68 * `:connect_timeout` - The timeout for establishing new connections (default: 5000) 69 * `:prepare` - How to prepare queries, either `:named` to use named queries 70 or `:unnamed` to force unnamed queries (default: `:named`) 71 * `:socket_options` - Specifies socket configuration 72 * `:show_sensitive_data_on_connection_error` - show connection data and 73 configuration whenever there is an error attempting to connect to the 74 database 75 76 The `:socket_options` are particularly useful when configuring the size 77 of both send and receive buffers. For example, when Ecto starts with a 78 pool of 20 connections, the memory usage may quickly grow from 20MB to 79 50MB based on the operating system default values for TCP buffers. It is 80 advised to stick with the operating system defaults but they can be 81 tweaked if desired: 82 83 socket_options: [recbuf: 8192, sndbuf: 8192] 84 85 We also recommend developers to consult the `Postgrex.start_link/1` 86 documentation for a complete listing of all supported options. 87 88 ### Storage options 89 90 * `:encoding` - the database encoding (default: "UTF8") 91 or `:unspecified` to remove encoding parameter (alternative engine compatibility) 92 * `:template` - the template to create the database from 93 * `:lc_collate` - the collation order 94 * `:lc_ctype` - the character classification 95 * `:dump_path` - where to place dumped structures 96 * `:force_drop` - force the database to be dropped even 97 if it has connections to it (requires PostgreSQL 13+) 98 99 ### After connect callback 100 101 If you want to execute a callback as soon as connection is established 102 to the database, you can use the `:after_connect` configuration. For 103 example, in your repository configuration you can add: 104 105 after_connect: {Postgrex, :query!, ["SET search_path TO global_prefix", []]} 106 107 You can also specify your own module that will receive the Postgrex 108 connection as argument. 109 110 ## Extensions 111 112 Both PostgreSQL and its adapter for Elixir, Postgrex, support an 113 extension system. If you want to use custom extensions for Postgrex 114 alongside Ecto, you must define a type module with your extensions. 115 Create a new file anywhere in your application with the following: 116 117 Postgrex.Types.define(MyApp.PostgresTypes, 118 [MyExtension.Foo, MyExtensionBar] ++ Ecto.Adapters.Postgres.extensions()) 119 120 Once your type module is defined, you can configure the repository to use it: 121 122 config :my_app, MyApp.Repo, types: MyApp.PostgresTypes 123 124 """ 125 126 # Inherit all behaviour from Ecto.Adapters.SQL 127 use Ecto.Adapters.SQL, driver: :postgrex 128 129 require Logger 130 131 # And provide a custom storage implementation 132 @behaviour Ecto.Adapter.Storage 133 @behaviour Ecto.Adapter.Structure 134 135 @default_maintenance_database "postgres" 136 @default_prepare_opt :named 137 138 @doc """ 139 All Ecto extensions for Postgrex. 140 """ 141 def extensions do 142 [] 143 end 144 145 # Support arrays in place of IN 146 @impl true 147 def dumpers({:map, _}, type), do: [&Ecto.Type.embedded_dump(type, &1, :json)] 148 def dumpers({:in, sub}, {:in, sub}), do: [{:array, sub}] 149 def dumpers(:binary_id, type), do: [type, Ecto.UUID] 150 def dumpers(_, type), do: [type] 151 152 ## Query API 153 154 @impl Ecto.Adapter.Queryable 155 def execute(adapter_meta, query_meta, query, params, opts) do 156 prepare = Keyword.get(opts, :prepare, @default_prepare_opt) 157 158 unless valid_prepare?(prepare) do 159 raise ArgumentError, 160 "expected option `:prepare` to be either `:named` or `:unnamed`, got: #{inspect(prepare)}" 161 end 162 163 Ecto.Adapters.SQL.execute(prepare, adapter_meta, query_meta, query, params, opts) 164 end 165 166 defp valid_prepare?(prepare) when prepare in [:named, :unnamed], do: true 167 defp valid_prepare?(_), do: false 168 169 ## Storage API 170 171 @impl true 172 def storage_up(opts) do 173 database = 174 Keyword.fetch!(opts, :database) || raise ":database is nil in repository configuration" 175 176 encoding = if opts[:encoding] == :unspecified, do: nil, else: opts[:encoding] || "UTF8" 177 maintenance_database = Keyword.get(opts, :maintenance_database, @default_maintenance_database) 178 opts = Keyword.put(opts, :database, maintenance_database) 179 180 check_existence_command = "SELECT FROM pg_database WHERE datname = '#{database}'" 181 182 case run_query(check_existence_command, opts) do 183 {:ok, %{num_rows: 1}} -> 184 {:error, :already_up} 185 186 _ -> 187 create_command = 188 ~s(CREATE DATABASE "#{database}") 189 |> concat_if(encoding, &"ENCODING '#{&1}'") 190 |> concat_if(opts[:template], &"TEMPLATE=#{&1}") 191 |> concat_if(opts[:lc_ctype], &"LC_CTYPE='#{&1}'") 192 |> concat_if(opts[:lc_collate], &"LC_COLLATE='#{&1}'") 193 194 case run_query(create_command, opts) do 195 {:ok, _} -> 196 :ok 197 198 {:error, %{postgres: %{code: :duplicate_database}}} -> 199 {:error, :already_up} 200 201 {:error, error} -> 202 {:error, Exception.message(error)} 203 end 204 end 205 end 206 207 defp concat_if(content, nil, _), do: content 208 defp concat_if(content, false, _), do: content 209 defp concat_if(content, value, fun), do: content <> " " <> fun.(value) 210 211 @impl true 212 def storage_down(opts) do 213 database = Keyword.fetch!(opts, :database) || raise ":database is nil in repository configuration" 214 command = "DROP DATABASE \"#{database}\"" 215 |> concat_if(opts[:force_drop], fn _ -> "WITH (FORCE)" end) 216 maintenance_database = Keyword.get(opts, :maintenance_database, @default_maintenance_database) 217 opts = Keyword.put(opts, :database, maintenance_database) 218 219 case run_query(command, opts) do 220 {:ok, _} -> 221 :ok 222 {:error, %{postgres: %{code: :invalid_catalog_name}}} -> 223 {:error, :already_down} 224 {:error, error} -> 225 {:error, Exception.message(error)} 226 end 227 end 228 229 @impl Ecto.Adapter.Storage 230 def storage_status(opts) do 231 database = Keyword.fetch!(opts, :database) || raise ":database is nil in repository configuration" 232 maintenance_database = Keyword.get(opts, :maintenance_database, @default_maintenance_database) 233 opts = Keyword.put(opts, :database, maintenance_database) 234 235 check_database_query = "SELECT datname FROM pg_catalog.pg_database WHERE datname = '#{database}'" 236 237 case run_query(check_database_query, opts) do 238 {:ok, %{num_rows: 0}} -> :down 239 {:ok, %{num_rows: _num_rows}} -> :up 240 other -> {:error, other} 241 end 242 end 243 244 @impl true 245 def supports_ddl_transaction? do 246 true 247 end 248 249 @impl true 250 def lock_for_migrations(meta, opts, fun) do 251 %{opts: adapter_opts, repo: repo} = meta 252 253 if Keyword.fetch(adapter_opts, :pool_size) == {:ok, 1} do 254 Ecto.Adapters.SQL.raise_migration_pool_size_error() 255 end 256 257 opts = Keyword.merge(opts, [timeout: :infinity, telemetry_options: [schema_migration: true]]) 258 config = repo.config() 259 lock_strategy = Keyword.get(config, :migration_lock, :table_lock) 260 do_lock_for_migrations(lock_strategy, meta, opts, config, fun) 261 end 262 263 defp do_lock_for_migrations(:pg_advisory_lock, meta, opts, config, fun) do 264 lock = :erlang.phash2({:ecto, opts[:prefix], meta.repo}) 265 266 retry_state = %{ 267 retry_interval_ms: config[:migration_advisory_lock_retry_interval_ms] || 5000, 268 max_tries: config[:migration_advisory_lock_max_tries] || :infinity, 269 tries: 0 270 } 271 272 advisory_lock(meta, opts, lock, retry_state, fun) 273 end 274 275 defp do_lock_for_migrations(:table_lock, meta, opts, _config, fun) do 276 {:ok, res} = 277 transaction(meta, opts, fn -> 278 # SHARE UPDATE EXCLUSIVE MODE is the first lock that locks 279 # itself but still allows updates to happen, see 280 # # https://www.postgresql.org/docs/9.4/explicit-locking.html 281 source = Keyword.get(opts, :migration_source, "schema_migrations") 282 table = if prefix = opts[:prefix], do: ~s|"#{prefix}"."#{source}"|, else: ~s|"#{source}"| 283 {:ok, _} = Ecto.Adapters.SQL.query(meta, "LOCK TABLE #{table} IN SHARE UPDATE EXCLUSIVE MODE", [], opts) 284 fun.() 285 end) 286 287 res 288 end 289 290 defp advisory_lock(meta, opts, lock, retry_state, fun) do 291 result = checkout(meta, opts, fn -> 292 case Ecto.Adapters.SQL.query(meta, "SELECT pg_try_advisory_lock(#{lock})", [], opts) do 293 {:ok, %{rows: [[true]]}} -> 294 try do 295 {:ok, fun.()} 296 after 297 release_advisory_lock(meta, opts, lock) 298 end 299 _ -> 300 :no_advisory_lock 301 end 302 end) 303 304 case result do 305 {:ok, fun_result} -> 306 fun_result 307 308 :no_advisory_lock -> 309 maybe_retry_advisory_lock(meta, opts, lock, retry_state, fun) 310 end 311 end 312 313 defp release_advisory_lock(meta, opts, lock) do 314 case Ecto.Adapters.SQL.query(meta, "SELECT pg_advisory_unlock(#{lock})", [], opts) do 315 {:ok, %{rows: [[true]]}} -> 316 :ok 317 _ -> 318 raise "failed to release advisory lock" 319 end 320 end 321 322 defp maybe_retry_advisory_lock(meta, opts, lock, retry_state, fun) do 323 %{retry_interval_ms: interval, max_tries: max_tries, tries: tries} = retry_state 324 325 if max_tries != :infinity && max_tries <= tries do 326 raise "failed to obtain advisory lock. Tried #{max_tries} times waiting #{interval}ms between tries" 327 else 328 if Keyword.get(opts, :log_migrator_sql, false) do 329 Logger.info("Migration lock occupied for #{inspect(meta.repo)}. Retry #{tries + 1}/#{max_tries} at #{interval}ms intervals.") 330 end 331 332 Process.sleep(interval) 333 retry_state = %{retry_state | tries: tries + 1} 334 advisory_lock(meta, opts, lock, retry_state, fun) 335 end 336 end 337 338 @impl true 339 def structure_dump(default, config) do 340 table = config[:migration_source] || "schema_migrations" 341 with {:ok, versions} <- select_versions(table, config), 342 {:ok, path} <- pg_dump(default, config), 343 do: append_versions(table, versions, path) 344 end 345 346 defp select_versions(table, config) do 347 case run_query(~s[SELECT version FROM public."#{table}" ORDER BY version], config) do 348 {:ok, %{rows: rows}} -> {:ok, Enum.map(rows, &hd/1)} 349 {:error, %{postgres: %{code: :undefined_table}}} -> {:ok, []} 350 {:error, _} = error -> error 351 end 352 end 353 354 defp pg_dump(default, config) do 355 path = config[:dump_path] || Path.join(default, "structure.sql") 356 File.mkdir_p!(Path.dirname(path)) 357 358 case run_with_cmd("pg_dump", config, ["--file", path, "--schema-only", "--no-acl", "--no-owner"]) do 359 {_output, 0} -> 360 {:ok, path} 361 {output, _} -> 362 {:error, output} 363 end 364 end 365 366 defp append_versions(_table, [], path) do 367 {:ok, path} 368 end 369 370 defp append_versions(table, versions, path) do 371 sql = Enum.map_join(versions, &~s[INSERT INTO public."#{table}" (version) VALUES (#{&1});\n]) 372 373 File.open!(path, [:append], fn file -> 374 IO.write(file, sql) 375 end) 376 377 {:ok, path} 378 end 379 380 @impl true 381 def structure_load(default, config) do 382 path = config[:dump_path] || Path.join(default, "structure.sql") 383 args = ["--quiet", "--file", path, "-vON_ERROR_STOP=1", "--single-transaction"] 384 case run_with_cmd("psql", config, args) do 385 {_output, 0} -> {:ok, path} 386 {output, _} -> {:error, output} 387 end 388 end 389 390 @impl true 391 def dump_cmd(args, opts \\ [], config) when is_list(config) and is_list(args), 392 do: run_with_cmd("pg_dump", config, args, opts) 393 394 ## Helpers 395 396 defp run_query(sql, opts) do 397 {:ok, _} = Application.ensure_all_started(:ecto_sql) 398 {:ok, _} = Application.ensure_all_started(:postgrex) 399 400 opts = 401 opts 402 |> Keyword.drop([:name, :log, :pool, :pool_size]) 403 |> Keyword.put(:backoff_type, :stop) 404 |> Keyword.put(:max_restarts, 0) 405 406 task = Task.Supervisor.async_nolink(Ecto.Adapters.SQL.StorageSupervisor, fn -> 407 {:ok, conn} = Postgrex.start_link(opts) 408 409 value = Postgrex.query(conn, sql, [], opts) 410 GenServer.stop(conn) 411 value 412 end) 413 414 timeout = Keyword.get(opts, :timeout, 15_000) 415 416 case Task.yield(task, timeout) || Task.shutdown(task) do 417 {:ok, {:ok, result}} -> 418 {:ok, result} 419 {:ok, {:error, error}} -> 420 {:error, error} 421 {:exit, {%{__struct__: struct} = error, _}} 422 when struct in [Postgrex.Error, DBConnection.Error] -> 423 {:error, error} 424 {:exit, reason} -> 425 {:error, RuntimeError.exception(Exception.format_exit(reason))} 426 nil -> 427 {:error, RuntimeError.exception("command timed out")} 428 end 429 end 430 431 defp run_with_cmd(cmd, opts, opt_args, cmd_opts \\ []) do 432 unless System.find_executable(cmd) do 433 raise "could not find executable `#{cmd}` in path, " <> 434 "please guarantee it is available before running ecto commands" 435 end 436 437 env = 438 [{"PGCONNECT_TIMEOUT", "10"}] 439 env = 440 if password = opts[:password] do 441 [{"PGPASSWORD", password}|env] 442 else 443 env 444 end 445 446 args = 447 [] 448 args = 449 if username = opts[:username], do: ["--username", username | args], else: args 450 args = 451 if port = opts[:port], do: ["--port", to_string(port) | args], else: args 452 args = 453 if database = opts[:database], do: ["--dbname", database | args], else: args 454 455 host = opts[:socket_dir] || opts[:hostname] || System.get_env("PGHOST") || "localhost" 456 457 if opts[:socket] do 458 IO.warn( 459 ":socket option is ignored when connecting in structure_load/2 and structure_dump/2," <> 460 " use :socket_dir or :hostname instead" 461 ) 462 end 463 464 args = ["--host", host | args] 465 args = args ++ opt_args 466 467 cmd_opts = 468 cmd_opts 469 |> Keyword.put_new(:stderr_to_stdout, true) 470 |> Keyword.update(:env, env, &Enum.concat(env, &1)) 471 472 System.cmd(cmd, args, cmd_opts) 473 end 474 end