zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

connection.ex (51604B)


      1 if Code.ensure_loaded?(Postgrex) do
      2   defmodule Ecto.Adapters.Postgres.Connection do
      3     @moduledoc false
      4 
      5     @default_port 5432
      6     @behaviour Ecto.Adapters.SQL.Connection
      7 
      8     ## Module and Options
      9 
     10     @impl true
     11     def child_spec(opts) do
     12       opts
     13       |> Keyword.put_new(:port, @default_port)
     14       |> Postgrex.child_spec()
     15     end
     16 
     17     @impl true
     18     def to_constraints(%Postgrex.Error{postgres: %{code: :unique_violation, constraint: constraint}}, _opts),
     19       do: [unique: constraint]
     20     def to_constraints(%Postgrex.Error{postgres: %{code: :foreign_key_violation, constraint: constraint}}, _opts),
     21       do: [foreign_key: constraint]
     22     def to_constraints(%Postgrex.Error{postgres: %{code: :exclusion_violation, constraint: constraint}}, _opts),
     23       do: [exclusion: constraint]
     24     def to_constraints(%Postgrex.Error{postgres: %{code: :check_violation, constraint: constraint}}, _opts),
     25       do: [check: constraint]
     26 
     27     # Postgres 9.2 and earlier does not provide the constraint field
     28     @impl true
     29     def to_constraints(%Postgrex.Error{postgres: %{code: :unique_violation, message: message}}, _opts) do
     30       case :binary.split(message, " unique constraint ") do
     31         [_, quoted] -> [unique: strip_quotes(quoted)]
     32         _ -> []
     33       end
     34     end
     35     def to_constraints(%Postgrex.Error{postgres: %{code: :foreign_key_violation, message: message}}, _opts) do
     36       case :binary.split(message, " foreign key constraint ") do
     37         [_, quoted] ->
     38           [quoted | _] = :binary.split(quoted, " on table ")
     39           [foreign_key: strip_quotes(quoted)]
     40         _ ->
     41           []
     42       end
     43     end
     44     def to_constraints(%Postgrex.Error{postgres: %{code: :exclusion_violation, message: message}}, _opts) do
     45       case :binary.split(message, " exclusion constraint ") do
     46         [_, quoted] -> [exclusion: strip_quotes(quoted)]
     47         _ -> []
     48       end
     49     end
     50     def to_constraints(%Postgrex.Error{postgres: %{code: :check_violation, message: message}}, _opts) do
     51       case :binary.split(message, " check constraint ") do
     52         [_, quoted] -> [check: strip_quotes(quoted)]
     53         _ -> []
     54       end
     55     end
     56 
     57     def to_constraints(_, _opts),
     58       do: []
     59 
     60     defp strip_quotes(quoted) do
     61       size = byte_size(quoted) - 2
     62       <<_, unquoted::binary-size(size), _>> = quoted
     63       unquoted
     64     end
     65 
     66     ## Query
     67 
     68     @impl true
     69     def prepare_execute(conn, name, sql, params, opts) do
     70       case Postgrex.prepare_execute(conn, name, sql, params, opts) do
     71         {:error, %Postgrex.Error{postgres: %{pg_code: "22P02", message: message}} = error} ->
     72           context = """
     73           . If you are trying to query a JSON field, the parameter may need to be interpolated. \
     74           Instead of
     75 
     76               p.json["field"] != "value"
     77 
     78           do
     79 
     80               p.json["field"] != ^"value"
     81           """
     82 
     83           {:error, put_in(error.postgres.message, message <> context)}
     84         other ->
     85           other
     86         end
     87 
     88     end
     89 
     90     @impl true
     91     def query(conn, sql, params, opts) do
     92       Postgrex.query(conn, sql, params, opts)
     93     end
     94 
     95     @impl true
     96     def query_many(_conn, _sql, _params, _opts) do
     97       raise RuntimeError, "query_many is not supported in the Postgrex adapter"
     98     end
     99 
    100     @impl true
    101     def execute(conn, %{ref: ref} = query, params, opts) do
    102       case Postgrex.execute(conn, query, params, opts) do
    103         {:ok, %{ref: ^ref}, result} ->
    104           {:ok, result}
    105 
    106         {:ok, _, _} = ok ->
    107           ok
    108 
    109         {:error, %Postgrex.QueryError{} = err} ->
    110           {:reset, err}
    111 
    112         {:error, %Postgrex.Error{postgres: %{code: :feature_not_supported}} = err} ->
    113           {:reset, err}
    114 
    115         {:error, _} = error ->
    116           error
    117       end
    118     end
    119 
    120     @impl true
    121     def stream(conn, sql, params, opts) do
    122       Postgrex.stream(conn, sql, params, opts)
    123     end
    124 
    125     @parent_as __MODULE__
    126     alias Ecto.Query.{BooleanExpr, JoinExpr, QueryExpr, WithExpr}
    127 
    128     @impl true
    129     def all(query, as_prefix \\ []) do
    130       sources = create_names(query, as_prefix)
    131       {select_distinct, order_by_distinct} = distinct(query.distinct, sources, query)
    132 
    133       cte = cte(query, sources)
    134       from = from(query, sources)
    135       select = select(query, select_distinct, sources)
    136       join = join(query, sources)
    137       where = where(query, sources)
    138       group_by = group_by(query, sources)
    139       having = having(query, sources)
    140       window = window(query, sources)
    141       combinations = combinations(query)
    142       order_by = order_by(query, order_by_distinct, sources)
    143       limit = limit(query, sources)
    144       offset = offset(query, sources)
    145       lock = lock(query, sources)
    146 
    147       [cte, select, from, join, where, group_by, having, window, combinations, order_by, limit, offset | lock]
    148     end
    149 
    150     @impl true
    151     def update_all(%{from: %{source: source}} = query, prefix \\ nil) do
    152       sources = create_names(query, [])
    153       cte = cte(query, sources)
    154       {from, name} = get_source(query, sources, 0, source)
    155 
    156       prefix = prefix || ["UPDATE ", from, " AS ", name | " SET "]
    157       fields = update_fields(query, sources)
    158       {join, wheres} = using_join(query, :update_all, "FROM", sources)
    159       where = where(%{query | wheres: wheres ++ query.wheres}, sources)
    160 
    161       [cte, prefix, fields, join, where | returning(query, sources)]
    162     end
    163 
    164     @impl true
    165     def delete_all(%{from: from} = query) do
    166       sources = create_names(query, [])
    167       cte = cte(query, sources)
    168       {from, name} = get_source(query, sources, 0, from)
    169 
    170       {join, wheres} = using_join(query, :delete_all, "USING", sources)
    171       where = where(%{query | wheres: wheres ++ query.wheres}, sources)
    172 
    173       [cte, "DELETE FROM ", from, " AS ", name, join, where | returning(query, sources)]
    174     end
    175 
    176     @impl true
    177     def insert(prefix, table, header, rows, on_conflict, returning, placeholders) do
    178       counter_offset = length(placeholders) + 1
    179       values =
    180         if header == [] do
    181           [" VALUES " | intersperse_map(rows, ?,, fn _ -> "(DEFAULT)" end)]
    182         else
    183           [" (", quote_names(header), ") " | insert_all(rows, counter_offset)]
    184         end
    185 
    186       ["INSERT INTO ", quote_table(prefix, table), insert_as(on_conflict),
    187        values, on_conflict(on_conflict, header) | returning(returning)]
    188     end
    189 
    190     defp insert_as({%{sources: sources}, _, _}) do
    191       {_expr, name, _schema} = create_name(sources, 0, [])
    192       [" AS " | name]
    193     end
    194     defp insert_as({_, _, _}) do
    195       []
    196     end
    197 
    198     defp on_conflict({:raise, _, []}, _header),
    199       do: []
    200     defp on_conflict({:nothing, _, targets}, _header),
    201       do: [" ON CONFLICT ", conflict_target(targets) | "DO NOTHING"]
    202     defp on_conflict({fields, _, targets}, _header) when is_list(fields),
    203       do: [" ON CONFLICT ", conflict_target!(targets), "DO " | replace(fields)]
    204     defp on_conflict({query, _, targets}, _header),
    205       do: [" ON CONFLICT ", conflict_target!(targets), "DO " | update_all(query, "UPDATE SET ")]
    206 
    207     defp conflict_target!([]),
    208       do: error!(nil, "the :conflict_target option is required on upserts by PostgreSQL")
    209     defp conflict_target!(target),
    210       do: conflict_target(target)
    211 
    212     defp conflict_target({:unsafe_fragment, fragment}),
    213       do: [fragment, ?\s]
    214     defp conflict_target([]),
    215       do: []
    216     defp conflict_target(targets),
    217       do: [?(, quote_names(targets), ?), ?\s]
    218 
    219     defp replace(fields) do
    220       ["UPDATE SET " |
    221        intersperse_map(fields, ?,, fn field ->
    222          quoted = quote_name(field)
    223          [quoted, " = ", "EXCLUDED." | quoted]
    224        end)]
    225     end
    226 
    227     defp insert_all(query = %Ecto.Query{}, _counter) do
    228       [?(, all(query), ?)]
    229     end
    230 
    231     defp insert_all(rows, counter) do
    232       ["VALUES ", intersperse_reduce(rows, ?,, counter, fn row, counter ->
    233         {row, counter} = insert_each(row, counter)
    234         {[?(, row, ?)], counter}
    235       end)
    236       |> elem(0)]
    237     end
    238 
    239     defp insert_each(values, counter) do
    240       intersperse_reduce(values, ?,, counter, fn
    241         nil, counter ->
    242           {"DEFAULT", counter}
    243 
    244         {%Ecto.Query{} = query, params_counter}, counter ->
    245           {[?(, all(query), ?)], counter + params_counter}
    246 
    247         {:placeholder, placeholder_index}, counter ->
    248           {[?$ | placeholder_index], counter}
    249 
    250         _, counter ->
    251           {[?$ | Integer.to_string(counter)], counter + 1}
    252       end)
    253     end
    254 
    255     @impl true
    256     def update(prefix, table, fields, filters, returning) do
    257       {fields, count} = intersperse_reduce(fields, ", ", 1, fn field, acc ->
    258         {[quote_name(field), " = $" | Integer.to_string(acc)], acc + 1}
    259       end)
    260 
    261       {filters, _count} = intersperse_reduce(filters, " AND ", count, fn
    262         {field, nil}, acc ->
    263           {[quote_name(field), " IS NULL"], acc}
    264 
    265         {field, _value}, acc ->
    266           {[quote_name(field), " = $" | Integer.to_string(acc)], acc + 1}
    267       end)
    268 
    269       ["UPDATE ", quote_table(prefix, table), " SET ",
    270        fields, " WHERE ", filters | returning(returning)]
    271     end
    272 
    273     @impl true
    274     def delete(prefix, table, filters, returning) do
    275       {filters, _} = intersperse_reduce(filters, " AND ", 1, fn
    276         {field, nil}, acc ->
    277           {[quote_name(field), " IS NULL"], acc}
    278 
    279         {field, _value}, acc ->
    280           {[quote_name(field), " = $" | Integer.to_string(acc)], acc + 1}
    281       end)
    282 
    283       ["DELETE FROM ", quote_table(prefix, table), " WHERE ", filters | returning(returning)]
    284     end
    285 
    286     @impl true
    287     def explain_query(conn, query, params, opts) do
    288       {explain_opts, opts} =
    289         Keyword.split(opts, ~w[analyze verbose costs settings buffers timing summary format]a)
    290 
    291       map_format? = {:format, :map} in explain_opts
    292 
    293       case query(conn, build_explain_query(query, explain_opts), params, opts) do
    294         {:ok, %Postgrex.Result{rows: rows}} when map_format? ->
    295           {:ok, List.flatten(rows)}
    296         {:ok, %Postgrex.Result{rows: rows}} ->
    297           {:ok, Enum.map_join(rows, "\n", & &1)}
    298         error -> error
    299       end
    300     end
    301 
    302     def build_explain_query(query, []) do
    303       ["EXPLAIN ", query]
    304       |> IO.iodata_to_binary()
    305     end
    306 
    307     def build_explain_query(query, opts) do
    308       {analyze, opts} = Keyword.pop(opts, :analyze)
    309       {verbose, opts} = Keyword.pop(opts, :verbose)
    310 
    311       # Given only ANALYZE or VERBOSE opts we assume the legacy format
    312       # to support all Postgres versions, otherwise assume the new
    313       # syntax supported since v9.0
    314       case opts do
    315         [] ->
    316           [
    317             "EXPLAIN ",
    318             if_do(quote_boolean(analyze) == "TRUE", "ANALYZE "),
    319             if_do(quote_boolean(verbose) == "TRUE", "VERBOSE "),
    320             query
    321           ]
    322 
    323         opts ->
    324           opts =
    325             ([analyze: analyze, verbose: verbose] ++ opts)
    326             |> Enum.reduce([], fn
    327               {_, nil}, acc ->
    328                 acc
    329 
    330               {:format, value}, acc ->
    331                 [String.upcase("#{format_to_sql(value)}") | acc]
    332 
    333               {opt, value}, acc ->
    334                 [String.upcase("#{opt} #{quote_boolean(value)}") | acc]
    335             end)
    336             |> Enum.reverse()
    337             |> Enum.join(", ")
    338 
    339           ["EXPLAIN ( ", opts, " ) ", query]
    340       end
    341       |> IO.iodata_to_binary()
    342     end
    343 
    344     ## Query generation
    345 
    346     binary_ops =
    347       [==: " = ", !=: " != ", <=: " <= ", >=: " >= ", <: " < ", >: " > ",
    348        +: " + ", -: " - ", *: " * ", /: " / ",
    349        and: " AND ", or: " OR ", ilike: " ILIKE ", like: " LIKE "]
    350 
    351     @binary_ops Keyword.keys(binary_ops)
    352 
    353     Enum.map(binary_ops, fn {op, str} ->
    354       defp handle_call(unquote(op), 2), do: {:binary_op, unquote(str)}
    355     end)
    356 
    357     defp handle_call(fun, _arity), do: {:fun, Atom.to_string(fun)}
    358 
    359     defp select(%{select: %{fields: fields}} = query, select_distinct, sources) do
    360       ["SELECT", select_distinct, ?\s | select_fields(fields, sources, query)]
    361     end
    362 
    363     defp select_fields([], _sources, _query),
    364       do: "TRUE"
    365     defp select_fields(fields, sources, query) do
    366       intersperse_map(fields, ", ", fn
    367         {:&, _, [idx]} ->
    368           case elem(sources, idx) do
    369             {nil, source, nil} ->
    370               error!(query, "PostgreSQL adapter does not support selecting all fields from fragment #{source}. " <>
    371                             "Please specify exactly which fields you want to select")
    372             {source, _, nil} ->
    373               error!(query, "PostgreSQL adapter does not support selecting all fields from #{source} without a schema. " <>
    374                             "Please specify a schema or specify exactly which fields you want to select")
    375             {_, source, _} ->
    376               source
    377           end
    378         {key, value} ->
    379           [expr(value, sources, query), " AS " | quote_name(key)]
    380         value ->
    381           expr(value, sources, query)
    382       end)
    383     end
    384 
    385     defp distinct(nil, _, _), do: {[], []}
    386     defp distinct(%QueryExpr{expr: []}, _, _), do: {[], []}
    387     defp distinct(%QueryExpr{expr: true}, _, _), do: {" DISTINCT", []}
    388     defp distinct(%QueryExpr{expr: false}, _, _), do: {[], []}
    389     defp distinct(%QueryExpr{expr: exprs}, sources, query) do
    390       {[" DISTINCT ON (",
    391         intersperse_map(exprs, ", ", fn {_, expr} -> expr(expr, sources, query) end), ?)],
    392        exprs}
    393     end
    394 
    395     defp from(%{from: %{hints: [_ | _]}} = query, _sources) do
    396       error!(query, "table hints are not supported by PostgreSQL")
    397     end
    398 
    399     defp from(%{from: %{source: source}} = query, sources) do
    400       {from, name} = get_source(query, sources, 0, source)
    401       [" FROM ", from, " AS " | name]
    402     end
    403 
    404     defp cte(%{with_ctes: %WithExpr{recursive: recursive, queries: [_ | _] = queries}} = query, sources) do
    405       recursive_opt = if recursive, do: "RECURSIVE ", else: ""
    406       ctes = intersperse_map(queries, ", ", &cte_expr(&1, sources, query))
    407       ["WITH ", recursive_opt, ctes, " "]
    408     end
    409 
    410     defp cte(%{with_ctes: _}, _), do: []
    411 
    412     defp cte_expr({name, cte}, sources, query) do
    413       [quote_name(name), " AS ", cte_query(cte, sources, query)]
    414     end
    415 
    416     defp cte_query(%Ecto.Query{} = query, sources, parent_query) do
    417       query = put_in(query.aliases[@parent_as], {parent_query, sources})
    418       ["(", all(query, subquery_as_prefix(sources)), ")"]
    419     end
    420 
    421     defp cte_query(%QueryExpr{expr: expr}, sources, query) do
    422       expr(expr, sources, query)
    423     end
    424 
    425     defp update_fields(%{updates: updates} = query, sources) do
    426       for(%{expr: expr} <- updates,
    427           {op, kw} <- expr,
    428           {key, value} <- kw,
    429           do: update_op(op, key, value, sources, query)) |> Enum.intersperse(", ")
    430     end
    431 
    432     defp update_op(:set, key, value, sources, query) do
    433       [quote_name(key), " = " | expr(value, sources, query)]
    434     end
    435 
    436     defp update_op(:inc, key, value, sources, query) do
    437       [quote_name(key), " = ", quote_qualified_name(key, sources, 0), " + " |
    438        expr(value, sources, query)]
    439     end
    440 
    441     defp update_op(:push, key, value, sources, query) do
    442       [quote_name(key), " = array_append(", quote_qualified_name(key, sources, 0),
    443        ", ", expr(value, sources, query), ?)]
    444     end
    445 
    446     defp update_op(:pull, key, value, sources, query) do
    447       [quote_name(key), " = array_remove(", quote_qualified_name(key, sources, 0),
    448        ", ", expr(value, sources, query), ?)]
    449     end
    450 
    451     defp update_op(command, _key, _value, _sources, query) do
    452       error!(query, "unknown update operation #{inspect command} for PostgreSQL")
    453     end
    454 
    455     defp using_join(%{joins: []}, _kind, _prefix, _sources), do: {[], []}
    456     defp using_join(%{joins: joins} = query, kind, prefix, sources) do
    457       froms =
    458         intersperse_map(joins, ", ", fn
    459           %JoinExpr{qual: :inner, ix: ix, source: source} ->
    460             {join, name} = get_source(query, sources, ix, source)
    461             [join, " AS " | name]
    462           %JoinExpr{qual: qual} ->
    463             error!(query, "PostgreSQL supports only inner joins on #{kind}, got: `#{qual}`")
    464         end)
    465 
    466       wheres =
    467         for %JoinExpr{on: %QueryExpr{expr: value} = expr} <- joins,
    468             value != true,
    469             do: expr |> Map.put(:__struct__, BooleanExpr) |> Map.put(:op, :and)
    470 
    471       {[?\s, prefix, ?\s | froms], wheres}
    472     end
    473 
    474     defp join(%{joins: []}, _sources), do: []
    475     defp join(%{joins: joins} = query, sources) do
    476       [?\s | intersperse_map(joins, ?\s, fn
    477         %JoinExpr{on: %QueryExpr{expr: expr}, qual: qual, ix: ix, source: source, hints: hints} ->
    478           if hints != [] do
    479             error!(query, "table hints are not supported by PostgreSQL")
    480           end
    481 
    482           {join, name} = get_source(query, sources, ix, source)
    483           [join_qual(qual), join, " AS ", name | join_on(qual, expr, sources, query)]
    484       end)]
    485     end
    486 
    487     defp join_on(:cross, true, _sources, _query), do: []
    488     defp join_on(_qual, expr, sources, query), do: [" ON " | expr(expr, sources, query)]
    489 
    490     defp join_qual(:inner), do: "INNER JOIN "
    491     defp join_qual(:inner_lateral), do: "INNER JOIN LATERAL "
    492     defp join_qual(:left),  do: "LEFT OUTER JOIN "
    493     defp join_qual(:left_lateral),  do: "LEFT OUTER JOIN LATERAL "
    494     defp join_qual(:right), do: "RIGHT OUTER JOIN "
    495     defp join_qual(:full),  do: "FULL OUTER JOIN "
    496     defp join_qual(:cross), do: "CROSS JOIN "
    497 
    498     defp where(%{wheres: wheres} = query, sources) do
    499       boolean(" WHERE ", wheres, sources, query)
    500     end
    501 
    502     defp having(%{havings: havings} = query, sources) do
    503       boolean(" HAVING ", havings, sources, query)
    504     end
    505 
    506     defp group_by(%{group_bys: []}, _sources), do: []
    507     defp group_by(%{group_bys: group_bys} = query, sources) do
    508       [" GROUP BY " |
    509        intersperse_map(group_bys, ", ", fn
    510          %QueryExpr{expr: expr} ->
    511            intersperse_map(expr, ", ", &expr(&1, sources, query))
    512        end)]
    513     end
    514 
    515     defp window(%{windows: []}, _sources), do: []
    516     defp window(%{windows: windows} = query, sources) do
    517       [" WINDOW " |
    518        intersperse_map(windows, ", ", fn {name, %{expr: kw}} ->
    519          [quote_name(name), " AS " | window_exprs(kw, sources, query)]
    520        end)]
    521     end
    522 
    523     defp window_exprs(kw, sources, query) do
    524       [?(, intersperse_map(kw, ?\s, &window_expr(&1, sources, query)), ?)]
    525     end
    526 
    527     defp window_expr({:partition_by, fields}, sources, query) do
    528       ["PARTITION BY " | intersperse_map(fields, ", ", &expr(&1, sources, query))]
    529     end
    530 
    531     defp window_expr({:order_by, fields}, sources, query) do
    532       ["ORDER BY " | intersperse_map(fields, ", ", &order_by_expr(&1, sources, query))]
    533     end
    534 
    535     defp window_expr({:frame, {:fragment, _, _} = fragment}, sources, query) do
    536       expr(fragment, sources, query)
    537     end
    538 
    539     defp order_by(%{order_bys: []}, _distinct, _sources), do: []
    540     defp order_by(%{order_bys: order_bys} = query, distinct, sources) do
    541       order_bys = Enum.flat_map(order_bys, & &1.expr)
    542       order_bys = order_by_concat(distinct, order_bys)
    543       [" ORDER BY " | intersperse_map(order_bys, ", ", &order_by_expr(&1, sources, query))]
    544     end
    545 
    546     defp order_by_concat([head | left], [head | right]), do: [head | order_by_concat(left, right)]
    547     defp order_by_concat(left, right), do: left ++ right
    548 
    549     defp order_by_expr({dir, expr}, sources, query) do
    550       str = expr(expr, sources, query)
    551 
    552       case dir do
    553         :asc  -> str
    554         :asc_nulls_last -> [str | " ASC NULLS LAST"]
    555         :asc_nulls_first -> [str | " ASC NULLS FIRST"]
    556         :desc -> [str | " DESC"]
    557         :desc_nulls_last -> [str | " DESC NULLS LAST"]
    558         :desc_nulls_first -> [str | " DESC NULLS FIRST"]
    559       end
    560     end
    561 
    562     defp limit(%{limit: nil}, _sources), do: []
    563     defp limit(%{limit: %QueryExpr{expr: expr}} = query, sources) do
    564       [" LIMIT " | expr(expr, sources, query)]
    565     end
    566 
    567     defp offset(%{offset: nil}, _sources), do: []
    568     defp offset(%{offset: %QueryExpr{expr: expr}} = query, sources) do
    569       [" OFFSET " | expr(expr, sources, query)]
    570     end
    571 
    572     defp combinations(%{combinations: combinations}) do
    573       Enum.map(combinations, fn
    574         {:union, query} -> [" UNION (", all(query), ")"]
    575         {:union_all, query} -> [" UNION ALL (", all(query), ")"]
    576         {:except, query} -> [" EXCEPT (", all(query), ")"]
    577         {:except_all, query} -> [" EXCEPT ALL (", all(query), ")"]
    578         {:intersect, query} -> [" INTERSECT (", all(query), ")"]
    579         {:intersect_all, query} -> [" INTERSECT ALL (", all(query), ")"]
    580       end)
    581     end
    582 
    583     defp lock(%{lock: nil}, _sources), do: []
    584     defp lock(%{lock: binary}, _sources) when is_binary(binary), do: [?\s | binary]
    585     defp lock(%{lock: expr} = query, sources), do: [?\s | expr(expr, sources, query)]
    586 
    587     defp boolean(_name, [], _sources, _query), do: []
    588     defp boolean(name, [%{expr: expr, op: op} | query_exprs], sources, query) do
    589       [name |
    590        Enum.reduce(query_exprs, {op, paren_expr(expr, sources, query)}, fn
    591          %BooleanExpr{expr: expr, op: op}, {op, acc} ->
    592            {op, [acc, operator_to_boolean(op), paren_expr(expr, sources, query)]}
    593          %BooleanExpr{expr: expr, op: op}, {_, acc} ->
    594            {op, [?(, acc, ?), operator_to_boolean(op), paren_expr(expr, sources, query)]}
    595        end) |> elem(1)]
    596     end
    597 
    598     defp operator_to_boolean(:and), do: " AND "
    599     defp operator_to_boolean(:or), do: " OR "
    600 
    601     defp parens_for_select([first_expr | _] = expr) do
    602       if is_binary(first_expr) and String.match?(first_expr, ~r/^\s*select/i) do
    603         [?(, expr, ?)]
    604       else
    605         expr
    606       end
    607     end
    608 
    609     defp paren_expr(expr, sources, query) do
    610       [?(, expr(expr, sources, query), ?)]
    611     end
    612 
    613     defp expr({:^, [], [ix]}, _sources, _query) do
    614       [?$ | Integer.to_string(ix + 1)]
    615     end
    616 
    617     defp expr({{:., _, [{:parent_as, _, [as]}, field]}, _, []}, _sources, query)
    618          when is_atom(field) do
    619       {ix, sources} = get_parent_sources_ix(query, as)
    620       quote_qualified_name(field, sources, ix)
    621     end
    622 
    623     defp expr({{:., _, [{:&, _, [idx]}, field]}, _, []}, sources, _query) when is_atom(field) do
    624       quote_qualified_name(field, sources, idx)
    625     end
    626 
    627     defp expr({:&, _, [idx]}, sources, _query) do
    628       {_, source, _} = elem(sources, idx)
    629       source
    630     end
    631 
    632     defp expr({:in, _, [_left, []]}, _sources, _query) do
    633       "false"
    634     end
    635 
    636     defp expr({:in, _, [left, right]}, sources, query) when is_list(right) do
    637       args = intersperse_map(right, ?,, &expr(&1, sources, query))
    638       [expr(left, sources, query), " IN (", args, ?)]
    639     end
    640 
    641     defp expr({:in, _, [left, {:^, _, [ix, _]}]}, sources, query) do
    642       [expr(left, sources, query), " = ANY($", Integer.to_string(ix + 1), ?)]
    643     end
    644 
    645     defp expr({:in, _, [left, %Ecto.SubQuery{} = subquery]}, sources, query) do
    646       [expr(left, sources, query), " IN ", expr(subquery, sources, query)]
    647     end
    648 
    649     defp expr({:in, _, [left, right]}, sources, query) do
    650       [expr(left, sources, query), " = ANY(", expr(right, sources, query), ?)]
    651     end
    652 
    653     defp expr({:is_nil, _, [arg]}, sources, query) do
    654       [expr(arg, sources, query) | " IS NULL"]
    655     end
    656 
    657     defp expr({:not, _, [expr]}, sources, query) do
    658       ["NOT (", expr(expr, sources, query), ?)]
    659     end
    660 
    661     defp expr(%Ecto.SubQuery{query: query}, sources, parent_query) do
    662       query = put_in(query.aliases[@parent_as], {parent_query, sources})
    663       [?(, all(query, subquery_as_prefix(sources)), ?)]
    664     end
    665 
    666     defp expr({:fragment, _, [kw]}, _sources, query) when is_list(kw) or tuple_size(kw) == 3 do
    667       error!(query, "PostgreSQL adapter does not support keyword or interpolated fragments")
    668     end
    669 
    670     defp expr({:fragment, _, parts}, sources, query) do
    671       Enum.map(parts, fn
    672         {:raw, part}  -> part
    673         {:expr, expr} -> expr(expr, sources, query)
    674       end)
    675       |> parens_for_select
    676     end
    677 
    678     defp expr({:literal, _, [literal]}, _sources, _query) do
    679       quote_name(literal)
    680     end
    681 
    682     defp expr({:selected_as, _, [name]}, _sources, _query) do
    683       [quote_name(name)]
    684     end
    685 
    686     defp expr({:datetime_add, _, [datetime, count, interval]}, sources, query) do
    687       [expr(datetime, sources, query), type_unless_typed(datetime, "timestamp"), " + ",
    688        interval(count, interval, sources, query)]
    689     end
    690 
    691     defp expr({:date_add, _, [date, count, interval]}, sources, query) do
    692       [?(, expr(date, sources, query), type_unless_typed(date, "date"), " + ",
    693        interval(count, interval, sources, query) | ")::date"]
    694     end
    695 
    696     defp expr({:json_extract_path, _, [expr, path]}, sources, query) do
    697       json_extract_path(expr, path, sources, query)
    698     end
    699 
    700     defp expr({:filter, _, [agg, filter]}, sources, query) do
    701       aggregate = expr(agg, sources, query)
    702       [aggregate, " FILTER (WHERE ", expr(filter, sources, query), ?)]
    703     end
    704 
    705     defp expr({:over, _, [agg, name]}, sources, query) when is_atom(name) do
    706       aggregate = expr(agg, sources, query)
    707       [aggregate, " OVER " | quote_name(name)]
    708     end
    709 
    710     defp expr({:over, _, [agg, kw]}, sources, query) do
    711       aggregate = expr(agg, sources, query)
    712       [aggregate, " OVER ", window_exprs(kw, sources, query)]
    713     end
    714 
    715     defp expr({:{}, _, elems}, sources, query) do
    716       [?(, intersperse_map(elems, ?,, &expr(&1, sources, query)), ?)]
    717     end
    718 
    719     defp expr({:count, _, []}, _sources, _query), do: "count(*)"
    720 
    721     defp expr({:==, _, [{:json_extract_path, _, [expr, path]} = left, right]}, sources, query)
    722          when is_binary(right) or is_integer(right) or is_boolean(right) do
    723       case Enum.split(path, -1) do
    724         {path, [last]} when is_binary(last) ->
    725           extracted = json_extract_path(expr, path, sources, query)
    726           [?(, extracted, "@>'{", escape_json(last), ": ", escape_json(right) | "}')"]
    727 
    728         _ ->
    729           [maybe_paren(left, sources, query), " = " | maybe_paren(right, sources, query)]
    730       end
    731     end
    732 
    733     defp expr({fun, _, args}, sources, query) when is_atom(fun) and is_list(args) do
    734       {modifier, args} =
    735         case args do
    736           [rest, :distinct] -> {"DISTINCT ", [rest]}
    737           _ -> {[], args}
    738         end
    739 
    740       case handle_call(fun, length(args)) do
    741         {:binary_op, op} ->
    742           [left, right] = args
    743           [maybe_paren(left, sources, query), op | maybe_paren(right, sources, query)]
    744         {:fun, fun} ->
    745           [fun, ?(, modifier, intersperse_map(args, ", ", &expr(&1, sources, query)), ?)]
    746       end
    747     end
    748 
    749     defp expr(list, sources, query) when is_list(list) do
    750       ["ARRAY[", intersperse_map(list, ?,, &expr(&1, sources, query)), ?]]
    751     end
    752 
    753     defp expr(%Decimal{} = decimal, _sources, _query) do
    754       Decimal.to_string(decimal, :normal)
    755     end
    756 
    757     defp expr(%Ecto.Query.Tagged{value: binary, type: :binary}, _sources, _query)
    758         when is_binary(binary) do
    759       ["'\\x", Base.encode16(binary, case: :lower) | "'::bytea"]
    760     end
    761 
    762     defp expr(%Ecto.Query.Tagged{value: other, type: type}, sources, query) do
    763       [maybe_paren(other, sources, query), ?:, ?: | tagged_to_db(type)]
    764     end
    765 
    766     defp expr(nil, _sources, _query),   do: "NULL"
    767     defp expr(true, _sources, _query),  do: "TRUE"
    768     defp expr(false, _sources, _query), do: "FALSE"
    769 
    770     defp expr(literal, _sources, _query) when is_binary(literal) do
    771       [?\', escape_string(literal), ?\']
    772     end
    773 
    774     defp expr(literal, _sources, _query) when is_integer(literal) do
    775       Integer.to_string(literal)
    776     end
    777 
    778     defp expr(literal, _sources, _query) when is_float(literal) do
    779       [Float.to_string(literal) | "::float"]
    780     end
    781 
    782     defp expr(expr, _sources, query) do
    783       error!(query, "unsupported expression: #{inspect(expr)}")
    784     end
    785 
    786     defp json_extract_path(expr, [], sources, query) do
    787       expr(expr, sources, query)
    788     end
    789 
    790     defp json_extract_path(expr, path, sources, query) do
    791       path = intersperse_map(path, ?,, &escape_json/1)
    792       [?(, expr(expr, sources, query), "#>'{", path, "}')"]
    793     end
    794 
    795     defp type_unless_typed(%Ecto.Query.Tagged{}, _type), do: []
    796     defp type_unless_typed(_, type), do: [?:, ?: | type]
    797 
    798     # Always use the largest possible type for integers
    799     defp tagged_to_db(:id), do: "bigint"
    800     defp tagged_to_db(:integer), do: "bigint"
    801     defp tagged_to_db({:array, type}), do: [tagged_to_db(type), ?[, ?]]
    802     defp tagged_to_db(type), do: ecto_to_db(type)
    803 
    804     defp interval(count, interval, _sources, _query) when is_integer(count) do
    805       ["interval '", String.Chars.Integer.to_string(count), ?\s, interval, ?\']
    806     end
    807 
    808     defp interval(count, interval, _sources, _query) when is_float(count) do
    809       count = :erlang.float_to_binary(count, [:compact, decimals: 16])
    810       ["interval '", count, ?\s, interval, ?\']
    811     end
    812 
    813     defp interval(count, interval, sources, query) do
    814       [?(, expr(count, sources, query), "::numeric * ",
    815        interval(1, interval, sources, query), ?)]
    816     end
    817 
    818     defp maybe_paren({op, _, [_, _]} = expr, sources, query) when op in @binary_ops,
    819       do: paren_expr(expr, sources, query)
    820 
    821     defp maybe_paren({:is_nil, _, [_]} = expr, sources, query),
    822       do: paren_expr(expr, sources, query)
    823 
    824     defp maybe_paren(expr, sources, query),
    825       do: expr(expr, sources, query)
    826 
    827     defp returning(%{select: nil}, _sources),
    828       do: []
    829     defp returning(%{select: %{fields: fields}} = query, sources),
    830       do: [" RETURNING " | select_fields(fields, sources, query)]
    831 
    832     defp returning([]),
    833       do: []
    834     defp returning(returning),
    835       do: [" RETURNING " | quote_names(returning)]
    836 
    837     defp create_names(%{sources: sources}, as_prefix) do
    838       create_names(sources, 0, tuple_size(sources), as_prefix) |> List.to_tuple()
    839     end
    840 
    841     defp create_names(sources, pos, limit, as_prefix) when pos < limit do
    842       [create_name(sources, pos, as_prefix) | create_names(sources, pos + 1, limit, as_prefix)]
    843     end
    844 
    845     defp create_names(_sources, pos, pos, as_prefix) do
    846       [as_prefix]
    847     end
    848 
    849     defp subquery_as_prefix(sources) do
    850       [?s | :erlang.element(tuple_size(sources), sources)]
    851     end
    852 
    853     defp create_name(sources, pos, as_prefix) do
    854       case elem(sources, pos) do
    855         {:fragment, _, _} ->
    856           {nil, as_prefix ++ [?f | Integer.to_string(pos)], nil}
    857 
    858         {table, schema, prefix} ->
    859           name = as_prefix ++ [create_alias(table) | Integer.to_string(pos)]
    860           {quote_table(prefix, table), name, schema}
    861 
    862         %Ecto.SubQuery{} ->
    863           {nil, as_prefix ++ [?s | Integer.to_string(pos)], nil}
    864       end
    865     end
    866 
    867     defp create_alias(<<first, _rest::binary>>) when first in ?a..?z when first in ?A..?Z do
    868       first
    869     end
    870     defp create_alias(_) do
    871       ?t
    872     end
    873 
    874     # DDL
    875 
    876     alias Ecto.Migration.{Table, Index, Reference, Constraint}
    877 
    878     @creates [:create, :create_if_not_exists]
    879     @drops [:drop, :drop_if_exists]
    880 
    881     @impl true
    882     def execute_ddl({command, %Table{} = table, columns}) when command in @creates do
    883       table_name = quote_table(table.prefix, table.name)
    884       query = ["CREATE TABLE ",
    885                if_do(command == :create_if_not_exists, "IF NOT EXISTS "),
    886                table_name, ?\s, ?(,
    887                column_definitions(table, columns), pk_definition(columns, ", "), ?),
    888                options_expr(table.options)]
    889 
    890       [query] ++
    891         comments_on("TABLE", table_name, table.comment) ++
    892         comments_for_columns(table_name, columns)
    893     end
    894 
    895     def execute_ddl({command, %Table{} = table, mode}) when command in @drops do
    896       [["DROP TABLE ", if_do(command == :drop_if_exists, "IF EXISTS "),
    897         quote_table(table.prefix, table.name), drop_mode(mode)]]
    898     end
    899 
    900     def execute_ddl({:alter, %Table{} = table, changes}) do
    901       table_name = quote_table(table.prefix, table.name)
    902       query = ["ALTER TABLE ", table_name, ?\s,
    903                column_changes(table, changes), pk_definition(changes, ", ADD ")]
    904 
    905       [query] ++
    906         comments_on("TABLE", table_name, table.comment) ++
    907         comments_for_columns(table_name, changes)
    908     end
    909 
    910     def execute_ddl({command, %Index{} = index}) when command in @creates do
    911       fields = intersperse_map(index.columns, ", ", &index_expr/1)
    912       include_fields = intersperse_map(index.include, ", ", &index_expr/1)
    913 
    914       maybe_nulls_distinct =
    915         case index.nulls_distinct do
    916           nil -> []
    917           true -> " NULLS DISTINCT"
    918           false -> " NULLS NOT DISTINCT"
    919         end
    920 
    921       queries = [["CREATE ",
    922                   if_do(index.unique, "UNIQUE "),
    923                   "INDEX ",
    924                   if_do(index.concurrently, "CONCURRENTLY "),
    925                   if_do(command == :create_if_not_exists, "IF NOT EXISTS "),
    926                   quote_name(index.name),
    927                   " ON ",
    928                   quote_table(index.prefix, index.table),
    929                   if_do(index.using, [" USING " , to_string(index.using)]),
    930                   ?\s, ?(, fields, ?),
    931                   if_do(include_fields != [], [" INCLUDE ", ?(, include_fields, ?)]),
    932                   maybe_nulls_distinct,
    933                   if_do(index.where, [" WHERE ", to_string(index.where)])]]
    934 
    935       queries ++ comments_on("INDEX", quote_table(index.prefix, index.name), index.comment)
    936     end
    937 
    938     def execute_ddl({command, %Index{} = index, mode}) when command in @drops do
    939       [["DROP INDEX ",
    940         if_do(index.concurrently, "CONCURRENTLY "),
    941         if_do(command == :drop_if_exists, "IF EXISTS "),
    942         quote_table(index.prefix, index.name),
    943         drop_mode(mode)]]
    944     end
    945 
    946     def execute_ddl({:rename, %Table{} = current_table, %Table{} = new_table}) do
    947       [["ALTER TABLE ", quote_table(current_table.prefix, current_table.name),
    948         " RENAME TO ", quote_table(nil, new_table.name)]]
    949     end
    950 
    951     def execute_ddl({:rename, %Table{} = table, current_column, new_column}) do
    952       [["ALTER TABLE ", quote_table(table.prefix, table.name), " RENAME ",
    953         quote_name(current_column), " TO ", quote_name(new_column)]]
    954     end
    955 
    956     def execute_ddl({:create, %Constraint{} = constraint}) do
    957       table_name = quote_table(constraint.prefix, constraint.table)
    958       queries = [["ALTER TABLE ", table_name,
    959                   " ADD ", new_constraint_expr(constraint)]]
    960 
    961       queries ++ comments_on("CONSTRAINT", constraint.name, constraint.comment, table_name)
    962     end
    963 
    964     def execute_ddl({command, %Constraint{}, :cascade}) when command in @drops,
    965       do: error!(nil, "PostgreSQL does not support `CASCADE` in DROP CONSTRAINT commands")
    966 
    967     def execute_ddl({command, %Constraint{} = constraint, :restrict}) when command in @drops do
    968       [["ALTER TABLE ", quote_table(constraint.prefix, constraint.table),
    969         " DROP CONSTRAINT ", if_do(command == :drop_if_exists, "IF EXISTS "), quote_name(constraint.name)]]
    970     end
    971 
    972     def execute_ddl(string) when is_binary(string), do: [string]
    973 
    974     def execute_ddl(keyword) when is_list(keyword),
    975       do: error!(nil, "PostgreSQL adapter does not support keyword lists in execute")
    976 
    977     @impl true
    978     def ddl_logs(%Postgrex.Result{} = result) do
    979       %{messages: messages} = result
    980 
    981       for message <- messages do
    982         %{message: message, severity: severity} = message
    983 
    984         {ddl_log_level(severity), message, []}
    985       end
    986     end
    987 
    988     @impl true
    989     def table_exists_query(table) do
    990       {"SELECT true FROM information_schema.tables WHERE table_name = $1 AND table_schema = current_schema() LIMIT 1", [table]}
    991     end
    992 
    993     defp drop_mode(:cascade), do: " CASCADE"
    994     defp drop_mode(:restrict), do: []
    995 
    996     # From https://www.postgresql.org/docs/current/protocol-error-fields.html.
    997     defp ddl_log_level("DEBUG"), do: :debug
    998     defp ddl_log_level("LOG"), do: :info
    999     defp ddl_log_level("INFO"), do: :info
   1000     defp ddl_log_level("NOTICE"), do: :info
   1001     defp ddl_log_level("WARNING"), do: :warn
   1002     defp ddl_log_level("ERROR"), do: :error
   1003     defp ddl_log_level("FATAL"), do: :error
   1004     defp ddl_log_level("PANIC"), do: :error
   1005     defp ddl_log_level(_severity), do: :info
   1006 
   1007     defp pk_definition(columns, prefix) do
   1008       pks =
   1009         for {_, name, _, opts} <- columns,
   1010             opts[:primary_key],
   1011             do: name
   1012 
   1013       case pks do
   1014         [] -> []
   1015         _  -> [prefix, "PRIMARY KEY (", quote_names(pks), ")"]
   1016       end
   1017     end
   1018 
   1019     defp comments_on(_object, _name, nil), do: []
   1020     defp comments_on(object, name, comment) do
   1021       [["COMMENT ON ", object, ?\s, name, " IS ", single_quote(comment)]]
   1022     end
   1023 
   1024     defp comments_on(_object, _name, nil, _table_name), do:  []
   1025     defp comments_on(object, name, comment, table_name) do
   1026       [["COMMENT ON ", object, ?\s, quote_name(name), " ON ", table_name,
   1027         " IS ", single_quote(comment)]]
   1028     end
   1029 
   1030     defp comments_for_columns(table_name, columns) do
   1031       Enum.flat_map(columns, fn
   1032         {_operation, column_name, _column_type, opts} ->
   1033           column_name = [table_name, ?. | quote_name(column_name)]
   1034           comments_on("COLUMN", column_name, opts[:comment])
   1035         _ -> []
   1036       end)
   1037     end
   1038 
   1039     defp column_definitions(table, columns) do
   1040       intersperse_map(columns, ", ", &column_definition(table, &1))
   1041     end
   1042 
   1043     defp column_definition(table, {:add, name, %Reference{} = ref, opts}) do
   1044       [quote_name(name), ?\s, reference_column_type(ref.type, opts),
   1045        column_options(ref.type, opts), ", ", reference_expr(ref, table, name)]
   1046     end
   1047 
   1048     defp column_definition(_table, {:add, name, type, opts}) do
   1049       [quote_name(name), ?\s, column_type(type, opts), column_options(type, opts)]
   1050     end
   1051 
   1052     defp column_changes(table, columns) do
   1053       intersperse_map(columns, ", ", &column_change(table, &1))
   1054     end
   1055 
   1056     defp column_change(table, {:add, name, %Reference{} = ref, opts}) do
   1057       ["ADD COLUMN ", quote_name(name), ?\s, reference_column_type(ref.type, opts),
   1058        column_options(ref.type, opts), ", ADD ", reference_expr(ref, table, name)]
   1059     end
   1060 
   1061     defp column_change(_table, {:add, name, type, opts}) do
   1062       ["ADD COLUMN ", quote_name(name), ?\s, column_type(type, opts),
   1063        column_options(type, opts)]
   1064     end
   1065 
   1066     defp column_change(table, {:add_if_not_exists, name, %Reference{} = ref, opts}) do
   1067       ["ADD COLUMN IF NOT EXISTS ", quote_name(name), ?\s, reference_column_type(ref.type, opts),
   1068        column_options(ref.type, opts), ", ADD ", reference_expr(ref, table, name)]
   1069     end
   1070 
   1071     defp column_change(_table, {:add_if_not_exists, name, type, opts}) do
   1072       ["ADD COLUMN IF NOT EXISTS ", quote_name(name), ?\s, column_type(type, opts),
   1073        column_options(type, opts)]
   1074     end
   1075 
   1076     defp column_change(table, {:modify, name, %Reference{} = ref, opts}) do
   1077       [drop_reference_expr(opts[:from], table, name), "ALTER COLUMN ", quote_name(name),
   1078       " TYPE ", reference_column_type(ref.type, opts),
   1079        ", ADD ", reference_expr(ref, table, name),
   1080        modify_null(name, opts), modify_default(name, ref.type, opts)]
   1081     end
   1082 
   1083     defp column_change(table, {:modify, name, type, opts}) do
   1084       [drop_reference_expr(opts[:from], table, name), "ALTER COLUMN ", quote_name(name), " TYPE ",
   1085        column_type(type, opts), modify_null(name, opts), modify_default(name, type, opts)]
   1086     end
   1087 
   1088     defp column_change(_table, {:remove, name}), do: ["DROP COLUMN ", quote_name(name)]
   1089     defp column_change(table, {:remove, name, %Reference{} = ref, _opts}) do
   1090       [drop_reference_expr(ref, table, name), "DROP COLUMN ", quote_name(name)]
   1091     end
   1092     defp column_change(_table, {:remove, name, _type, _opts}), do: ["DROP COLUMN ", quote_name(name)]
   1093 
   1094     defp column_change(table, {:remove_if_exists, name, %Reference{} = ref}) do
   1095       [drop_reference_if_exists_expr(ref, table, name), "DROP COLUMN IF EXISTS ", quote_name(name)]
   1096     end
   1097     defp column_change(_table, {:remove_if_exists, name, _type}), do: ["DROP COLUMN IF EXISTS ", quote_name(name)]
   1098 
   1099     defp modify_null(name, opts) do
   1100       case Keyword.get(opts, :null) do
   1101         true  -> [", ALTER COLUMN ", quote_name(name), " DROP NOT NULL"]
   1102         false -> [", ALTER COLUMN ", quote_name(name), " SET NOT NULL"]
   1103         nil   -> []
   1104       end
   1105     end
   1106 
   1107     defp modify_default(name, type, opts) do
   1108       case Keyword.fetch(opts, :default) do
   1109         {:ok, val} -> [", ALTER COLUMN ", quote_name(name), " SET", default_expr({:ok, val}, type)]
   1110         :error -> []
   1111       end
   1112     end
   1113 
   1114     defp column_options(type, opts) do
   1115       default = Keyword.fetch(opts, :default)
   1116       null    = Keyword.get(opts, :null)
   1117 
   1118       [default_expr(default, type), null_expr(null)]
   1119     end
   1120 
   1121     defp null_expr(false), do: " NOT NULL"
   1122     defp null_expr(true), do: " NULL"
   1123     defp null_expr(_), do: []
   1124 
   1125     defp new_constraint_expr(%Constraint{check: check} = constraint) when is_binary(check) do
   1126       ["CONSTRAINT ", quote_name(constraint.name), " CHECK (", check, ")", validate(constraint.validate)]
   1127     end
   1128     defp new_constraint_expr(%Constraint{exclude: exclude} = constraint) when is_binary(exclude) do
   1129       ["CONSTRAINT ", quote_name(constraint.name), " EXCLUDE USING ", exclude, validate(constraint.validate)]
   1130     end
   1131 
   1132     defp default_expr({:ok, nil}, _type),    do: " DEFAULT NULL"
   1133     defp default_expr({:ok, literal}, type), do: [" DEFAULT ", default_type(literal, type)]
   1134     defp default_expr(:error, _),            do: []
   1135 
   1136     defp default_type(list, {:array, inner} = type) when is_list(list) do
   1137       ["ARRAY[",  Enum.map(list, &default_type(&1, inner)) |> Enum.intersperse(?,), "]::", ecto_to_db(type)]
   1138     end
   1139     defp default_type(literal, _type) when is_binary(literal) do
   1140       if :binary.match(literal, <<0>>) == :nomatch and String.valid?(literal) do
   1141         single_quote(literal)
   1142       else
   1143         encoded = "\\x" <> Base.encode16(literal, case: :lower)
   1144         raise ArgumentError, "default values are interpolated as UTF-8 strings and cannot contain null bytes. " <>
   1145                              "`#{inspect literal}` is invalid. If you want to write it as a binary, use \"#{encoded}\", " <>
   1146                              "otherwise refer to PostgreSQL documentation for instructions on how to escape this SQL type"
   1147       end
   1148     end
   1149     defp default_type(literal, _type) when is_number(literal),  do: to_string(literal)
   1150     defp default_type(literal, _type) when is_boolean(literal), do: to_string(literal)
   1151     defp default_type(%{} = map, :map) do
   1152       library = Application.get_env(:postgrex, :json_library, Jason)
   1153       default = IO.iodata_to_binary(library.encode_to_iodata!(map))
   1154       [single_quote(default)]
   1155     end
   1156     defp default_type({:fragment, expr}, _type),
   1157       do: [expr]
   1158     defp default_type(expr, type),
   1159       do: raise(ArgumentError, "unknown default `#{inspect expr}` for type `#{inspect type}`. " <>
   1160                                ":default may be a string, number, boolean, list of strings, list of integers, map (when type is Map), or a fragment(...)")
   1161 
   1162     defp index_expr(literal) when is_binary(literal),
   1163       do: literal
   1164     defp index_expr(literal),
   1165       do: quote_name(literal)
   1166 
   1167     defp options_expr(nil),
   1168       do: []
   1169     defp options_expr(keyword) when is_list(keyword),
   1170       do: error!(nil, "PostgreSQL adapter does not support keyword lists in :options")
   1171     defp options_expr(options),
   1172       do: [?\s, options]
   1173 
   1174     defp column_type({:array, type}, opts),
   1175       do: [column_type(type, opts), "[]"]
   1176 
   1177     defp column_type(type, _opts) when type in ~w(time utc_datetime naive_datetime)a,
   1178       do: [ecto_to_db(type), "(0)"]
   1179 
   1180     defp column_type(type, opts) when type in ~w(time_usec utc_datetime_usec naive_datetime_usec)a do
   1181       precision = Keyword.get(opts, :precision)
   1182       type_name = ecto_to_db(type)
   1183 
   1184       if precision do
   1185         [type_name, ?(, to_string(precision), ?)]
   1186       else
   1187         type_name
   1188       end
   1189     end
   1190 
   1191     defp column_type(:identity, opts) do
   1192       start_value = [Keyword.get(opts, :start_value)]
   1193       increment   = [Keyword.get(opts, :increment)]
   1194       type_name   = ecto_to_db(:identity)
   1195 
   1196       cleanup = fn v -> is_integer(v) and v > 0 end
   1197 
   1198       sequence =
   1199         start_value
   1200         |> Enum.filter(cleanup)
   1201         |> Enum.map(&"START WITH #{&1}")
   1202         |> Kernel.++(
   1203           increment
   1204           |> Enum.filter(cleanup)
   1205           |> Enum.map(&"INCREMENT BY #{&1}")
   1206         )
   1207 
   1208       case sequence do
   1209         [] -> [type_name, " GENERATED BY DEFAULT AS IDENTITY"]
   1210         _ ->  [type_name, " GENERATED BY DEFAULT AS IDENTITY(", Enum.join(sequence, " "), ") "]
   1211       end
   1212     end
   1213 
   1214     defp column_type(type, opts) do
   1215       size      = Keyword.get(opts, :size)
   1216       precision = Keyword.get(opts, :precision)
   1217       scale     = Keyword.get(opts, :scale)
   1218       type_name = ecto_to_db(type)
   1219 
   1220       cond do
   1221         size            -> [type_name, ?(, to_string(size), ?)]
   1222         precision       -> [type_name, ?(, to_string(precision), ?,, to_string(scale || 0), ?)]
   1223         type == :string -> [type_name, "(255)"]
   1224         true            -> type_name
   1225       end
   1226     end
   1227 
   1228     defp reference_expr(%Reference{} = ref, table, name) do
   1229       {current_columns, reference_columns} = Enum.unzip([{name, ref.column} | ref.with])
   1230 
   1231       ["CONSTRAINT ", reference_name(ref, table, name), ?\s,
   1232        "FOREIGN KEY (", quote_names(current_columns), ") REFERENCES ",
   1233        quote_table(ref.prefix || table.prefix, ref.table), ?(, quote_names(reference_columns), ?),
   1234        reference_match(ref.match), reference_on_delete(ref.on_delete),
   1235        reference_on_update(ref.on_update), validate(ref.validate)]
   1236     end
   1237 
   1238     defp drop_reference_expr(%Reference{} = ref, table, name),
   1239       do: ["DROP CONSTRAINT ", reference_name(ref, table, name), ", "]
   1240     defp drop_reference_expr(_, _, _),
   1241       do: []
   1242 
   1243     defp drop_reference_if_exists_expr(%Reference{} = ref, table, name),
   1244       do: ["DROP CONSTRAINT IF EXISTS ", reference_name(ref, table, name), ", "]
   1245     defp drop_reference_if_exists_expr(_, _, _),
   1246       do: []
   1247 
   1248     defp reference_name(%Reference{name: nil}, table, column),
   1249       do: quote_name("#{table.name}_#{column}_fkey")
   1250     defp reference_name(%Reference{name: name}, _table, _column),
   1251       do: quote_name(name)
   1252 
   1253     defp reference_column_type(:serial, _opts), do: "integer"
   1254     defp reference_column_type(:bigserial, _opts), do: "bigint"
   1255     defp reference_column_type(:identity, _opts), do: "bigint"
   1256     defp reference_column_type(type, opts), do: column_type(type, opts)
   1257 
   1258     defp reference_on_delete(:nilify_all), do: " ON DELETE SET NULL"
   1259     defp reference_on_delete(:delete_all), do: " ON DELETE CASCADE"
   1260     defp reference_on_delete(:restrict), do: " ON DELETE RESTRICT"
   1261     defp reference_on_delete(_), do: []
   1262 
   1263     defp reference_on_update(:nilify_all), do: " ON UPDATE SET NULL"
   1264     defp reference_on_update(:update_all), do: " ON UPDATE CASCADE"
   1265     defp reference_on_update(:restrict), do: " ON UPDATE RESTRICT"
   1266     defp reference_on_update(_), do: []
   1267 
   1268     defp reference_match(nil), do: []
   1269     defp reference_match(:full), do: " MATCH FULL"
   1270     defp reference_match(:simple), do: " MATCH SIMPLE"
   1271     defp reference_match(:partial), do: " MATCH PARTIAL"
   1272 
   1273     defp validate(false), do: " NOT VALID"
   1274     defp validate(_), do: []
   1275 
   1276     ## Helpers
   1277 
   1278     defp get_source(query, sources, ix, source) do
   1279       {expr, name, _schema} = elem(sources, ix)
   1280       {expr || expr(source, sources, query), name}
   1281     end
   1282 
   1283     defp get_parent_sources_ix(query, as) do
   1284       case query.aliases[@parent_as] do
   1285         {%{aliases: %{^as => ix}}, sources} -> {ix, sources}
   1286         {%{} = parent, _sources} -> get_parent_sources_ix(parent, as)
   1287       end
   1288     end
   1289 
   1290     defp quote_qualified_name(name, sources, ix) do
   1291       {_, source, _} = elem(sources, ix)
   1292       [source, ?. | quote_name(name)]
   1293     end
   1294 
   1295     defp quote_names(names) do
   1296       intersperse_map(names, ?,, &quote_name/1)
   1297     end
   1298 
   1299     defp quote_name(name) when is_atom(name) do
   1300       quote_name(Atom.to_string(name))
   1301     end
   1302 
   1303     defp quote_name(name) when is_binary(name) do
   1304       if String.contains?(name, "\"") do
   1305         error!(nil, "bad literal/field/table name #{inspect name} (\" is not permitted)")
   1306       end
   1307 
   1308       [?", name, ?"]
   1309     end
   1310 
   1311     defp quote_table(nil, name),    do: quote_table(name)
   1312     defp quote_table(prefix, name), do: [quote_table(prefix), ?., quote_table(name)]
   1313 
   1314     defp quote_table(name) when is_atom(name),
   1315       do: quote_table(Atom.to_string(name))
   1316     defp quote_table(name) do
   1317       if String.contains?(name, "\"") do
   1318         error!(nil, "bad table name #{inspect name}")
   1319       end
   1320       [?", name, ?"]
   1321     end
   1322 
   1323     # TRUE, ON, or 1 to enable the option, and FALSE, OFF, or 0 to disable it
   1324     defp quote_boolean(nil), do: nil
   1325     defp quote_boolean(true), do: "TRUE"
   1326     defp quote_boolean(false), do: "FALSE"
   1327     defp quote_boolean(value), do: error!(nil, "bad boolean value #{value}")
   1328 
   1329     defp format_to_sql(:text), do: "FORMAT TEXT"
   1330     defp format_to_sql(:map), do: "FORMAT JSON"
   1331     defp format_to_sql(:yaml), do: "FORMAT YAML"
   1332 
   1333     defp single_quote(value), do: [?', escape_string(value), ?']
   1334 
   1335     defp intersperse_map(list, separator, mapper, acc \\ [])
   1336     defp intersperse_map([], _separator, _mapper, acc),
   1337       do: acc
   1338     defp intersperse_map([elem], _separator, mapper, acc),
   1339       do: [acc | mapper.(elem)]
   1340     defp intersperse_map([elem | rest], separator, mapper, acc),
   1341       do: intersperse_map(rest, separator, mapper, [acc, mapper.(elem), separator])
   1342 
   1343     defp intersperse_reduce(list, separator, user_acc, reducer, acc \\ [])
   1344     defp intersperse_reduce([], _separator, user_acc, _reducer, acc),
   1345       do: {acc, user_acc}
   1346     defp intersperse_reduce([elem], _separator, user_acc, reducer, acc) do
   1347       {elem, user_acc} = reducer.(elem, user_acc)
   1348       {[acc | elem], user_acc}
   1349     end
   1350     defp intersperse_reduce([elem | rest], separator, user_acc, reducer, acc) do
   1351       {elem, user_acc} = reducer.(elem, user_acc)
   1352       intersperse_reduce(rest, separator, user_acc, reducer, [acc, elem, separator])
   1353     end
   1354 
   1355     defp if_do(condition, value) do
   1356       if condition, do: value, else: []
   1357     end
   1358 
   1359     defp escape_string(value) when is_binary(value) do
   1360       :binary.replace(value, "'", "''", [:global])
   1361     end
   1362 
   1363     defp escape_json(value) when is_binary(value) do
   1364       escaped =
   1365         value
   1366         |> escape_string()
   1367         |> :binary.replace("\"", "\\\"", [:global])
   1368 
   1369       [?", escaped, ?"]
   1370     end
   1371 
   1372     defp escape_json(value) when is_integer(value) do
   1373       Integer.to_string(value)
   1374     end
   1375 
   1376     defp escape_json(true), do: ["true"]
   1377     defp escape_json(false), do: ["false"]
   1378 
   1379     defp ecto_to_db({:array, t}),          do: [ecto_to_db(t), ?[, ?]]
   1380     defp ecto_to_db(:id),                  do: "integer"
   1381     defp ecto_to_db(:identity),            do: "bigint"
   1382     defp ecto_to_db(:serial),              do: "serial"
   1383     defp ecto_to_db(:bigserial),           do: "bigserial"
   1384     defp ecto_to_db(:binary_id),           do: "uuid"
   1385     defp ecto_to_db(:string),              do: "varchar"
   1386     defp ecto_to_db(:binary),              do: "bytea"
   1387     defp ecto_to_db(:map),                 do: Application.fetch_env!(:ecto_sql, :postgres_map_type)
   1388     defp ecto_to_db({:map, _}),            do: Application.fetch_env!(:ecto_sql, :postgres_map_type)
   1389     defp ecto_to_db(:time_usec),           do: "time"
   1390     defp ecto_to_db(:utc_datetime),        do: "timestamp"
   1391     defp ecto_to_db(:utc_datetime_usec),   do: "timestamp"
   1392     defp ecto_to_db(:naive_datetime),      do: "timestamp"
   1393     defp ecto_to_db(:naive_datetime_usec), do: "timestamp"
   1394     defp ecto_to_db(atom) when is_atom(atom),  do: Atom.to_string(atom)
   1395     defp ecto_to_db(type) do
   1396       raise ArgumentError,
   1397             "unsupported type `#{inspect(type)}`. The type can either be an atom, a string " <>
   1398               "or a tuple of the form `{:map, t}` or `{:array, t}` where `t` itself follows the same conditions."
   1399     end
   1400 
   1401     defp error!(nil, message) do
   1402       raise ArgumentError, message
   1403     end
   1404     defp error!(query, message) do
   1405       raise Ecto.QueryError, query: query, message: message
   1406     end
   1407   end
   1408 end