zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

planner.ex (74637B)


      1 defmodule Ecto.Query.Planner do
      2   # Normalizes a query and its parameters.
      3   @moduledoc false
      4 
      5   alias Ecto.Query.{BooleanExpr, DynamicExpr, FromExpr, JoinExpr, QueryExpr, SelectExpr}
      6 
      7   if map_size(%Ecto.Query{}) != 21 do
      8     raise "Ecto.Query match out of date in builder"
      9   end
     10 
     11   @parent_as __MODULE__
     12   @aggs ~w(count avg min max sum row_number rank dense_rank percent_rank cume_dist ntile lag lead first_value last_value nth_value)a
     13 
     14   @doc """
     15   Converts a query to a list of joins.
     16 
     17   The from is moved as last join with the where conditions as its "on"
     18   in order to keep proper binding order.
     19   """
     20   def query_to_joins(qual, source, %{wheres: wheres, joins: joins}, position) do
     21     on = %QueryExpr{file: __ENV__.file, line: __ENV__.line, expr: true, params: []}
     22 
     23     on =
     24       Enum.reduce(wheres, on, fn %BooleanExpr{op: op, expr: expr, params: params}, acc ->
     25         merge_expr_and_params(op, acc, expr, params)
     26       end)
     27 
     28     join = %JoinExpr{qual: qual, source: source, file: __ENV__.file, line: __ENV__.line, on: on}
     29     last = length(joins) + position
     30 
     31     mapping = fn
     32       0  -> last
     33       ix -> ix + position - 1
     34     end
     35 
     36     for {%{on: on} = join, ix} <- Enum.with_index(joins ++ [join]) do
     37       %{join | on: rewrite_sources(on, mapping), ix: ix + position}
     38     end
     39   end
     40 
     41   defp merge_expr_and_params(op, %QueryExpr{expr: left_expr, params: left_params} = struct,
     42                              right_expr, right_params) do
     43     right_expr = Ecto.Query.Builder.bump_interpolations(right_expr, left_params)
     44     %{struct | expr: merge_expr(op, left_expr, right_expr), params: left_params ++ right_params}
     45   end
     46 
     47   defp merge_expr(_op, left, true), do: left
     48   defp merge_expr(_op, true, right), do: right
     49   defp merge_expr(op, left, right), do: {op, [], [left, right]}
     50 
     51   @doc """
     52   Rewrites the given query expression sources using the given mapping.
     53   """
     54   def rewrite_sources(%{expr: expr, params: params} = part, mapping) do
     55     expr =
     56       Macro.prewalk expr, fn
     57         %Ecto.Query.Tagged{type: type, tag: tag} = tagged ->
     58           %{tagged | type: rewrite_type(type, mapping), tag: rewrite_type(tag, mapping)}
     59         {:&, meta, [ix]} ->
     60           {:&, meta, [mapping.(ix)]}
     61         other ->
     62           other
     63       end
     64 
     65     params =
     66       Enum.map params, fn
     67         {val, type} ->
     68           {val, rewrite_type(type, mapping)}
     69         val ->
     70           val
     71       end
     72 
     73     %{part | expr: expr, params: params}
     74   end
     75 
     76   defp rewrite_type({composite, {ix, field}}, mapping) when is_integer(ix) do
     77     {composite, {mapping.(ix), field}}
     78   end
     79 
     80   defp rewrite_type({ix, field}, mapping) when is_integer(ix) do
     81     {mapping.(ix), field}
     82   end
     83 
     84   defp rewrite_type(other, _mapping) do
     85     other
     86   end
     87 
     88   @doc """
     89   Define the query cache table.
     90   """
     91   def new_query_cache(atom_name) do
     92     :ets.new(atom_name || __MODULE__, [:set, :public, read_concurrency: true])
     93   end
     94 
     95   @doc """
     96   Plans the query for execution.
     97 
     98   Planning happens in multiple steps:
     99 
    100     1. First the query is planned by retrieving
    101        its cache key, casting and merging parameters
    102 
    103     2. Then a cache lookup is done, if the query is
    104        cached, we are done
    105 
    106     3. If there is no cache, we need to actually
    107        normalize and validate the query, asking the
    108        adapter to prepare it
    109 
    110     4. The query is sent to the adapter to be generated
    111 
    112   ## Cache
    113 
    114   All entries in the query, except the preload and sources
    115   field, should be part of the cache key.
    116 
    117   The cache value is the compiled query by the adapter
    118   along-side the select expression.
    119   """
    120   def query(query, operation, cache, adapter, counter) do
    121     {query, params, key} = plan(query, operation, adapter)
    122     {cast_params, dump_params} = Enum.unzip(params)
    123     query_with_cache(key, query, operation, cache, adapter, counter, cast_params, dump_params)
    124   end
    125 
    126   defp query_with_cache(key, query, operation, cache, adapter, counter, cast_params, dump_params) do
    127     case query_lookup(key, query, operation, cache, adapter, counter) do
    128       {_, select, prepared} ->
    129         {build_meta(query, select), {:nocache, prepared}, cast_params, dump_params}
    130       {_key, :cached, select, cached} ->
    131         update = &cache_update(cache, key, &1)
    132         reset = &cache_reset(cache, key, &1)
    133         {build_meta(query, select), {:cached, update, reset, cached}, cast_params, dump_params}
    134       {_key, :cache, select, prepared} ->
    135         update = &cache_update(cache, key, &1)
    136         {build_meta(query, select), {:cache, update, prepared}, cast_params, dump_params}
    137     end
    138   end
    139 
    140   defp query_lookup(:nocache, query, operation, _cache, adapter, counter) do
    141     query_without_cache(query, operation, adapter, counter)
    142   end
    143 
    144   defp query_lookup(key, query, operation, cache, adapter, counter) do
    145     case :ets.lookup(cache, key) do
    146       [term] -> term
    147       [] -> query_prepare(query, operation, adapter, counter, cache, key)
    148     end
    149   end
    150 
    151   defp query_prepare(query, operation, adapter, counter, cache, key) do
    152     case query_without_cache(query, operation, adapter, counter) do
    153       {:cache, select, prepared} ->
    154         cache_insert(cache, key, {key, :cache, select, prepared})
    155       {:nocache, _, _} = nocache ->
    156         nocache
    157     end
    158   end
    159 
    160   defp cache_insert(cache, key, elem) do
    161     case :ets.insert_new(cache, elem) do
    162       true ->
    163         elem
    164       false ->
    165         [elem] = :ets.lookup(cache, key)
    166         elem
    167     end
    168   end
    169 
    170   defp cache_update(cache, key, cached) do
    171     _ = :ets.update_element(cache, key, [{2, :cached}, {4, cached}])
    172     :ok
    173   end
    174 
    175   defp cache_reset(cache, key, prepared) do
    176     _ = :ets.update_element(cache, key, [{2, :cache}, {4, prepared}])
    177     :ok
    178   end
    179 
    180   defp query_without_cache(query, operation, adapter, counter) do
    181     {query, select} = normalize(query, operation, adapter, counter)
    182     {cache, prepared} = adapter.prepare(operation, query)
    183     {cache, select, prepared}
    184   end
    185 
    186   defp build_meta(%{sources: sources, preloads: preloads}, select) do
    187     %{select: select, preloads: preloads, sources: sources}
    188   end
    189 
    190   @doc """
    191   Prepares the query for cache.
    192 
    193   This means all the parameters from query expressions are
    194   merged into a single value and their entries are pruned
    195   from the query.
    196 
    197   This function is called by the backend before invoking
    198   any cache mechanism.
    199   """
    200   @spec plan(Ecto.Query.t, atom, module) :: {planned_query :: Ecto.Query.t, parameters :: list, cache_key :: any}
    201   def plan(query, operation, adapter) do
    202     query
    203     |> plan_sources(adapter)
    204     |> plan_assocs()
    205     |> plan_combinations(adapter)
    206     |> plan_ctes(adapter)
    207     |> plan_wheres(adapter)
    208     |> plan_select(adapter)
    209     |> plan_cache(operation, adapter)
    210   rescue
    211     e ->
    212       # Reraise errors so we ignore the planner inner stacktrace
    213       filter_and_reraise e, __STACKTRACE__
    214   end
    215 
    216   @doc """
    217   Prepare all sources, by traversing and expanding from, joins, subqueries.
    218   """
    219   def plan_sources(query, adapter) do
    220     {from, source} = plan_from(query, adapter)
    221 
    222     # Set up the initial source so we can refer
    223     # to the parent in subqueries in joins
    224     query = %{query | sources: {source}}
    225 
    226     {joins, sources, tail_sources} = plan_joins(query, [source], length(query.joins), adapter)
    227 
    228     %{query | from: from,
    229               joins: joins |> Enum.reverse,
    230               sources: (tail_sources ++ sources) |> Enum.reverse |> List.to_tuple()}
    231   end
    232 
    233   defp plan_from(%{from: nil} = query, _adapter) do
    234     error!(query, "query must have a from expression")
    235   end
    236 
    237   defp plan_from(%{from: %{source: {:fragment, _, _}}, preloads: preloads, assocs: assocs} = query, _adapter)
    238        when assocs != [] or preloads != [] do
    239     error!(query, "cannot preload associations with a fragment source")
    240   end
    241 
    242   defp plan_from(%{from: from} = query, adapter) do
    243     plan_source(query, from, adapter)
    244   end
    245 
    246   defp plan_source(query, %{source: %Ecto.SubQuery{} = subquery, prefix: prefix} = expr, adapter) do
    247     subquery = plan_subquery(subquery, query, prefix, adapter, true)
    248     {%{expr | source: subquery}, subquery}
    249   end
    250 
    251   defp plan_source(query, %{source: {nil, schema}} = expr, _adapter)
    252        when is_atom(schema) and schema != nil do
    253     source = schema.__schema__(:source)
    254     prefix = plan_source_schema_prefix(expr, schema) || query.prefix
    255     {%{expr | source: {source, schema}}, {source, schema, prefix}}
    256   end
    257 
    258   defp plan_source(query, %{source: {source, schema}, prefix: prefix} = expr, _adapter)
    259        when is_binary(source) and is_atom(schema),
    260        do: {expr, {source, schema, prefix || query.prefix}}
    261 
    262   defp plan_source(_query, %{source: {:fragment, _, _} = source, prefix: nil} = expr, _adapter),
    263        do: {expr, source}
    264 
    265   defp plan_source(query, %{source: {:fragment, _, _}, prefix: prefix} = expr, _adapter),
    266        do: error!(query, expr, "cannot set prefix: #{inspect(prefix)} option for fragment sources")
    267 
    268   defp plan_subquery(subquery, query, prefix, adapter, source?) do
    269     %{query: inner_query} = subquery
    270 
    271     inner_query = %{
    272       inner_query
    273       | prefix: prefix || subquery.query.prefix || query.prefix,
    274         aliases: Map.put(inner_query.aliases, @parent_as, query)
    275     }
    276 
    277     {inner_query, params, key} = plan(inner_query, :all, adapter)
    278     assert_no_subquery_assocs!(inner_query)
    279 
    280     {inner_query, select} =
    281       inner_query
    282       |> ensure_select(true)
    283       |> normalize_subquery_select(adapter, source?)
    284 
    285     {_, inner_query} = pop_in(inner_query.aliases[@parent_as])
    286     %{subquery | query: inner_query, params: params, cache: key, select: select}
    287   rescue
    288     e -> raise Ecto.SubQueryError, query: query, exception: e
    289   end
    290 
    291   # The prefix for form are computed upfront, but not for joins
    292   defp plan_source_schema_prefix(%FromExpr{prefix: prefix}, _schema),
    293     do: prefix
    294 
    295   defp plan_source_schema_prefix(%JoinExpr{prefix: prefix}, schema),
    296     do: prefix || schema.__schema__(:prefix)
    297 
    298   defp assert_no_subquery_assocs!(%{assocs: assocs, preloads: preloads} = query)
    299        when assocs != [] or preloads != [] do
    300     error!(query, "cannot preload associations in subquery")
    301   end
    302 
    303   defp assert_no_subquery_assocs!(query) do
    304     query
    305   end
    306 
    307   defp normalize_subquery_select(query, adapter, source?) do
    308     {schema_or_source, expr, %{select: select} = query} = rewrite_subquery_select_expr(query, source?)
    309     {expr, _} = prewalk(expr, :select, query, select, 0, adapter)
    310     {{:map, types}, _fields, _from} = collect_fields(expr, [], :never, query, select.take, true, %{})
    311     {query, subquery_source(schema_or_source, types)}
    312   end
    313 
    314   defp subquery_source(nil, types), do: {:map, types}
    315   defp subquery_source(name, types) when is_atom(name), do: {:struct, name, types}
    316   defp subquery_source({:source, schema, prefix, types}, only) do
    317     types = Enum.map(only, fn {field, {:value, type}} -> {field, Keyword.get(types, field, type)} end)
    318     {:source, schema, prefix, types}
    319   end
    320 
    321   defp rewrite_subquery_select_expr(%{select: select} = query, source?) do
    322     %{expr: expr, take: take} = select
    323 
    324     case subquery_select(expr, take, query) do
    325       {schema_or_source, fields} ->
    326         expr = {:%{}, [], fields}
    327         {schema_or_source, expr, put_in(query.select.expr, expr)}
    328 
    329       :error when source? ->
    330         error!(query, "subquery/cte must select a source (t), a field (t.field) or a map, got: `#{Macro.to_string(expr)}`")
    331 
    332       :error ->
    333         expr = {:%{}, [], [result: expr]}
    334         {nil, expr, put_in(query.select.expr, expr)}
    335     end
    336   end
    337 
    338   defp subquery_select({:merge, _, [left, right]}, take, query) do
    339     {left_struct, left_fields} = subquery_select(left, take, query)
    340     {right_struct, right_fields} = subquery_select(right, take, query)
    341     {left_struct || right_struct, Keyword.merge(left_fields, right_fields)}
    342   end
    343   defp subquery_select({:%, _, [name, map]}, take, query) do
    344     {_, fields} = subquery_select(map, take, query)
    345     {name, fields}
    346   end
    347   defp subquery_select({:%{}, _, [{:|, _, [{:&, [], [ix]}, pairs]}]} = expr, take, query) do
    348     assert_subquery_fields!(query, expr, pairs)
    349     drop = Map.new(pairs, fn {key, _} -> {key, nil} end)
    350     {source, _} = source_take!(:select, query, take, ix, ix, drop)
    351 
    352     # In case of map updates, we need to remove duplicated fields
    353     # at query time because we use the field names as aliases and
    354     # duplicate aliases will lead to invalid queries.
    355     kept_keys = subquery_source_fields(source) -- Keyword.keys(pairs)
    356     {keep_source_or_struct(source), subquery_fields(kept_keys, ix) ++ pairs}
    357   end
    358   defp subquery_select({:%{}, _, pairs} = expr, _take, query) do
    359     assert_subquery_fields!(query, expr, pairs)
    360     {nil, pairs}
    361   end
    362   defp subquery_select({:&, _, [ix]}, take, query) do
    363     {source, _} = source_take!(:select, query, take, ix, ix, %{})
    364     fields = subquery_source_fields(source)
    365     {keep_source_or_struct(source), subquery_fields(fields, ix)}
    366   end
    367   defp subquery_select({{:., _, [{:&, _, [_]}, field]}, _, []} = expr, _take, _query) do
    368     {nil, [{field, expr}]}
    369   end
    370   defp subquery_select(_expr, _take, _query) do
    371     :error
    372   end
    373 
    374   defp subquery_fields(fields, ix) do
    375     for field <- fields do
    376       {field, {{:., [], [{:&, [], [ix]}, field]}, [], []}}
    377     end
    378   end
    379 
    380   defp keep_source_or_struct({:source, _, _, _} = source), do: source
    381   defp keep_source_or_struct({:struct, name, _}), do: name
    382   defp keep_source_or_struct(_), do: nil
    383 
    384   defp subquery_source_fields({:source, _, _, types}), do: Keyword.keys(types)
    385   defp subquery_source_fields({:struct, _, types}), do: Keyword.keys(types)
    386   defp subquery_source_fields({:map, types}), do: Keyword.keys(types)
    387 
    388   defp subquery_type_for({:source, _, _, fields}, field), do: Keyword.fetch(fields, field)
    389   defp subquery_type_for({:struct, _name, types}, field), do: subquery_type_for_value(types, field)
    390   defp subquery_type_for({:map, types}, field), do: subquery_type_for_value(types, field)
    391 
    392   defp subquery_type_for_value(types, field) do
    393     case Keyword.fetch(types, field) do
    394       {:ok, {:value, type}} -> {:ok, type}
    395       {:ok, _} -> {:ok, :any}
    396       :error -> :error
    397     end
    398   end
    399 
    400   defp assert_subquery_fields!(query, expr, pairs) do
    401     Enum.each(pairs, fn
    402       {key, _} when not is_atom(key) ->
    403         error!(query, "only atom keys are allowed when selecting a map in subquery, got: `#{Macro.to_string(expr)}`")
    404 
    405       {key, value} ->
    406         if valid_subquery_value?(value) do
    407           {key, value}
    408         else
    409           error!(query, "atoms, structs, maps, lists, tuples and sources are not allowed as map values in subquery, got: `#{Macro.to_string(expr)}`")
    410         end
    411     end)
    412   end
    413 
    414   defp valid_subquery_value?({_, _}), do: false
    415   defp valid_subquery_value?(args) when is_list(args), do: false
    416   defp valid_subquery_value?({container, _, args})
    417        when container in [:{}, :%{}, :&, :%] and is_list(args), do: false
    418   defp valid_subquery_value?(nil), do: true
    419   defp valid_subquery_value?(arg) when is_atom(arg), do: is_boolean(arg)
    420   defp valid_subquery_value?(_), do: true
    421 
    422   defp plan_joins(query, sources, offset, adapter) do
    423     plan_joins(query.joins, query, [], sources, [], 1, offset, adapter)
    424   end
    425 
    426   defp plan_joins([%JoinExpr{assoc: {ix, assoc}, qual: qual, on: on, prefix: prefix} = join|t],
    427                      query, joins, sources, tail_sources, counter, offset, adapter) do
    428     source = fetch_source!(sources, ix)
    429     schema = schema_for_association_join!(query, join, source)
    430     refl = schema.__schema__(:association, assoc)
    431 
    432     unless refl do
    433       error! query, join, "could not find association `#{assoc}` on schema #{inspect schema}"
    434     end
    435 
    436     # If we have the following join:
    437     #
    438     #     from p in Post,
    439     #       join: p in assoc(p, :comments)
    440     #
    441     # The callback below will return a query that contains only
    442     # joins in a way it starts with the Post and ends in the
    443     # Comment.
    444     #
    445     # This means we need to rewrite the joins below to properly
    446     # shift the &... identifier in a way that:
    447     #
    448     #    &0         -> becomes assoc ix
    449     #    &LAST_JOIN -> becomes counter
    450     #
    451     # All values in the middle should be shifted by offset,
    452     # all values after join are already correct.
    453     child = refl.__struct__.joins_query(refl)
    454 
    455     # Rewrite prefixes:
    456     # 1. the child query has the parent query prefix
    457     #    (note the child query should NEVER have a prefix)
    458     # 2. from and joins can have their prefixes explicitly
    459     #    overwritten by the join prefix
    460     child = rewrite_prefix(child, query.prefix)
    461     child = update_in child.from, &rewrite_prefix(&1, prefix)
    462     child = update_in child.joins, &Enum.map(&1, fn join -> rewrite_prefix(join, prefix) end)
    463 
    464     last_ix = length(child.joins)
    465     source_ix = counter
    466 
    467     {_, child_from_source} = plan_source(child, child.from, adapter)
    468 
    469     {child_joins, child_sources, child_tail} =
    470       plan_joins(child, [child_from_source], offset + last_ix - 1, adapter)
    471 
    472     # Rewrite joins indexes as mentioned above
    473     child_joins = Enum.map(child_joins, &rewrite_join(&1, qual, ix, last_ix, source_ix, offset))
    474 
    475     # Drop the last resource which is the association owner (it is reversed)
    476     child_sources = Enum.drop(child_sources, -1)
    477 
    478     [current_source|child_sources] = child_sources
    479     child_sources = child_tail ++ child_sources
    480 
    481     plan_joins(t, query, attach_on(child_joins, on) ++ joins, [current_source|sources],
    482                   child_sources ++ tail_sources, counter + 1, offset + length(child_sources), adapter)
    483   end
    484 
    485   defp plan_joins([%JoinExpr{source: %Ecto.Query{} = join_query, qual: qual, on: on, prefix: prefix} = join|t],
    486                       query, joins, sources, tail_sources, counter, offset, adapter) do
    487     case join_query do
    488       %{order_bys: [], limit: nil, offset: nil, group_bys: [], joins: [],
    489         havings: [], preloads: [], assocs: [], distinct: nil, lock: nil} ->
    490         join_query = rewrite_prefix(join_query, query.prefix)
    491         from = rewrite_prefix(join_query.from, prefix)
    492         {from, source} = plan_source(join_query, from, adapter)
    493         [join] = attach_on(query_to_joins(qual, from.source, join_query, counter), on)
    494         plan_joins(t, query, [join|joins], [source|sources], tail_sources, counter + 1, offset, adapter)
    495       _ ->
    496         error! query, join, """
    497         invalid query was interpolated in a join.
    498         If you want to pass a query to a join, you must either:
    499 
    500           1. Make sure the query only has `where` conditions (which will be converted to ON clauses)
    501           2. Or wrap the query in a subquery by calling subquery(query)
    502         """
    503     end
    504   end
    505 
    506   defp plan_joins([%JoinExpr{} = join|t],
    507                       query, joins, sources, tail_sources, counter, offset, adapter) do
    508     {join, source} = plan_source(query, %{join | ix: counter}, adapter)
    509     plan_joins(t, query, [join|joins], [source|sources], tail_sources, counter + 1, offset, adapter)
    510   end
    511 
    512   defp plan_joins([], _query, joins, sources, tail_sources, _counter, _offset, _adapter) do
    513     {joins, sources, tail_sources}
    514   end
    515 
    516   defp attach_on([%{on: on} = h | t], %{expr: expr, params: params}) do
    517     [%{h | on: merge_expr_and_params(:and, on, expr, params)} | t]
    518   end
    519 
    520   defp rewrite_prefix(expr, nil), do: expr
    521   defp rewrite_prefix(%{prefix: nil} = expr, prefix), do: %{expr | prefix: prefix}
    522   defp rewrite_prefix(expr, _prefix), do: expr
    523 
    524   defp rewrite_join(%{on: on, ix: join_ix} = join, qual, ix, last_ix, source_ix, inc_ix) do
    525     expr = Macro.prewalk on.expr, fn
    526         {:&, meta, [join_ix]} ->
    527           {:&, meta, [rewrite_ix(join_ix, ix, last_ix, source_ix, inc_ix)]}
    528         expr = %Ecto.Query.Tagged{type: {type_ix, type}} when is_integer(type_ix) ->
    529           %{expr | type: {rewrite_ix(type_ix, ix, last_ix, source_ix, inc_ix), type}}
    530         other ->
    531           other
    532       end
    533 
    534     params = Enum.map(on.params, &rewrite_param_ix(&1, ix, last_ix, source_ix, inc_ix))
    535 
    536     %{join | on: %{on | expr: expr, params: params}, qual: qual,
    537              ix: rewrite_ix(join_ix, ix, last_ix, source_ix, inc_ix)}
    538   end
    539 
    540   # We need to replace the source by the one from the assoc
    541   defp rewrite_ix(0, ix, _last_ix, _source_ix, _inc_x), do: ix
    542 
    543   # The last entry will have the current source index
    544   defp rewrite_ix(last_ix, _ix, last_ix, source_ix, _inc_x), do: source_ix
    545 
    546   # All above last are already correct
    547   defp rewrite_ix(join_ix, _ix, last_ix, _source_ix, _inc_ix) when join_ix > last_ix, do: join_ix
    548 
    549   # All others need to be incremented by the offset sources
    550   defp rewrite_ix(join_ix, _ix, _last_ix, _source_ix, inc_ix), do: join_ix + inc_ix
    551 
    552   defp rewrite_param_ix({value, {upper, {type_ix, field}}}, ix, last_ix, source_ix, inc_ix) when is_integer(type_ix) do
    553     {value, {upper, {rewrite_ix(type_ix, ix, last_ix, source_ix, inc_ix), field}}}
    554   end
    555 
    556   defp rewrite_param_ix({value, {type_ix, field}}, ix, last_ix, source_ix, inc_ix) when is_integer(type_ix) do
    557     {value, {rewrite_ix(type_ix, ix, last_ix, source_ix, inc_ix), field}}
    558   end
    559 
    560   defp rewrite_param_ix(param, _, _, _, _), do: param
    561 
    562   defp fetch_source!(sources, ix) when is_integer(ix) do
    563     case Enum.reverse(sources) |> Enum.fetch(ix) do
    564       {:ok, source} -> source
    565       :error -> raise ArgumentError, "could not find a source with index `#{ix}` in `#{inspect sources}"
    566     end
    567   end
    568 
    569   defp fetch_source!(_, ix) do
    570     raise ArgumentError, "invalid binding index: `#{inspect ix}` (check if you're binding using a valid :as atom)"
    571   end
    572 
    573   defp schema_for_association_join!(query, join, source) do
    574     case source do
    575       {:fragment, _, _} ->
    576         error! query, join, "cannot perform association joins on fragment sources"
    577 
    578       {source, nil, _} ->
    579           error! query, join, "cannot perform association join on #{inspect source} " <>
    580                               "because it does not have a schema"
    581 
    582       {_, schema, _} ->
    583         schema
    584 
    585       %Ecto.SubQuery{select: {:source, {_, schema}, _, _}} ->
    586         schema
    587 
    588       %Ecto.SubQuery{select: {:struct, schema, _}} ->
    589         schema
    590 
    591       %Ecto.SubQuery{} ->
    592         error! query, join, "can only perform association joins on subqueries " <>
    593                             "that return a source with schema in select"
    594 
    595       _ ->
    596         error! query, join, "can only perform association joins on sources with a schema"
    597     end
    598   end
    599 
    600   @spec plan_wheres(Ecto.Query.t, module) :: Ecto.Query.t
    601   defp plan_wheres(query, adapter) do
    602     wheres =
    603       Enum.map(query.wheres, fn
    604         %{subqueries: []} = where ->
    605           where
    606 
    607         %{subqueries: subqueries} = where ->
    608           %{where | subqueries: Enum.map(subqueries, &plan_subquery(&1, query, nil, adapter, false))}
    609       end)
    610 
    611     %{query | wheres: wheres}
    612   end
    613 
    614   @spec plan_select(Ecto.Query.t, module) :: Ecto.Query.t
    615   defp plan_select(query, adapter) do
    616     case query do
    617       %{select: %{subqueries: [_ | _] = subqueries}} ->
    618         subqueries = Enum.map(subqueries, &plan_subquery(&1, query, nil, adapter, false))
    619         put_in(query.select.subqueries, subqueries)
    620 
    621       query -> query
    622     end
    623   end
    624 
    625   @doc """
    626   Prepare the parameters by merging and casting them according to sources.
    627   """
    628   def plan_cache(query, operation, adapter) do
    629     {query, params, cache} = traverse_cache(query, operation, {[], []}, adapter)
    630     {query, Enum.reverse(params), cache}
    631   end
    632 
    633   defp traverse_cache(query, operation, cache_params, adapter) do
    634     fun = &{&3, merge_cache(&1, &2, &3, &4, operation, adapter)}
    635     {query, {cache, params}} = traverse_exprs(query, operation, cache_params, fun)
    636     {query, params, finalize_cache(query, operation, cache)}
    637   end
    638 
    639   defp merge_cache(:from, query, from, {cache, params}, _operation, adapter) do
    640     {key, params} = source_cache(from, params)
    641     {params, source_cacheable?} = cast_and_merge_params(:from, query, from, params, adapter)
    642     {merge_cache({:from, key, from.hints}, cache, source_cacheable? and key != :nocache), params}
    643   end
    644 
    645   defp merge_cache(kind, query, expr, {cache, params}, _operation, adapter)
    646       when kind in ~w(select distinct limit offset)a do
    647     if expr do
    648       {params, cacheable?} = cast_and_merge_params(kind, query, expr, params, adapter)
    649       {merge_cache({kind, expr_to_cache(expr)}, cache, cacheable?), params}
    650     else
    651       {cache, params}
    652     end
    653   end
    654 
    655   defp merge_cache(kind, query, exprs, {cache, params}, _operation, adapter)
    656       when kind in ~w(where update group_by having order_by)a do
    657     {expr_cache, {params, cacheable?}} =
    658       Enum.map_reduce exprs, {params, true}, fn expr, {params, cacheable?} ->
    659         {params, current_cacheable?} = cast_and_merge_params(kind, query, expr, params, adapter)
    660         {expr_to_cache(expr), {params, cacheable? and current_cacheable?}}
    661       end
    662 
    663     case expr_cache do
    664       [] -> {cache, params}
    665       _  -> {merge_cache({kind, expr_cache}, cache, cacheable?), params}
    666     end
    667   end
    668 
    669   defp merge_cache(:join, query, exprs, {cache, params}, _operation, adapter) do
    670     {expr_cache, {params, cacheable?}} =
    671       Enum.map_reduce exprs, {params, true}, fn
    672         %JoinExpr{on: on, qual: qual, hints: hints} = join, {params, cacheable?} ->
    673           {key, params} = source_cache(join, params)
    674           {params, join_cacheable?} = cast_and_merge_params(:join, query, join, params, adapter)
    675           {params, on_cacheable?} = cast_and_merge_params(:join, query, on, params, adapter)
    676           {{qual, key, on.expr, hints},
    677            {params, cacheable? and join_cacheable? and on_cacheable? and key != :nocache}}
    678       end
    679 
    680     case expr_cache do
    681       [] -> {cache, params}
    682       _  -> {merge_cache({:join, expr_cache}, cache, cacheable?), params}
    683     end
    684   end
    685 
    686   defp merge_cache(:windows, query, exprs, {cache, params}, _operation, adapter) do
    687     {expr_cache, {params, cacheable?}} =
    688       Enum.map_reduce exprs, {params, true}, fn {key, expr}, {params, cacheable?} ->
    689         {params, current_cacheable?} = cast_and_merge_params(:windows, query, expr, params, adapter)
    690         {{key, expr_to_cache(expr)}, {params, cacheable? and current_cacheable?}}
    691       end
    692 
    693     case expr_cache do
    694       [] -> {cache, params}
    695       _  -> {merge_cache({:windows, expr_cache}, cache, cacheable?), params}
    696     end
    697   end
    698 
    699   defp merge_cache(:combination, _query, combinations, cache_and_params, operation, adapter) do
    700     # In here we add each combination as its own entry in the cache key.
    701     # We could group them to avoid multiple keys, but since they are uncommon, we keep it simple.
    702     Enum.reduce combinations, cache_and_params, fn {modifier, query}, {cache, params} ->
    703       {_, params, inner_cache} = traverse_cache(query, operation, {[], params}, adapter)
    704       {merge_cache({modifier, inner_cache}, cache, inner_cache != :nocache), params}
    705     end
    706   end
    707 
    708   defp merge_cache(:with_cte, _query, nil, cache_and_params, _operation, _adapter) do
    709     cache_and_params
    710   end
    711 
    712   defp merge_cache(:with_cte, query, with_expr, cache_and_params, _operation, adapter) do
    713     %{queries: queries, recursive: recursive} = with_expr
    714     key = if recursive, do: :recursive_cte, else: :non_recursive_cte
    715 
    716     # In here we add each cte as its own entry in the cache key.
    717     # We could group them to avoid multiple keys, but since they are uncommon, we keep it simple.
    718     Enum.reduce queries, cache_and_params, fn
    719       {name, %Ecto.Query{} = query}, {cache, params} ->
    720         {_, params, inner_cache} = traverse_cache(query, :all, {[], params}, adapter)
    721         {merge_cache({key, name, inner_cache}, cache, inner_cache != :nocache), params}
    722 
    723       {name, %Ecto.Query.QueryExpr{} = query_expr}, {cache, params} ->
    724         {params, cacheable?} = cast_and_merge_params(:with_cte, query, query_expr, params, adapter)
    725         {merge_cache({key, name, expr_to_cache(query_expr)}, cache, cacheable?), params}
    726     end
    727   end
    728 
    729   defp expr_to_cache(%QueryExpr{expr: expr}), do: expr
    730   defp expr_to_cache(%SelectExpr{expr: expr}), do: expr
    731   defp expr_to_cache(%BooleanExpr{op: op, expr: expr, subqueries: []}), do: {op, expr}
    732   defp expr_to_cache(%BooleanExpr{op: op, expr: expr, subqueries: subqueries}) do
    733     # Alternate implementation could be replace {:subquery, i} expression in expr.
    734     # Current strategy appends [{:subquery, i, cache}], where cache is the cache key for this subquery.
    735     {op, expr, Enum.map(subqueries, fn %{cache: cache} -> {:subquery, cache} end)}
    736   end
    737 
    738   @spec cast_and_merge_params(atom, Ecto.Query.t, any, list, module) :: {params :: list, cacheable? :: boolean}
    739   defp cast_and_merge_params(kind, query, expr, params, adapter) do
    740     Enum.reduce expr.params, {params, true}, fn
    741       {:subquery, i}, {acc, cacheable?} ->
    742         # This is the place holder to intersperse subquery parameters.
    743         %Ecto.SubQuery{params: subparams, cache: cache} = Enum.fetch!(expr.subqueries, i)
    744         {Enum.reverse(subparams, acc), cacheable? and cache != :nocache}
    745 
    746       {v, type}, {acc, cacheable?} ->
    747         case cast_param(kind, query, expr, v, type, adapter) do
    748           {cast_v, {:in, dump_v}} -> {split_in_params(cast_v, dump_v, acc), false}
    749           cast_v_and_dump_v -> {[cast_v_and_dump_v | acc], cacheable?}
    750         end
    751     end
    752   end
    753 
    754   defp split_in_params(cast_v, dump_v, acc) do
    755     Enum.zip(cast_v, dump_v) |> Enum.reverse(acc)
    756   end
    757 
    758   defp merge_cache(_left, _right, false),  do: :nocache
    759   defp merge_cache(_left, :nocache, true), do: :nocache
    760   defp merge_cache(left, right, true),     do: [left|right]
    761 
    762   defp finalize_cache(_query, _operation, :nocache) do
    763     :nocache
    764   end
    765 
    766   defp finalize_cache(query, operation, cache) do
    767     %{assocs: assocs, prefix: prefix, lock: lock, select: select, aliases: aliases} = query
    768     aliases = Map.delete(aliases, @parent_as)
    769 
    770     cache =
    771       case select do
    772         %{take: take} when take != %{} ->
    773           [take: take] ++ cache
    774         _ ->
    775           cache
    776       end
    777 
    778     cache =
    779       cache
    780       |> prepend_if(assocs != [],   [assocs: assocs])
    781       |> prepend_if(prefix != nil,  [prefix: prefix])
    782       |> prepend_if(lock != nil,    [lock: lock])
    783       |> prepend_if(aliases != %{}, [aliases: aliases])
    784 
    785     [operation | cache]
    786   end
    787 
    788   defp prepend_if(cache, true, prepend), do: prepend ++ cache
    789   defp prepend_if(cache, false, _prepend), do: cache
    790 
    791   defp source_cache(%{source: {_, nil} = source, prefix: prefix}, params),
    792     do: {{source, prefix}, params}
    793   defp source_cache(%{source: {bin, schema}, prefix: prefix}, params),
    794     do: {{bin, schema, schema.__schema__(:hash), prefix}, params}
    795   defp source_cache(%{source: {:fragment, _, _} = source, prefix: prefix}, params),
    796     do: {{source, prefix}, params}
    797   defp source_cache(%{source: %Ecto.SubQuery{params: inner, cache: key}}, params),
    798     do: {key, Enum.reverse(inner, params)}
    799 
    800   defp cast_param(_kind, query, expr, %DynamicExpr{}, _type, _value) do
    801     error! query, expr, "invalid dynamic expression",
    802                         "dynamic expressions can only be interpolated at the top level of where, having, group_by, order_by, select, update or a join's on"
    803   end
    804   defp cast_param(_kind, query, expr, [{key, _} | _], _type, _value) when is_atom(key) do
    805     error! query, expr, "invalid keyword list",
    806                         "keyword lists are only allowed at the top level of where, having, distinct, order_by, update or a join's on"
    807   end
    808   defp cast_param(_kind, query, expr, %x{}, {:in, _type}, _value) when x in [Ecto.Query, Ecto.SubQuery] do
    809     error! query, expr, "an #{inspect(x)} struct is not supported as right-side value of `in` operator",
    810                         "Did you mean to write `expr in subquery(query)` instead?"
    811   end
    812   defp cast_param(kind, query, expr, v, type, adapter) do
    813     type = field_type!(kind, query, expr, type)
    814 
    815     try do
    816       case cast_param(kind, type, v, adapter) do
    817         {:ok, v} -> v
    818         {:error, error} -> error! query, expr, error
    819       end
    820     catch
    821       :error, %Ecto.QueryError{} = e ->
    822         raise Ecto.Query.CastError, value: v, type: type, message: Exception.message(e)
    823     end
    824   end
    825 
    826   defp cast_param(kind, type, v, adapter) do
    827     with {:ok, type} <- normalize_param(kind, type, v),
    828          {:ok, cast_v} <- cast_param(kind, type, v),
    829          {:ok, dump_v} <- dump_param(adapter, type, cast_v),
    830          do: {:ok, {cast_v, dump_v}}
    831   end
    832 
    833   @doc """
    834   Prepare association fields found in the query.
    835   """
    836   def plan_assocs(query) do
    837     plan_assocs(query, 0, query.assocs)
    838     query
    839   end
    840 
    841   defp plan_assocs(_query, _ix, []), do: :ok
    842   defp plan_assocs(query, ix, assocs) do
    843     # We validate the schema exists when preparing joins above
    844     {_, parent_schema, _} = get_preload_source!(query, ix)
    845 
    846     Enum.each assocs, fn {assoc, {child_ix, child_assocs}} ->
    847       refl = parent_schema.__schema__(:association, assoc)
    848 
    849       unless refl do
    850         error! query, "field `#{inspect parent_schema}.#{assoc}` " <>
    851                       "in preload is not an association"
    852       end
    853 
    854       case find_source_expr(query, child_ix) do
    855         %JoinExpr{qual: qual} when qual in [:inner, :left, :inner_lateral, :left_lateral] ->
    856           :ok
    857         %JoinExpr{qual: qual} ->
    858           error! query, "association `#{inspect parent_schema}.#{assoc}` " <>
    859                         "in preload requires an inner, left or lateral join, got #{qual} join"
    860         _ ->
    861           :ok
    862       end
    863 
    864       plan_assocs(query, child_ix, child_assocs)
    865     end
    866   end
    867 
    868   defp plan_combinations(query, adapter) do
    869     combinations =
    870       Enum.map query.combinations, fn {type, combination_query} ->
    871         {prepared_query, _params, _key} = combination_query |> attach_prefix(query) |> plan(:all, adapter)
    872         prepared_query = prepared_query |> ensure_select(true)
    873         {type, prepared_query}
    874       end
    875 
    876     %{query | combinations: combinations}
    877   end
    878 
    879   defp plan_ctes(%Ecto.Query{with_ctes: nil} = query, _adapter), do: query
    880   defp plan_ctes(%Ecto.Query{with_ctes: %{queries: queries}} = query, adapter) do
    881     queries =
    882       Enum.map queries, fn
    883         {name, %Ecto.Query{} = cte_query} ->
    884           {planned_query, _params, _key} = cte_query |> attach_prefix(query) |> plan(:all, adapter)
    885           planned_query = planned_query |> ensure_select(true)
    886           {name, planned_query}
    887 
    888         {name, other} ->
    889           {name, other}
    890       end
    891 
    892     put_in(query.with_ctes.queries, queries)
    893   end
    894 
    895   defp find_source_expr(query, 0) do
    896     query.from
    897   end
    898 
    899   defp find_source_expr(query, ix) do
    900     Enum.find(query.joins, & &1.ix == ix)
    901   end
    902 
    903   @doc """
    904   Used for customizing the query returning result.
    905   """
    906   def ensure_select(%{select: select} = query, _fields) when select != nil do
    907     query
    908   end
    909   def ensure_select(%{select: nil}, []) do
    910     raise ArgumentError, ":returning expects at least one field to be given, got an empty list"
    911   end
    912   def ensure_select(%{select: nil} = query, fields) when is_list(fields) do
    913     %{query | select: %SelectExpr{expr: {:&, [], [0]}, take: %{0 => {:any, fields}},
    914                                   line: __ENV__.line, file: __ENV__.file}}
    915   end
    916   def ensure_select(%{select: nil, from: %{source: {_, nil}}} = query, true) do
    917     error! query, "queries that do not have a schema need to explicitly pass a :select clause"
    918   end
    919   def ensure_select(%{select: nil, from: %{source: {:fragment, _, _}}} = query, true) do
    920     error! query, "queries from a fragment need to explicitly pass a :select clause"
    921   end
    922   def ensure_select(%{select: nil} = query, true) do
    923     %{query | select: %SelectExpr{expr: {:&, [], [0]}, line: __ENV__.line, file: __ENV__.file}}
    924   end
    925   def ensure_select(%{select: nil} = query, false) do
    926     query
    927   end
    928 
    929   @doc """
    930   Normalizes and validates the query.
    931 
    932   After the query was planned and there is no cache
    933   entry, we need to update its interpolations and check
    934   its fields and associations exist and are valid.
    935   """
    936   def normalize(query, operation, adapter, counter) do
    937     query
    938     |> normalize_query(operation, adapter, counter)
    939     |> elem(0)
    940     |> normalize_select(keep_literals?(operation, query), true)
    941   rescue
    942     e ->
    943       # Reraise errors so we ignore the planner inner stacktrace
    944       filter_and_reraise e, __STACKTRACE__
    945   end
    946 
    947   defp keep_literals?(:insert_all, _), do: true
    948   defp keep_literals?(_, %{combinations: combinations}), do: combinations != []
    949 
    950   defp normalize_query(query, operation, adapter, counter) do
    951     case operation do
    952       :all ->
    953         assert_no_update!(query, operation)
    954       :insert_all ->
    955         assert_no_update!(query, operation)
    956       :update_all ->
    957         assert_update!(query, operation)
    958         assert_only_filter_expressions!(query, operation)
    959       :delete_all ->
    960         assert_no_update!(query, operation)
    961         assert_only_filter_expressions!(query, operation)
    962     end
    963 
    964     traverse_exprs(query, operation, counter,
    965                    &validate_and_increment(&1, &2, &3, &4, operation, adapter))
    966   end
    967 
    968   defp validate_and_increment(:from, query, %{source: %Ecto.SubQuery{}}, _counter, kind, _adapter) when kind not in ~w(all insert_all)a do
    969     error! query, "`#{kind}` does not allow subqueries in `from`"
    970   end
    971   defp validate_and_increment(:from, query, %{source: source} = expr, counter, _kind, adapter) do
    972     {source, acc} = prewalk_source(source, :from, query, expr, counter, adapter)
    973     {%{expr | source: source}, acc}
    974   end
    975 
    976   defp validate_and_increment(kind, query, expr, counter, _operation, adapter)
    977       when kind in ~w(select distinct limit offset)a do
    978     if expr do
    979       prewalk(kind, query, expr, counter, adapter)
    980     else
    981       {nil, counter}
    982     end
    983   end
    984 
    985   defp validate_and_increment(kind, query, exprs, counter, _operation, adapter)
    986        when kind in ~w(where group_by having order_by update)a do
    987 
    988     {exprs, counter} =
    989       Enum.reduce(exprs, {[], counter}, fn
    990         %{expr: []}, {list, acc} ->
    991           {list, acc}
    992         expr, {list, acc} ->
    993           {expr, acc} = prewalk(kind, query, expr, acc, adapter)
    994           {[expr|list], acc}
    995       end)
    996     {Enum.reverse(exprs), counter}
    997   end
    998 
    999   defp validate_and_increment(:with_cte, _query, nil, counter, _operation, _adapter) do
   1000     {nil, counter}
   1001   end
   1002 
   1003   defp validate_and_increment(:with_cte, query, with_expr, counter, _operation, adapter) do
   1004     fun = &validate_and_increment(&1, &2, &3, &4, :all, adapter)
   1005 
   1006     {queries, counter} =
   1007       Enum.reduce with_expr.queries, {[], counter}, fn
   1008         {name, %Ecto.Query{} = inner_query}, {queries, counter} ->
   1009           inner_query = put_in(inner_query.aliases[@parent_as], query)
   1010 
   1011           # We don't want to use normalize_subquery_select because we are
   1012           # going to prepare the whole query ourselves next.
   1013           {_, _, inner_query} = rewrite_subquery_select_expr(inner_query, true)
   1014           {inner_query, counter} = traverse_exprs(inner_query, :all, counter, fun)
   1015 
   1016           # Now compute the fields as keyword lists so we emit AS in Ecto query.
   1017           %{select: %{expr: expr, take: take}} = inner_query
   1018           {{:map, types}, fields, _from} = collect_fields(expr, [], :never, inner_query, take, true, %{})
   1019           fields = cte_fields(Keyword.keys(types), Enum.reverse(fields), [])
   1020           inner_query = put_in(inner_query.select.fields, fields)
   1021           {_, inner_query} = pop_in(inner_query.aliases[@parent_as])
   1022 
   1023           {[{name, inner_query} | queries], counter}
   1024 
   1025         {name, %QueryExpr{expr: {:fragment, _, _} = fragment} = query_expr}, {queries, counter} ->
   1026           {fragment, counter} = prewalk_source(fragment, :with_cte, query, with_expr, counter, adapter)
   1027           query_expr = %{query_expr | expr: fragment}
   1028           {[{name, query_expr} | queries], counter}
   1029       end
   1030 
   1031     {%{with_expr | queries: Enum.reverse(queries)}, counter}
   1032   end
   1033 
   1034   defp validate_and_increment(:join, query, exprs, counter, _operation, adapter) do
   1035     Enum.map_reduce exprs, counter, fn join, acc ->
   1036       {source, acc} = prewalk_source(join.source, :join, query, join, acc, adapter)
   1037       {on, acc} = prewalk(:join, query, join.on, acc, adapter)
   1038       {%{join | on: on, source: source, params: nil}, acc}
   1039     end
   1040   end
   1041 
   1042   defp validate_and_increment(:windows, query, exprs, counter, _operation, adapter) do
   1043     {exprs, counter} =
   1044       Enum.reduce(exprs, {[], counter}, fn {name, expr}, {list, acc} ->
   1045         {expr, acc} = prewalk(:windows, query, expr, acc, adapter)
   1046         {[{name, expr}|list], acc}
   1047       end)
   1048 
   1049     {Enum.reverse(exprs), counter}
   1050   end
   1051 
   1052   defp validate_and_increment(:combination, _query, combinations, counter, operation, adapter) do
   1053     fun = &validate_and_increment(&1, &2, &3, &4, operation, adapter)
   1054 
   1055     {combinations, counter} =
   1056       Enum.reduce combinations, {[], counter}, fn {type, combination_query}, {combinations, counter} ->
   1057         {combination_query, counter} = traverse_exprs(combination_query, operation, counter, fun)
   1058         {combination_query, _} = combination_query |> normalize_select(true, true)
   1059         {[{type, combination_query} | combinations], counter}
   1060       end
   1061 
   1062     {Enum.reverse(combinations), counter}
   1063   end
   1064 
   1065   defp validate_json_path!([path_field | rest], field, embed) do
   1066     case embed do
   1067       %{related: related, cardinality: :one} ->
   1068         unless Enum.any?(related.__schema__(:fields), &Atom.to_string(&1) == path_field) do
   1069           raise "field `#{path_field}` does not exist in #{inspect(related)}"
   1070         end
   1071 
   1072         path_embed = related.__schema__(:embed, String.to_atom(path_field))
   1073         validate_json_path!(rest, path_field, path_embed)
   1074 
   1075       %{related: _, cardinality: :many} ->
   1076         unless is_integer(path_field) do
   1077           raise "cannot use `#{path_field}` to refer to an item in `embeds_many`"
   1078         end
   1079 
   1080         validate_json_path!(rest, path_field, %{embed | cardinality: :one})
   1081 
   1082       other ->
   1083         raise "expected field `#{field}` to be of type embed, got: `#{inspect(other)}`"
   1084     end
   1085   end
   1086 
   1087   defp validate_json_path!([], _field, _type) do
   1088     :ok
   1089   end
   1090 
   1091   defp prewalk_source({:fragment, meta, fragments}, kind, query, expr, acc, adapter) do
   1092     {fragments, acc} = prewalk(fragments, kind, query, expr, acc, adapter)
   1093     {{:fragment, meta, fragments}, acc}
   1094   end
   1095   defp prewalk_source(%Ecto.SubQuery{query: inner_query} = subquery, kind, query, _expr, counter, adapter) do
   1096     try do
   1097       inner_query = put_in inner_query.aliases[@parent_as], query
   1098       {inner_query, counter} = normalize_query(inner_query, :all, adapter, counter)
   1099       {inner_query, _} = normalize_select(inner_query, true, false)
   1100       {_, inner_query} = pop_in(inner_query.aliases[@parent_as])
   1101 
   1102       inner_query =
   1103         # If the subquery comes from a select, we are not really interested on the fields
   1104         if kind == :where do
   1105           inner_query
   1106         else
   1107           update_in(inner_query.select.fields, fn fields ->
   1108             subquery.select |> subquery_source_fields() |> Enum.zip(fields)
   1109           end)
   1110         end
   1111 
   1112       {%{subquery | query: inner_query}, counter}
   1113     rescue
   1114       e -> raise Ecto.SubQueryError, query: query, exception: e
   1115     end
   1116   end
   1117   defp prewalk_source(source, _kind, _query, _expr, acc, _adapter) do
   1118     {source, acc}
   1119   end
   1120 
   1121   defp prewalk(:update, query, expr, counter, adapter) do
   1122     source = get_source!(:update, query, 0)
   1123 
   1124     {inner, acc} =
   1125       Enum.map_reduce expr.expr, counter, fn {op, kw}, counter ->
   1126         {kw, acc} =
   1127           Enum.map_reduce kw, counter, fn {field, value}, counter ->
   1128             {value, acc} = prewalk(value, :update, query, expr, counter, adapter)
   1129             {{field_source(source, field), value}, acc}
   1130           end
   1131         {{op, kw}, acc}
   1132       end
   1133 
   1134     {%{expr | expr: inner, params: nil}, acc}
   1135   end
   1136   defp prewalk(kind, query, expr, counter, adapter) do
   1137     {inner, acc} = prewalk(expr.expr, kind, query, expr, counter, adapter)
   1138     {%{expr | expr: inner, params: nil}, acc}
   1139   end
   1140 
   1141   defp prewalk({:subquery, i}, kind, query, expr, acc, adapter) do
   1142     prewalk_source(Enum.fetch!(expr.subqueries, i), kind, query, expr, acc, adapter)
   1143   end
   1144 
   1145   defp prewalk({:in, in_meta, [left, {:^, meta, [param]}]}, kind, query, expr, acc, adapter) do
   1146     {left, acc} = prewalk(left, kind, query, expr, acc, adapter)
   1147     {right, acc} = validate_in(meta, expr, param, acc, adapter)
   1148     {{:in, in_meta, [left, right]}, acc}
   1149   end
   1150 
   1151   defp prewalk({:in, in_meta, [left, {:subquery, _} = right]}, kind, query, expr, acc, adapter) do
   1152     {left, acc} = prewalk(left, kind, query, expr, acc, adapter)
   1153     {right, acc} = prewalk(right, kind, query, expr, acc, adapter)
   1154 
   1155     case right.query.select.fields do
   1156       [_] -> :ok
   1157       _ -> error!(query, "subquery must return a single field in order to be used on the right-side of `in`")
   1158     end
   1159 
   1160     {{:in, in_meta, [left, right]}, acc}
   1161   end
   1162 
   1163   defp prewalk({quantifier, meta, [{:subquery, _} = subquery]}, kind, query, expr, acc, adapter) when quantifier in [:exists, :any, :all] do
   1164     {subquery, acc} = prewalk(subquery, kind, query, expr, acc, adapter)
   1165 
   1166     case {quantifier, subquery.query.select.fields} do
   1167       {:exists, _} ->
   1168         :ok
   1169 
   1170       {_, [_]} ->
   1171         :ok
   1172 
   1173       _ ->
   1174         error!(
   1175           query,
   1176           "subquery must return a single field in order to be used with #{quantifier}"
   1177         )
   1178     end
   1179 
   1180     {{quantifier, meta, [subquery]}, acc}
   1181   end
   1182 
   1183   defp prewalk({{:., dot_meta, [left, field]}, meta, []},
   1184                kind, query, expr, acc, _adapter) do
   1185     {ix, ix_expr, ix_query} = get_ix!(left, kind, query)
   1186     extra = if kind == :select, do: [type: type!(kind, ix_query, expr, ix, field)], else: []
   1187     field = field_source(get_source!(kind, ix_query, ix), field)
   1188     {{{:., extra ++ dot_meta, [ix_expr, field]}, meta, []}, acc}
   1189   end
   1190 
   1191   defp prewalk({:^, meta, [ix]}, _kind, _query, _expr, acc, _adapter) when is_integer(ix) do
   1192     {{:^, meta, [acc]}, acc + 1}
   1193   end
   1194 
   1195   defp prewalk({:type, _, [arg, type]}, kind, query, expr, acc, adapter) do
   1196     {arg, acc} = prewalk(arg, kind, query, expr, acc, adapter)
   1197     type = field_type!(kind, query, expr, type, true)
   1198     {%Ecto.Query.Tagged{value: arg, tag: type, type: Ecto.Type.type(type)}, acc}
   1199   end
   1200 
   1201   defp prewalk({:json_extract_path, meta, [json_field, path]}, kind, query, expr, acc, _adapter) do
   1202     {{:., dot_meta, [{:&, amp_meta, [ix]}, field]}, expr_meta, []} = json_field
   1203 
   1204     case type!(kind, query, expr, ix, field) do
   1205       {:parameterized, Ecto.Embedded, embed} ->
   1206         validate_json_path!(path, field, embed)
   1207 
   1208       type ->
   1209         case Ecto.Type.type(type) do
   1210           :any ->
   1211             :ok
   1212 
   1213           :map ->
   1214             :ok
   1215 
   1216           {:map, _} ->
   1217             :ok
   1218 
   1219           _ ->
   1220             raise "expected field `#{field}` to be an embed or a map, got: `#{inspect(type)}`"
   1221         end
   1222     end
   1223 
   1224     field_source = kind |> get_source!(query, ix) |> field_source(field)
   1225 
   1226     json_field = {{:., dot_meta, [{:&, amp_meta, [ix]}, field_source]}, expr_meta, []}
   1227     {{:json_extract_path, meta, [json_field, path]}, acc}
   1228   end
   1229 
   1230   defp prewalk({:selected_as, [], [name]}, _kind, query, _expr, acc, _adapter) do
   1231     name = selected_as!(query.select.aliases, name)
   1232     {{:selected_as, [], [name]}, acc}
   1233   end
   1234 
   1235   defp prewalk(%Ecto.Query.Tagged{value: v, type: type} = tagged, kind, query, expr, acc, adapter) do
   1236     if Ecto.Type.base?(type) do
   1237       {tagged, acc}
   1238     else
   1239       {dump_param(kind, query, expr, v, type, adapter), acc}
   1240     end
   1241   end
   1242 
   1243   defp prewalk({left, right}, kind, query, expr, acc, adapter) do
   1244     {left, acc} = prewalk(left, kind, query, expr, acc, adapter)
   1245     {right, acc} = prewalk(right, kind, query, expr, acc, adapter)
   1246     {{left, right}, acc}
   1247   end
   1248 
   1249   defp prewalk({left, meta, args}, kind, query, expr, acc, adapter) do
   1250     {left, acc} = prewalk(left, kind, query, expr, acc, adapter)
   1251     {args, acc} = prewalk(args, kind, query, expr, acc, adapter)
   1252     {{left, meta, args}, acc}
   1253   end
   1254 
   1255   defp prewalk(list, kind, query, expr, acc, adapter) when is_list(list) do
   1256     Enum.map_reduce(list, acc, &prewalk(&1, kind, query, expr, &2, adapter))
   1257   end
   1258 
   1259   defp prewalk(other, _kind, _query, _expr, acc, _adapter) do
   1260     {other, acc}
   1261   end
   1262 
   1263   defp selected_as!(select_aliases, name) do
   1264     case select_aliases do
   1265       %{^name => _} ->
   1266         name
   1267 
   1268       _ ->
   1269         raise ArgumentError,
   1270               "invalid alias: `#{inspect(name)}`. Use `selected_as/2` to define aliases in the outer most `select` expression."
   1271     end
   1272   end
   1273 
   1274   defp dump_param(kind, query, expr, v, type, adapter) do
   1275     type = field_type!(kind, query, expr, type)
   1276 
   1277     case dump_param(kind, type, v, adapter) do
   1278       {:ok, v} ->
   1279         v
   1280       {:error, error} ->
   1281         error = error <> ". Or the value is incompatible or it must be " <>
   1282                          "interpolated (using ^) so it may be cast accordingly"
   1283         error! query, expr, error
   1284     end
   1285   end
   1286 
   1287   defp dump_param(kind, type, v, adapter) do
   1288     with {:ok, type} <- normalize_param(kind, type, v),
   1289          do: dump_param(adapter, type, v)
   1290   end
   1291 
   1292   defp validate_in(meta, expr, param, acc, adapter) do
   1293     {v, t} = Enum.fetch!(expr.params, param)
   1294     length = length(v)
   1295 
   1296     case adapter.dumpers(t, t) do
   1297       [{:in, _} | _] -> {{:^, meta, [acc, length]}, acc + length}
   1298       _ -> {{:^, meta, [acc, length]}, acc + 1}
   1299     end
   1300   end
   1301 
   1302   defp normalize_select(%{select: nil} = query, _keep_literals?, _allow_alias?) do
   1303     {query, nil}
   1304   end
   1305 
   1306   defp normalize_select(query, keep_literals?, allow_alias?) do
   1307     %{assocs: assocs, preloads: preloads, select: select} = query
   1308     %{take: take, expr: expr} = select
   1309     {tag, from_take} = Map.get(take, 0, {:any, []})
   1310     source = get_source!(:select, query, 0)
   1311     assocs = merge_assocs(assocs, query)
   1312 
   1313     # In from, if there is a schema and we have a map tag with preloads,
   1314     # it needs to be converted to a map in a later pass.
   1315     {take, from_tag} =
   1316       case source do
   1317         {source, schema, _}
   1318         when tag == :map and preloads != [] and is_binary(source) and schema != nil ->
   1319           {Map.put(take, 0, {:struct, from_take}), :map}
   1320 
   1321         _ ->
   1322           {take, :any}
   1323       end
   1324 
   1325     {postprocess, fields, from} =
   1326       collect_fields(expr, [], :none, query, take, keep_literals?, %{})
   1327 
   1328     # Convert selected_as/2 to a tuple so it can be aliased by the adapters.
   1329     # Don't convert if the select expression belongs to a CTE or subquery
   1330     # because those fields are already automatically aliased.
   1331     fields = normalize_selected_as(fields, allow_alias?, select.aliases)
   1332 
   1333     {fields, preprocess, from} =
   1334       case from do
   1335         {:ok, from_pre, from_expr, from_taken} ->
   1336           {assoc_exprs, assoc_fields} = collect_assocs([], [], query, tag, from_take, assocs)
   1337           fields = from_taken ++ Enum.reverse(assoc_fields, Enum.reverse(fields))
   1338           preprocess = [from_pre | Enum.reverse(assoc_exprs)]
   1339           {fields, preprocess, {from_tag, from_expr}}
   1340 
   1341         :none when preloads != [] or assocs != [] ->
   1342           error! query, "the binding used in `from` must be selected in `select` when using `preload`"
   1343 
   1344         :none ->
   1345           {Enum.reverse(fields), [], :none}
   1346       end
   1347 
   1348     select = %{
   1349       preprocess: preprocess,
   1350       postprocess: postprocess,
   1351       take: from_take,
   1352       assocs: assocs,
   1353       from: from
   1354     }
   1355 
   1356     {put_in(query.select.fields, fields), select}
   1357   end
   1358 
   1359   defp normalize_selected_as(fields, _allow_alias?, aliases) when aliases == %{}, do: fields
   1360 
   1361   defp normalize_selected_as(_fields, false, aliases) do
   1362     raise ArgumentError,
   1363           "`selected_as/2` can only be used in the outer most `select` expression. " <>
   1364             "If you are attempting to alias a field from a subquery or cte, it is not allowed " <>
   1365             "because the fields are automatically aliased by the corresponding map/struct key. " <>
   1366             "The following field aliases were specified: #{inspect(Map.keys(aliases))}."
   1367   end
   1368 
   1369   defp normalize_selected_as(fields, true, _aliases) do
   1370     Enum.map(fields, fn
   1371       {:selected_as, _, [select_expr, name]} -> {name, select_expr}
   1372       field -> field
   1373     end)
   1374   end
   1375 
   1376   # Handling of source
   1377 
   1378   defp collect_fields({:merge, _, [{:&, _, [0]}, right]}, fields, :none, query, take, keep_literals?, _drop) do
   1379     {expr, taken} = source_take!(:select, query, take, 0, 0, %{})
   1380     from = {:ok, {:source, :from}, expr, taken}
   1381 
   1382     {right, right_fields, _from} = collect_fields(right, [], from, query, take, keep_literals?, %{})
   1383     from = {:ok, {:merge, {:source, :from}, right}, expr, taken ++ Enum.reverse(right_fields)}
   1384 
   1385     {{:source, :from}, fields, from}
   1386   end
   1387 
   1388   defp collect_fields({:&, _, [0]}, fields, :none, query, take, _keep_literals?, drop) do
   1389     {expr, taken} = source_take!(:select, query, take, 0, 0, drop)
   1390     {{:source, :from}, fields, {:ok, {:source, :from}, expr, taken}}
   1391   end
   1392 
   1393   defp collect_fields({:&, _, [0]}, fields, from, _query, _take, _keep_literals?, _drop)
   1394        when from != :never do
   1395     {{:source, :from}, fields, from}
   1396   end
   1397 
   1398   defp collect_fields({:&, _, [ix]}, fields, from, query, take, _keep_literals?, drop) do
   1399     {expr, taken} = source_take!(:select, query, take, ix, ix, drop)
   1400     {expr, Enum.reverse(taken, fields), from}
   1401   end
   1402 
   1403   # Expression handling
   1404 
   1405   defp collect_fields({agg, _, [{{:., dot_meta, [{:&, _, [_]}, _]}, _, []} | _]} = expr,
   1406                       fields, from, _query, _take, _keep_literals?, _drop)
   1407        when agg in @aggs do
   1408     type =
   1409       case agg do
   1410         :count -> :integer
   1411         :row_number -> :integer
   1412         :rank -> :integer
   1413         :dense_rank -> :integer
   1414         :ntile -> :integer
   1415         # If it is possible to upcast, we do it, otherwise keep the DB value.
   1416         # For example, an average of integers will return a decimal, which can't be cast
   1417         # as an integer. But an average of "moneys" should be upcast.
   1418         _ -> {:maybe, Keyword.fetch!(dot_meta, :type)}
   1419       end
   1420 
   1421     {{:value, type}, [expr | fields], from}
   1422   end
   1423 
   1424   defp collect_fields({:filter, _, [call, _]} = expr, fields, from, query, take, keep_literals?, _drop) do
   1425     case call do
   1426       {agg, _, _} when agg in @aggs -> :ok
   1427       {:fragment, _, [_ | _]} -> :ok
   1428       _ -> error!(query, "filter(...) expects the first argument to be an aggregate expression, got: `#{Macro.to_string(expr)}`")
   1429     end
   1430 
   1431     {type, _, _} = collect_fields(call, fields, from, query, take, keep_literals?, %{})
   1432     {type, [expr | fields], from}
   1433   end
   1434 
   1435   defp collect_fields({:coalesce, _, [left, right]} = expr, fields, from, query, take, _keep_literals?, _drop) do
   1436     {left_type, _, _} = collect_fields(left, fields, from, query, take, true, %{})
   1437     {right_type, _, _} = collect_fields(right, fields, from, query, take, true, %{})
   1438 
   1439     type = if left_type == right_type, do: left_type, else: {:value, :any}
   1440     {type, [expr | fields], from}
   1441   end
   1442 
   1443   defp collect_fields({:over, _, [call, window]} = expr, fields, from, query, take, keep_literals?, _drop) do
   1444     if is_atom(window) and not Keyword.has_key?(query.windows, window) do
   1445       error!(query, "unknown window #{inspect window} given to over/2")
   1446     end
   1447 
   1448     {type, _, _} = collect_fields(call, fields, from, query, take, keep_literals?, %{})
   1449     {type, [expr | fields], from}
   1450   end
   1451 
   1452   defp collect_fields({{:., dot_meta, [{:&, _, [_]}, _]}, _, []} = expr,
   1453                       fields, from, _query, _take, _keep_literals?, _drop) do
   1454     {{:value, Keyword.fetch!(dot_meta, :type)}, [expr | fields], from}
   1455   end
   1456 
   1457   defp collect_fields({left, right}, fields, from, query, take, keep_literals?, _drop) do
   1458     {args, fields, from} = collect_args([left, right], fields, from, query, take, keep_literals?, [])
   1459     {{:tuple, args}, fields, from}
   1460   end
   1461 
   1462   defp collect_fields({:{}, _, args}, fields, from, query, take, keep_literals?, _drop) do
   1463     {args, fields, from} = collect_args(args, fields, from, query, take, keep_literals?, [])
   1464     {{:tuple, args}, fields, from}
   1465   end
   1466 
   1467   defp collect_fields({:%{}, _, [{:|, _, [data, args]}]}, fields, from, query, take, keep_literals?, _drop) do
   1468     drop = Map.new(args, fn {key, _} -> {key, nil} end)
   1469     {data, fields, from} = collect_fields(data, fields, from, query, take, keep_literals?, drop)
   1470     {args, fields, from} = collect_kv(args, fields, from, query, take, keep_literals?, [])
   1471     {{:map, data, args}, fields, from}
   1472   end
   1473 
   1474   defp collect_fields({:%{}, _, args}, fields, from, query, take, keep_literals?, _drop) do
   1475     {args, fields, from} = collect_kv(args, fields, from, query, take, keep_literals?, [])
   1476     {{:map, args}, fields, from}
   1477   end
   1478 
   1479   defp collect_fields({:%, _, [name, {:%{}, _, [{:|, _, [data, args]}]}]},
   1480                       fields, from, query, take, keep_literals?, _drop) do
   1481     drop = Map.new(args, fn {key, _} -> {key, nil} end)
   1482     {data, fields, from} = collect_fields(data, fields, from, query, take, keep_literals?, drop)
   1483     {args, fields, from} = collect_kv(args, fields, from, query, take, keep_literals?, [])
   1484     struct!(name, args)
   1485     {{:struct, name, data, args}, fields, from}
   1486   end
   1487 
   1488   defp collect_fields({:%, _, [name, {:%{}, _, args}]}, fields, from, query, take, keep_literals?, _drop) do
   1489     {args, fields, from} = collect_kv(args, fields, from, query, take, keep_literals?, [])
   1490     struct!(name, args)
   1491     {{:struct, name, args}, fields, from}
   1492   end
   1493 
   1494   defp collect_fields({:merge, _, args}, fields, from, query, take, keep_literals?, _drop) do
   1495     {[left, right], fields, from} = collect_args(args, fields, from, query, take, keep_literals?, [])
   1496     {{:merge, left, right}, fields, from}
   1497   end
   1498 
   1499   defp collect_fields({:date_add, _, [arg | _]} = expr, fields, from, query, take, keep_literals?, _drop) do
   1500     case collect_fields(arg, fields, from, query, take, keep_literals?, %{}) do
   1501       {{:value, :any}, _, _} -> {{:value, :date}, [expr | fields], from}
   1502       {type, _, _} -> {type, [expr | fields], from}
   1503     end
   1504   end
   1505 
   1506   defp collect_fields({:datetime_add, _, [arg | _]} = expr, fields, from, query, take, keep_literals?, _drop) do
   1507     case collect_fields(arg, fields, from, query, take, keep_literals?, %{}) do
   1508       {{:value, :any}, _, _} -> {{:value, :naive_datetime}, [expr | fields], from}
   1509       {type, _, _} -> {type, [expr | fields], from}
   1510     end
   1511   end
   1512 
   1513   defp collect_fields(args, fields, from, query, take, keep_literals?, _drop) when is_list(args) do
   1514     {args, fields, from} = collect_args(args, fields, from, query, take, keep_literals?, [])
   1515     {{:list, args}, fields, from}
   1516   end
   1517 
   1518   defp collect_fields(expr, fields, from, _query, _take, true, _drop) when is_binary(expr) do
   1519     {{:value, :binary}, [expr | fields], from}
   1520   end
   1521 
   1522   defp collect_fields(expr, fields, from, _query, _take, true, _drop) when is_integer(expr) do
   1523     {{:value, :integer}, [expr | fields], from}
   1524   end
   1525 
   1526   defp collect_fields(expr, fields, from, _query, _take, true, _drop) when is_float(expr) do
   1527     {{:value, :float}, [expr | fields], from}
   1528   end
   1529 
   1530   defp collect_fields(expr, fields, from, _query, _take, true, _drop) when is_boolean(expr) do
   1531     {{:value, :boolean}, [expr | fields], from}
   1532   end
   1533 
   1534   defp collect_fields(nil, fields, from, _query, _take, true, _drop) do
   1535     {{:value, :any}, [nil | fields], from}
   1536   end
   1537 
   1538   defp collect_fields(expr, fields, from, _query, _take, _keep_literals?, _drop) when is_atom(expr) do
   1539     {expr, fields, from}
   1540   end
   1541 
   1542   defp collect_fields(expr, fields, from, _query, _take, false, _drop)
   1543        when is_binary(expr) or is_number(expr) do
   1544     {expr, fields, from}
   1545   end
   1546 
   1547   defp collect_fields(%Ecto.Query.Tagged{tag: tag} = expr, fields, from, _query, _take, _keep_literals?, _drop) do
   1548     {{:value, tag}, [expr | fields], from}
   1549   end
   1550 
   1551   defp collect_fields({op, _, [_]} = expr, fields, from, _query, _take, _keep_literals?, _drop)
   1552        when op in ~w(not is_nil)a do
   1553     {{:value, :boolean}, [expr | fields], from}
   1554   end
   1555 
   1556   defp collect_fields({op, _, [_, _]} = expr, fields, from, _query, _take, _keep_literals?, _drop)
   1557        when op in ~w(< > <= >= == != and or like ilike)a do
   1558     {{:value, :boolean}, [expr | fields], from}
   1559   end
   1560 
   1561   defp collect_fields({:selected_as, _, [select_expr, _name]} = expr, fields, from, query, take, keep_literals?, _drop) do
   1562     {type, _, _} = collect_fields(select_expr, fields, from, query, take, keep_literals?, %{})
   1563     {type, [expr | fields], from}
   1564   end
   1565 
   1566   defp collect_fields(expr, fields, from, _query, _take, _keep_literals?, _drop) do
   1567     {{:value, :any}, [expr | fields], from}
   1568   end
   1569 
   1570   defp collect_kv([{key, value} | elems], fields, from, query, take, keep_literals?, acc) do
   1571     {key, fields, from} = collect_fields(key, fields, from, query, take, keep_literals?, %{})
   1572     {value, fields, from} = collect_fields(value, fields, from, query, take, keep_literals?, %{})
   1573     collect_kv(elems, fields, from, query, take, keep_literals?, [{key, value} | acc])
   1574   end
   1575 
   1576   defp collect_kv([], fields, from, _query, _take, _keep_literals?, acc) do
   1577     {Enum.reverse(acc), fields, from}
   1578   end
   1579 
   1580   defp collect_args([elem | elems], fields, from, query, take, keep_literals?, acc) do
   1581     {elem, fields, from} = collect_fields(elem, fields, from, query, take, keep_literals?, %{})
   1582     collect_args(elems, fields, from, query, take, keep_literals?, [elem | acc])
   1583   end
   1584   defp collect_args([], fields, from, _query, _take, _keep_literals?, acc) do
   1585     {Enum.reverse(acc), fields, from}
   1586   end
   1587 
   1588   defp merge_assocs(assocs, query) do
   1589     assocs
   1590     |> Enum.reduce(%{}, fn {field, {index, children}}, acc ->
   1591       children = merge_assocs(children, query)
   1592 
   1593       Map.update(acc, field, {index, children}, fn
   1594         {^index, current_children} ->
   1595           {index, merge_assocs(children ++ current_children, query)}
   1596         {other_index, _} ->
   1597           error! query, "association `#{field}` is being set to binding at position #{index} " <>
   1598                         "and at position #{other_index} at the same time"
   1599       end)
   1600     end)
   1601     |> Map.to_list()
   1602   end
   1603 
   1604   defp collect_assocs(exprs, fields, query, tag, take, [{assoc, {ix, children}}|tail]) do
   1605     to_take = get_preload_source!(query, ix)
   1606     {fetch, take_children} = fetch_assoc(tag, take, assoc)
   1607     {expr, taken} = take!(to_take, query, fetch, assoc, ix, %{})
   1608     exprs = [expr | exprs]
   1609     fields = Enum.reverse(taken, fields)
   1610     {exprs, fields} = collect_assocs(exprs, fields, query, tag, take_children, children)
   1611     {exprs, fields} = collect_assocs(exprs, fields, query, tag, take, tail)
   1612     {exprs, fields}
   1613   end
   1614   defp collect_assocs(exprs, fields, _query, _tag, _take, []) do
   1615     {exprs, fields}
   1616   end
   1617 
   1618   defp fetch_assoc(tag, take, assoc) do
   1619     case Access.fetch(take, assoc) do
   1620       {:ok, value} -> {{:ok, {tag, value}}, value}
   1621       :error -> {:error, []}
   1622     end
   1623   end
   1624 
   1625   defp source_take!(kind, query, take, field, ix, drop) do
   1626     source = get_source!(kind, query, ix)
   1627     take!(source, query, Access.fetch(take, field), field, ix, drop)
   1628   end
   1629 
   1630   defp take!(source, query, fetched, field, ix, drop) do
   1631     case {fetched, source} do
   1632       {{:ok, {:struct, _}}, {:fragment, _, _}} ->
   1633         error! query, "it is not possible to return a struct subset of a fragment"
   1634 
   1635       {{:ok, {:struct, _}}, %Ecto.SubQuery{}} ->
   1636         error! query, "it is not possible to return a struct subset of a subquery"
   1637 
   1638       {{:ok, {_, []}}, {_, _, _}} ->
   1639         error! query, "at least one field must be selected for binding `#{field}`, got an empty list"
   1640 
   1641       {{:ok, {:struct, _}}, {_, nil, _}} ->
   1642         error! query, "struct/2 in select expects a source with a schema"
   1643 
   1644       {{:ok, {kind, fields}}, {source, schema, prefix}} when is_binary(source) ->
   1645         dumper = if schema, do: schema.__schema__(:dump), else: %{}
   1646         schema = if kind == :map, do: nil, else: schema
   1647         {types, fields} = select_dump(List.wrap(fields), dumper, ix, drop)
   1648         {{:source, {source, schema}, prefix || query.prefix, types}, fields}
   1649 
   1650       {{:ok, {_, fields}}, _} ->
   1651         {{:map, Enum.map(fields, &{&1, {:value, :any}})}, Enum.map(fields, &select_field(&1, ix))}
   1652 
   1653       {:error, {:fragment, _, _}} ->
   1654         {{:value, :map}, [{:&, [], [ix]}]}
   1655 
   1656       {:error, {_, nil, _}} ->
   1657         {{:value, :map}, [{:&, [], [ix]}]}
   1658 
   1659       {:error, {source, schema, prefix}} ->
   1660         {types, fields} = select_dump(schema.__schema__(:query_fields), schema.__schema__(:dump), ix, drop)
   1661 
   1662         {{:source, {source, schema}, prefix || query.prefix, types}, fields}
   1663 
   1664       {:error, %Ecto.SubQuery{select: select}} ->
   1665         fields = subquery_source_fields(select)
   1666         {select, Enum.map(fields, &select_field(&1, ix))}
   1667     end
   1668   end
   1669 
   1670   defp select_dump(fields, dumper, ix, drop) do
   1671     fields
   1672     |> Enum.reverse
   1673     |> Enum.reduce({[], []}, fn
   1674       field, {types, exprs} when is_atom(field) and not is_map_key(drop, field) ->
   1675         {source, type} = Map.get(dumper, field, {field, :any})
   1676         {[{field, type} | types], [select_field(source, ix) | exprs]}
   1677       _field, acc ->
   1678         acc
   1679     end)
   1680   end
   1681 
   1682   defp select_field(field, ix) do
   1683     {{:., [], [{:&, [], [ix]}, field]}, [], []}
   1684   end
   1685 
   1686   defp get_ix!({:&, _, [ix]} = expr, _kind, query) do
   1687     {ix, expr, query}
   1688   end
   1689 
   1690   defp get_ix!({:as, meta, [as]}, _kind, query) do
   1691     case query.aliases do
   1692       %{^as => ix} -> {ix, {:&, meta, [ix]}, query}
   1693       %{} -> error!(query, "could not find named binding `as(#{inspect(as)})`")
   1694     end
   1695   end
   1696 
   1697   defp get_ix!({:parent_as, meta, [as]}, kind, query) do
   1698     case query.aliases[@parent_as] do
   1699       %{aliases: %{^as => ix}, sources: sources} = query ->
   1700         if kind == :select and not (ix < tuple_size(sources)) do
   1701           error!(query, "the parent_as in a subquery select used as a join can only access the `from` binding")
   1702         else
   1703           {ix, {:parent_as, [], [as]}, query}
   1704         end
   1705 
   1706       %{} = parent ->
   1707         get_ix!({:parent_as, meta, [as]}, kind, parent)
   1708 
   1709       nil ->
   1710         error!(query, "could not find named binding `parent_as(#{inspect(as)})`")
   1711     end
   1712   end
   1713 
   1714   defp get_source!(where, %{sources: sources} = query, ix) do
   1715     elem(sources, ix)
   1716   rescue
   1717     ArgumentError ->
   1718       error! query, "invalid query has specified more bindings than bindings available " <>
   1719                     "in `#{where}` (look for `unknown_binding!` in the printed query below)"
   1720   end
   1721 
   1722   defp get_preload_source!(query, ix) do
   1723     case get_source!(:preload, query, ix) do
   1724       {source, schema, _} = all when is_binary(source) and schema != nil ->
   1725         all
   1726       _ ->
   1727         error! query, "can only preload sources with a schema " <>
   1728                       "(fragments, binary and subqueries are not supported)"
   1729     end
   1730   end
   1731 
   1732   @doc """
   1733   Puts the prefix given via `opts` into the given query, if available.
   1734   """
   1735   def attach_prefix(%{prefix: nil} = query, opts) when is_list(opts) do
   1736     case Keyword.fetch(opts, :prefix) do
   1737       {:ok, prefix} -> %{query | prefix: prefix}
   1738       :error -> query
   1739     end
   1740   end
   1741 
   1742   def attach_prefix(%{prefix: nil} = query, %{prefix: prefix}) do
   1743     %{query | prefix: prefix}
   1744   end
   1745 
   1746   def attach_prefix(query, _), do: query
   1747 
   1748   ## Helpers
   1749 
   1750   @all_exprs [with_cte: :with_ctes, distinct: :distinct, select: :select, from: :from, join: :joins,
   1751               where: :wheres, group_by: :group_bys, having: :havings, windows: :windows,
   1752               combination: :combinations, order_by: :order_bys, limit: :limit, offset: :offset]
   1753 
   1754   # Although joins come before updates in the actual query,
   1755   # the on fields are moved to where, so they effectively
   1756   # need to come later for MySQL. This means subqueries
   1757   # with parameters are not supported as a join on MySQL.
   1758   # The only way to address it is by splitting how join
   1759   # and their on expressions are processed.
   1760   @update_all_exprs [with_cte: :with_ctes, from: :from, update: :updates,
   1761                      join: :joins, where: :wheres, select: :select]
   1762 
   1763   @delete_all_exprs [with_cte: :with_ctes, from: :from, join: :joins,
   1764                      where: :wheres, select: :select]
   1765 
   1766   # Traverse all query components with expressions.
   1767   # Therefore from, preload, assocs and lock are not traversed.
   1768   defp traverse_exprs(query, operation, acc, fun) do
   1769     exprs =
   1770       case operation do
   1771         :all -> @all_exprs
   1772         :insert_all -> @all_exprs
   1773         :update_all -> @update_all_exprs
   1774         :delete_all -> @delete_all_exprs
   1775       end
   1776 
   1777     Enum.reduce exprs, {query, acc}, fn {kind, key}, {query, acc} ->
   1778       {traversed, acc} = fun.(kind, query, Map.fetch!(query, key), acc)
   1779       {%{query | key => traversed}, acc}
   1780     end
   1781   end
   1782 
   1783   defp field_type!(kind, query, expr, type, allow_virtuals? \\ false)
   1784 
   1785   defp field_type!(kind, query, expr, {composite, {ix, field}}, allow_virtuals?) when is_integer(ix) do
   1786     {composite, type!(kind, query, expr, ix, field, allow_virtuals?)}
   1787   end
   1788 
   1789   defp field_type!(kind, query, expr, {ix, field}, allow_virtuals?) when is_integer(ix) do
   1790     type!(kind, query, expr, ix, field, allow_virtuals?)
   1791   end
   1792 
   1793   defp field_type!(_kind, _query, _expr, type, _) do
   1794     type
   1795   end
   1796 
   1797   defp type!(kind, query, expr, schema, field, allow_virtuals? \\ false)
   1798 
   1799   defp type!(_kind, _query, _expr, nil, _field, _allow_virtuals?), do: :any
   1800 
   1801   defp type!(kind, query, expr, ix, field, allow_virtuals?) when is_integer(ix) do
   1802     case get_source!(kind, query, ix) do
   1803       {:fragment, _, _} ->
   1804         :any
   1805 
   1806       {_, schema, _} ->
   1807         type!(kind, query, expr, schema, field, allow_virtuals?)
   1808 
   1809       %Ecto.SubQuery{select: select} ->
   1810         case subquery_type_for(select, field) do
   1811           {:ok, type} -> type
   1812           :error -> error!(query, expr, "field `#{field}` does not exist in subquery")
   1813         end
   1814     end
   1815   end
   1816 
   1817   defp type!(kind, query, expr, schema, field, allow_virtuals?) when is_atom(schema) do
   1818     cond do
   1819       type = schema.__schema__(:type, field) ->
   1820         type
   1821 
   1822       type = allow_virtuals? && schema.__schema__(:virtual_type, field) ->
   1823         type
   1824 
   1825       Map.has_key?(schema.__struct__(), field) ->
   1826         case schema.__schema__(:association, field) do
   1827           %Ecto.Association.BelongsTo{owner_key: owner_key} ->
   1828             error! query, expr, "field `#{field}` in `#{kind}` is an association in schema #{inspect schema}. " <>
   1829                                 "Did you mean to use `#{owner_key}`?"
   1830           %_{} ->
   1831             error! query, expr, "field `#{field}` in `#{kind}` is an association in schema #{inspect schema}"
   1832 
   1833           _ ->
   1834             error! query, expr, "field `#{field}` in `#{kind}` is a virtual field in schema #{inspect schema}"
   1835         end
   1836 
   1837       true ->
   1838         hint = closest_fields_hint(field, schema)
   1839         error! query, expr, "field `#{field}` in `#{kind}` does not exist in schema #{inspect schema}", hint
   1840     end
   1841   end
   1842 
   1843   defp closest_fields_hint(input, schema) do
   1844     input_string = Atom.to_string(input)
   1845 
   1846     schema.__schema__(:fields)
   1847     |> Enum.map(fn field -> {field, String.jaro_distance(input_string, Atom.to_string(field))} end)
   1848     |> Enum.filter(fn {_field, score} -> score >= 0.77 end)
   1849     |> Enum.sort(& elem(&1, 0) >= elem(&2, 0))
   1850     |> Enum.take(5)
   1851     |> Enum.map(&elem(&1, 0))
   1852     |> case do
   1853       [] ->
   1854         nil
   1855 
   1856       [suggestion] ->
   1857         "Did you mean `#{suggestion}`?"
   1858 
   1859       suggestions ->
   1860         Enum.reduce(suggestions, "Did you mean one of: \n", fn suggestion, acc ->
   1861           acc <> "\n      * `#{suggestion}`"
   1862         end)
   1863     end
   1864   end
   1865 
   1866   defp normalize_param(_kind, {:out, {:array, type}}, _value) do
   1867     {:ok, type}
   1868   end
   1869   defp normalize_param(_kind, {:out, :any}, _value) do
   1870     {:ok, :any}
   1871   end
   1872   defp normalize_param(kind, {:out, other}, value) do
   1873     {:error, "value `#{inspect value}` in `#{kind}` expected to be part of an array " <>
   1874              "but matched type is #{inspect other}"}
   1875   end
   1876   defp normalize_param(_kind, type, _value) do
   1877     {:ok, type}
   1878   end
   1879 
   1880   defp cast_param(kind, type, v) do
   1881     case Ecto.Type.cast(type, v) do
   1882       {:ok, v} ->
   1883         {:ok, v}
   1884       _ ->
   1885         {:error, "value `#{inspect v}` in `#{kind}` cannot be cast to type #{inspect type}"}
   1886     end
   1887   end
   1888 
   1889   defp dump_param(adapter, type, v) do
   1890     case Ecto.Type.adapter_dump(adapter, type, v) do
   1891       {:ok, v} ->
   1892         {:ok, v}
   1893       :error ->
   1894         {:error, "value `#{inspect v}` cannot be dumped to type #{inspect type}"}
   1895     end
   1896   end
   1897 
   1898   defp field_source({source, schema, _}, field) when is_binary(source) and schema != nil do
   1899     # If the field is not found we return the field itself
   1900     # which will be checked and raise later.
   1901     schema.__schema__(:field_source, field) || field
   1902   end
   1903   defp field_source(_, field) do
   1904     field
   1905   end
   1906 
   1907   defp cte_fields([_key | _rest_keys], [{:selected_as, _, [_, _]} | _rest_fields], _acc) do
   1908     raise ArgumentError,
   1909           "`selected_as/2` can only be used in the outer most `select` expression. " <>
   1910             "If you are attempting to alias a field from a subquery or cte, it is not allowed " <>
   1911             "because the fields are automatically aliased by the corresponding map/struct key."
   1912   end
   1913 
   1914   defp cte_fields([key | rest_keys], [field | rest_fields], acc) do
   1915     cte_fields(rest_keys, rest_fields, [{key, field} | acc])
   1916   end
   1917 
   1918   defp cte_fields(_keys, [], acc), do: :lists.reverse(acc)
   1919   defp cte_fields([], _fields, acc), do: :lists.reverse(acc)
   1920 
   1921   defp assert_update!(%Ecto.Query{updates: updates} = query, operation) do
   1922     changes =
   1923       Enum.reduce(updates, %{}, fn update, acc ->
   1924         Enum.reduce(update.expr, acc, fn {_op, kw}, acc ->
   1925           Enum.reduce(kw, acc, fn {k, v}, acc ->
   1926             if Map.has_key?(acc, k) do
   1927               error! query, "duplicate field `#{k}` for `#{operation}`"
   1928             else
   1929               Map.put(acc, k, v)
   1930             end
   1931           end)
   1932         end)
   1933       end)
   1934 
   1935     if changes == %{} do
   1936       error! query, "`#{operation}` requires at least one field to be updated"
   1937     end
   1938   end
   1939 
   1940   defp assert_no_update!(query, operation) do
   1941     case query do
   1942       %Ecto.Query{updates: []} -> query
   1943       _ ->
   1944         error! query, "`#{operation}` does not allow `update` expressions"
   1945     end
   1946   end
   1947 
   1948   defp assert_only_filter_expressions!(query, operation) do
   1949     case query do
   1950       %Ecto.Query{order_bys: [], limit: nil, offset: nil, group_bys: [],
   1951                   havings: [], preloads: [], assocs: [], distinct: nil, lock: nil,
   1952                   windows: [], combinations: []} ->
   1953         query
   1954       _ ->
   1955         error! query, "`#{operation}` allows only `with_cte`, `where` and `join` expressions. " <>
   1956                       "You can exclude unwanted expressions from a query by using " <>
   1957                       "Ecto.Query.exclude/2. Error found"
   1958     end
   1959   end
   1960 
   1961   defp filter_and_reraise(exception, stacktrace) do
   1962     reraise exception, Enum.reject(stacktrace, &match?({__MODULE__, _, _, _}, &1))
   1963   end
   1964 
   1965   defp error!(query, message) do
   1966     raise Ecto.QueryError, message: message, query: query
   1967   end
   1968 
   1969   defp error!(query, expr, message) do
   1970     raise Ecto.QueryError, message: message, query: query, file: expr.file, line: expr.line
   1971   end
   1972 
   1973   defp error!(query, expr, message, hint) do
   1974     raise Ecto.QueryError, message: message, query: query, file: expr.file, line: expr.line, hint: hint
   1975   end
   1976 end