zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

postgrex.ex (27347B)


      1 defmodule Postgrex do
      2   @moduledoc """
      3   PostgreSQL driver for Elixir.
      4 
      5   Postgrex is a partial implementation of the Postgres [frontend/backend
      6   message protocol](https://www.postgresql.org/docs/current/protocol.html).
      7   It performs wire messaging in Elixir, as opposed to binding to a library
      8   such as `libpq` in C.
      9 
     10   A Postgrex query is performed as "[extended query](https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY)".
     11   An "extended query"  involves separate server-side parse, bind, and execute
     12   stages, each of which may be re-used for efficiency. For example, libraries
     13   like Ecto caches queries, so a query only has to be parsed and planned once.
     14   This is all done via wire messaging, without relying on `PREPARE q AS (...)`
     15   and `EXECUTE q()` SQL statements directly.
     16 
     17   This module handles the connection to PostgreSQL, providing support
     18   for queries, transactions, connection backoff, logging, pooling and
     19   more.
     20 
     21   Note that the notifications API (pub/sub) supported by PostgreSQL is
     22   handled by `Postgrex.Notifications`. Hence, to use this feature,
     23   you need to start a separate (notifications) connection.
     24   """
     25 
     26   alias Postgrex.Query
     27 
     28   @typedoc """
     29   A connection process name, pid or reference.
     30 
     31   A connection reference is used when making multiple requests to the same
     32   connection, see `transaction/3`.
     33   """
     34   @type conn :: DBConnection.conn()
     35 
     36   @type start_option ::
     37           {:hostname, String.t()}
     38           | {:endpoints, [tuple()]}
     39           | {:socket_dir, Path.t()}
     40           | {:socket, Path.t()}
     41           | {:port, :inet.port_number()}
     42           | {:database, String.t()}
     43           | {:username, String.t()}
     44           | {:password, String.t()}
     45           | {:parameters, keyword}
     46           | {:timeout, timeout}
     47           | {:connect_timeout, timeout}
     48           | {:handshake_timeout, timeout}
     49           | {:ping_timeout, timeout}
     50           | {:ssl, boolean}
     51           | {:ssl_opts, [:ssl.tls_client_option()]}
     52           | {:socket_options, [:gen_tcp.connect_option()]}
     53           | {:prepare, :named | :unnamed}
     54           | {:transactions, :strict | :naive}
     55           | {:types, module}
     56           | {:search_path, [String.t()]}
     57           | {:disconnect_on_error_codes, [atom]}
     58           | DBConnection.start_option()
     59 
     60   @type option ::
     61           {:mode, :transaction | :savepoint}
     62           | DBConnection.option()
     63 
     64   @type execute_option ::
     65           {:decode_mapper, (list -> term)}
     66           | option
     67 
     68   @max_rows 500
     69   @timeout 15_000
     70 
     71   ### PUBLIC API ###
     72 
     73   @doc """
     74   Start the connection process and connect to postgres.
     75 
     76   ## Options
     77 
     78   Postgrex provides multiple ways to connect to the server, listed in order of
     79   precedence below:
     80 
     81     * `:hostname` - Server hostname (default: PGHOST env variable, then localhost);
     82     * `:port` - Server port (default: PGPORT env variable, then 5432);
     83     * `:endpoints` - A list of endpoints (host and port pairs, with an optional
     84       extra_opts keyword list);
     85       Postgrex will try each endpoint in order, one by one, until the connection succeeds;
     86       The syntax is `[{host1, port1},{host2, port2},{host3, port3}]` or
     87       `[{host1, port1, extra_opt1: value},{host2, port2, extra_opt2: value}}]`;
     88       This option takes precedence over `:hostname+:port`;
     89     * `:socket_dir` - Connect to PostgreSQL via UNIX sockets in the given directory;
     90       The socket name is derived based on the port. This is the preferred method
     91       for configuring sockets and it takes precedence over the hostname. If you are
     92       connecting to a socket outside of the PostgreSQL convention, use `:socket` instead;
     93     * `:socket` - Connect to PostgreSQL via UNIX sockets in the given path.
     94       This option takes precedence over the `:hostname`, `:endpoints` and `:socket_dir`;
     95 
     96   Once a server is specified, you can configure the connection with the following:
     97 
     98     * `:database` - Database (default: PGDATABASE env variable; otherwise required);
     99 
    100     * `:username` - Username (default: PGUSER env variable, then USER env var);
    101 
    102     * `:password` - User password (default: PGPASSWORD env variable);
    103 
    104     * `:parameters` - Keyword list of connection parameters;
    105 
    106     * `:timeout` - Socket receive timeout when idle in milliseconds (default:
    107     `#{@timeout}`);
    108 
    109     * `:connect_timeout` - Socket connect timeout in milliseconds (defaults to
    110       `:timeout` value);
    111 
    112     * `:handshake_timeout` - Connection handshake timeout in milliseconds
    113       (defaults to `:timeout` value);
    114 
    115     * `:ping_timeout` - Socket receive timeout when idle in milliseconds (defaults to
    116       `:timeout` value);
    117 
    118     * `:idle_interval` - Ping connections after a period of inactivity in milliseconds.
    119       Defaults to 1000ms;
    120 
    121     * `:ssl` - Set to `true` if ssl should be used (default: `false`);
    122 
    123     * `:ssl_opts` - A list of ssl options, see the
    124       [`tls_client_option`](http://erlang.org/doc/man/ssl.html#type-tls_client_option)
    125       from the ssl docs;
    126 
    127     * `:socket_options` - Options to be given to the underlying socket
    128       (applies to both TCP and UNIX sockets);
    129 
    130     * `:target_server_type` - Allows opening connections to a server in the given
    131       replica mode. The allowed values are `:any`, `:primary` and `:secondary`
    132       (default: `:any`). If this option is used together with `endpoints`, we will
    133       traverse all endpoints until we find an endpoint matching the server type;
    134 
    135     * `:disconnect_on_error_codes` - List of error code atoms that when encountered
    136       will disconnect the connection. This is useful when using Postgrex against systems that
    137       support failover, which when it occurs will emit certain error codes
    138       e.g. `:read_only_sql_transaction` (default: `[]`);
    139 
    140     * `:show_sensitive_data_on_connection_error` - By default, `Postgrex`
    141       hides all information during connection errors to avoid leaking credentials
    142       or other sensitive information. You can set this option if you wish to
    143       see complete errors and stacktraces during connection errors;
    144 
    145   The following options controls the pool and other Postgrex features:
    146 
    147     * `:prepare` - How to prepare queries, either `:named` to use named queries
    148     or `:unnamed` to force unnamed queries (default: `:named`);
    149 
    150     * `:transactions` - Set to `:strict` to error on unexpected transaction
    151       state, otherwise set to `:naive` (default: `:strict`);
    152 
    153     * `:pool` - The pool module to use, defaults to `DBConnection.ConnectionPool`.
    154       See the pool documentation for more options. The default `:pool_size` for
    155       the default pool is 1. If you set a different pool, this option must be
    156       included with all requests contacting the pool;
    157 
    158     * `:types` - The types module to use, see `Postgrex.Types.define/3`, this
    159       option is only required when using custom encoding or decoding (default:
    160       `Postgrex.DefaultTypes`);
    161 
    162     * `:search_path` - A list of strings used to set the search path for the connection.
    163       This is useful when, for instance, an extension like `citext` is installed in a
    164       separate schema. If that schema is not in the connection's search path, Postgrex
    165       might not be able to recognize the extension's data type. When this option is `nil`,
    166       the search path is not modified. (default: `nil`).
    167       See the [PostgreSQL docs](https://www.postgresql.org/docs/current/ddl-schemas.html#DDL-SCHEMAS-PATH)
    168       for more details.
    169 
    170 
    171   `Postgrex` uses the `DBConnection` library and supports all `DBConnection`
    172   options like `:idle`, `:after_connect` etc. See `DBConnection.start_link/2`
    173   for more information.
    174 
    175   ## Examples
    176 
    177       iex> {:ok, pid} = Postgrex.start_link(database: "postgres")
    178       {:ok, #PID<0.69.0>}
    179 
    180   Run a query after connection has been established:
    181 
    182       iex> {:ok, pid} = Postgrex.start_link(after_connect: &Postgrex.query!(&1, "SET TIME ZONE 'UTC';", []))
    183       {:ok, #PID<0.69.0>}
    184 
    185   Connect to postgres instance through a unix domain socket
    186 
    187       iex> {:ok, pid} = Postgrex.start_link(socket_dir: "/tmp", database: "postgres")
    188       {:ok, #PID<0.69.0>}
    189 
    190   ## SSL client authentication
    191 
    192   When connecting to Postgres or CockroachDB instances over SSL it is idiomatic to use
    193   certificate authentication. Config files do not allowing passing functions,
    194   so use the `init` callback of the Ecto supervisor.
    195 
    196   In your Repository configuration:
    197 
    198       config :app, App.Repo,
    199         ssl: String.to_existing_atom(System.get_env("DB_SSL_ENABLED", "true")),
    200         verify_ssl: true
    201 
    202   And in App.Repo, set your `:ssl_opts`:
    203 
    204       def init(_type, config) do
    205         config =
    206           if config[:verify_ssl] do
    207             Keyword.put(config, :ssl_opts, my_ssl_opts(config[:hostname]))
    208           else
    209             config
    210           end
    211 
    212         {:ok, config}
    213       end
    214 
    215       def my_ssl_opts(server) do
    216         [
    217           verify: :verify_peer,
    218           cacertfile: System.get_env("DB_CA_CERT_FILE"),
    219           server_name_indication: String.to_charlist(server),
    220           customize_hostname_check: [match_fun: :public_key.pkix_verify_hostname_match_fun(:https)],
    221           depth: 3
    222         ]
    223       end
    224 
    225   ## PgBouncer
    226 
    227   When using PgBouncer with transaction or statement pooling named prepared
    228   queries can not be used because the bouncer may route requests from
    229   the same postgrex connection to different PostgreSQL backend processes
    230   and discards named queries after the transactions closes.
    231   To force unnamed prepared queries set the `:prepare` option to `:unnamed`.
    232 
    233   ## Handling failover
    234 
    235   Some services, such as AWS Aurora, support failovers. The 2 options
    236   `endpoints` and `target_server_type` can be used together to achieve a faster fail-over.
    237 
    238   Imagine an AWS Aurora cluster named "test" with 2 instances. Use the
    239   following options minimize downtime by ensuring that Postgrex connects to
    240   the new primary instance as soon as possible.
    241 
    242       {:ok, pid} = Postgrex.start_link(
    243         endpoints: [
    244           {"test.cluster-xyz.eu-west-1.rds.amazonaws.com", 5432},
    245           {"test.cluster-ro-xyz.eu-west-1.rds.amazonaws.com", 5432}
    246         ],
    247         target_server_type: :primary,
    248         (...)
    249       )
    250 
    251   In the event of a fail-over, Postgrex gets first disconnected from what used
    252   to be the primary instance. The primary instance will then reboot and turn into
    253   a secondary instance. Meanwhile, one of the secondary instances will have turned
    254   into the new primary instance. However, the DNS entry of the primary endpoint
    255   provided by AWS can take some time to get updated. That is why it can be faster to
    256   let Postgrex iterate over all the instances of the cluster to find the new
    257   primary instance instead of waiting for the DNS update.
    258 
    259   If the cluster does not have DNS-backed primary and secondary endpoints (like the
    260   ones provided by AWS Aurora) or if the cluster is made of more than 2 instances,
    261   the hostname (and port) of all of the individual instances can be specified
    262   in the `endpoints` list:
    263 
    264       endpoints: [
    265         {"test-instance-1.xyz.eu-west-1.rds.amazonaws.com", 5432},
    266         {"test-instance-2.xyz.eu-west-1.rds.amazonaws.com", 5432},
    267         (...),
    268         {"test-instance-N.xyz.eu-west-1.rds.amazonaws.com", 5432}
    269       ]
    270 
    271   ### Failover with SSL support
    272 
    273   As specified in Erlang [:ssl.connect](https://erlang.org/doc/man/ssl.html#connect-3),
    274   host verification using `:public_key.pkix_verify_hostname_match_fun(:https)`
    275   requires that the ssl_opt `server_name_indication` is set, and for this reason,
    276   the aforementioned `endpoints` list can become a three element tuple as:
    277 
    278       endpoints: [
    279         {
    280           "test-instance-1.xyz.eu-west-1.rds.amazonaws.com",
    281           5432,
    282           [ssl: [server_name_indication: String.to_charlist("test-instance-1.xyz.eu-west-1.rds.amazonaws.com")]]
    283         },
    284         (...),
    285         {
    286           "test-instance-2.xyz.eu-west-1.rds.amazonaws.com",
    287           5432,
    288           [ssl: [server_name_indication: String.to_charlist("test-instance-2.xyz.eu-west-1.rds.amazonaws.com")]]
    289         }
    290       ]
    291 
    292   """
    293   @spec start_link([start_option]) :: {:ok, pid} | {:error, Postgrex.Error.t() | term}
    294   def start_link(opts) do
    295     ensure_deps_started!(opts)
    296     opts = Postgrex.Utils.default_opts(opts)
    297     DBConnection.start_link(Postgrex.Protocol, opts)
    298   end
    299 
    300   @doc """
    301   Runs an (extended) query and returns the result as `{:ok, %Postgrex.Result{}}`
    302   or `{:error, %Postgrex.Error{}}` if there was a database error. Parameters can
    303   be set in the query as `$1` embedded in the query string. Parameters are given
    304   as a list of elixir values. See the README for information on how Postgrex
    305   encodes and decodes Elixir values by default. See `Postgrex.Result` for the
    306   result data.
    307 
    308   This function may still raise an exception if there is an issue with types
    309   (`ArgumentError`), connection (`DBConnection.ConnectionError`), ownership
    310   (`DBConnection.OwnershipError`) or other error (`RuntimeError`).
    311 
    312   ## Options
    313 
    314     * `:queue` - Whether to wait for connection in a queue (default: `true`);
    315     * `:timeout` - Query request timeout (default: `#{@timeout}`);
    316     * `:decode_mapper` - Fun to map each row in the result to a term after
    317     decoding, (default: `fn x -> x end`);
    318     * `:mode` - set to `:savepoint` to use a savepoint to rollback to before the
    319     query on error, otherwise set to `:transaction` (default: `:transaction`);
    320     * `:cache_statement` - Caches the query with the given name
    321 
    322   ## Examples
    323 
    324       Postgrex.query(conn, "CREATE TABLE posts (id serial, title text)", [])
    325 
    326       Postgrex.query(conn, "INSERT INTO posts (title) VALUES ('my title')", [])
    327 
    328       Postgrex.query(conn, "SELECT title FROM posts", [])
    329 
    330       Postgrex.query(conn, "SELECT id FROM posts WHERE title like $1", ["%my%"])
    331 
    332       Postgrex.query(conn, "COPY posts TO STDOUT", [])
    333   """
    334   @spec query(conn, iodata, list, [execute_option]) ::
    335           {:ok, Postgrex.Result.t()} | {:error, Exception.t()}
    336   def query(conn, statement, params, opts \\ []) do
    337     if name = Keyword.get(opts, :cache_statement) do
    338       query = %Query{name: name, cache: :statement, statement: IO.iodata_to_binary(statement)}
    339 
    340       case DBConnection.prepare_execute(conn, query, params, opts) do
    341         {:ok, _, result} ->
    342           {:ok, result}
    343 
    344         {:error, %Postgrex.Error{postgres: %{code: :feature_not_supported}}} = error ->
    345           with %DBConnection{} <- conn,
    346                :error <- DBConnection.status(conn) do
    347             error
    348           else
    349             _ -> query_prepare_execute(conn, query, params, opts)
    350           end
    351 
    352         {:error, _} = error ->
    353           error
    354       end
    355     else
    356       query_prepare_execute(conn, %Query{name: "", statement: statement}, params, opts)
    357     end
    358   end
    359 
    360   defp query_prepare_execute(conn, query, params, opts) do
    361     case DBConnection.prepare_execute(conn, query, params, opts) do
    362       {:ok, _, result} -> {:ok, result}
    363       {:error, _} = error -> error
    364     end
    365   end
    366 
    367   @doc """
    368   Runs an (extended) query and returns the result or raises `Postgrex.Error` if
    369   there was an error. See `query/3`.
    370   """
    371   @spec query!(conn, iodata, list, [execute_option]) :: Postgrex.Result.t()
    372   def query!(conn, statement, params, opts \\ []) do
    373     case query(conn, statement, params, opts) do
    374       {:ok, result} -> result
    375       {:error, err} -> raise err
    376     end
    377   end
    378 
    379   @doc """
    380   Prepares an (extended) query and returns the result as
    381   `{:ok, %Postgrex.Query{}}` or `{:error, %Postgrex.Error{}}` if there was an
    382   error. Parameters can be set in the query as `$1` embedded in the query
    383   string. To execute the query call `execute/4`. To close the prepared query
    384   call `close/3`. See `Postgrex.Query` for the query data.
    385 
    386   This function may still raise an exception if there is an issue with types
    387   (`ArgumentError`), connection (`DBConnection.ConnectionError`), ownership
    388   (`DBConnection.OwnershipError`) or other error (`RuntimeError`).
    389 
    390   ## Options
    391 
    392     * `:queue` - Whether to wait for connection in a queue (default: `true`);
    393     * `:timeout` - Prepare request timeout (default: `#{@timeout}`);
    394     * `:mode` - set to `:savepoint` to use a savepoint to rollback to before the
    395     prepare on error, otherwise set to `:transaction` (default: `:transaction`);
    396 
    397   ## Examples
    398 
    399       Postgrex.prepare(conn, "", "CREATE TABLE posts (id serial, title text)")
    400   """
    401   @spec prepare(conn, iodata, iodata, [option]) ::
    402           {:ok, Postgrex.Query.t()} | {:error, Exception.t()}
    403   def prepare(conn, name, statement, opts \\ []) do
    404     query = %Query{name: name, statement: statement}
    405     opts = Keyword.put(opts, :postgrex_prepare, true)
    406     DBConnection.prepare(conn, query, opts)
    407   end
    408 
    409   @doc """
    410   Prepares an (extended) query and returns the prepared query or raises
    411   `Postgrex.Error` if there was an error. See `prepare/4`.
    412   """
    413   @spec prepare!(conn, iodata, iodata, [option]) :: Postgrex.Query.t()
    414   def prepare!(conn, name, statement, opts \\ []) do
    415     opts = Keyword.put(opts, :postgrex_prepare, true)
    416     DBConnection.prepare!(conn, %Query{name: name, statement: statement}, opts)
    417   end
    418 
    419   @doc """
    420   Prepares and executes a query in a single step.
    421 
    422   It returns the result as `{:ok, %Postgrex.Query{}, %Postgrex.Result{}}` or
    423   `{:error, %Postgrex.Error{}}` if there was an error. Parameters are given as
    424   part of the prepared query, `%Postgrex.Query{}`.
    425 
    426   See the README for information on how Postgrex encodes and decodes Elixir
    427   values by default. See `Postgrex.Query` for the query data and
    428   `Postgrex.Result` for the result data.
    429 
    430   ## Options
    431 
    432     * `:queue` - Whether to wait for connection in a queue (default: `true`);
    433     * `:timeout` - Execute request timeout (default: `#{@timeout}`);
    434     * `:decode_mapper` - Fun to map each row in the result to a term after
    435     decoding, (default: `fn x -> x end`);
    436     * `:mode` - set to `:savepoint` to use a savepoint to rollback to before the
    437     execute on error, otherwise set to `:transaction` (default: `:transaction`);
    438 
    439   ## Examples
    440 
    441       Postgrex.prepare_execute(conn, "", "SELECT id FROM posts WHERE title like $1", ["%my%"])
    442 
    443   """
    444   @spec prepare_execute(conn, iodata, iodata, list, [execute_option]) ::
    445           {:ok, Postgrex.Query.t(), Postgrex.Result.t()} | {:error, Postgrex.Error.t()}
    446   def prepare_execute(conn, name, statement, params, opts \\ []) do
    447     query = %Query{name: name, statement: statement}
    448     DBConnection.prepare_execute(conn, query, params, opts)
    449   end
    450 
    451   @doc """
    452   Prepares and runs a query and returns the result or raises
    453   `Postgrex.Error` if there was an error. See `prepare_execute/5`.
    454   """
    455   @spec prepare_execute!(conn, iodata, iodata, list, [execute_option]) ::
    456           {Postgrex.Query.t(), Postgrex.Result.t()}
    457   def prepare_execute!(conn, name, statement, params, opts \\ []) do
    458     query = %Query{name: name, statement: statement}
    459     DBConnection.prepare_execute!(conn, query, params, opts)
    460   end
    461 
    462   @doc """
    463   Runs an (extended) prepared query.
    464 
    465   It returns the result as `{:ok, %Postgrex.Query{}, %Postgrex.Result{}}` or
    466   `{:error, %Postgrex.Error{}}` if there was an error. Parameters are given as
    467   part of the prepared query, `%Postgrex.Query{}`.
    468 
    469   See the README for information on how Postgrex encodes and decodes Elixir
    470   values by default. See `Postgrex.Query` for the query data and
    471   `Postgrex.Result` for the result data.
    472 
    473   ## Options
    474 
    475     * `:queue` - Whether to wait for connection in a queue (default: `true`);
    476     * `:timeout` - Execute request timeout (default: `#{@timeout}`);
    477     * `:decode_mapper` - Fun to map each row in the result to a term after
    478     decoding, (default: `fn x -> x end`);
    479     * `:mode` - set to `:savepoint` to use a savepoint to rollback to before the
    480     execute on error, otherwise set to `:transaction` (default: `:transaction`);
    481 
    482   ## Examples
    483 
    484       query = Postgrex.prepare!(conn, "", "CREATE TABLE posts (id serial, title text)")
    485       Postgrex.execute(conn, query, [])
    486 
    487       query = Postgrex.prepare!(conn, "", "SELECT id FROM posts WHERE title like $1")
    488       Postgrex.execute(conn, query, ["%my%"])
    489   """
    490   @spec execute(conn, Postgrex.Query.t(), list, [execute_option]) ::
    491           {:ok, Postgrex.Query.t(), Postgrex.Result.t()} | {:error, Postgrex.Error.t()}
    492   def execute(conn, query, params, opts \\ []) do
    493     DBConnection.execute(conn, query, params, opts)
    494   end
    495 
    496   @doc """
    497   Runs an (extended) prepared query and returns the result or raises
    498   `Postgrex.Error` if there was an error. See `execute/4`.
    499   """
    500   @spec execute!(conn, Postgrex.Query.t(), list, [execute_option]) ::
    501           Postgrex.Result.t()
    502   def execute!(conn, query, params, opts \\ []) do
    503     DBConnection.execute!(conn, query, params, opts)
    504   end
    505 
    506   @doc """
    507   Closes an (extended) prepared query and returns `:ok` or
    508   `{:error, %Postgrex.Error{}}` if there was an error. Closing a query releases
    509   any resources held by postgresql for a prepared query with that name. See
    510   `Postgrex.Query` for the query data.
    511 
    512   This function may still raise an exception if there is an issue with types
    513   (`ArgumentError`), connection (`DBConnection.ConnectionError`), ownership
    514   (`DBConnection.OwnershipError`) or other error (`RuntimeError`).
    515 
    516   ## Options
    517 
    518     * `:queue` - Whether to wait for connection in a queue (default: `true`);
    519     * `:timeout` - Close request timeout (default: `#{@timeout}`);
    520     * `:mode` - set to `:savepoint` to use a savepoint to rollback to before the
    521     close on error, otherwise set to `:transaction` (default: `:transaction`);
    522 
    523   ## Examples
    524 
    525       query = Postgrex.prepare!(conn, "", "CREATE TABLE posts (id serial, title text)")
    526       Postgrex.close(conn, query)
    527   """
    528   @spec close(conn, Postgrex.Query.t(), [option]) :: :ok | {:error, Exception.t()}
    529   def close(conn, query, opts \\ []) do
    530     with {:ok, _} <- DBConnection.close(conn, query, opts) do
    531       :ok
    532     end
    533   end
    534 
    535   @doc """
    536   Closes an (extended) prepared query and returns `:ok` or raises
    537   `Postgrex.Error` if there was an error. See `close/3`.
    538   """
    539   @spec close!(conn, Postgrex.Query.t(), [option]) :: :ok
    540   def close!(conn, query, opts \\ []) do
    541     DBConnection.close!(conn, query, opts)
    542     :ok
    543   end
    544 
    545   @doc """
    546   Acquire a lock on a connection and run a series of requests inside a
    547   transaction. The result of the transaction fun is return inside an `:ok`
    548   tuple: `{:ok, result}`.
    549 
    550   To use the locked connection call the request with the connection
    551   reference passed as the single argument to the `fun`. If the
    552   connection disconnects all future calls using that connection
    553   reference will fail.
    554 
    555   `rollback/2` rolls back the transaction and causes the function to
    556   return `{:error, reason}`.
    557 
    558   `transaction/3` can be nested multiple times if the connection
    559   reference is used to start a nested transaction. The top level
    560   transaction function is the actual transaction.
    561 
    562   ## Options
    563 
    564     * `:queue` - Whether to wait for connection in a queue (default: `true`);
    565     * `:timeout` - Transaction timeout (default: `#{@timeout}`);
    566     * `:mode` - Set to `:savepoint` to use savepoints instead of an SQL
    567     transaction, otherwise set to `:transaction` (default: `:transaction`);
    568 
    569   The `:timeout` is for the duration of the transaction and all nested
    570   transactions and requests. This timeout overrides timeouts set by internal
    571   transactions and requests. The `:mode` will be used for all requests inside
    572   the transaction function.
    573 
    574   ## Example
    575 
    576       {:ok, res} = Postgrex.transaction(pid, fn(conn) ->
    577         Postgrex.query!(conn, "SELECT title FROM posts", [])
    578       end)
    579   """
    580   @spec transaction(conn, (DBConnection.t() -> result), [option]) ::
    581           {:ok, result} | {:error, any}
    582         when result: var
    583   def transaction(conn, fun, opts \\ []) do
    584     DBConnection.transaction(conn, fun, opts)
    585   end
    586 
    587   @doc """
    588   Rollback a transaction, does not return.
    589 
    590   Aborts the current transaction fun. If inside multiple `transaction/3`
    591   functions, bubbles up to the top level.
    592 
    593   ## Example
    594 
    595       {:error, :oops} = Postgrex.transaction(pid, fn(conn) ->
    596         DBConnection.rollback(conn, :bar)
    597         IO.puts "never reaches here!"
    598       end)
    599   """
    600   @spec rollback(DBConnection.t(), reason :: any) :: no_return()
    601   defdelegate rollback(conn, reason), to: DBConnection
    602 
    603   @doc """
    604   Returns a cached map of connection parameters.
    605 
    606   ## Options
    607 
    608     * `:timeout` - Call timeout (default: `#{@timeout}`)
    609 
    610   """
    611   @spec parameters(conn, [option]) :: %{binary => binary}
    612         when option: {:timeout, timeout}
    613   def parameters(conn, opts \\ []) do
    614     DBConnection.execute!(conn, %Postgrex.Parameters{}, nil, opts)
    615   end
    616 
    617   @doc """
    618   Returns a supervisor child specification for a DBConnection pool.
    619   """
    620   @spec child_spec([start_option]) :: :supervisor.child_spec()
    621   def child_spec(opts) do
    622     ensure_deps_started!(opts)
    623     opts = Postgrex.Utils.default_opts(opts)
    624     DBConnection.child_spec(Postgrex.Protocol, opts)
    625   end
    626 
    627   @doc """
    628   Returns a stream for a query on a connection.
    629 
    630   Stream consumes memory in chunks of at most `max_rows` rows (see Options).
    631   This is useful for processing _large_ datasets.
    632 
    633   A stream must be wrapped in a transaction and may be used as an `Enumerable`
    634   or a `Collectable`.
    635 
    636   When used as an `Enumerable` with a `COPY .. TO STDOUT` SQL query no other
    637   queries or streams can be interspersed until the copy has finished. Otherwise
    638   it is possible to intersperse enumerable streams and queries.
    639 
    640   When used as a `Collectable` the values are passed as copy data with the
    641   query. No other queries or streams can be interspersed until the copy has
    642   finished. If the query is not copying to the database the copy data will still
    643   be sent but is silently discarded.
    644 
    645   ### Options
    646 
    647     * `:max_rows` - Maximum numbers of rows in a result (default to `#{@max_rows}`)
    648     * `:decode_mapper` - Fun to map each row in the result to a term after
    649     decoding, (default: `fn x -> x end`);
    650     * `:mode` - set to `:savepoint` to use a savepoint to rollback to before an
    651     execute on error, otherwise set to `:transaction` (default: `:transaction`);
    652 
    653   ## Examples
    654 
    655       Postgrex.transaction(pid, fn(conn) ->
    656         query = Postgrex.prepare!(conn, "", "COPY posts TO STDOUT")
    657         stream = Postgrex.stream(conn, query, [])
    658         result_to_iodata = fn(%Postgrex.Result{rows: rows}) -> rows end
    659         Enum.into(stream, File.stream!("posts"), result_to_iodata)
    660       end)
    661 
    662       Postgrex.transaction(pid, fn(conn) ->
    663         stream = Postgrex.stream(conn, "COPY posts FROM STDIN", [])
    664         Enum.into(File.stream!("posts"), stream)
    665       end)
    666   """
    667   @spec stream(DBConnection.t(), iodata | Postgrex.Query.t(), list, [option]) ::
    668           Postgrex.Stream.t()
    669         when option: execute_option | {:max_rows, pos_integer}
    670   def stream(%DBConnection{} = conn, query, params, options \\ []) do
    671     options = Keyword.put_new(options, :max_rows, @max_rows)
    672     %Postgrex.Stream{conn: conn, query: query, params: params, options: options}
    673   end
    674 
    675   ## Helpers
    676 
    677   defp ensure_deps_started!(opts) do
    678     if Keyword.get(opts, :ssl, false) and
    679          not List.keymember?(:application.which_applications(), :ssl, 0) do
    680       raise """
    681       SSL connection can not be established because `:ssl` application is not started,
    682       you can add it to `extra_applications` in your `mix.exs`:
    683 
    684         def application do
    685           [extra_applications: [:ssl]]
    686         end
    687       """
    688     end
    689   end
    690 end