zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

db_connection.ex (60135B)


      1 defmodule DBConnection.Stream do
      2   defstruct [:conn, :query, :params, :opts]
      3 
      4   @type t :: %__MODULE__{conn: DBConnection.conn(), query: any, params: any, opts: Keyword.t()}
      5 end
      6 
      7 defimpl Enumerable, for: DBConnection.Stream do
      8   def count(_), do: {:error, __MODULE__}
      9 
     10   def member?(_, _), do: {:error, __MODULE__}
     11 
     12   def slice(_), do: {:error, __MODULE__}
     13 
     14   def reduce(stream, acc, fun), do: DBConnection.reduce(stream, acc, fun)
     15 end
     16 
     17 defmodule DBConnection.PrepareStream do
     18   defstruct [:conn, :query, :params, :opts]
     19 
     20   @type t :: %__MODULE__{conn: DBConnection.conn(), query: any, params: any, opts: Keyword.t()}
     21 end
     22 
     23 defimpl Enumerable, for: DBConnection.PrepareStream do
     24   def count(_), do: {:error, __MODULE__}
     25 
     26   def member?(_, _), do: {:error, __MODULE__}
     27 
     28   def slice(_), do: {:error, __MODULE__}
     29 
     30   def reduce(stream, acc, fun), do: DBConnection.reduce(stream, acc, fun)
     31 end
     32 
     33 defmodule DBConnection do
     34   @moduledoc """
     35   A behaviour module for implementing efficient database connection
     36   client processes, pools and transactions.
     37 
     38   `DBConnection` handles callbacks differently to most behaviours. Some
     39   callbacks will be called in the calling process, with the state
     40   copied to and from the calling process. This is useful when the data
     41   for a request is large and means that a calling process can interact
     42   with a socket directly.
     43 
     44   A side effect of this is that query handling can be written in a
     45   simple blocking fashion, while the connection process itself will
     46   remain responsive to OTP messages and can enqueue and cancel queued
     47   requests.
     48 
     49   If a request or series of requests takes too long to handle in the
     50   client process a timeout will trigger and the socket can be cleanly
     51   disconnected by the connection process.
     52 
     53   If a calling process waits too long to start its request it will
     54   timeout and its request will be cancelled. This prevents requests
     55   building up when the database can not keep up.
     56 
     57   If no requests are received for an idle interval, the pool will
     58   ping all stale connections which can then ping the database to keep
     59   the connection alive.
     60 
     61   Should the connection be lost, attempts will be made to reconnect with
     62   (configurable) exponential random backoff to reconnect. All state is
     63   lost when a connection disconnects but the process is reused.
     64 
     65   The `DBConnection.Query` protocol provide utility functions so that
     66   queries can be encoded and decoded without blocking the connection or pool.
     67   """
     68   require Logger
     69 
     70   alias DBConnection.Holder
     71 
     72   require Holder
     73 
     74   defstruct [:pool_ref, :conn_ref, :conn_mode]
     75 
     76   defmodule EncodeError do
     77     defexception [:message]
     78   end
     79 
     80   defmodule TransactionError do
     81     defexception [:status, :message]
     82 
     83     def exception(:idle),
     84       do: %__MODULE__{status: :idle, message: "transaction is not started"}
     85 
     86     def exception(:transaction),
     87       do: %__MODULE__{status: :transaction, message: "transaction is already started"}
     88 
     89     def exception(:error),
     90       do: %__MODULE__{status: :error, message: "transaction is aborted"}
     91   end
     92 
     93   @typedoc """
     94   Run or transaction connection reference.
     95   """
     96   @type t :: %__MODULE__{pool_ref: any, conn_ref: reference}
     97   @type conn :: GenServer.server() | t
     98   @type query :: DBConnection.Query.t()
     99   @type params :: any
    100   @type result :: any
    101   @type cursor :: any
    102   @type status :: :idle | :transaction | :error
    103 
    104   @type start_option ::
    105           {:after_connect, (t -> any) | {module, atom, [any]} | nil}
    106           | {:after_connect_timeout, timeout}
    107           | {:connection_listeners, list(Process.dest()) | nil}
    108           | {:backoff_max, non_neg_integer}
    109           | {:backoff_min, non_neg_integer}
    110           | {:backoff_type, :stop | :exp | :rand | :rand_exp}
    111           | {:configure, (keyword -> keyword) | {module, atom, [any]} | nil}
    112           | {:idle_interval, non_neg_integer}
    113           | {:max_restarts, non_neg_integer}
    114           | {:max_seconds, pos_integer}
    115           | {:name, GenServer.name()}
    116           | {:pool, module}
    117           | {:pool_size, pos_integer}
    118           | {:queue_interval, non_neg_integer}
    119           | {:queue_target, non_neg_integer}
    120           | {:show_sensitive_data_on_connection_error, boolean}
    121 
    122   @type option ::
    123           {:log, (DBConnection.LogEntry.t() -> any) | {module, atom, [any]} | nil}
    124           | {:queue, boolean}
    125           | {:timeout, timeout}
    126           | {:deadline, integer | nil}
    127 
    128   @doc """
    129   Connect to the database. Return `{:ok, state}` on success or
    130   `{:error, exception}` on failure.
    131 
    132   If an error is returned it will be logged and another
    133   connection attempt will be made after a backoff interval.
    134 
    135   This callback is called in the connection process.
    136   """
    137   @callback connect(opts :: Keyword.t()) ::
    138               {:ok, state :: any} | {:error, Exception.t()}
    139 
    140   @doc """
    141   Checkouts the state from the connection process. Return `{:ok, state}`
    142   to allow the checkout or `{:disconnect, exception, state}` to disconnect.
    143 
    144   This callback is called immediately after the connection is established
    145   and the state is never effetively checked in again. That's because
    146   DBConnection keeps the connection state in an ETS table that is moved
    147   between the different clients checking out connections. There is no
    148   `checkin` callback. The state is only handed back to the connection
    149   process during pings and (re)connects.
    150 
    151   This callback is called in the connection process.
    152   """
    153   @callback checkout(state :: any) ::
    154               {:ok, new_state :: any} | {:disconnect, Exception.t(), new_state :: any}
    155 
    156   @doc """
    157   Called when the connection has been idle for a period of time. Return
    158   `{:ok, state}` to continue or `{:disconnect, exception, state}` to
    159   disconnect.
    160 
    161   This callback is called if no callbacks have been called after the
    162   idle timeout and a client process is not using the state. The idle
    163   timeout can be configured by the `:idle_interval` option. This function
    164   can be called whether the connection is checked in or checked out.
    165 
    166   This callback is called in the connection process.
    167   """
    168   @callback ping(state :: any) ::
    169               {:ok, new_state :: any} | {:disconnect, Exception.t(), new_state :: any}
    170 
    171   @doc """
    172   Handle the beginning of a transaction.
    173 
    174   Return `{:ok, result, state}` to continue, `{status, state}` to notify caller
    175   that the transaction can not begin due to the transaction status `status`,
    176   `{:error, exception, state}` (deprecated) to error without beginning the
    177   transaction, or `{:disconnect, exception, state}` to error and disconnect.
    178 
    179   A callback implementation should only return `status` if it
    180   can determine the database's transaction status without side effect.
    181 
    182   This callback is called in the client process.
    183   """
    184   @callback handle_begin(opts :: Keyword.t(), state :: any) ::
    185               {:ok, result, new_state :: any}
    186               | {status, new_state :: any}
    187               | {:disconnect, Exception.t(), new_state :: any}
    188 
    189   @doc """
    190   Handle committing a transaction. Return `{:ok, result, state}` on successfully
    191   committing transaction, `{status, state}` to notify caller that the
    192   transaction can not commit due to the transaction status `status`,
    193   `{:error, exception, state}` (deprecated) to error and no longer be inside
    194   transaction, or `{:disconnect, exception, state}` to error and disconnect.
    195 
    196   A callback implementation should only return `status` if it
    197   can determine the database's transaction status without side effect.
    198 
    199   This callback is called in the client process.
    200   """
    201   @callback handle_commit(opts :: Keyword.t(), state :: any) ::
    202               {:ok, result, new_state :: any}
    203               | {status, new_state :: any}
    204               | {:disconnect, Exception.t(), new_state :: any}
    205 
    206   @doc """
    207   Handle rolling back a transaction. Return `{:ok, result, state}` on successfully
    208   rolling back transaction, `{status, state}` to notify caller that the
    209   transaction can not rollback due to the transaction status `status`,
    210   `{:error, exception, state}` (deprecated) to
    211   error and no longer be inside transaction, or
    212   `{:disconnect, exception, state}` to error and disconnect.
    213 
    214   A callback implementation should only return `status` if it
    215   can determine the database' transaction status without side effect.
    216 
    217   This callback is called in the client and connection process.
    218   """
    219   @callback handle_rollback(opts :: Keyword.t(), state :: any) ::
    220               {:ok, result, new_state :: any}
    221               | {status, new_state :: any}
    222               | {:disconnect, Exception.t(), new_state :: any}
    223 
    224   @doc """
    225   Handle getting the transaction status. Return `{:idle, state}` if outside a
    226   transaction, `{:transaction, state}` if inside a transaction,
    227   `{:error, state}` if inside an aborted transaction, or
    228   `{:disconnect, exception, state}` to error and disconnect.
    229 
    230   If the callback returns a `:disconnect` tuples then `status/2` will return
    231   `:error`.
    232   """
    233   @callback handle_status(opts :: Keyword.t(), state :: any) ::
    234               {status, new_state :: any}
    235               | {:disconnect, Exception.t(), new_state :: any}
    236 
    237   @doc """
    238   Prepare a query with the database. Return `{:ok, query, state}` where
    239   `query` is a query to pass to `execute/4` or `close/3`,
    240   `{:error, exception, state}` to return an error and continue or
    241   `{:disconnect, exception, state}` to return an error and disconnect.
    242 
    243   This callback is intended for cases where the state of a connection is
    244   needed to prepare a query and/or the query can be saved in the
    245   database to call later.
    246 
    247   This callback is called in the client process.
    248   """
    249   @callback handle_prepare(query, opts :: Keyword.t(), state :: any) ::
    250               {:ok, query, new_state :: any}
    251               | {:error | :disconnect, Exception.t(), new_state :: any}
    252 
    253   @doc """
    254   Execute a query prepared by `c:handle_prepare/3`. Return
    255   `{:ok, query, result, state}` to return altered query `query` and result
    256   `result` and continue, `{:error, exception, state}` to return an error and
    257   continue or `{:disconnect, exception, state}` to return an error and
    258   disconnect.
    259 
    260   This callback is called in the client process.
    261   """
    262   @callback handle_execute(query, params, opts :: Keyword.t(), state :: any) ::
    263               {:ok, query, result, new_state :: any}
    264               | {:error | :disconnect, Exception.t(), new_state :: any}
    265 
    266   @doc """
    267   Close a query prepared by `c:handle_prepare/3` with the database. Return
    268   `{:ok, result, state}` on success and to continue,
    269   `{:error, exception, state}` to return an error and continue, or
    270   `{:disconnect, exception, state}` to return an error and disconnect.
    271 
    272   This callback is called in the client process.
    273   """
    274   @callback handle_close(query, opts :: Keyword.t(), state :: any) ::
    275               {:ok, result, new_state :: any}
    276               | {:error | :disconnect, Exception.t(), new_state :: any}
    277 
    278   @doc """
    279   Declare a cursor using a query prepared by `c:handle_prepare/3`. Return
    280   `{:ok, query, cursor, state}` to return altered query `query` and cursor
    281   `cursor` for a stream and continue, `{:error, exception, state}` to return an
    282   error and continue or `{:disconnect, exception, state}` to return an error
    283   and disconnect.
    284 
    285   This callback is called in the client process.
    286   """
    287   @callback handle_declare(query, params, opts :: Keyword.t(), state :: any) ::
    288               {:ok, query, cursor, new_state :: any}
    289               | {:error | :disconnect, Exception.t(), new_state :: any}
    290 
    291   @doc """
    292   Fetch the next result from a cursor declared by `c:handle_declare/4`. Return
    293   `{:cont, result, state}` to return the result `result` and continue using
    294   cursor, `{:halt, result, state}` to return the result `result` and close the
    295   cursor, `{:error, exception, state}` to return an error and close the
    296   cursor, `{:disconnect, exception, state}` to return an error and disconnect.
    297 
    298   This callback is called in the client process.
    299   """
    300   @callback handle_fetch(query, cursor, opts :: Keyword.t(), state :: any) ::
    301               {:cont | :halt, result, new_state :: any}
    302               | {:error | :disconnect, Exception.t(), new_state :: any}
    303 
    304   @doc """
    305   Deallocate a cursor declared by `c:handle_declare/4` with the database. Return
    306   `{:ok, result, state}` on success and to continue,
    307   `{:error, exception, state}` to return an error and continue, or
    308   `{:disconnect, exception, state}` to return an error and disconnect.
    309 
    310   This callback is called in the client process.
    311   """
    312   @callback handle_deallocate(query, cursor, opts :: Keyword.t(), state :: any) ::
    313               {:ok, result, new_state :: any}
    314               | {:error | :disconnect, Exception.t(), new_state :: any}
    315 
    316   @doc """
    317   Disconnect from the database. Return `:ok`.
    318 
    319   The exception as first argument is the exception from a `:disconnect`
    320   3-tuple returned by a previous callback.
    321 
    322   If the state is controlled by a client and it exits or takes too long
    323   to process a request the state will be last known state. In these
    324   cases the exception will be a `DBConnection.ConnectionError`.
    325 
    326   This callback is called in the connection process.
    327   """
    328   @callback disconnect(err :: Exception.t(), state :: any) :: :ok
    329 
    330   @connection_module_key :connection_module
    331 
    332   @doc """
    333   Use `DBConnection` to set the behaviour.
    334   """
    335   defmacro __using__(_) do
    336     quote location: :keep do
    337       @behaviour DBConnection
    338     end
    339   end
    340 
    341   @doc """
    342   Starts and links to a database connection process.
    343 
    344   By default the `DBConnection` starts a pool with a single connection.
    345   The size of the pool can be increased with `:pool_size`. A separate
    346   pool can be given with the `:pool` option.
    347 
    348   ### Options
    349 
    350     * `:backoff_min` - The minimum backoff interval (default: `1_000`)
    351     * `:backoff_max` - The maximum backoff interval (default: `30_000`)
    352     * `:backoff_type` - The backoff strategy, `:stop` for no backoff and
    353     to stop, `:exp` for exponential, `:rand` for random and `:rand_exp` for
    354     random exponential (default: `:rand_exp`)
    355     * `:configure` - A function to run before every connect attempt to
    356     dynamically configure the options, either a 1-arity fun,
    357     `{module, function, args}` with options prepended to `args` or `nil` where
    358     only returned options are passed to connect callback (default: `nil`)
    359     * `:after_connect` - A function to run on connect using `run/3`, either
    360     a 1-arity fun, `{module, function, args}` with `t:DBConnection.t/0` prepended
    361     to `args` or `nil` (default: `nil`)
    362     * `:after_connect_timeout` - The maximum time allowed to perform
    363     function specified by `:after_connect` option (default: `15_000`)
    364     * `:connection_listeners` - A list of process destinations to send
    365       notification messages whenever a connection is connected or disconnected.
    366       See "Connection listeners" below
    367     * `:name` - A name to register the started process (see the `:name` option
    368       in `GenServer.start_link/3`)
    369     * `:pool` - Chooses the pool to be started (default: `DBConnection.ConnectionPool`)
    370     * `:pool_size` - Chooses the size of the pool
    371     * `:idle_interval` - Controls the frequency we check for idle connections
    372       in the pool. We then notify each idle connection to ping the database.
    373       In practice, the ping happens within `idle_interval <= ping < 2 * idle_interval`.
    374       Defaults to 1000ms.
    375     * `:queue_target` and `:queue_interval` - See "Queue config" below
    376     * `:max_restarts` and `:max_seconds` - Configures the `:max_restarts` and
    377       `:max_seconds` for the connection pool supervisor (see the `Supervisor` docs).
    378       Typically speaking the connection process doesn't terminate, except due to
    379       faults in DBConnection. However, if backoff has been disabled, then they
    380       also terminate whenever a connection is disconnected (for instance, due to
    381       client or server errors)
    382     * `:show_sensitive_data_on_connection_error` - By default, `DBConnection`
    383       hides all information during connection errors to avoid leaking credentials
    384       or other sensitive information. You can set this option if you wish to
    385       see complete errors and stacktraces during connection errors
    386 
    387   ### Example
    388 
    389       {:ok, conn} = DBConnection.start_link(mod, [idle_interval: 5_000])
    390 
    391   ## Queue config
    392 
    393   Handling requests is done through a queue. When DBConnection is
    394   started, there are two relevant options to control the queue:
    395 
    396     * `:queue_target` in milliseconds, defaults to 50ms
    397     * `:queue_interval` in milliseconds, defaults to 1000ms
    398 
    399   Our goal is to wait at most `:queue_target` for a connection.
    400   If all connections checked out during a `:queue_interval` takes
    401   more than `:queue_target`, then we double the `:queue_target`.
    402   If checking out connections take longer than the new target,
    403   then we start dropping messages.
    404 
    405   For example, by default our target is 50ms. If all connections
    406   checkouts take longer than 50ms for a whole second, we double
    407   the target to 100ms and we start dropping messages if the
    408   time to checkout goes above the new limit.
    409 
    410   This allows us to better plan for overloads as we can refuse
    411   requests before they are sent to the database, which would
    412   otherwise increase the burden on the database, making the
    413   overload worse.
    414 
    415   ## Connection listeners
    416 
    417   The `:connection_listeners` option allows one or more processes to be notified
    418   whenever a connection is connected or disconnected. A listener may be a remote
    419   or local PID, a locally registered name, or a tuple in the form of
    420   `{registered_name, node}` for a registered name at another node.
    421 
    422   Each listener process may receive the following messages where `pid`
    423   identifies the connection process:
    424 
    425     * `{:connected, pid}`
    426     * `{:disconnected, pid}`
    427 
    428   ## Telemetry
    429 
    430   A `[:db_connection, :connection_error]` event is published whenever a connection checkout
    431   receives a `%DBConnection.ConnectionError{}`.
    432 
    433   Measurements:
    434 
    435     * `:count` - A fixed-value measurement which always measures 1.
    436 
    437   Metadata
    438 
    439     * `:error` - The `DBConnection.ConnectionError` struct which triggered the event.
    440 
    441     * `:opts` - All options given to the pool operation
    442 
    443   """
    444   @spec start_link(module, opts :: Keyword.t()) :: GenServer.on_start()
    445   def start_link(conn_mod, opts) do
    446     case child_spec(conn_mod, opts) do
    447       {_, {m, f, args}, _, _, _, _} -> apply(m, f, args)
    448       %{start: {m, f, args}} -> apply(m, f, args)
    449     end
    450   end
    451 
    452   @doc """
    453   Creates a supervisor child specification for a pool of connections.
    454 
    455   See `start_link/2` for options.
    456   """
    457   @spec child_spec(module, opts :: Keyword.t()) :: :supervisor.child_spec()
    458   def child_spec(conn_mod, opts) do
    459     pool = Keyword.get(opts, :pool, DBConnection.ConnectionPool)
    460     pool.child_spec({conn_mod, opts})
    461   end
    462 
    463   @doc """
    464   Forces all connections in the pool to disconnect within the given interval.
    465 
    466   Once this function is called, the pool will disconnect all of its connections
    467   as they are checked in or as they are pinged. Checked in connections will be
    468   randomly disconnected within the given time interval. Pinged connections are
    469   immediately disconnected - as they are idle (according to `:idle_interval`).
    470 
    471   If the connection has a backoff configured (which is the case by default),
    472   disconnecting means an attempt at a new connection will be done immediately
    473   after, without starting a new process for each connection. However, if backoff
    474   has been disabled, the connection process will terminate. In such cases,
    475   disconnecting all connections may cause the pool supervisor to restart
    476   depending on the max_restarts/max_seconds configuration of the pool,
    477   so you will want to set those carefully.
    478   """
    479   @spec disconnect_all(conn, non_neg_integer, opts :: Keyword.t()) :: :ok
    480   def disconnect_all(conn, interval, opts \\ []) when interval >= 0 do
    481     pool = Keyword.get(opts, :pool, DBConnection.ConnectionPool)
    482     interval = System.convert_time_unit(interval, :millisecond, :native)
    483     pool.disconnect_all(conn, interval, opts)
    484   end
    485 
    486   @doc """
    487   Prepare a query with a database connection for later execution.
    488 
    489   It returns `{:ok, query}` on success or `{:error, exception}` if there was
    490   an error.
    491 
    492   The returned `query` can then be passed to `execute/4` and/or `close/3`
    493 
    494   ### Options
    495 
    496     * `:queue` - Whether to block waiting in an internal queue for the
    497     connection's state (boolean, default: `true`). See "Queue config" in
    498     `start_link/2` docs
    499     * `:timeout` - The maximum time that the caller is allowed to perform
    500     this operation (default: `15_000`)
    501     * `:deadline` - If set, overrides `:timeout` option and specifies absolute
    502     monotonic time in milliseconds by which caller must perform operation.
    503     See `System` module documentation for more information on monotonic time
    504     (default: `nil`)
    505     * `:log` - A function to log information about a call, either
    506     a 1-arity fun, `{module, function, args}` with `t:DBConnection.LogEntry.t/0`
    507     prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`)
    508 
    509   The pool and connection module may support other options. All options
    510   are passed to `c:handle_prepare/3`.
    511 
    512   ### Example
    513 
    514       DBConnection.transaction(pool, fn conn ->
    515         query = %Query{statement: "SELECT * FROM table"}
    516         query = DBConnection.prepare!(conn, query)
    517         try do
    518           DBConnection.execute!(conn, query, [])
    519         after
    520           DBConnection.close(conn, query)
    521         end
    522       end)
    523 
    524   """
    525   @spec prepare(conn, query, opts :: Keyword.t()) ::
    526           {:ok, query} | {:error, Exception.t()}
    527   def prepare(conn, query, opts \\ []) do
    528     meter = meter(opts)
    529 
    530     result =
    531       with {:ok, query, meter} <- parse(query, meter, opts) do
    532         run(conn, &run_prepare/4, query, meter, opts)
    533       end
    534 
    535     log(result, :prepare, query, nil)
    536   end
    537 
    538   @doc """
    539   Prepare a query with a database connection and return the prepared
    540   query. An exception is raised on error.
    541 
    542   See `prepare/3`.
    543   """
    544   @spec prepare!(conn, query, opts :: Keyword.t()) :: query
    545   def prepare!(conn, query, opts \\ []) do
    546     case prepare(conn, query, opts) do
    547       {:ok, result} -> result
    548       {:error, err} -> raise err
    549     end
    550   end
    551 
    552   @doc """
    553   Prepare a query and execute it with a database connection and return both the
    554   prepared query and the result, `{:ok, query, result}` on success or
    555   `{:error, exception}` if there was an error.
    556 
    557   The returned `query` can be passed to `execute/4` and `close/3`.
    558 
    559   ### Options
    560 
    561     * `:queue` - Whether to block waiting in an internal queue for the
    562     connection's state (boolean, default: `true`). See "Queue config" in
    563     `start_link/2` docs
    564     * `:timeout` - The maximum time that the caller is allowed to perform
    565     this operation (default: `15_000`)
    566     * `:deadline` - If set, overrides `:timeout` option and specifies absolute
    567     monotonic time in milliseconds by which caller must perform operation.
    568     See `System` module documentation for more information on monotonic time
    569     (default: `nil`)
    570     * `:log` - A function to log information about a call, either
    571     a 1-arity fun, `{module, function, args}` with `t:DBConnection.LogEntry.t/0`
    572     prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`)
    573 
    574   ### Example
    575 
    576       query                = %Query{statement: "SELECT id FROM table WHERE id=$1"}
    577       {:ok, query, result} = DBConnection.prepare_execute(conn, query, [1])
    578       {:ok, result2}       = DBConnection.execute(conn, query, [2])
    579       :ok                  = DBConnection.close(conn, query)
    580   """
    581   @spec prepare_execute(conn, query, params, Keyword.t()) ::
    582           {:ok, query, result}
    583           | {:error, Exception.t()}
    584   def prepare_execute(conn, query, params, opts \\ []) do
    585     result =
    586       with {:ok, query, meter} <- parse(query, meter(opts), opts) do
    587         parsed_prepare_execute(conn, query, params, meter, opts)
    588       end
    589 
    590     log(result, :prepare_execute, query, params)
    591   end
    592 
    593   defp parsed_prepare_execute(conn, query, params, meter, opts) do
    594     with {:ok, query, result, meter} <-
    595            run(conn, &run_prepare_execute/5, query, params, meter, opts),
    596          {:ok, result, meter} <- decode(query, result, meter, opts) do
    597       {:ok, query, result, meter}
    598     end
    599   end
    600 
    601   @doc """
    602   Prepare a query and execute it with a database connection and return both the
    603   prepared query and result. An exception is raised on error.
    604 
    605   See `prepare_execute/4`.
    606   """
    607   @spec prepare_execute!(conn, query, Keyword.t()) :: {query, result}
    608   def prepare_execute!(conn, query, params, opts \\ []) do
    609     case prepare_execute(conn, query, params, opts) do
    610       {:ok, query, result} -> {query, result}
    611       {:error, err} -> raise err
    612     end
    613   end
    614 
    615   @doc """
    616   Execute a prepared query with a database connection and return
    617   `{:ok, query, result}` on success or `{:error, exception}` if there was an error.
    618 
    619   If the query is not prepared on the connection an attempt may be made to
    620   prepare it and then execute again.
    621 
    622   ### Options
    623 
    624     * `:queue` - Whether to block waiting in an internal queue for the
    625     connection's state (boolean, default: `true`). See "Queue config" in
    626     `start_link/2` docs
    627     * `:timeout` - The maximum time that the caller is allowed to perform
    628     this operation (default: `15_000`)
    629     * `:deadline` - If set, overrides `:timeout` option and specifies absolute
    630     monotonic time in milliseconds by which caller must perform operation.
    631     See `System` module documentation for more information on monotonic time
    632     (default: `nil`)
    633     * `:log` - A function to log information about a call, either
    634     a 1-arity fun, `{module, function, args}` with `t:DBConnection.LogEntry.t/0`
    635     prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`)
    636 
    637   The pool and connection module may support other options. All options
    638   are passed to `handle_execute/4`.
    639 
    640   See `prepare/3`.
    641   """
    642   @spec execute(conn, query, params, opts :: Keyword.t()) ::
    643           {:ok, query, result} | {:error, Exception.t()}
    644   def execute(conn, query, params, opts \\ []) do
    645     result =
    646       case maybe_encode(query, params, meter(opts), opts) do
    647         {:prepare, meter} ->
    648           parsed_prepare_execute(conn, query, params, meter, opts)
    649 
    650         {:ok, params, meter} ->
    651           with {:ok, query, result, meter} <-
    652                  run(conn, &run_execute/5, query, params, meter, opts),
    653                {:ok, result, meter} <- decode(query, result, meter, opts) do
    654             {:ok, query, result, meter}
    655           end
    656 
    657         {_, _, _, _} = error ->
    658           error
    659       end
    660 
    661     log(result, :execute, query, params)
    662   end
    663 
    664   @doc """
    665   Execute a prepared query with a database connection and return the
    666   result. Raises an exception on error.
    667 
    668   See `execute/4`
    669   """
    670   @spec execute!(conn, query, params, opts :: Keyword.t()) :: result
    671   def execute!(conn, query, params, opts \\ []) do
    672     case execute(conn, query, params, opts) do
    673       {:ok, _query, result} -> result
    674       {:error, err} -> raise err
    675     end
    676   end
    677 
    678   @doc """
    679   Close a prepared query on a database connection and return `{:ok, result}` on
    680   success or `{:error, exception}` on error.
    681 
    682   This function should be used to free resources held by the connection
    683   process and/or the database server.
    684 
    685   ## Options
    686 
    687     * `:queue` - Whether to block waiting in an internal queue for the
    688     connection's state (boolean, default: `true`). See "Queue config" in
    689     `start_link/2` docs
    690     * `:timeout` - The maximum time that the caller is allowed to perform
    691     this operation (default: `15_000`)
    692     * `:deadline` - If set, overrides `:timeout` option and specifies absolute
    693     monotonic time in milliseconds by which caller must perform operation.
    694     See `System` module documentation for more information on monotonic time
    695     (default: `nil`)
    696     * `:log` - A function to log information about a call, either
    697     a 1-arity fun, `{module, function, args}` with `t:DBConnection.LogEntry.t/0`
    698     prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`)
    699 
    700   The pool and connection module may support other options. All options
    701   are passed to `c:handle_close/3`.
    702 
    703   See `prepare/3`.
    704   """
    705   @spec close(conn, query, opts :: Keyword.t()) ::
    706           {:ok, result} | {:error, Exception.t()}
    707   def close(conn, query, opts \\ []) do
    708     conn
    709     |> run_cleanup(&run_close/4, [query], meter(opts), opts)
    710     |> log(:close, query, nil)
    711   end
    712 
    713   @doc """
    714   Close a prepared query on a database connection and return the result. Raises
    715   an exception on error.
    716 
    717   See `close/3`.
    718   """
    719   @spec close!(conn, query, opts :: Keyword.t()) :: result
    720   def close!(conn, query, opts \\ []) do
    721     case close(conn, query, opts) do
    722       {:ok, result} -> result
    723       {:error, err} -> raise err
    724     end
    725   end
    726 
    727   @doc """
    728   Acquire a lock on a connection and run a series of requests on it.
    729 
    730   The return value of this function is the return value of `fun`.
    731 
    732   To use the locked connection call the request with the connection
    733   reference passed as the single argument to the `fun`. If the
    734   connection disconnects all future calls using that connection
    735   reference will fail.
    736 
    737   `run/3` and `transaction/3` can be nested multiple times but a
    738   `transaction/3` call inside another `transaction/3` will be treated
    739   the same as `run/3`.
    740 
    741   ### Options
    742 
    743     * `:queue` - Whether to block waiting in an internal queue for the
    744     connection's state (boolean, default: `true`). See "Queue config" in
    745     `start_link/2` docs
    746     * `:timeout` - The maximum time that the caller is allowed to perform
    747     this operation (default: `15_000`)
    748     * `:deadline` - If set, overrides `:timeout` option and specifies absolute
    749     monotonic time in milliseconds by which caller must perform operation.
    750     See `System` module documentation for more information on monotonic time
    751     (default: `nil`)
    752 
    753   The pool may support other options.
    754 
    755   ### Example
    756 
    757       {:ok, res} = DBConnection.run(conn, fn conn ->
    758         DBConnection.execute!(conn, query, [])
    759       end)
    760 
    761   """
    762   @spec run(conn, (t -> result), opts :: Keyword.t()) :: result when result: var
    763   def run(conn, fun, opts \\ [])
    764 
    765   def run(%DBConnection{} = conn, fun, _) do
    766     fun.(conn)
    767   end
    768 
    769   def run(pool, fun, opts) do
    770     case checkout(pool, nil, opts) do
    771       {:ok, conn, _} ->
    772         old_status = status(conn, opts)
    773 
    774         try do
    775           result = fun.(conn)
    776           {result, run(conn, &run_status/3, nil, opts)}
    777         catch
    778           kind, error ->
    779             checkin(conn)
    780             :erlang.raise(kind, error, __STACKTRACE__)
    781         else
    782           {result, {:error, _, _}} ->
    783             checkin(conn)
    784             result
    785 
    786           {result, {^old_status, _meter}} ->
    787             checkin(conn)
    788             result
    789 
    790           {_result, {new_status, _meter}} ->
    791             err =
    792               DBConnection.ConnectionError.exception(
    793                 "connection was checked out with status #{inspect(old_status)} " <>
    794                   "but it was checked in with status #{inspect(new_status)}"
    795               )
    796 
    797             disconnect(conn, err)
    798             raise err
    799 
    800           {_result, {kind, reason, stack, _meter}} ->
    801             :erlang.raise(kind, reason, stack)
    802         end
    803 
    804       {:error, err, _} ->
    805         raise err
    806 
    807       {kind, reason, stack, _} ->
    808         :erlang.raise(kind, reason, stack)
    809     end
    810   end
    811 
    812   @doc """
    813   Acquire a lock on a connection and run a series of requests inside a
    814   transaction. The result of the transaction fun is return inside an `:ok`
    815   tuple: `{:ok, result}`.
    816 
    817   To use the locked connection call the request with the connection
    818   reference passed as the single argument to the `fun`. If the
    819   connection disconnects all future calls using that connection
    820   reference will fail.
    821 
    822   `run/3` and `transaction/3` can be nested multiple times. If a transaction is
    823   rolled back or a nested transaction `fun` raises the transaction is marked as
    824   failed. All calls except `run/3`, `transaction/3`, `rollback/2`, `close/3` and
    825   `close!/3` will raise an exception inside a failed transaction until the outer
    826   transaction call returns. All `transaction/3` calls will return
    827   `{:error, :rollback}` if the transaction failed or connection closed and
    828   `rollback/2` is not called for that `transaction/3`.
    829 
    830   ### Options
    831 
    832     * `:queue` - Whether to block waiting in an internal queue for the
    833     connection's state (boolean, default: `true`). See "Queue config" in
    834     `start_link/2` docs
    835     * `:timeout` - The maximum time that the caller is allowed to perform
    836     this operation (default: `15_000`)
    837     * `:deadline` - If set, overrides `:timeout` option and specifies absolute
    838     monotonic time in milliseconds by which caller must perform operation.
    839     See `System` module documentation for more information on monotonic time
    840     (default: `nil`)
    841     * `:log` - A function to log information about begin, commit and rollback
    842     calls made as part of the transaction, either a 1-arity fun,
    843     `{module, function, args}` with `t:DBConnection.LogEntry.t/0` prepended to
    844     `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`)
    845 
    846   The pool and connection module may support other options. All options
    847   are passed to `c:handle_begin/2`, `c:handle_commit/2` and
    848   `c:handle_rollback/2`.
    849 
    850   ### Example
    851 
    852       {:ok, res} = DBConnection.transaction(conn, fn conn ->
    853         DBConnection.execute!(conn, query, [])
    854       end)
    855   """
    856   @spec transaction(conn, (t -> result), opts :: Keyword.t()) ::
    857           {:ok, result} | {:error, reason :: any}
    858         when result: var
    859   def transaction(conn, fun, opts \\ [])
    860 
    861   def transaction(%DBConnection{conn_mode: :transaction} = conn, fun, _opts) do
    862     %DBConnection{conn_ref: conn_ref} = conn
    863 
    864     try do
    865       result = fun.(conn)
    866       conclude(conn, result)
    867     catch
    868       :throw, {__MODULE__, ^conn_ref, reason} ->
    869         fail(conn)
    870         {:error, reason}
    871 
    872       kind, reason ->
    873         stack = __STACKTRACE__
    874         fail(conn)
    875         :erlang.raise(kind, reason, stack)
    876     else
    877       result ->
    878         {:ok, result}
    879     end
    880   end
    881 
    882   def transaction(%DBConnection{} = conn, fun, opts) do
    883     case begin(conn, &run/4, opts) do
    884       {:ok, _} ->
    885         run_transaction(conn, fun, &run/4, opts)
    886 
    887       {:error, %DBConnection.TransactionError{}} ->
    888         {:error, :rollback}
    889 
    890       {:error, err} ->
    891         raise err
    892     end
    893   end
    894 
    895   def transaction(pool, fun, opts) do
    896     case begin(pool, &checkout/4, opts) do
    897       {:ok, conn, _} ->
    898         run_transaction(conn, fun, &checkin/4, opts)
    899 
    900       {:error, %DBConnection.TransactionError{}} ->
    901         {:error, :rollback}
    902 
    903       {:error, err} ->
    904         raise err
    905     end
    906   end
    907 
    908   @doc """
    909   Rollback a database transaction and release lock on connection.
    910 
    911   When inside of a `transaction/3` call does a non-local return, using a
    912   `throw/1` to cause the transaction to enter a failed state and the
    913   `transaction/3` call returns `{:error, reason}`. If `transaction/3` calls are
    914   nested the connection is marked as failed until the outermost transaction call
    915   does the database rollback.
    916 
    917   ### Example
    918 
    919       {:error, :oops} = DBConnection.transaction(pool, fun(conn) ->
    920         DBConnection.rollback(conn, :oops)
    921       end)
    922   """
    923   @spec rollback(t, reason :: any) :: no_return
    924   def rollback(conn, reason)
    925 
    926   def rollback(%DBConnection{conn_mode: :transaction} = conn, reason) do
    927     %DBConnection{conn_ref: conn_ref} = conn
    928     throw({__MODULE__, conn_ref, reason})
    929   end
    930 
    931   def rollback(%DBConnection{} = _conn, _reason) do
    932     raise "not inside transaction"
    933   end
    934 
    935   @doc """
    936   Return the transaction status of a connection.
    937 
    938   The callback implementation should return the transaction status according to
    939   the database, and not make assumptions based on client-side state.
    940 
    941   This function will raise a `DBConnection.ConnectionError` when called inside a
    942   deprecated `transaction/3`.
    943 
    944   ### Options
    945 
    946   See module documentation. The pool and connection module may support other
    947   options. All options are passed to `c:handle_status/2`.
    948 
    949   ### Example
    950 
    951       # outside of the transaction, the status is `:idle`
    952       DBConnection.status(conn) #=> :idle
    953 
    954       DBConnection.transaction(conn, fn conn ->
    955         DBConnection.status(conn) #=> :transaction
    956 
    957         # run a query that will cause the transaction to rollback, e.g.
    958         # uniqueness constraint violation
    959         DBConnection.execute(conn, bad_query, [])
    960 
    961         DBConnection.status(conn) #=> :error
    962       end)
    963 
    964       DBConnection.status(conn) #=> :idle
    965   """
    966   @spec status(conn, opts :: Keyword.t()) :: status
    967   def status(conn, opts \\ []) do
    968     case run(conn, &run_status/3, nil, opts) do
    969       {status, _meter} ->
    970         status
    971 
    972       {:error, _err, _meter} ->
    973         :error
    974 
    975       {kind, reason, stack, _meter} ->
    976         :erlang.raise(kind, reason, stack)
    977     end
    978   end
    979 
    980   @doc """
    981   Create a stream that will prepare a query, execute it and stream results
    982   using a cursor.
    983 
    984   ### Options
    985 
    986     * `:queue` - Whether to block waiting in an internal queue for the
    987     connection's state (boolean, default: `true`). See "Queue config" in
    988     `start_link/2` docs
    989     * `:timeout` - The maximum time that the caller is allowed to perform
    990     this operation (default: `15_000`)
    991     * `:deadline` - If set, overrides `:timeout` option and specifies absolute
    992     monotonic time in milliseconds by which caller must perform operation.
    993     See `System` module documentation for more information on monotonic time
    994     (default: `nil`)
    995     * `:log` - A function to log information about a call, either
    996     a 1-arity fun, `{module, function, args}` with `t:DBConnection.LogEntry.t/0`
    997     prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`)
    998 
    999   The pool and connection module may support other options. All options
   1000   are passed to `c:handle_prepare/3`, `c:handle_close/3`, `c:handle_declare/4`,
   1001   and `c:handle_deallocate/4`.
   1002 
   1003   ### Example
   1004 
   1005       {:ok, results} = DBConnection.transaction(conn, fn conn ->
   1006         query = %Query{statement: "SELECT id FROM table"}
   1007         stream = DBConnection.prepare_stream(conn, query, [])
   1008         Enum.to_list(stream)
   1009       end)
   1010   """
   1011   @spec prepare_stream(t, query, params, opts :: Keyword.t()) ::
   1012           DBConnection.PrepareStream.t()
   1013   def prepare_stream(%DBConnection{} = conn, query, params, opts \\ []) do
   1014     %DBConnection.PrepareStream{conn: conn, query: query, params: params, opts: opts}
   1015   end
   1016 
   1017   @doc """
   1018   Create a stream that will execute a prepared query and stream results using a
   1019   cursor.
   1020 
   1021   ### Options
   1022 
   1023     * `:queue` - Whether to block waiting in an internal queue for the
   1024     connection's state (boolean, default: `true`). See "Queue config" in
   1025     `start_link/2` docs
   1026     * `:timeout` - The maximum time that the caller is allowed to perform
   1027     this operation (default: `15_000`)
   1028     * `:deadline` - If set, overrides `:timeout` option and specifies absolute
   1029     monotonic time in milliseconds by which caller must perform operation.
   1030     See `System` module documentation for more information on monotonic time
   1031     (default: `nil`)
   1032     * `:log` - A function to log information about a call, either
   1033     a 1-arity fun, `{module, function, args}` with `t:DBConnection.LogEntry.t/0`
   1034     prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`)
   1035 
   1036   The pool and connection module may support other options. All options
   1037   are passed to `c:handle_declare/4` and `c:handle_deallocate/4`.
   1038 
   1039   ### Example
   1040 
   1041       DBConnection.transaction(pool, fn conn ->
   1042         query = %Query{statement: "SELECT id FROM table"}
   1043         query = DBConnection.prepare!(conn, query)
   1044         try do
   1045           stream = DBConnection.stream(conn, query, [])
   1046           Enum.to_list(stream)
   1047         after
   1048           # Make sure query is closed!
   1049           DBConnection.close(conn, query)
   1050         end
   1051       end)
   1052   """
   1053   @spec stream(t, query, params, opts :: Keyword.t()) :: DBConnection.Stream.t()
   1054   def stream(%DBConnection{} = conn, query, params, opts \\ []) do
   1055     %DBConnection.Stream{conn: conn, query: query, params: params, opts: opts}
   1056   end
   1057 
   1058   @doc """
   1059   Reduces a previously built stream or prepared stream.
   1060   """
   1061   def reduce(%DBConnection.PrepareStream{} = stream, acc, fun) do
   1062     %DBConnection.PrepareStream{conn: conn, query: query, params: params, opts: opts} = stream
   1063 
   1064     declare = fn conn, opts ->
   1065       {query, cursor} = prepare_declare!(conn, query, params, opts)
   1066       {:cont, query, cursor}
   1067     end
   1068 
   1069     enum = resource(conn, declare, &stream_fetch/3, &stream_deallocate/3, opts)
   1070     enum.(acc, fun)
   1071   end
   1072 
   1073   def reduce(%DBConnection.Stream{} = stream, acc, fun) do
   1074     %DBConnection.Stream{conn: conn, query: query, params: params, opts: opts} = stream
   1075 
   1076     declare = fn conn, opts ->
   1077       case declare(conn, query, params, opts) do
   1078         {:ok, query, cursor} ->
   1079           {:cont, query, cursor}
   1080 
   1081         {:ok, cursor} ->
   1082           {:cont, query, cursor}
   1083 
   1084         {:error, err} ->
   1085           raise err
   1086       end
   1087     end
   1088 
   1089     enum = resource(conn, declare, &stream_fetch/3, &stream_deallocate/3, opts)
   1090     enum.(acc, fun)
   1091   end
   1092 
   1093   @doc false
   1094   def register_as_pool(conn_module) do
   1095     Process.put(@connection_module_key, conn_module)
   1096   end
   1097 
   1098   @doc """
   1099   Returns connection module used by the given connection pool.
   1100 
   1101   When given a process that is not a connection pool, returns an `:error`.
   1102   """
   1103   @spec connection_module(conn) :: {:ok, module} | :error
   1104   def connection_module(conn) do
   1105     with pid when pid != nil <- pool_pid(conn),
   1106          {:dictionary, dictionary} <- Process.info(pid, :dictionary),
   1107          {:ok, module} <- fetch_from_dictionary(dictionary, @connection_module_key),
   1108          do: {:ok, module},
   1109          else: (_ -> :error)
   1110   end
   1111 
   1112   defp pool_pid(%DBConnection{pool_ref: Holder.pool_ref(pool: pid)}), do: pid
   1113   defp pool_pid(conn), do: GenServer.whereis(conn)
   1114 
   1115   defp fetch_from_dictionary(dictionary, key) do
   1116     Enum.find_value(dictionary, :error, fn
   1117       {^key, value} -> {:ok, value}
   1118       _pair -> nil
   1119     end)
   1120   end
   1121 
   1122   ## Helpers
   1123 
   1124   defp checkout(pool, meter, opts) do
   1125     checkout = System.monotonic_time()
   1126     pool_mod = Keyword.get(opts, :pool, DBConnection.ConnectionPool)
   1127 
   1128     caller = Keyword.get(opts, :caller, self())
   1129     callers = [caller | Process.get(:"$callers") || []]
   1130 
   1131     try do
   1132       pool_mod.checkout(pool, callers, opts)
   1133     catch
   1134       kind, reason ->
   1135         stack = __STACKTRACE__
   1136         {kind, reason, stack, past_event(meter, :checkout, checkout)}
   1137     else
   1138       {:ok, pool_ref, _conn_mod, checkin, _conn_state} ->
   1139         conn = %DBConnection{pool_ref: pool_ref, conn_ref: make_ref()}
   1140         meter = meter |> past_event(:checkin, checkin) |> past_event(:checkout, checkout)
   1141         {:ok, conn, meter}
   1142 
   1143       {:error, err} ->
   1144         {:error, err, past_event(meter, :checkout, checkout)}
   1145     end
   1146   end
   1147 
   1148   defp checkout(%DBConnection{} = conn, fun, meter, opts) do
   1149     with {:ok, result, meter} <- fun.(conn, meter, opts) do
   1150       {:ok, conn, result, meter}
   1151     end
   1152   end
   1153 
   1154   defp checkout(pool, fun, meter, opts) do
   1155     with {:ok, conn, meter} <- checkout(pool, meter, opts) do
   1156       case fun.(conn, meter, opts) do
   1157         {:ok, result, meter} ->
   1158           {:ok, conn, result, meter}
   1159 
   1160         error ->
   1161           checkin(conn)
   1162           error
   1163       end
   1164     end
   1165   end
   1166 
   1167   defp checkin(%DBConnection{pool_ref: pool_ref}) do
   1168     Holder.checkin(pool_ref)
   1169   end
   1170 
   1171   defp checkin(%DBConnection{} = conn, fun, meter, opts) do
   1172     return = fun.(conn, meter, opts)
   1173     checkin(conn)
   1174     return
   1175   end
   1176 
   1177   defp checkin(pool, fun, meter, opts) do
   1178     run(pool, fun, meter, opts)
   1179   end
   1180 
   1181   defp disconnect(%DBConnection{pool_ref: pool_ref}, err) do
   1182     _ = Holder.disconnect(pool_ref, err)
   1183     :ok
   1184   end
   1185 
   1186   defp stop(%DBConnection{pool_ref: pool_ref}, kind, reason, stack) do
   1187     msg = "client #{inspect(self())} stopped: " <> Exception.format(kind, reason, stack)
   1188     exception = DBConnection.ConnectionError.exception(msg)
   1189     _ = Holder.stop(pool_ref, exception)
   1190     :ok
   1191   end
   1192 
   1193   defp handle_common_result(return, conn, meter) do
   1194     case return do
   1195       {:ok, result, _conn_state} ->
   1196         {:ok, result, meter}
   1197 
   1198       {:error, err, _conn_state} ->
   1199         {:error, err, meter}
   1200 
   1201       {:disconnect, err, _conn_state} ->
   1202         disconnect(conn, err)
   1203         {:error, err, meter}
   1204 
   1205       {:catch, kind, reason, stack} ->
   1206         stop(conn, kind, reason, stack)
   1207         {kind, reason, stack, meter}
   1208 
   1209       other ->
   1210         bad_return!(other, conn, meter)
   1211     end
   1212   end
   1213 
   1214   @compile {:inline, bad_return!: 3}
   1215 
   1216   defp bad_return!(other, conn, meter) do
   1217     try do
   1218       raise DBConnection.ConnectionError, "bad return value: #{inspect(other)}"
   1219     catch
   1220       :error, reason ->
   1221         stack = __STACKTRACE__
   1222         stop(conn, :error, reason, stack)
   1223         {:error, reason, stack, meter}
   1224     end
   1225   end
   1226 
   1227   defp parse(query, meter, opts) do
   1228     try do
   1229       DBConnection.Query.parse(query, opts)
   1230     catch
   1231       kind, reason ->
   1232         stack = __STACKTRACE__
   1233         {kind, reason, stack, meter}
   1234     else
   1235       query ->
   1236         {:ok, query, meter}
   1237     end
   1238   end
   1239 
   1240   defp describe(conn, query, meter, opts) do
   1241     try do
   1242       DBConnection.Query.describe(query, opts)
   1243     catch
   1244       kind, reason ->
   1245         stack = __STACKTRACE__
   1246         raised_close(conn, query, meter, opts, kind, reason, stack)
   1247     else
   1248       query ->
   1249         {:ok, query, meter}
   1250     end
   1251   end
   1252 
   1253   defp encode(conn, query, params, meter, opts) do
   1254     try do
   1255       DBConnection.Query.encode(query, params, opts)
   1256     catch
   1257       kind, reason ->
   1258         stack = __STACKTRACE__
   1259         raised_close(conn, query, meter, opts, kind, reason, stack)
   1260     else
   1261       params ->
   1262         {:ok, params, meter}
   1263     end
   1264   end
   1265 
   1266   defp maybe_encode(query, params, meter, opts) do
   1267     try do
   1268       DBConnection.Query.encode(query, params, opts)
   1269     rescue
   1270       DBConnection.EncodeError -> {:prepare, meter}
   1271     catch
   1272       kind, reason ->
   1273         stack = __STACKTRACE__
   1274         {kind, reason, stack, meter}
   1275     else
   1276       params ->
   1277         {:ok, params, meter}
   1278     end
   1279   end
   1280 
   1281   defp decode(query, result, meter, opts) do
   1282     meter = event(meter, :decode)
   1283 
   1284     try do
   1285       DBConnection.Query.decode(query, result, opts)
   1286     catch
   1287       kind, reason ->
   1288         stack = __STACKTRACE__
   1289         {kind, reason, stack, meter}
   1290     else
   1291       result ->
   1292         {:ok, result, meter}
   1293     end
   1294   end
   1295 
   1296   defp prepare_declare(conn, query, params, opts) do
   1297     result =
   1298       with {:ok, query, meter} <- parse(query, meter(opts), opts) do
   1299         parsed_prepare_declare(conn, query, params, meter, opts)
   1300       end
   1301 
   1302     log(result, :prepare_declare, query, params)
   1303   end
   1304 
   1305   defp parsed_prepare_declare(conn, query, params, meter, opts) do
   1306     run(conn, &run_prepare_declare/5, query, params, meter, opts)
   1307   end
   1308 
   1309   defp prepare_declare!(conn, query, params, opts) do
   1310     case prepare_declare(conn, query, params, opts) do
   1311       {:ok, query, cursor} ->
   1312         {query, cursor}
   1313 
   1314       {:error, err} ->
   1315         raise err
   1316     end
   1317   end
   1318 
   1319   defp declare(conn, query, params, opts) do
   1320     result =
   1321       case maybe_encode(query, params, meter(opts), opts) do
   1322         {:prepare, meter} ->
   1323           parsed_prepare_declare(conn, query, params, meter, opts)
   1324 
   1325         {:ok, params, meter} ->
   1326           run(conn, &run_declare/5, query, params, meter, opts)
   1327 
   1328         {_, _, _, _} = error ->
   1329           error
   1330       end
   1331 
   1332     log(result, :declare, query, params)
   1333   end
   1334 
   1335   defp deallocate(conn, query, cursor, opts) do
   1336     conn
   1337     |> run_cleanup(&run_deallocate/4, [query, cursor], meter(opts), opts)
   1338     |> log(:deallocate, query, cursor)
   1339   end
   1340 
   1341   defp run_prepare(conn, query, meter, opts) do
   1342     with {:ok, query, meter} <- prepare(conn, query, meter, opts) do
   1343       describe(conn, query, meter, opts)
   1344     end
   1345   end
   1346 
   1347   defp prepare(%DBConnection{pool_ref: pool_ref} = conn, query, meter, opts) do
   1348     pool_ref
   1349     |> Holder.handle(:handle_prepare, [query], opts)
   1350     |> handle_common_result(conn, event(meter, :prepare))
   1351   end
   1352 
   1353   defp run_prepare_execute(conn, query, params, meter, opts) do
   1354     with {:ok, query, meter} <- run_prepare(conn, query, meter, opts),
   1355          {:ok, params, meter} <- encode(conn, query, params, meter, opts) do
   1356       run_execute(conn, query, params, meter, opts)
   1357     end
   1358   end
   1359 
   1360   defp run_execute(conn, query, params, meter, opts) do
   1361     %DBConnection{pool_ref: pool_ref} = conn
   1362     meter = event(meter, :execute)
   1363 
   1364     case Holder.handle(pool_ref, :handle_execute, [query, params], opts) do
   1365       {:ok, query, result, _conn_state} ->
   1366         {:ok, query, result, meter}
   1367 
   1368       {:ok, _, _} = other ->
   1369         bad_return!(other, conn, meter)
   1370 
   1371       other ->
   1372         handle_common_result(other, conn, meter)
   1373     end
   1374   end
   1375 
   1376   defp raised_close(conn, query, meter, opts, kind, reason, stack) do
   1377     with {:ok, _, meter} <- run_close(conn, [query], meter, opts) do
   1378       {kind, reason, stack, meter}
   1379     end
   1380   end
   1381 
   1382   defp run_close(conn, args, meter, opts) do
   1383     meter = event(meter, :close)
   1384     cleanup(conn, :handle_close, args, meter, opts)
   1385   end
   1386 
   1387   defp run_cleanup(%DBConnection{} = conn, fun, args, meter, opts) do
   1388     fun.(conn, args, meter, opts)
   1389   end
   1390 
   1391   defp run_cleanup(pool, fun, args, meter, opts) do
   1392     with {:ok, conn, meter} <- checkout(pool, meter, opts) do
   1393       try do
   1394         fun.(conn, args, meter, opts)
   1395       after
   1396         checkin(conn)
   1397       end
   1398     end
   1399   end
   1400 
   1401   defp cleanup(conn, fun, args, meter, opts) do
   1402     %DBConnection{pool_ref: pool_ref} = conn
   1403 
   1404     case Holder.cleanup(pool_ref, fun, args, opts) do
   1405       {:ok, result, _conn_state} ->
   1406         {:ok, result, meter}
   1407 
   1408       {:error, err, _conn_state} ->
   1409         {:error, err, meter}
   1410 
   1411       {:disconnect, err, _conn_state} ->
   1412         disconnect(conn, err)
   1413         {:error, err, meter}
   1414 
   1415       {:catch, kind, reason, stack} ->
   1416         stop(conn, kind, reason, stack)
   1417         {kind, reason, stack, meter}
   1418 
   1419       other ->
   1420         bad_return!(other, conn, meter)
   1421     end
   1422   end
   1423 
   1424   defp run(%DBConnection{} = conn, fun, meter, opts) do
   1425     fun.(conn, meter, opts)
   1426   end
   1427 
   1428   defp run(pool, fun, meter, opts) do
   1429     with {:ok, conn, meter} <- checkout(pool, meter, opts) do
   1430       try do
   1431         fun.(conn, meter, opts)
   1432       after
   1433         checkin(conn)
   1434       end
   1435     end
   1436   end
   1437 
   1438   defp run(%DBConnection{} = conn, fun, arg, meter, opts) do
   1439     fun.(conn, arg, meter, opts)
   1440   end
   1441 
   1442   defp run(pool, fun, arg, meter, opts) do
   1443     with {:ok, conn, meter} <- checkout(pool, meter, opts) do
   1444       try do
   1445         fun.(conn, arg, meter, opts)
   1446       after
   1447         checkin(conn)
   1448       end
   1449     end
   1450   end
   1451 
   1452   defp run(%DBConnection{} = conn, fun, arg1, arg2, meter, opts) do
   1453     fun.(conn, arg1, arg2, meter, opts)
   1454   end
   1455 
   1456   defp run(pool, fun, arg1, arg2, meter, opts) do
   1457     with {:ok, conn, meter} <- checkout(pool, meter, opts) do
   1458       try do
   1459         fun.(conn, arg1, arg2, meter, opts)
   1460       after
   1461         checkin(conn)
   1462       end
   1463     end
   1464   end
   1465 
   1466   defp meter(opts) do
   1467     case Keyword.get(opts, :log) do
   1468       nil -> nil
   1469       log -> {log, []}
   1470     end
   1471   end
   1472 
   1473   defp event(nil, _),
   1474     do: nil
   1475 
   1476   defp event({log, events}, event),
   1477     do: {log, [{event, System.monotonic_time()} | events]}
   1478 
   1479   defp past_event(nil, _, _),
   1480     do: nil
   1481 
   1482   defp past_event(log_events, _, nil),
   1483     do: log_events
   1484 
   1485   defp past_event({log, events}, event, time),
   1486     do: {log, [{event, time} | events]}
   1487 
   1488   defp log({:ok, res, meter}, call, query, params),
   1489     do: log(meter, call, query, params, {:ok, res})
   1490 
   1491   defp log({:ok, res1, res2, meter}, call, query, params),
   1492     do: log(meter, call, query, params, {:ok, res1, res2})
   1493 
   1494   defp log({ok, res, meter}, call, query, cursor) when ok in [:cont, :halt],
   1495     do: log(meter, call, query, cursor, {ok, res})
   1496 
   1497   defp log({:error, err, meter}, call, query, params),
   1498     do: log(meter, call, query, params, {:error, err})
   1499 
   1500   defp log({kind, reason, stack, meter}, call, query, params),
   1501     do: log(meter, call, query, params, {kind, reason, stack})
   1502 
   1503   defp log(nil, _, _, _, result),
   1504     do: log_result(result)
   1505 
   1506   defp log({log, times}, call, query, params, result) do
   1507     entry = DBConnection.LogEntry.new(call, query, params, times, entry_result(result))
   1508 
   1509     try do
   1510       log(log, entry)
   1511     catch
   1512       kind, reason ->
   1513         stack = __STACKTRACE__
   1514         log_raised(entry, kind, reason, stack)
   1515     end
   1516 
   1517     log_result(result)
   1518   end
   1519 
   1520   defp entry_result({kind, reason, stack})
   1521        when kind in [:error, :exit, :throw] do
   1522     msg = "an exception was raised: " <> Exception.format(kind, reason, stack)
   1523     {:error, %DBConnection.ConnectionError{message: msg}}
   1524   end
   1525 
   1526   defp entry_result({ok, res}) when ok in [:cont, :halt],
   1527     do: {:ok, res}
   1528 
   1529   defp entry_result(other), do: other
   1530 
   1531   defp log({mod, fun, args}, entry), do: apply(mod, fun, [entry | args])
   1532   defp log(fun, entry), do: fun.(entry)
   1533 
   1534   defp log_result({kind, reason, stack}) when kind in [:error, :exit, :throw] do
   1535     :erlang.raise(kind, reason, stack)
   1536   end
   1537 
   1538   defp log_result(other), do: other
   1539 
   1540   defp log_raised(entry, kind, reason, stack) do
   1541     reason = Exception.normalize(kind, reason, stack)
   1542 
   1543     Logger.error(
   1544       fn ->
   1545         "an exception was raised logging #{inspect(entry)}: " <>
   1546           Exception.format(kind, reason, stack)
   1547       end,
   1548       crash_reason: {crash_reason(kind, reason), stack}
   1549     )
   1550   catch
   1551     _, _ ->
   1552       :ok
   1553   end
   1554 
   1555   defp crash_reason(:throw, value), do: {:nocatch, value}
   1556   defp crash_reason(_, value), do: value
   1557 
   1558   defp run_transaction(conn, fun, run, opts) do
   1559     %DBConnection{conn_ref: conn_ref} = conn
   1560 
   1561     try do
   1562       result = fun.(%{conn | conn_mode: :transaction})
   1563       conclude(conn, result)
   1564     catch
   1565       :throw, {__MODULE__, ^conn_ref, reason} ->
   1566         reset(conn)
   1567 
   1568         case rollback(conn, run, opts) do
   1569           {:ok, _} ->
   1570             {:error, reason}
   1571 
   1572           {:error, %DBConnection.TransactionError{}} ->
   1573             {:error, reason}
   1574 
   1575           {:error, %DBConnection.ConnectionError{}} ->
   1576             {:error, reason}
   1577 
   1578           {:error, err} ->
   1579             raise err
   1580         end
   1581 
   1582       kind, reason ->
   1583         stack = __STACKTRACE__
   1584         reset(conn)
   1585         _ = rollback(conn, run, opts)
   1586         :erlang.raise(kind, reason, stack)
   1587     else
   1588       result ->
   1589         case commit(conn, run, opts) do
   1590           {:ok, _} ->
   1591             {:ok, result}
   1592 
   1593           {:error, %DBConnection.TransactionError{}} ->
   1594             {:error, :rollback}
   1595 
   1596           {:error, err} ->
   1597             raise err
   1598         end
   1599     after
   1600       reset(conn)
   1601     end
   1602   end
   1603 
   1604   defp fail(%DBConnection{pool_ref: pool_ref}) do
   1605     case Holder.status?(pool_ref, :ok) do
   1606       true -> Holder.put_status(pool_ref, :aborted)
   1607       false -> :ok
   1608     end
   1609   end
   1610 
   1611   defp conclude(%DBConnection{pool_ref: pool_ref, conn_ref: conn_ref}, result) do
   1612     case Holder.status?(pool_ref, :ok) do
   1613       true -> result
   1614       false -> throw({__MODULE__, conn_ref, :rollback})
   1615     end
   1616   end
   1617 
   1618   defp reset(%DBConnection{pool_ref: pool_ref}) do
   1619     case Holder.status?(pool_ref, :aborted) do
   1620       true -> Holder.put_status(pool_ref, :ok)
   1621       false -> :ok
   1622     end
   1623   end
   1624 
   1625   defp begin(conn, run, opts) do
   1626     conn
   1627     |> run.(&run_begin/3, meter(opts), opts)
   1628     |> log(:begin, :begin, nil)
   1629   end
   1630 
   1631   defp run_begin(conn, meter, opts) do
   1632     %DBConnection{pool_ref: pool_ref} = conn
   1633     meter = event(meter, :begin)
   1634 
   1635     case Holder.handle(pool_ref, :handle_begin, [], opts) do
   1636       {status, _conn_state} when status in [:idle, :transaction, :error] ->
   1637         status_disconnect(conn, status, meter)
   1638 
   1639       other ->
   1640         handle_common_result(other, conn, meter)
   1641     end
   1642   end
   1643 
   1644   defp rollback(conn, run, opts) do
   1645     conn
   1646     |> run.(&run_rollback/3, meter(opts), opts)
   1647     |> log(:rollback, :rollback, nil)
   1648   end
   1649 
   1650   defp run_rollback(conn, meter, opts) do
   1651     %DBConnection{pool_ref: pool_ref} = conn
   1652     meter = event(meter, :rollback)
   1653 
   1654     case Holder.handle(pool_ref, :handle_rollback, [], opts) do
   1655       {status, _conn_state} when status in [:idle, :transaction, :error] ->
   1656         status_disconnect(conn, status, meter)
   1657 
   1658       other ->
   1659         handle_common_result(other, conn, meter)
   1660     end
   1661   end
   1662 
   1663   defp commit(conn, run, opts) do
   1664     case run.(conn, &run_commit/3, meter(opts), opts) do
   1665       {:rollback, {:ok, result, meter}} ->
   1666         log(meter, :commit, :rollback, nil, {:ok, result})
   1667         err = DBConnection.TransactionError.exception(:error)
   1668         {:error, err}
   1669 
   1670       {query, other} ->
   1671         log(other, :commit, query, nil)
   1672 
   1673       {:error, err, meter} ->
   1674         log(meter, :commit, :commit, nil, {:error, err})
   1675 
   1676       {kind, reason, stack, meter} ->
   1677         log(meter, :commit, :commit, nil, {kind, reason, stack})
   1678     end
   1679   end
   1680 
   1681   defp run_commit(conn, meter, opts) do
   1682     %DBConnection{pool_ref: pool_ref} = conn
   1683     meter = event(meter, :commit)
   1684 
   1685     case Holder.handle(pool_ref, :handle_commit, [], opts) do
   1686       {:error, _conn_state} ->
   1687         {:rollback, run_rollback(conn, meter, opts)}
   1688 
   1689       {status, _conn_state} when status in [:idle, :transaction] ->
   1690         {:commit, status_disconnect(conn, status, meter)}
   1691 
   1692       other ->
   1693         {:commit, handle_common_result(other, conn, meter)}
   1694     end
   1695   end
   1696 
   1697   defp status_disconnect(conn, status, meter) do
   1698     err = DBConnection.TransactionError.exception(status)
   1699     disconnect(conn, err)
   1700     {:error, err, meter}
   1701   end
   1702 
   1703   defp run_status(conn, meter, opts) do
   1704     %DBConnection{pool_ref: pool_ref} = conn
   1705 
   1706     case Holder.handle(pool_ref, :handle_status, [], opts) do
   1707       {status, _conn_state} when status in [:idle, :transaction, :error] ->
   1708         {status, meter}
   1709 
   1710       {:disconnect, err, _conn_state} ->
   1711         disconnect(conn, err)
   1712         {:error, err, meter}
   1713 
   1714       {:catch, kind, reason, stack} ->
   1715         stop(conn, kind, reason, stack)
   1716         {kind, reason, stack, meter}
   1717 
   1718       other ->
   1719         bad_return!(other, conn, meter)
   1720     end
   1721   end
   1722 
   1723   defp run_prepare_declare(conn, query, params, meter, opts) do
   1724     with {:ok, query, meter} <- prepare(conn, query, meter, opts),
   1725          {:ok, query, meter} <- describe(conn, query, meter, opts),
   1726          {:ok, params, meter} <- encode(conn, query, params, meter, opts),
   1727          {:ok, query, cursor, meter} <- run_declare(conn, query, params, meter, opts) do
   1728       {:ok, query, cursor, meter}
   1729     end
   1730   end
   1731 
   1732   defp run_declare(conn, query, params, meter, opts) do
   1733     %DBConnection{pool_ref: pool_ref} = conn
   1734     meter = event(meter, :declare)
   1735 
   1736     case Holder.handle(pool_ref, :handle_declare, [query, params], opts) do
   1737       {:ok, query, result, _conn_state} ->
   1738         {:ok, query, result, meter}
   1739 
   1740       {:ok, _, _} = other ->
   1741         bad_return!(other, conn, meter)
   1742 
   1743       other ->
   1744         handle_common_result(other, conn, meter)
   1745     end
   1746   end
   1747 
   1748   defp stream_fetch(conn, {:cont, query, cursor}, opts) do
   1749     conn
   1750     |> run(&run_stream_fetch/4, [query, cursor], meter(opts), opts)
   1751     |> log(:fetch, query, cursor)
   1752     |> case do
   1753       {ok, result} when ok in [:cont, :halt] ->
   1754         {[result], {ok, query, cursor}}
   1755 
   1756       {:error, err} ->
   1757         raise err
   1758     end
   1759   end
   1760 
   1761   defp stream_fetch(_, {:halt, _, _} = state, _) do
   1762     {:halt, state}
   1763   end
   1764 
   1765   defp run_stream_fetch(conn, args, meter, opts) do
   1766     [query, _] = args
   1767 
   1768     with {ok, result, meter} when ok in [:cont, :halt] <- run_fetch(conn, args, meter, opts),
   1769          {:ok, result, meter} <- decode(query, result, meter, opts) do
   1770       {ok, result, meter}
   1771     end
   1772   end
   1773 
   1774   defp run_fetch(conn, args, meter, opts) do
   1775     %DBConnection{pool_ref: pool_ref} = conn
   1776     meter = event(meter, :fetch)
   1777 
   1778     case Holder.handle(pool_ref, :handle_fetch, args, opts) do
   1779       {:cont, result, _conn_state} ->
   1780         {:cont, result, meter}
   1781 
   1782       {:halt, result, _conn_state} ->
   1783         {:halt, result, meter}
   1784 
   1785       other ->
   1786         handle_common_result(other, conn, meter)
   1787     end
   1788   end
   1789 
   1790   defp stream_deallocate(conn, {_status, query, cursor}, opts),
   1791     do: deallocate(conn, query, cursor, opts)
   1792 
   1793   defp run_deallocate(conn, args, meter, opts) do
   1794     meter = event(meter, :deallocate)
   1795     cleanup(conn, :handle_deallocate, args, meter, opts)
   1796   end
   1797 
   1798   defp resource(%DBConnection{} = conn, start, next, stop, opts) do
   1799     start = fn -> start.(conn, opts) end
   1800     next = fn state -> next.(conn, state, opts) end
   1801     stop = fn state -> stop.(conn, state, opts) end
   1802     Stream.resource(start, next, stop)
   1803   end
   1804 end