zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

sql.ex (40713B)


      1 defmodule Ecto.Adapters.SQL do
      2   @moduledoc ~S"""
      3   This application provides functionality for working with
      4   SQL databases in `Ecto`.
      5 
      6   ## Built-in adapters
      7 
      8   By default, we support the following adapters:
      9 
     10     * `Ecto.Adapters.Postgres` for Postgres
     11     * `Ecto.Adapters.MyXQL` for MySQL
     12     * `Ecto.Adapters.Tds` for SQLServer
     13 
     14   ## Additional functions
     15 
     16   If your `Ecto.Repo` is backed by any of the SQL adapters above,
     17   this module will inject additional functions into your repository:
     18 
     19     * `disconnect_all(interval, options \\ [])` -
     20        shortcut for `Ecto.Adapters.SQL.disconnect_all/3`
     21 
     22     * `explain(type, query, options \\ [])` -
     23        shortcut for `Ecto.Adapters.SQL.explain/4`
     24 
     25     * `query(sql, params, options \\ [])` -
     26        shortcut for `Ecto.Adapters.SQL.query/4`
     27 
     28     * `query!(sql, params, options \\ [])` -
     29        shortcut for `Ecto.Adapters.SQL.query!/4`
     30 
     31     * `query_many(sql, params, options \\ [])` -
     32        shortcut for `Ecto.Adapters.SQL.query_many/4`
     33 
     34     * `query_many!(sql, params, options \\ [])` -
     35        shortcut for `Ecto.Adapters.SQL.query_many!/4`
     36 
     37     * `to_sql(type, query)` -
     38        shortcut for `Ecto.Adapters.SQL.to_sql/3`
     39 
     40   Generally speaking, you must invoke those functions directly from
     41   your repository, for example: `MyApp.Repo.query("SELECT true")`.
     42   You can also invoke them directly from `Ecto.Adapters.SQL`, but
     43   keep in mind that in such cases features such as "dynamic repositories"
     44   won't be available.
     45 
     46   ## Migrations
     47 
     48   `ecto_sql` supports database migrations. You can generate a migration
     49   with:
     50 
     51       $ mix ecto.gen.migration create_posts
     52 
     53   This will create a new file inside `priv/repo/migrations` with the
     54   `change` function. Check `Ecto.Migration` for more information.
     55 
     56   To interface with migrations, developers typically use mix tasks:
     57 
     58     * `mix ecto.migrations` - lists all available migrations and their status
     59     * `mix ecto.migrate` - runs a migration
     60     * `mix ecto.rollback` - rolls back a previously run migration
     61 
     62   If you want to run migrations programmatically, see `Ecto.Migrator`.
     63 
     64   ## SQL sandbox
     65 
     66   `ecto_sql` provides a sandbox for testing. The sandbox wraps each
     67   test in a transaction, making sure the tests are isolated and can
     68   run concurrently. See `Ecto.Adapters.SQL.Sandbox` for more information.
     69 
     70   ## Structure load and dumping
     71 
     72   If you have an existing database, you may want to dump its existing
     73   structure and make it reproducible from within Ecto. This can be
     74   achieved with two Mix tasks:
     75 
     76     * `mix ecto.load` - loads an existing structure into the database
     77     * `mix ecto.dump` - dumps the existing database structure to the filesystem
     78 
     79   For creating and dropping databases, see `mix ecto.create`
     80   and `mix ecto.drop` that are included as part of Ecto.
     81 
     82   ## Custom adapters
     83 
     84   Developers can implement their own SQL adapters by using
     85   `Ecto.Adapters.SQL` and by implementing the callbacks required
     86   by `Ecto.Adapters.SQL.Connection`  for handling connections and
     87   performing queries. The connection handling and pooling for SQL
     88   adapters should be built using the `DBConnection` library.
     89 
     90   When using `Ecto.Adapters.SQL`, the following options are required:
     91 
     92     * `:driver` (required) - the database driver library.
     93       For example: `:postgrex`
     94 
     95   """
     96 
     97   require Logger
     98 
     99   @doc false
    100   defmacro __using__(opts) do
    101     quote do
    102       @behaviour Ecto.Adapter
    103       @behaviour Ecto.Adapter.Migration
    104       @behaviour Ecto.Adapter.Queryable
    105       @behaviour Ecto.Adapter.Schema
    106       @behaviour Ecto.Adapter.Transaction
    107 
    108       opts = unquote(opts)
    109       @conn __MODULE__.Connection
    110       @driver Keyword.fetch!(opts, :driver)
    111 
    112       @impl true
    113       defmacro __before_compile__(env) do
    114         Ecto.Adapters.SQL.__before_compile__(@driver, env)
    115       end
    116 
    117       @impl true
    118       def ensure_all_started(config, type) do
    119         Ecto.Adapters.SQL.ensure_all_started(@driver, config, type)
    120       end
    121 
    122       @impl true
    123       def init(config) do
    124         Ecto.Adapters.SQL.init(@conn, @driver, config)
    125       end
    126 
    127       @impl true
    128       def checkout(meta, opts, fun) do
    129         Ecto.Adapters.SQL.checkout(meta, opts, fun)
    130       end
    131 
    132       @impl true
    133       def checked_out?(meta) do
    134         Ecto.Adapters.SQL.checked_out?(meta)
    135       end
    136 
    137       @impl true
    138       def loaders({:map, _}, type),   do: [&Ecto.Type.embedded_load(type, &1, :json)]
    139       def loaders(:binary_id, type),  do: [Ecto.UUID, type]
    140       def loaders(_, type),           do: [type]
    141 
    142       @impl true
    143       def dumpers({:map, _}, type),   do: [&Ecto.Type.embedded_dump(type, &1, :json)]
    144       def dumpers(:binary_id, type),  do: [type, Ecto.UUID]
    145       def dumpers(_, type),           do: [type]
    146 
    147       ## Query
    148 
    149       @impl true
    150       def prepare(:all, query) do
    151         {:cache, {System.unique_integer([:positive]), IO.iodata_to_binary(@conn.all(query))}}
    152       end
    153 
    154       def prepare(:update_all, query) do
    155         {:cache, {System.unique_integer([:positive]), IO.iodata_to_binary(@conn.update_all(query))}}
    156       end
    157 
    158       def prepare(:delete_all, query) do
    159         {:cache, {System.unique_integer([:positive]), IO.iodata_to_binary(@conn.delete_all(query))}}
    160       end
    161 
    162       @impl true
    163       def execute(adapter_meta, query_meta, query, params, opts) do
    164         Ecto.Adapters.SQL.execute(:named, adapter_meta, query_meta, query, params, opts)
    165       end
    166 
    167       @impl true
    168       def stream(adapter_meta, query_meta, query, params, opts) do
    169         Ecto.Adapters.SQL.stream(adapter_meta, query_meta, query, params, opts)
    170       end
    171 
    172       ## Schema
    173 
    174       @impl true
    175       def autogenerate(:id),        do: nil
    176       def autogenerate(:embed_id),  do: Ecto.UUID.generate()
    177       def autogenerate(:binary_id), do: Ecto.UUID.bingenerate()
    178 
    179       @impl true
    180       def insert_all(adapter_meta, schema_meta, header, rows, on_conflict, returning, placeholders, opts) do
    181         Ecto.Adapters.SQL.insert_all(adapter_meta, schema_meta, @conn, header, rows, on_conflict, returning, placeholders, opts)
    182       end
    183 
    184       @impl true
    185       def insert(adapter_meta, %{source: source, prefix: prefix}, params,
    186                  {kind, conflict_params, _} = on_conflict, returning, opts) do
    187         {fields, values} = :lists.unzip(params)
    188         sql = @conn.insert(prefix, source, fields, [fields], on_conflict, returning, [])
    189         Ecto.Adapters.SQL.struct(adapter_meta, @conn, sql, :insert, source, [], values ++ conflict_params, kind, returning, opts)
    190       end
    191 
    192       @impl true
    193       def update(adapter_meta, %{source: source, prefix: prefix}, fields, params, returning, opts) do
    194         {fields, field_values} = :lists.unzip(fields)
    195         filter_values = Keyword.values(params)
    196         sql = @conn.update(prefix, source, fields, params, returning)
    197         Ecto.Adapters.SQL.struct(adapter_meta, @conn, sql, :update, source, params, field_values ++ filter_values, :raise, returning, opts)
    198       end
    199 
    200       @impl true
    201       def delete(adapter_meta, %{source: source, prefix: prefix}, params, opts) do
    202         filter_values = Keyword.values(params)
    203         sql = @conn.delete(prefix, source, params, [])
    204         Ecto.Adapters.SQL.struct(adapter_meta, @conn, sql, :delete, source, params, filter_values, :raise, [], opts)
    205       end
    206 
    207       ## Transaction
    208 
    209       @impl true
    210       def transaction(meta, opts, fun) do
    211         Ecto.Adapters.SQL.transaction(meta, opts, fun)
    212       end
    213 
    214       @impl true
    215       def in_transaction?(meta) do
    216         Ecto.Adapters.SQL.in_transaction?(meta)
    217       end
    218 
    219       @impl true
    220       def rollback(meta, value) do
    221         Ecto.Adapters.SQL.rollback(meta, value)
    222       end
    223 
    224       ## Migration
    225 
    226       @impl true
    227       def execute_ddl(meta, definition, opts) do
    228         Ecto.Adapters.SQL.execute_ddl(meta, @conn, definition, opts)
    229       end
    230 
    231       defoverridable [prepare: 2, execute: 5, insert: 6, update: 6, delete: 4, insert_all: 8,
    232                       execute_ddl: 3, loaders: 2, dumpers: 2, autogenerate: 1,
    233                       ensure_all_started: 2, __before_compile__: 1]
    234     end
    235   end
    236 
    237   @doc """
    238   Converts the given query to SQL according to its kind and the
    239   adapter in the given repository.
    240 
    241   ## Examples
    242 
    243   The examples below are meant for reference. Each adapter will
    244   return a different result:
    245 
    246       iex> Ecto.Adapters.SQL.to_sql(:all, Repo, Post)
    247       {"SELECT p.id, p.title, p.inserted_at, p.created_at FROM posts as p", []}
    248 
    249       iex> Ecto.Adapters.SQL.to_sql(:update_all, Repo,
    250                                     from(p in Post, update: [set: [title: ^"hello"]]))
    251       {"UPDATE posts AS p SET title = $1", ["hello"]}
    252 
    253   This function is also available under the repository with name `to_sql`:
    254 
    255       iex> Repo.to_sql(:all, Post)
    256       {"SELECT p.id, p.title, p.inserted_at, p.created_at FROM posts as p", []}
    257 
    258   """
    259   @spec to_sql(:all | :update_all | :delete_all, Ecto.Repo.t, Ecto.Queryable.t) ::
    260                {String.t, [term]}
    261   def to_sql(kind, repo, queryable) do
    262     case Ecto.Adapter.Queryable.prepare_query(kind, repo, queryable) do
    263       {{:cached, _update, _reset, {_id, cached}}, params} ->
    264         {String.Chars.to_string(cached), params}
    265 
    266       {{:cache, _update, {_id, prepared}}, params} ->
    267         {prepared, params}
    268 
    269       {{:nocache, {_id, prepared}}, params} ->
    270         {prepared, params}
    271     end
    272   end
    273 
    274   @doc """
    275   Executes an EXPLAIN statement or similar for the given query according to its kind and the
    276   adapter in the given repository.
    277 
    278   ## Examples
    279 
    280       # Postgres
    281       iex> Ecto.Adapters.SQL.explain(Repo, :all, Post)
    282       "Seq Scan on posts p0  (cost=0.00..12.12 rows=1 width=443)"
    283 
    284       # MySQL
    285       iex> Ecto.Adapters.SQL.explain(Repo, :all, from(p in Post, where: p.title == "title")) |> IO.puts()
    286       +----+-------------+-------+------------+------+---------------+------+---------+------+------+----------+-------------+
    287       | id | select_type | table | partitions | type | possible_keys | key  | key_len | ref  | rows | filtered | Extra       |
    288       +----+-------------+-------+------------+------+---------------+------+---------+------+------+----------+-------------+
    289       |  1 | SIMPLE      | p0    | NULL       | ALL  | NULL          | NULL | NULL    | NULL |    1 |    100.0 | Using where |
    290       +----+-------------+-------+------------+------+---------------+------+---------+------+------+----------+-------------+
    291 
    292       # Shared opts
    293       iex> Ecto.Adapters.SQL.explain(Repo, :all, Post, analyze: true, timeout: 20_000)
    294       "Seq Scan on posts p0  (cost=0.00..11.70 rows=170 width=443) (actual time=0.013..0.013 rows=0 loops=1)\\nPlanning Time: 0.031 ms\\nExecution Time: 0.021 ms"
    295 
    296   It's safe to execute it for updates and deletes, no data change will be committed:
    297 
    298       iex> Ecto.Adapters.SQL.explain(Repo, :update_all, from(p in Post, update: [set: [title: "new title"]]))
    299       "Update on posts p0  (cost=0.00..11.70 rows=170 width=449)\\n  ->  Seq Scan on posts p0  (cost=0.00..11.70 rows=170 width=449)"
    300 
    301   This function is also available under the repository with name `explain`:
    302 
    303       iex> Repo.explain(:all, from(p in Post, where: p.title == "title"))
    304       "Seq Scan on posts p0  (cost=0.00..12.12 rows=1 width=443)\\n  Filter: ((title)::text = 'title'::text)"
    305 
    306   ### Options
    307 
    308   Built-in adapters support passing `opts` to the EXPLAIN statement according to the following:
    309 
    310   Adapter          | Supported opts
    311   ---------------- | --------------
    312   Postgrex         | `analyze`, `verbose`, `costs`, `settings`, `buffers`, `timing`, `summary`
    313   MyXQL            | None
    314 
    315   _Postgrex_: Check [PostgreSQL doc](https://www.postgresql.org/docs/current/sql-explain.html)
    316   for version compatibility.
    317 
    318   Also note that:
    319 
    320     * Currently `:map`, `:yaml`, and `:text` format options are supported
    321       for PostgreSQL. `:map` is the deserialized JSON encoding. The last two
    322       options return the result as a string;
    323 
    324     * Any other value passed to `opts` will be forwarded to the underlying
    325       adapter query function, including Repo shared options such as `:timeout`;
    326 
    327     * Non built-in adapters may have specific behavior and you should consult
    328       their own documentation.
    329 
    330   """
    331   @spec explain(pid() | Ecto.Repo.t | Ecto.Adapter.adapter_meta,
    332                 :all | :update_all | :delete_all,
    333                 Ecto.Queryable.t, opts :: Keyword.t) :: String.t | Exception.t
    334   def explain(repo, operation, queryable, opts \\ [])
    335 
    336   def explain(repo, operation, queryable, opts) when is_atom(repo) or is_pid(repo) do
    337     explain(Ecto.Adapter.lookup_meta(repo), operation, queryable, opts)
    338   end
    339 
    340   def explain(%{repo: repo} = adapter_meta, operation, queryable, opts) do
    341     Ecto.Multi.new()
    342     |> Ecto.Multi.run(:explain, fn _, _ ->
    343       {prepared, prepared_params} = to_sql(operation, repo, queryable)
    344       sql_call(adapter_meta, :explain_query, [prepared], prepared_params, opts)
    345     end)
    346      |> Ecto.Multi.run(:rollback, fn _, _ ->
    347        {:error, :forced_rollback}
    348      end)
    349      |> repo.transaction(opts)
    350      |> case do
    351        {:error, :rollback, :forced_rollback, %{explain: result}} -> result
    352        {:error, :explain, error, _} -> raise error
    353        _ -> raise "unable to execute explain"
    354      end
    355   end
    356 
    357   @doc """
    358   Forces all connections in the repo pool to disconnect within the given interval.
    359 
    360   Once this function is called, the pool will disconnect all of its connections
    361   as they are checked in or as they are pinged. Checked in connections will be
    362   randomly disconnected within the given time interval. Pinged connections are
    363   immediately disconnected - as they are idle (according to `:idle_interval`).
    364 
    365   If the connection has a backoff configured (which is the case by default),
    366   disconnecting means an attempt at a new connection will be done immediately
    367   after, without starting a new process for each connection. However, if backoff
    368   has been disabled, the connection process will terminate. In such cases,
    369   disconnecting all connections may cause the pool supervisor to restart
    370   depending on the max_restarts/max_seconds configuration of the pool,
    371   so you will want to set those carefully.
    372 
    373   For convenience, this function is also available in the repository:
    374 
    375       iex> MyRepo.disconnect_all(60_000)
    376       :ok
    377   """
    378   @spec disconnect_all(pid | Ecto.Repo.t | Ecto.Adapter.adapter_meta, non_neg_integer, opts :: Keyword.t()) :: :ok
    379   def disconnect_all(repo, interval, opts \\ [])
    380 
    381   def disconnect_all(repo, interval, opts) when is_atom(repo) or is_pid(repo) do
    382     disconnect_all(Ecto.Adapter.lookup_meta(repo), interval, opts)
    383   end
    384 
    385   def disconnect_all(%{pid: pid} = _adapter_meta, interval, opts) do
    386     DBConnection.disconnect_all(pid, interval, opts)
    387   end
    388 
    389   @doc """
    390   Returns a stream that runs a custom SQL query on given repo when reduced.
    391 
    392   In case of success it is a enumerable containing maps with at least two keys:
    393 
    394     * `:num_rows` - the number of rows affected
    395 
    396     * `:rows` - the result set as a list. `nil` may be returned
    397       instead of the list if the command does not yield any row
    398       as result (but still yields the number of affected rows,
    399       like a `delete` command without returning would)
    400 
    401   In case of failure it raises an exception.
    402 
    403   If the adapter supports a collectable stream, the stream may also be used as
    404   the collectable in `Enum.into/3`. Behaviour depends on the adapter.
    405 
    406   ## Options
    407 
    408     * `:log` - When false, does not log the query
    409     * `:max_rows` - The number of rows to load from the database as we stream
    410 
    411   ## Examples
    412 
    413       iex> Ecto.Adapters.SQL.stream(MyRepo, "SELECT $1::integer + $2", [40, 2]) |> Enum.to_list()
    414       [%{rows: [[42]], num_rows: 1}]
    415 
    416   """
    417   @spec stream(Ecto.Repo.t, String.t, [term], Keyword.t) :: Enum.t
    418   def stream(repo, sql, params \\ [], opts \\ []) do
    419     repo
    420     |> Ecto.Adapter.lookup_meta()
    421     |> Ecto.Adapters.SQL.Stream.build(sql, params, opts)
    422   end
    423 
    424   @doc """
    425   Same as `query/4` but raises on invalid queries.
    426   """
    427   @spec query!(pid() | Ecto.Repo.t | Ecto.Adapter.adapter_meta, iodata, [term], Keyword.t) ::
    428                %{:rows => nil | [[term] | binary],
    429                  :num_rows => non_neg_integer,
    430                  optional(atom) => any}
    431   def query!(repo, sql, params \\ [], opts \\ []) do
    432     case query(repo, sql, params, opts) do
    433       {:ok, result} -> result
    434       {:error, err} -> raise_sql_call_error err
    435     end
    436   end
    437 
    438   @doc """
    439   Runs a custom SQL query on the given repo.
    440 
    441   In case of success, it must return an `:ok` tuple containing
    442   a map with at least two keys:
    443 
    444     * `:num_rows` - the number of rows affected
    445 
    446     * `:rows` - the result set as a list. `nil` may be returned
    447       instead of the list if the command does not yield any row
    448       as result (but still yields the number of affected rows,
    449       like a `delete` command without returning would)
    450 
    451   ## Options
    452 
    453     * `:log` - When false, does not log the query
    454 
    455   ## Examples
    456 
    457       iex> Ecto.Adapters.SQL.query(MyRepo, "SELECT $1::integer + $2", [40, 2])
    458       {:ok, %{rows: [[42]], num_rows: 1}}
    459 
    460   For convenience, this function is also available under the repository:
    461 
    462       iex> MyRepo.query("SELECT $1::integer + $2", [40, 2])
    463       {:ok, %{rows: [[42]], num_rows: 1}}
    464 
    465   """
    466   @spec query(pid() | Ecto.Repo.t | Ecto.Adapter.adapter_meta, iodata, [term], Keyword.t) ::
    467               {:ok, %{:rows => nil | [[term] | binary],
    468                       :num_rows => non_neg_integer,
    469                       optional(atom) => any}}
    470               | {:error, Exception.t}
    471   def query(repo, sql, params \\ [], opts \\ [])
    472 
    473   def query(repo, sql, params, opts) when is_atom(repo) or is_pid(repo) do
    474     query(Ecto.Adapter.lookup_meta(repo), sql, params, opts)
    475   end
    476 
    477   def query(adapter_meta, sql, params, opts) do
    478     sql_call(adapter_meta, :query, [sql], params, opts)
    479   end
    480 
    481   @doc """
    482   Same as `query_many/4` but raises on invalid queries.
    483   """
    484   @spec query_many!(Ecto.Repo.t | Ecto.Adapter.adapter_meta, iodata, [term], Keyword.t) ::
    485                [%{:rows => nil | [[term] | binary],
    486                  :num_rows => non_neg_integer,
    487                  optional(atom) => any}]
    488   def query_many!(repo, sql, params \\ [], opts \\ []) do
    489     case query_many(repo, sql, params, opts) do
    490       {:ok, result} -> result
    491       {:error, err} -> raise_sql_call_error err
    492     end
    493   end
    494 
    495   @doc """
    496   Runs a custom SQL query that returns multiple results on the given repo.
    497 
    498   In case of success, it must return an `:ok` tuple containing
    499   a list of maps with at least two keys:
    500 
    501     * `:num_rows` - the number of rows affected
    502 
    503     * `:rows` - the result set as a list. `nil` may be returned
    504       instead of the list if the command does not yield any row
    505       as result (but still yields the number of affected rows,
    506       like a `delete` command without returning would)
    507 
    508   ## Options
    509 
    510     * `:log` - When false, does not log the query
    511 
    512   ## Examples
    513 
    514       iex> Ecto.Adapters.SQL.query_many(MyRepo, "SELECT $1; SELECT $2;", [40, 2])
    515       {:ok, [%{rows: [[40]], num_rows: 1}, %{rows: [[2]], num_rows: 1}]}
    516 
    517   For convenience, this function is also available under the repository:
    518 
    519       iex> MyRepo.query_many(SELECT $1; SELECT $2;", [40, 2])
    520       {:ok, [%{rows: [[40]], num_rows: 1}, %{rows: [[2]], num_rows: 1}]}
    521 
    522   """
    523   @spec query_many(pid() | Ecto.Repo.t | Ecto.Adapter.adapter_meta, iodata, [term], Keyword.t) ::
    524               {:ok, [%{:rows => nil | [[term] | binary],
    525                       :num_rows => non_neg_integer,
    526                       optional(atom) => any}]}
    527               | {:error, Exception.t}
    528   def query_many(repo, sql, params \\ [], opts \\ [])
    529 
    530   def query_many(repo, sql, params, opts) when is_atom(repo) or is_pid(repo) do
    531     query_many(Ecto.Adapter.lookup_meta(repo), sql, params, opts)
    532   end
    533 
    534   def query_many(adapter_meta, sql, params, opts) do
    535     sql_call(adapter_meta, :query_many, [sql], params, opts)
    536   end
    537 
    538   defp sql_call(adapter_meta, callback, args, params, opts) do
    539     %{pid: pool, telemetry: telemetry, sql: sql, opts: default_opts} = adapter_meta
    540     conn = get_conn_or_pool(pool)
    541     opts = with_log(telemetry, params, opts ++ default_opts)
    542     args = args ++ [params, opts]
    543     apply(sql, callback, [conn | args])
    544   end
    545 
    546   defp put_source(opts, %{sources: sources}) when is_binary(elem(elem(sources, 0), 0)) do
    547     {source, _, _} = elem(sources, 0)
    548     [source: source] ++ opts
    549   end
    550 
    551   defp put_source(opts, _) do
    552     opts
    553   end
    554 
    555   @doc """
    556   Check if the given `table` exists.
    557 
    558   Returns `true` if the `table` exists in the `repo`, otherwise `false`.
    559   The table is checked against the current database/schema in the connection.
    560   """
    561   @spec table_exists?(Ecto.Repo.t, table :: String.t) :: boolean
    562   def table_exists?(repo, table) when is_atom(repo) do
    563     %{sql: sql} = adapter_meta = Ecto.Adapter.lookup_meta(repo)
    564     {query, params} = sql.table_exists_query(table)
    565     query!(adapter_meta, query, params, []).num_rows != 0
    566   end
    567 
    568   # Returns a formatted table for a given query `result`.
    569   #
    570   # ## Examples
    571   #
    572   #     iex> Ecto.Adapters.SQL.format_table(query) |> IO.puts()
    573   #     +---------------+---------+--------+
    574   #     | title         | counter | public |
    575   #     +---------------+---------+--------+
    576   #     | My Post Title |       1 | NULL   |
    577   #     +---------------+---------+--------+
    578   @doc false
    579   @spec format_table(%{:columns => [String.t] | nil, :rows => [term()] | nil, optional(atom) => any()}) :: String.t
    580   def format_table(result)
    581 
    582   def format_table(nil), do: ""
    583   def format_table(%{columns: nil}), do: ""
    584   def format_table(%{columns: []}), do: ""
    585   def format_table(%{columns: columns, rows: nil}), do: format_table(%{columns: columns, rows: []})
    586 
    587   def format_table(%{columns: columns, rows: rows}) do
    588     column_widths =
    589       [columns | rows]
    590       |> List.zip()
    591       |> Enum.map(&Tuple.to_list/1)
    592       |> Enum.map(fn column_with_rows ->
    593         column_with_rows |> Enum.map(&binary_length/1) |> Enum.max()
    594       end)
    595 
    596     [
    597       separator(column_widths),
    598       "\n",
    599       cells(columns, column_widths),
    600       "\n",
    601       separator(column_widths),
    602       "\n",
    603       Enum.map(rows, &cells(&1, column_widths) ++ ["\n"]),
    604       separator(column_widths)
    605     ]
    606     |> IO.iodata_to_binary()
    607   end
    608 
    609   defp binary_length(nil), do: 4 # NULL
    610   defp binary_length(binary) when is_binary(binary), do: String.length(binary)
    611   defp binary_length(other), do: other |> inspect() |> String.length()
    612 
    613   defp separator(widths) do
    614     Enum.map(widths, & [?+, ?-, String.duplicate("-", &1), ?-]) ++ [?+]
    615   end
    616 
    617   defp cells(items, widths) do
    618     cell =
    619       [items, widths]
    620       |> List.zip()
    621       |> Enum.map(fn {item, width} -> [?|, " ", format_item(item, width) , " "] end)
    622 
    623     [cell | [?|]]
    624   end
    625 
    626   defp format_item(nil, width), do: String.pad_trailing("NULL", width)
    627   defp format_item(item, width) when is_binary(item), do: String.pad_trailing(item, width)
    628   defp format_item(item, width) when is_number(item), do: item |> inspect() |> String.pad_leading(width)
    629   defp format_item(item, width), do: item |> inspect() |> String.pad_trailing(width)
    630 
    631   ## Callbacks
    632 
    633   @doc false
    634   def __before_compile__(_driver, _env) do
    635     quote do
    636       @doc """
    637       A convenience function for SQL-based repositories that executes the given query.
    638 
    639       See `Ecto.Adapters.SQL.query/4` for more information.
    640       """
    641       def query(sql, params \\ [], opts \\ []) do
    642         Ecto.Adapters.SQL.query(get_dynamic_repo(), sql, params, opts)
    643       end
    644 
    645       @doc """
    646       A convenience function for SQL-based repositories that executes the given query.
    647 
    648       See `Ecto.Adapters.SQL.query!/4` for more information.
    649       """
    650       def query!(sql, params \\ [], opts \\ []) do
    651         Ecto.Adapters.SQL.query!(get_dynamic_repo(), sql, params, opts)
    652       end
    653 
    654       @doc """
    655       A convenience function for SQL-based repositories that executes the given multi-result query.
    656 
    657       See `Ecto.Adapters.SQL.query_many/4` for more information.
    658       """
    659       def query_many(sql, params \\ [], opts \\ []) do
    660         Ecto.Adapters.SQL.query_many(get_dynamic_repo(), sql, params, opts)
    661       end
    662 
    663       @doc """
    664       A convenience function for SQL-based repositories that executes the given multi-result query.
    665 
    666       See `Ecto.Adapters.SQL.query_many!/4` for more information.
    667       """
    668       def query_many!(sql, params \\ [], opts \\ []) do
    669         Ecto.Adapters.SQL.query_many!(get_dynamic_repo(), sql, params, opts)
    670       end
    671 
    672       @doc """
    673       A convenience function for SQL-based repositories that translates the given query to SQL.
    674 
    675       See `Ecto.Adapters.SQL.to_sql/3` for more information.
    676       """
    677       def to_sql(operation, queryable) do
    678         Ecto.Adapters.SQL.to_sql(operation, get_dynamic_repo(), queryable)
    679       end
    680 
    681       @doc """
    682       A convenience function for SQL-based repositories that executes an EXPLAIN statement or similar
    683       depending on the adapter to obtain statistics for the given query.
    684 
    685       See `Ecto.Adapters.SQL.explain/4` for more information.
    686       """
    687       def explain(operation, queryable, opts \\ []) do
    688         Ecto.Adapters.SQL.explain(get_dynamic_repo(), operation, queryable, opts)
    689       end
    690 
    691       @doc """
    692       A convenience function for SQL-based repositories that forces all connections in the
    693       pool to disconnect within the given interval.
    694 
    695       See `Ecto.Adapters.SQL.disconnect_all/3` for more information.
    696       """
    697       def disconnect_all(interval, opts \\ []) do
    698         Ecto.Adapters.SQL.disconnect_all(get_dynamic_repo(), interval, opts)
    699       end
    700     end
    701   end
    702 
    703   @doc false
    704   def ensure_all_started(driver, _config, type) do
    705     Application.ensure_all_started(driver, type)
    706   end
    707 
    708   @pool_opts [:timeout, :pool, :pool_size] ++
    709                [:queue_target, :queue_interval, :ownership_timeout, :repo]
    710 
    711   @doc false
    712   def init(connection, driver, config) do
    713     unless Code.ensure_loaded?(connection) do
    714       raise """
    715       could not find #{inspect connection}.
    716 
    717       Please verify you have added #{inspect driver} as a dependency:
    718 
    719           {#{inspect driver}, ">= 0.0.0"}
    720 
    721       And remember to recompile Ecto afterwards by cleaning the current build:
    722 
    723           mix deps.clean --build ecto
    724       """
    725     end
    726 
    727     log = Keyword.get(config, :log, :debug)
    728     stacktrace = Keyword.get(config, :stacktrace, nil)
    729     telemetry_prefix = Keyword.fetch!(config, :telemetry_prefix)
    730     telemetry = {config[:repo], log, telemetry_prefix ++ [:query]}
    731 
    732     config = adapter_config(config)
    733     opts = Keyword.take(config, @pool_opts)
    734     meta = %{telemetry: telemetry, sql: connection, stacktrace: stacktrace, opts: opts}
    735     {:ok, connection.child_spec(config), meta}
    736   end
    737 
    738   defp adapter_config(config) do
    739     if Keyword.has_key?(config, :pool_timeout) do
    740       message = """
    741       :pool_timeout option no longer has an effect and has been replaced with an improved queuing system.
    742       See \"Queue config\" in DBConnection.start_link/2 documentation for more information.
    743       """
    744 
    745       IO.warn(message)
    746     end
    747 
    748     config
    749     |> Keyword.delete(:name)
    750     |> Keyword.update(:pool, DBConnection.ConnectionPool, &normalize_pool/1)
    751   end
    752 
    753   defp normalize_pool(pool) do
    754     if Code.ensure_loaded?(pool) && function_exported?(pool, :unboxed_run, 2) do
    755       DBConnection.Ownership
    756     else
    757       pool
    758     end
    759   end
    760 
    761   @doc false
    762   def checkout(adapter_meta, opts, callback) do
    763     checkout_or_transaction(:run, adapter_meta, opts, callback)
    764   end
    765 
    766   @doc false
    767   def checked_out?(adapter_meta) do
    768     %{pid: pool} = adapter_meta
    769     get_conn(pool) != nil
    770   end
    771 
    772   ## Query
    773 
    774   @doc false
    775   def insert_all(adapter_meta, schema_meta, conn, header, rows, on_conflict, returning, placeholders, opts) do
    776     %{source: source, prefix: prefix} = schema_meta
    777     {_, conflict_params, _} = on_conflict
    778 
    779     {rows, params} =
    780       case rows do
    781         {%Ecto.Query{} = query, params} -> {query, Enum.reverse(params)}
    782         rows -> unzip_inserts(header, rows)
    783       end
    784 
    785     sql = conn.insert(prefix, source, header, rows, on_conflict, returning, placeholders)
    786 
    787     opts = if is_nil(Keyword.get(opts, :cache_statement)) do
    788       [{:cache_statement, "ecto_insert_all_#{source}"} | opts]
    789     else
    790       opts
    791     end
    792 
    793     all_params = placeholders ++ Enum.reverse(params, conflict_params)
    794 
    795     %{num_rows: num, rows: rows} = query!(adapter_meta, sql, all_params, opts)
    796     {num, rows}
    797   end
    798 
    799   defp unzip_inserts(header, rows) do
    800     Enum.map_reduce rows, [], fn fields, params ->
    801       Enum.map_reduce header, params, fn key, acc ->
    802         case :lists.keyfind(key, 1, fields) do
    803           {^key, {%Ecto.Query{} = query, query_params}} ->
    804             {{query, length(query_params)}, Enum.reverse(query_params, acc)}
    805 
    806           {^key, {:placeholder, placeholder_index}} ->
    807             {{:placeholder, Integer.to_string(placeholder_index)}, acc}
    808 
    809           {^key, value} -> {key, [value | acc]}
    810 
    811           false -> {nil, acc}
    812         end
    813       end
    814     end
    815   end
    816 
    817   @doc false
    818   def execute(prepare, adapter_meta, query_meta, prepared, params, opts) do
    819     %{num_rows: num, rows: rows} =
    820       execute!(prepare, adapter_meta, prepared, params, put_source(opts, query_meta))
    821 
    822     {num, rows}
    823   end
    824 
    825   defp execute!(prepare, adapter_meta, {:cache, update, {id, prepared}}, params, opts) do
    826     name = prepare_name(prepare, id)
    827 
    828     case sql_call(adapter_meta, :prepare_execute, [name, prepared], params, opts) do
    829       {:ok, query, result} ->
    830         maybe_update_cache(prepare, update, {id, query})
    831         result
    832       {:error, err} ->
    833         raise_sql_call_error err
    834     end
    835   end
    836 
    837   defp execute!(:unnamed = prepare, adapter_meta, {:cached, _update, _reset, {id, cached}}, params, opts) do
    838     name = prepare_name(prepare, id)
    839     prepared = String.Chars.to_string(cached)
    840 
    841     case sql_call(adapter_meta, :prepare_execute, [name, prepared], params, opts) do
    842       {:ok, _query, result} ->
    843         result
    844       {:error, err} ->
    845         raise_sql_call_error err
    846     end
    847   end
    848 
    849   defp execute!(:named = _prepare, adapter_meta, {:cached, update, reset, {id, cached}}, params, opts) do
    850     case sql_call(adapter_meta, :execute, [cached], params, opts) do
    851       {:ok, query, result} ->
    852         update.({id, query})
    853         result
    854       {:ok, result} ->
    855         result
    856       {:error, err} ->
    857         raise_sql_call_error err
    858       {:reset, err} ->
    859         reset.({id, String.Chars.to_string(cached)})
    860         raise_sql_call_error err
    861     end
    862   end
    863 
    864   defp execute!(_prepare, adapter_meta, {:nocache, {_id, prepared}}, params, opts) do
    865     case sql_call(adapter_meta, :query, [prepared], params, opts) do
    866       {:ok, res} -> res
    867       {:error, err} -> raise_sql_call_error err
    868     end
    869   end
    870 
    871   defp prepare_name(:named, id), do: "ecto_" <> Integer.to_string(id)
    872   defp prepare_name(:unnamed, _id), do: ""
    873 
    874   defp maybe_update_cache(:named = _prepare, update, value), do: update.(value)
    875   defp maybe_update_cache(:unnamed = _prepare, _update, _value), do: :noop
    876 
    877   @doc false
    878   def stream(adapter_meta, query_meta, prepared, params, opts) do
    879     do_stream(adapter_meta, prepared, params, put_source(opts, query_meta))
    880   end
    881 
    882   defp do_stream(adapter_meta, {:cache, _, {_, prepared}}, params, opts) do
    883     prepare_stream(adapter_meta, prepared, params, opts)
    884   end
    885 
    886   defp do_stream(adapter_meta, {:cached, _, _, {_, cached}}, params, opts) do
    887     prepare_stream(adapter_meta, String.Chars.to_string(cached), params, opts)
    888   end
    889 
    890   defp do_stream(adapter_meta, {:nocache, {_id, prepared}}, params, opts) do
    891     prepare_stream(adapter_meta, prepared, params, opts)
    892   end
    893 
    894   defp prepare_stream(adapter_meta, prepared, params, opts) do
    895     adapter_meta
    896     |> Ecto.Adapters.SQL.Stream.build(prepared, params, opts)
    897     |> Stream.map(fn(%{num_rows: nrows, rows: rows}) -> {nrows, rows} end)
    898   end
    899 
    900   defp raise_sql_call_error(%DBConnection.OwnershipError{} = err) do
    901     message = err.message <> "\nSee Ecto.Adapters.SQL.Sandbox docs for more information."
    902     raise %{err | message: message}
    903   end
    904 
    905   defp raise_sql_call_error(err), do: raise err
    906 
    907   @doc false
    908   def reduce(adapter_meta, statement, params, opts, acc, fun) do
    909     %{pid: pool, telemetry: telemetry, sql: sql, opts: default_opts} = adapter_meta
    910     opts = with_log(telemetry, params, opts ++ default_opts)
    911 
    912     case get_conn(pool) do
    913       %DBConnection{conn_mode: :transaction} = conn ->
    914         sql
    915         |> apply(:stream, [conn, statement, params, opts])
    916         |> Enumerable.reduce(acc, fun)
    917 
    918       _ ->
    919         raise "cannot reduce stream outside of transaction"
    920     end
    921   end
    922 
    923   @doc false
    924   def into(adapter_meta, statement, params, opts) do
    925     %{pid: pool, telemetry: telemetry, sql: sql, opts: default_opts} = adapter_meta
    926     opts = with_log(telemetry, params, opts ++ default_opts)
    927 
    928     case get_conn(pool) do
    929       %DBConnection{conn_mode: :transaction} = conn ->
    930         sql
    931         |> apply(:stream, [conn, statement, params, opts])
    932         |> Collectable.into()
    933 
    934       _ ->
    935         raise "cannot collect into stream outside of transaction"
    936     end
    937   end
    938 
    939   @doc false
    940   def struct(adapter_meta, conn, sql, operation, source, params, values, on_conflict, returning, opts) do
    941     opts = if is_nil(Keyword.get(opts, :cache_statement)) do
    942       [{:cache_statement, "ecto_#{operation}_#{source}_#{length(params)}"} | opts]
    943     else
    944       opts
    945     end
    946 
    947     case query(adapter_meta, sql, values, opts) do
    948       {:ok, %{rows: nil, num_rows: 1}} ->
    949         {:ok, []}
    950 
    951       {:ok, %{rows: [values], num_rows: 1}} ->
    952         {:ok, Enum.zip(returning, values)}
    953 
    954       {:ok, %{num_rows: 0}} ->
    955         if on_conflict == :nothing, do: {:ok, []}, else: {:error, :stale}
    956 
    957       {:ok, %{num_rows: num_rows}} when num_rows > 1 ->
    958         raise Ecto.MultiplePrimaryKeyError,
    959               source: source, params: params, count: num_rows, operation: operation
    960 
    961       {:error, err} ->
    962         case conn.to_constraints(err, source: source) do
    963           [] -> raise_sql_call_error err
    964           constraints -> {:invalid, constraints}
    965         end
    966     end
    967   end
    968 
    969   ## Transactions
    970 
    971   @doc false
    972   def transaction(adapter_meta, opts, callback) do
    973     checkout_or_transaction(:transaction, adapter_meta, opts, callback)
    974   end
    975 
    976   @doc false
    977   def in_transaction?(%{pid: pool}) do
    978     match?(%DBConnection{conn_mode: :transaction}, get_conn(pool))
    979   end
    980 
    981   @doc false
    982   def rollback(%{pid: pool}, value) do
    983     case get_conn(pool) do
    984       %DBConnection{conn_mode: :transaction} = conn -> DBConnection.rollback(conn, value)
    985       _ -> raise "cannot call rollback outside of transaction"
    986     end
    987   end
    988 
    989   ## Migrations
    990 
    991   @doc false
    992   def execute_ddl(meta, conn, definition, opts) do
    993     ddl_logs =
    994       definition
    995       |> conn.execute_ddl()
    996       |> List.wrap()
    997       |> Enum.map(&query!(meta, &1, [], opts))
    998       |> Enum.flat_map(&conn.ddl_logs/1)
    999 
   1000     {:ok, ddl_logs}
   1001   end
   1002 
   1003   @doc false
   1004   def raise_migration_pool_size_error do
   1005     raise Ecto.MigrationError, """
   1006     Migrations failed to run because the connection pool size is less than 2.
   1007 
   1008     Ecto requires a pool size of at least 2 to support concurrent migrators.
   1009     When migrations run, Ecto uses one connection to maintain a lock and
   1010     another to run migrations.
   1011 
   1012     If you are running migrations with Mix, you can increase the number
   1013     of connections via the pool size option:
   1014 
   1015         mix ecto.migrate --pool-size 2
   1016 
   1017     If you are running the Ecto.Migrator programmatically, you can configure
   1018     the pool size via your application config:
   1019 
   1020         config :my_app, Repo,
   1021           ...,
   1022           pool_size: 2 # at least
   1023     """
   1024   end
   1025 
   1026   ## Log
   1027 
   1028   defp with_log(telemetry, params, opts) do
   1029     [log: &log(telemetry, params, &1, opts)] ++ opts
   1030   end
   1031 
   1032   defp log({repo, log, event_name}, params, entry, opts) do
   1033     %{
   1034       connection_time: query_time,
   1035       decode_time: decode_time,
   1036       pool_time: queue_time,
   1037       idle_time: idle_time,
   1038       result: result,
   1039       query: query
   1040     } = entry
   1041 
   1042     source = Keyword.get(opts, :source)
   1043     query = String.Chars.to_string(query)
   1044     result = with {:ok, _query, res} <- result, do: {:ok, res}
   1045     stacktrace = Keyword.get(opts, :stacktrace)
   1046 
   1047     acc =
   1048       if idle_time, do: [idle_time: idle_time], else: []
   1049 
   1050     measurements =
   1051       log_measurements(
   1052         [query_time: query_time, decode_time: decode_time, queue_time: queue_time],
   1053         0,
   1054         acc
   1055       )
   1056 
   1057     metadata = %{
   1058       type: :ecto_sql_query,
   1059       repo: repo,
   1060       result: result,
   1061       params: params,
   1062       query: query,
   1063       source: source,
   1064       stacktrace: stacktrace,
   1065       options: Keyword.get(opts, :telemetry_options, [])
   1066     }
   1067 
   1068     if event_name = Keyword.get(opts, :telemetry_event, event_name) do
   1069       :telemetry.execute(event_name, measurements, metadata)
   1070     end
   1071 
   1072     case Keyword.get(opts, :log, log) do
   1073       true ->
   1074         Logger.log(
   1075           log,
   1076           fn -> log_iodata(measurements, repo, source, query, opts[:cast_params] || params, result, stacktrace) end,
   1077           ansi_color: sql_color(query)
   1078         )
   1079 
   1080       false ->
   1081         :ok
   1082 
   1083       level ->
   1084         Logger.log(
   1085           level,
   1086           fn -> log_iodata(measurements, repo, source, query, opts[:cast_params] || params, result, stacktrace) end,
   1087           ansi_color: sql_color(query)
   1088         )
   1089     end
   1090 
   1091     :ok
   1092   end
   1093 
   1094   defp log_measurements([{_, nil} | rest], total, acc),
   1095     do: log_measurements(rest, total, acc)
   1096 
   1097   defp log_measurements([{key, value} | rest], total, acc),
   1098     do: log_measurements(rest, total + value, [{key, value} | acc])
   1099 
   1100   defp log_measurements([], total, acc),
   1101     do: Map.new([total_time: total] ++ acc)
   1102 
   1103   defp log_iodata(measurements, repo, source, query, params, result, stacktrace) do
   1104     [
   1105       "QUERY",
   1106       ?\s,
   1107       log_ok_error(result),
   1108       log_ok_source(source),
   1109       log_time("db", measurements, :query_time, true),
   1110       log_time("decode", measurements, :decode_time, false),
   1111       log_time("queue", measurements, :queue_time, false),
   1112       log_time("idle", measurements, :idle_time, true),
   1113       ?\n,
   1114       query,
   1115       ?\s,
   1116       inspect(params, charlists: false),
   1117       log_stacktrace(stacktrace, repo)
   1118     ]
   1119   end
   1120 
   1121   defp log_ok_error({:ok, _res}), do: "OK"
   1122   defp log_ok_error({:error, _err}), do: "ERROR"
   1123 
   1124   defp log_ok_source(nil), do: ""
   1125   defp log_ok_source(source), do: " source=#{inspect(source)}"
   1126 
   1127   defp log_time(label, measurements, key, force) do
   1128     case measurements do
   1129       %{^key => time} ->
   1130         us = System.convert_time_unit(time, :native, :microsecond)
   1131         ms = div(us, 100) / 10
   1132 
   1133         if force or ms > 0 do
   1134           [?\s, label, ?=, :io_lib_format.fwrite_g(ms), ?m, ?s]
   1135         else
   1136           []
   1137         end
   1138 
   1139       %{} ->
   1140         []
   1141     end
   1142   end
   1143 
   1144   defp log_stacktrace(stacktrace, repo) do
   1145     with [_ | _] <- stacktrace,
   1146          {module, function, arity, info} <- last_non_ecto(Enum.reverse(stacktrace), repo, nil) do
   1147       [
   1148         ?\n,
   1149         IO.ANSI.light_black(),
   1150         "↳ ",
   1151         Exception.format_mfa(module, function, arity),
   1152         log_stacktrace_info(info),
   1153         IO.ANSI.reset(),
   1154       ]
   1155     else
   1156       _ -> []
   1157     end
   1158   end
   1159 
   1160   defp log_stacktrace_info([file: file, line: line] ++ _) do
   1161     [", at: ", file, ?:, Integer.to_string(line)]
   1162   end
   1163 
   1164   defp log_stacktrace_info(_) do
   1165     []
   1166   end
   1167 
   1168   @repo_modules [Ecto.Repo.Queryable, Ecto.Repo.Schema, Ecto.Repo.Transaction]
   1169 
   1170   defp last_non_ecto([{mod, _, _, _} | _stacktrace], repo, last)
   1171        when mod == repo or mod in @repo_modules,
   1172        do: last
   1173 
   1174   defp last_non_ecto([last | stacktrace], repo, _last),
   1175     do: last_non_ecto(stacktrace, repo, last)
   1176 
   1177   defp last_non_ecto([], _repo, last),
   1178     do: last
   1179 
   1180   ## Connection helpers
   1181 
   1182   defp checkout_or_transaction(fun, adapter_meta, opts, callback) do
   1183     %{pid: pool, telemetry: telemetry, opts: default_opts} = adapter_meta
   1184     opts = with_log(telemetry, [], opts ++ default_opts)
   1185 
   1186     callback = fn conn ->
   1187       previous_conn = put_conn(pool, conn)
   1188 
   1189       try do
   1190         callback.()
   1191       after
   1192         reset_conn(pool, previous_conn)
   1193       end
   1194     end
   1195 
   1196     apply(DBConnection, fun, [get_conn_or_pool(pool), callback, opts])
   1197   end
   1198 
   1199   defp get_conn_or_pool(pool) do
   1200     Process.get(key(pool), pool)
   1201   end
   1202 
   1203   defp get_conn(pool) do
   1204     Process.get(key(pool))
   1205   end
   1206 
   1207   defp put_conn(pool, conn) do
   1208     Process.put(key(pool), conn)
   1209   end
   1210 
   1211   defp reset_conn(pool, conn) do
   1212     if conn do
   1213       put_conn(pool, conn)
   1214     else
   1215       Process.delete(key(pool))
   1216     end
   1217   end
   1218 
   1219   defp key(pool), do: {__MODULE__, pool}
   1220 
   1221   defp sql_color("SELECT" <> _), do: :cyan
   1222   defp sql_color("ROLLBACK" <> _), do: :red
   1223   defp sql_color("LOCK" <> _), do: :white
   1224   defp sql_color("INSERT" <> _), do: :green
   1225   defp sql_color("UPDATE" <> _), do: :yellow
   1226   defp sql_color("DELETE" <> _), do: :red
   1227   defp sql_color("begin" <> _), do: :magenta
   1228   defp sql_color("commit" <> _), do: :magenta
   1229   defp sql_color(_), do: nil
   1230 end