postgrex.ex (27347B)
1 defmodule Postgrex do 2 @moduledoc """ 3 PostgreSQL driver for Elixir. 4 5 Postgrex is a partial implementation of the Postgres [frontend/backend 6 message protocol](https://www.postgresql.org/docs/current/protocol.html). 7 It performs wire messaging in Elixir, as opposed to binding to a library 8 such as `libpq` in C. 9 10 A Postgrex query is performed as "[extended query](https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY)". 11 An "extended query" involves separate server-side parse, bind, and execute 12 stages, each of which may be re-used for efficiency. For example, libraries 13 like Ecto caches queries, so a query only has to be parsed and planned once. 14 This is all done via wire messaging, without relying on `PREPARE q AS (...)` 15 and `EXECUTE q()` SQL statements directly. 16 17 This module handles the connection to PostgreSQL, providing support 18 for queries, transactions, connection backoff, logging, pooling and 19 more. 20 21 Note that the notifications API (pub/sub) supported by PostgreSQL is 22 handled by `Postgrex.Notifications`. Hence, to use this feature, 23 you need to start a separate (notifications) connection. 24 """ 25 26 alias Postgrex.Query 27 28 @typedoc """ 29 A connection process name, pid or reference. 30 31 A connection reference is used when making multiple requests to the same 32 connection, see `transaction/3`. 33 """ 34 @type conn :: DBConnection.conn() 35 36 @type start_option :: 37 {:hostname, String.t()} 38 | {:endpoints, [tuple()]} 39 | {:socket_dir, Path.t()} 40 | {:socket, Path.t()} 41 | {:port, :inet.port_number()} 42 | {:database, String.t()} 43 | {:username, String.t()} 44 | {:password, String.t()} 45 | {:parameters, keyword} 46 | {:timeout, timeout} 47 | {:connect_timeout, timeout} 48 | {:handshake_timeout, timeout} 49 | {:ping_timeout, timeout} 50 | {:ssl, boolean} 51 | {:ssl_opts, [:ssl.tls_client_option()]} 52 | {:socket_options, [:gen_tcp.connect_option()]} 53 | {:prepare, :named | :unnamed} 54 | {:transactions, :strict | :naive} 55 | {:types, module} 56 | {:search_path, [String.t()]} 57 | {:disconnect_on_error_codes, [atom]} 58 | DBConnection.start_option() 59 60 @type option :: 61 {:mode, :transaction | :savepoint} 62 | DBConnection.option() 63 64 @type execute_option :: 65 {:decode_mapper, (list -> term)} 66 | option 67 68 @max_rows 500 69 @timeout 15_000 70 71 ### PUBLIC API ### 72 73 @doc """ 74 Start the connection process and connect to postgres. 75 76 ## Options 77 78 Postgrex provides multiple ways to connect to the server, listed in order of 79 precedence below: 80 81 * `:hostname` - Server hostname (default: PGHOST env variable, then localhost); 82 * `:port` - Server port (default: PGPORT env variable, then 5432); 83 * `:endpoints` - A list of endpoints (host and port pairs, with an optional 84 extra_opts keyword list); 85 Postgrex will try each endpoint in order, one by one, until the connection succeeds; 86 The syntax is `[{host1, port1},{host2, port2},{host3, port3}]` or 87 `[{host1, port1, extra_opt1: value},{host2, port2, extra_opt2: value}}]`; 88 This option takes precedence over `:hostname+:port`; 89 * `:socket_dir` - Connect to PostgreSQL via UNIX sockets in the given directory; 90 The socket name is derived based on the port. This is the preferred method 91 for configuring sockets and it takes precedence over the hostname. If you are 92 connecting to a socket outside of the PostgreSQL convention, use `:socket` instead; 93 * `:socket` - Connect to PostgreSQL via UNIX sockets in the given path. 94 This option takes precedence over the `:hostname`, `:endpoints` and `:socket_dir`; 95 96 Once a server is specified, you can configure the connection with the following: 97 98 * `:database` - Database (default: PGDATABASE env variable; otherwise required); 99 100 * `:username` - Username (default: PGUSER env variable, then USER env var); 101 102 * `:password` - User password (default: PGPASSWORD env variable); 103 104 * `:parameters` - Keyword list of connection parameters; 105 106 * `:timeout` - Socket receive timeout when idle in milliseconds (default: 107 `#{@timeout}`); 108 109 * `:connect_timeout` - Socket connect timeout in milliseconds (defaults to 110 `:timeout` value); 111 112 * `:handshake_timeout` - Connection handshake timeout in milliseconds 113 (defaults to `:timeout` value); 114 115 * `:ping_timeout` - Socket receive timeout when idle in milliseconds (defaults to 116 `:timeout` value); 117 118 * `:idle_interval` - Ping connections after a period of inactivity in milliseconds. 119 Defaults to 1000ms; 120 121 * `:ssl` - Set to `true` if ssl should be used (default: `false`); 122 123 * `:ssl_opts` - A list of ssl options, see the 124 [`tls_client_option`](http://erlang.org/doc/man/ssl.html#type-tls_client_option) 125 from the ssl docs; 126 127 * `:socket_options` - Options to be given to the underlying socket 128 (applies to both TCP and UNIX sockets); 129 130 * `:target_server_type` - Allows opening connections to a server in the given 131 replica mode. The allowed values are `:any`, `:primary` and `:secondary` 132 (default: `:any`). If this option is used together with `endpoints`, we will 133 traverse all endpoints until we find an endpoint matching the server type; 134 135 * `:disconnect_on_error_codes` - List of error code atoms that when encountered 136 will disconnect the connection. This is useful when using Postgrex against systems that 137 support failover, which when it occurs will emit certain error codes 138 e.g. `:read_only_sql_transaction` (default: `[]`); 139 140 * `:show_sensitive_data_on_connection_error` - By default, `Postgrex` 141 hides all information during connection errors to avoid leaking credentials 142 or other sensitive information. You can set this option if you wish to 143 see complete errors and stacktraces during connection errors; 144 145 The following options controls the pool and other Postgrex features: 146 147 * `:prepare` - How to prepare queries, either `:named` to use named queries 148 or `:unnamed` to force unnamed queries (default: `:named`); 149 150 * `:transactions` - Set to `:strict` to error on unexpected transaction 151 state, otherwise set to `:naive` (default: `:strict`); 152 153 * `:pool` - The pool module to use, defaults to `DBConnection.ConnectionPool`. 154 See the pool documentation for more options. The default `:pool_size` for 155 the default pool is 1. If you set a different pool, this option must be 156 included with all requests contacting the pool; 157 158 * `:types` - The types module to use, see `Postgrex.Types.define/3`, this 159 option is only required when using custom encoding or decoding (default: 160 `Postgrex.DefaultTypes`); 161 162 * `:search_path` - A list of strings used to set the search path for the connection. 163 This is useful when, for instance, an extension like `citext` is installed in a 164 separate schema. If that schema is not in the connection's search path, Postgrex 165 might not be able to recognize the extension's data type. When this option is `nil`, 166 the search path is not modified. (default: `nil`). 167 See the [PostgreSQL docs](https://www.postgresql.org/docs/current/ddl-schemas.html#DDL-SCHEMAS-PATH) 168 for more details. 169 170 171 `Postgrex` uses the `DBConnection` library and supports all `DBConnection` 172 options like `:idle`, `:after_connect` etc. See `DBConnection.start_link/2` 173 for more information. 174 175 ## Examples 176 177 iex> {:ok, pid} = Postgrex.start_link(database: "postgres") 178 {:ok, #PID<0.69.0>} 179 180 Run a query after connection has been established: 181 182 iex> {:ok, pid} = Postgrex.start_link(after_connect: &Postgrex.query!(&1, "SET TIME ZONE 'UTC';", [])) 183 {:ok, #PID<0.69.0>} 184 185 Connect to postgres instance through a unix domain socket 186 187 iex> {:ok, pid} = Postgrex.start_link(socket_dir: "/tmp", database: "postgres") 188 {:ok, #PID<0.69.0>} 189 190 ## SSL client authentication 191 192 When connecting to Postgres or CockroachDB instances over SSL it is idiomatic to use 193 certificate authentication. Config files do not allowing passing functions, 194 so use the `init` callback of the Ecto supervisor. 195 196 In your Repository configuration: 197 198 config :app, App.Repo, 199 ssl: String.to_existing_atom(System.get_env("DB_SSL_ENABLED", "true")), 200 verify_ssl: true 201 202 And in App.Repo, set your `:ssl_opts`: 203 204 def init(_type, config) do 205 config = 206 if config[:verify_ssl] do 207 Keyword.put(config, :ssl_opts, my_ssl_opts(config[:hostname])) 208 else 209 config 210 end 211 212 {:ok, config} 213 end 214 215 def my_ssl_opts(server) do 216 [ 217 verify: :verify_peer, 218 cacertfile: System.get_env("DB_CA_CERT_FILE"), 219 server_name_indication: String.to_charlist(server), 220 customize_hostname_check: [match_fun: :public_key.pkix_verify_hostname_match_fun(:https)], 221 depth: 3 222 ] 223 end 224 225 ## PgBouncer 226 227 When using PgBouncer with transaction or statement pooling named prepared 228 queries can not be used because the bouncer may route requests from 229 the same postgrex connection to different PostgreSQL backend processes 230 and discards named queries after the transactions closes. 231 To force unnamed prepared queries set the `:prepare` option to `:unnamed`. 232 233 ## Handling failover 234 235 Some services, such as AWS Aurora, support failovers. The 2 options 236 `endpoints` and `target_server_type` can be used together to achieve a faster fail-over. 237 238 Imagine an AWS Aurora cluster named "test" with 2 instances. Use the 239 following options minimize downtime by ensuring that Postgrex connects to 240 the new primary instance as soon as possible. 241 242 {:ok, pid} = Postgrex.start_link( 243 endpoints: [ 244 {"test.cluster-xyz.eu-west-1.rds.amazonaws.com", 5432}, 245 {"test.cluster-ro-xyz.eu-west-1.rds.amazonaws.com", 5432} 246 ], 247 target_server_type: :primary, 248 (...) 249 ) 250 251 In the event of a fail-over, Postgrex gets first disconnected from what used 252 to be the primary instance. The primary instance will then reboot and turn into 253 a secondary instance. Meanwhile, one of the secondary instances will have turned 254 into the new primary instance. However, the DNS entry of the primary endpoint 255 provided by AWS can take some time to get updated. That is why it can be faster to 256 let Postgrex iterate over all the instances of the cluster to find the new 257 primary instance instead of waiting for the DNS update. 258 259 If the cluster does not have DNS-backed primary and secondary endpoints (like the 260 ones provided by AWS Aurora) or if the cluster is made of more than 2 instances, 261 the hostname (and port) of all of the individual instances can be specified 262 in the `endpoints` list: 263 264 endpoints: [ 265 {"test-instance-1.xyz.eu-west-1.rds.amazonaws.com", 5432}, 266 {"test-instance-2.xyz.eu-west-1.rds.amazonaws.com", 5432}, 267 (...), 268 {"test-instance-N.xyz.eu-west-1.rds.amazonaws.com", 5432} 269 ] 270 271 ### Failover with SSL support 272 273 As specified in Erlang [:ssl.connect](https://erlang.org/doc/man/ssl.html#connect-3), 274 host verification using `:public_key.pkix_verify_hostname_match_fun(:https)` 275 requires that the ssl_opt `server_name_indication` is set, and for this reason, 276 the aforementioned `endpoints` list can become a three element tuple as: 277 278 endpoints: [ 279 { 280 "test-instance-1.xyz.eu-west-1.rds.amazonaws.com", 281 5432, 282 [ssl: [server_name_indication: String.to_charlist("test-instance-1.xyz.eu-west-1.rds.amazonaws.com")]] 283 }, 284 (...), 285 { 286 "test-instance-2.xyz.eu-west-1.rds.amazonaws.com", 287 5432, 288 [ssl: [server_name_indication: String.to_charlist("test-instance-2.xyz.eu-west-1.rds.amazonaws.com")]] 289 } 290 ] 291 292 """ 293 @spec start_link([start_option]) :: {:ok, pid} | {:error, Postgrex.Error.t() | term} 294 def start_link(opts) do 295 ensure_deps_started!(opts) 296 opts = Postgrex.Utils.default_opts(opts) 297 DBConnection.start_link(Postgrex.Protocol, opts) 298 end 299 300 @doc """ 301 Runs an (extended) query and returns the result as `{:ok, %Postgrex.Result{}}` 302 or `{:error, %Postgrex.Error{}}` if there was a database error. Parameters can 303 be set in the query as `$1` embedded in the query string. Parameters are given 304 as a list of elixir values. See the README for information on how Postgrex 305 encodes and decodes Elixir values by default. See `Postgrex.Result` for the 306 result data. 307 308 This function may still raise an exception if there is an issue with types 309 (`ArgumentError`), connection (`DBConnection.ConnectionError`), ownership 310 (`DBConnection.OwnershipError`) or other error (`RuntimeError`). 311 312 ## Options 313 314 * `:queue` - Whether to wait for connection in a queue (default: `true`); 315 * `:timeout` - Query request timeout (default: `#{@timeout}`); 316 * `:decode_mapper` - Fun to map each row in the result to a term after 317 decoding, (default: `fn x -> x end`); 318 * `:mode` - set to `:savepoint` to use a savepoint to rollback to before the 319 query on error, otherwise set to `:transaction` (default: `:transaction`); 320 * `:cache_statement` - Caches the query with the given name 321 322 ## Examples 323 324 Postgrex.query(conn, "CREATE TABLE posts (id serial, title text)", []) 325 326 Postgrex.query(conn, "INSERT INTO posts (title) VALUES ('my title')", []) 327 328 Postgrex.query(conn, "SELECT title FROM posts", []) 329 330 Postgrex.query(conn, "SELECT id FROM posts WHERE title like $1", ["%my%"]) 331 332 Postgrex.query(conn, "COPY posts TO STDOUT", []) 333 """ 334 @spec query(conn, iodata, list, [execute_option]) :: 335 {:ok, Postgrex.Result.t()} | {:error, Exception.t()} 336 def query(conn, statement, params, opts \\ []) do 337 if name = Keyword.get(opts, :cache_statement) do 338 query = %Query{name: name, cache: :statement, statement: IO.iodata_to_binary(statement)} 339 340 case DBConnection.prepare_execute(conn, query, params, opts) do 341 {:ok, _, result} -> 342 {:ok, result} 343 344 {:error, %Postgrex.Error{postgres: %{code: :feature_not_supported}}} = error -> 345 with %DBConnection{} <- conn, 346 :error <- DBConnection.status(conn) do 347 error 348 else 349 _ -> query_prepare_execute(conn, query, params, opts) 350 end 351 352 {:error, _} = error -> 353 error 354 end 355 else 356 query_prepare_execute(conn, %Query{name: "", statement: statement}, params, opts) 357 end 358 end 359 360 defp query_prepare_execute(conn, query, params, opts) do 361 case DBConnection.prepare_execute(conn, query, params, opts) do 362 {:ok, _, result} -> {:ok, result} 363 {:error, _} = error -> error 364 end 365 end 366 367 @doc """ 368 Runs an (extended) query and returns the result or raises `Postgrex.Error` if 369 there was an error. See `query/3`. 370 """ 371 @spec query!(conn, iodata, list, [execute_option]) :: Postgrex.Result.t() 372 def query!(conn, statement, params, opts \\ []) do 373 case query(conn, statement, params, opts) do 374 {:ok, result} -> result 375 {:error, err} -> raise err 376 end 377 end 378 379 @doc """ 380 Prepares an (extended) query and returns the result as 381 `{:ok, %Postgrex.Query{}}` or `{:error, %Postgrex.Error{}}` if there was an 382 error. Parameters can be set in the query as `$1` embedded in the query 383 string. To execute the query call `execute/4`. To close the prepared query 384 call `close/3`. See `Postgrex.Query` for the query data. 385 386 This function may still raise an exception if there is an issue with types 387 (`ArgumentError`), connection (`DBConnection.ConnectionError`), ownership 388 (`DBConnection.OwnershipError`) or other error (`RuntimeError`). 389 390 ## Options 391 392 * `:queue` - Whether to wait for connection in a queue (default: `true`); 393 * `:timeout` - Prepare request timeout (default: `#{@timeout}`); 394 * `:mode` - set to `:savepoint` to use a savepoint to rollback to before the 395 prepare on error, otherwise set to `:transaction` (default: `:transaction`); 396 397 ## Examples 398 399 Postgrex.prepare(conn, "", "CREATE TABLE posts (id serial, title text)") 400 """ 401 @spec prepare(conn, iodata, iodata, [option]) :: 402 {:ok, Postgrex.Query.t()} | {:error, Exception.t()} 403 def prepare(conn, name, statement, opts \\ []) do 404 query = %Query{name: name, statement: statement} 405 opts = Keyword.put(opts, :postgrex_prepare, true) 406 DBConnection.prepare(conn, query, opts) 407 end 408 409 @doc """ 410 Prepares an (extended) query and returns the prepared query or raises 411 `Postgrex.Error` if there was an error. See `prepare/4`. 412 """ 413 @spec prepare!(conn, iodata, iodata, [option]) :: Postgrex.Query.t() 414 def prepare!(conn, name, statement, opts \\ []) do 415 opts = Keyword.put(opts, :postgrex_prepare, true) 416 DBConnection.prepare!(conn, %Query{name: name, statement: statement}, opts) 417 end 418 419 @doc """ 420 Prepares and executes a query in a single step. 421 422 It returns the result as `{:ok, %Postgrex.Query{}, %Postgrex.Result{}}` or 423 `{:error, %Postgrex.Error{}}` if there was an error. Parameters are given as 424 part of the prepared query, `%Postgrex.Query{}`. 425 426 See the README for information on how Postgrex encodes and decodes Elixir 427 values by default. See `Postgrex.Query` for the query data and 428 `Postgrex.Result` for the result data. 429 430 ## Options 431 432 * `:queue` - Whether to wait for connection in a queue (default: `true`); 433 * `:timeout` - Execute request timeout (default: `#{@timeout}`); 434 * `:decode_mapper` - Fun to map each row in the result to a term after 435 decoding, (default: `fn x -> x end`); 436 * `:mode` - set to `:savepoint` to use a savepoint to rollback to before the 437 execute on error, otherwise set to `:transaction` (default: `:transaction`); 438 439 ## Examples 440 441 Postgrex.prepare_execute(conn, "", "SELECT id FROM posts WHERE title like $1", ["%my%"]) 442 443 """ 444 @spec prepare_execute(conn, iodata, iodata, list, [execute_option]) :: 445 {:ok, Postgrex.Query.t(), Postgrex.Result.t()} | {:error, Postgrex.Error.t()} 446 def prepare_execute(conn, name, statement, params, opts \\ []) do 447 query = %Query{name: name, statement: statement} 448 DBConnection.prepare_execute(conn, query, params, opts) 449 end 450 451 @doc """ 452 Prepares and runs a query and returns the result or raises 453 `Postgrex.Error` if there was an error. See `prepare_execute/5`. 454 """ 455 @spec prepare_execute!(conn, iodata, iodata, list, [execute_option]) :: 456 {Postgrex.Query.t(), Postgrex.Result.t()} 457 def prepare_execute!(conn, name, statement, params, opts \\ []) do 458 query = %Query{name: name, statement: statement} 459 DBConnection.prepare_execute!(conn, query, params, opts) 460 end 461 462 @doc """ 463 Runs an (extended) prepared query. 464 465 It returns the result as `{:ok, %Postgrex.Query{}, %Postgrex.Result{}}` or 466 `{:error, %Postgrex.Error{}}` if there was an error. Parameters are given as 467 part of the prepared query, `%Postgrex.Query{}`. 468 469 See the README for information on how Postgrex encodes and decodes Elixir 470 values by default. See `Postgrex.Query` for the query data and 471 `Postgrex.Result` for the result data. 472 473 ## Options 474 475 * `:queue` - Whether to wait for connection in a queue (default: `true`); 476 * `:timeout` - Execute request timeout (default: `#{@timeout}`); 477 * `:decode_mapper` - Fun to map each row in the result to a term after 478 decoding, (default: `fn x -> x end`); 479 * `:mode` - set to `:savepoint` to use a savepoint to rollback to before the 480 execute on error, otherwise set to `:transaction` (default: `:transaction`); 481 482 ## Examples 483 484 query = Postgrex.prepare!(conn, "", "CREATE TABLE posts (id serial, title text)") 485 Postgrex.execute(conn, query, []) 486 487 query = Postgrex.prepare!(conn, "", "SELECT id FROM posts WHERE title like $1") 488 Postgrex.execute(conn, query, ["%my%"]) 489 """ 490 @spec execute(conn, Postgrex.Query.t(), list, [execute_option]) :: 491 {:ok, Postgrex.Query.t(), Postgrex.Result.t()} | {:error, Postgrex.Error.t()} 492 def execute(conn, query, params, opts \\ []) do 493 DBConnection.execute(conn, query, params, opts) 494 end 495 496 @doc """ 497 Runs an (extended) prepared query and returns the result or raises 498 `Postgrex.Error` if there was an error. See `execute/4`. 499 """ 500 @spec execute!(conn, Postgrex.Query.t(), list, [execute_option]) :: 501 Postgrex.Result.t() 502 def execute!(conn, query, params, opts \\ []) do 503 DBConnection.execute!(conn, query, params, opts) 504 end 505 506 @doc """ 507 Closes an (extended) prepared query and returns `:ok` or 508 `{:error, %Postgrex.Error{}}` if there was an error. Closing a query releases 509 any resources held by postgresql for a prepared query with that name. See 510 `Postgrex.Query` for the query data. 511 512 This function may still raise an exception if there is an issue with types 513 (`ArgumentError`), connection (`DBConnection.ConnectionError`), ownership 514 (`DBConnection.OwnershipError`) or other error (`RuntimeError`). 515 516 ## Options 517 518 * `:queue` - Whether to wait for connection in a queue (default: `true`); 519 * `:timeout` - Close request timeout (default: `#{@timeout}`); 520 * `:mode` - set to `:savepoint` to use a savepoint to rollback to before the 521 close on error, otherwise set to `:transaction` (default: `:transaction`); 522 523 ## Examples 524 525 query = Postgrex.prepare!(conn, "", "CREATE TABLE posts (id serial, title text)") 526 Postgrex.close(conn, query) 527 """ 528 @spec close(conn, Postgrex.Query.t(), [option]) :: :ok | {:error, Exception.t()} 529 def close(conn, query, opts \\ []) do 530 with {:ok, _} <- DBConnection.close(conn, query, opts) do 531 :ok 532 end 533 end 534 535 @doc """ 536 Closes an (extended) prepared query and returns `:ok` or raises 537 `Postgrex.Error` if there was an error. See `close/3`. 538 """ 539 @spec close!(conn, Postgrex.Query.t(), [option]) :: :ok 540 def close!(conn, query, opts \\ []) do 541 DBConnection.close!(conn, query, opts) 542 :ok 543 end 544 545 @doc """ 546 Acquire a lock on a connection and run a series of requests inside a 547 transaction. The result of the transaction fun is return inside an `:ok` 548 tuple: `{:ok, result}`. 549 550 To use the locked connection call the request with the connection 551 reference passed as the single argument to the `fun`. If the 552 connection disconnects all future calls using that connection 553 reference will fail. 554 555 `rollback/2` rolls back the transaction and causes the function to 556 return `{:error, reason}`. 557 558 `transaction/3` can be nested multiple times if the connection 559 reference is used to start a nested transaction. The top level 560 transaction function is the actual transaction. 561 562 ## Options 563 564 * `:queue` - Whether to wait for connection in a queue (default: `true`); 565 * `:timeout` - Transaction timeout (default: `#{@timeout}`); 566 * `:mode` - Set to `:savepoint` to use savepoints instead of an SQL 567 transaction, otherwise set to `:transaction` (default: `:transaction`); 568 569 The `:timeout` is for the duration of the transaction and all nested 570 transactions and requests. This timeout overrides timeouts set by internal 571 transactions and requests. The `:mode` will be used for all requests inside 572 the transaction function. 573 574 ## Example 575 576 {:ok, res} = Postgrex.transaction(pid, fn(conn) -> 577 Postgrex.query!(conn, "SELECT title FROM posts", []) 578 end) 579 """ 580 @spec transaction(conn, (DBConnection.t() -> result), [option]) :: 581 {:ok, result} | {:error, any} 582 when result: var 583 def transaction(conn, fun, opts \\ []) do 584 DBConnection.transaction(conn, fun, opts) 585 end 586 587 @doc """ 588 Rollback a transaction, does not return. 589 590 Aborts the current transaction fun. If inside multiple `transaction/3` 591 functions, bubbles up to the top level. 592 593 ## Example 594 595 {:error, :oops} = Postgrex.transaction(pid, fn(conn) -> 596 DBConnection.rollback(conn, :bar) 597 IO.puts "never reaches here!" 598 end) 599 """ 600 @spec rollback(DBConnection.t(), reason :: any) :: no_return() 601 defdelegate rollback(conn, reason), to: DBConnection 602 603 @doc """ 604 Returns a cached map of connection parameters. 605 606 ## Options 607 608 * `:timeout` - Call timeout (default: `#{@timeout}`) 609 610 """ 611 @spec parameters(conn, [option]) :: %{binary => binary} 612 when option: {:timeout, timeout} 613 def parameters(conn, opts \\ []) do 614 DBConnection.execute!(conn, %Postgrex.Parameters{}, nil, opts) 615 end 616 617 @doc """ 618 Returns a supervisor child specification for a DBConnection pool. 619 """ 620 @spec child_spec([start_option]) :: :supervisor.child_spec() 621 def child_spec(opts) do 622 ensure_deps_started!(opts) 623 opts = Postgrex.Utils.default_opts(opts) 624 DBConnection.child_spec(Postgrex.Protocol, opts) 625 end 626 627 @doc """ 628 Returns a stream for a query on a connection. 629 630 Stream consumes memory in chunks of at most `max_rows` rows (see Options). 631 This is useful for processing _large_ datasets. 632 633 A stream must be wrapped in a transaction and may be used as an `Enumerable` 634 or a `Collectable`. 635 636 When used as an `Enumerable` with a `COPY .. TO STDOUT` SQL query no other 637 queries or streams can be interspersed until the copy has finished. Otherwise 638 it is possible to intersperse enumerable streams and queries. 639 640 When used as a `Collectable` the values are passed as copy data with the 641 query. No other queries or streams can be interspersed until the copy has 642 finished. If the query is not copying to the database the copy data will still 643 be sent but is silently discarded. 644 645 ### Options 646 647 * `:max_rows` - Maximum numbers of rows in a result (default to `#{@max_rows}`) 648 * `:decode_mapper` - Fun to map each row in the result to a term after 649 decoding, (default: `fn x -> x end`); 650 * `:mode` - set to `:savepoint` to use a savepoint to rollback to before an 651 execute on error, otherwise set to `:transaction` (default: `:transaction`); 652 653 ## Examples 654 655 Postgrex.transaction(pid, fn(conn) -> 656 query = Postgrex.prepare!(conn, "", "COPY posts TO STDOUT") 657 stream = Postgrex.stream(conn, query, []) 658 result_to_iodata = fn(%Postgrex.Result{rows: rows}) -> rows end 659 Enum.into(stream, File.stream!("posts"), result_to_iodata) 660 end) 661 662 Postgrex.transaction(pid, fn(conn) -> 663 stream = Postgrex.stream(conn, "COPY posts FROM STDIN", []) 664 Enum.into(File.stream!("posts"), stream) 665 end) 666 """ 667 @spec stream(DBConnection.t(), iodata | Postgrex.Query.t(), list, [option]) :: 668 Postgrex.Stream.t() 669 when option: execute_option | {:max_rows, pos_integer} 670 def stream(%DBConnection{} = conn, query, params, options \\ []) do 671 options = Keyword.put_new(options, :max_rows, @max_rows) 672 %Postgrex.Stream{conn: conn, query: query, params: params, options: options} 673 end 674 675 ## Helpers 676 677 defp ensure_deps_started!(opts) do 678 if Keyword.get(opts, :ssl, false) and 679 not List.keymember?(:application.which_applications(), :ssl, 0) do 680 raise """ 681 SSL connection can not be established because `:ssl` application is not started, 682 you can add it to `extra_applications` in your `mix.exs`: 683 684 def application do 685 [extra_applications: [:ssl]] 686 end 687 """ 688 end 689 end 690 end