zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

preloader.ex (21789B)


      1 defmodule Ecto.Repo.Preloader do
      2   # The module invoked by user defined repo_names
      3   # for preload related functionality.
      4   @moduledoc false
      5 
      6   require Ecto.Query
      7   require Logger
      8 
      9   @doc """
     10   Transforms a result set based on query preloads, loading
     11   the associations onto their parent schema.
     12   """
     13   @spec query([list], Ecto.Repo.t, list, Access.t, fun, {adapter_meta :: map, opts :: Keyword.t}) :: [list]
     14   def query([], _repo_name, _preloads, _take, _fun, _tuplet), do: []
     15   def query(rows, _repo_name, [], _take, fun, _tuplet), do: Enum.map(rows, fun)
     16 
     17   def query(rows, repo_name, preloads, take, fun, tuplet) do
     18     rows
     19     |> extract()
     20     |> normalize_and_preload_each(repo_name, preloads, take, tuplet)
     21     |> unextract(rows, fun)
     22   end
     23 
     24   defp extract([[nil|_]|t2]), do: extract(t2)
     25   defp extract([[h|_]|t2]), do: [h|extract(t2)]
     26   defp extract([]), do: []
     27 
     28   defp unextract(structs, [[nil|_] = h2|t2], fun), do: [fun.(h2)|unextract(structs, t2, fun)]
     29   defp unextract([h1|structs], [[_|t1]|t2], fun), do: [fun.([h1|t1])|unextract(structs, t2, fun)]
     30   defp unextract([], [], _fun), do: []
     31 
     32   @doc """
     33   Implementation for `Ecto.Repo.preload/2`.
     34   """
     35   @spec preload(structs, atom, atom | list, {adapter_meta :: map, opts :: Keyword.t}) ::
     36                 structs when structs: [Ecto.Schema.t] | Ecto.Schema.t | nil
     37   def preload(nil, _repo_name, _preloads, _tuplet) do
     38     nil
     39   end
     40 
     41   def preload(structs, repo_name, preloads, {_adapter_meta, opts} = tuplet) when is_list(structs) do
     42     normalize_and_preload_each(structs, repo_name, preloads, opts[:take], tuplet)
     43   end
     44 
     45   def preload(struct, repo_name, preloads, {_adapter_meta, opts} = tuplet) when is_map(struct) do
     46     normalize_and_preload_each([struct], repo_name, preloads, opts[:take], tuplet) |> hd()
     47   end
     48 
     49   defp normalize_and_preload_each(structs, repo_name, preloads, take, tuplet) do
     50     preloads = normalize(preloads, take, preloads)
     51     preload_each(structs, repo_name, preloads, tuplet)
     52   rescue
     53     e ->
     54       # Reraise errors so we ignore the preload inner stacktrace
     55       filter_and_reraise e, __STACKTRACE__
     56   end
     57 
     58   ## Preloading
     59 
     60   defp preload_each(structs, _repo_name, [], _tuplet),   do: structs
     61   defp preload_each([], _repo_name, _preloads, _tuplet), do: []
     62   defp preload_each(structs, repo_name, preloads, tuplet) do
     63     if sample = Enum.find(structs, & &1) do
     64       module = sample.__struct__
     65       prefix = preload_prefix(tuplet, sample)
     66       {assocs, throughs, embeds} = expand(module, preloads, {%{}, %{}, []})
     67       structs = preload_embeds(structs, embeds, repo_name, tuplet)
     68 
     69       {fetched_assocs, to_fetch_queries} =
     70         prepare_queries(structs, module, assocs, prefix, repo_name, tuplet)
     71 
     72       fetched_queries = maybe_pmap(to_fetch_queries, repo_name, tuplet)
     73       assocs = preload_assocs(fetched_assocs, fetched_queries, repo_name, tuplet)
     74       throughs = Map.values(throughs)
     75 
     76       for struct <- structs do
     77         struct = Enum.reduce assocs, struct, &load_assoc/2
     78         struct = Enum.reduce throughs, struct, &load_through/2
     79         struct
     80       end
     81     else
     82       structs
     83     end
     84   end
     85 
     86   defp preload_prefix({_adapter_meta, opts}, sample) do
     87     case Keyword.fetch(opts, :prefix) do
     88       {:ok, prefix} ->
     89         prefix
     90 
     91       :error ->
     92         case sample do
     93           %{__meta__: %{prefix: prefix}} -> prefix
     94           # Must be an embedded schema
     95           _ -> nil
     96         end
     97     end
     98   end
     99 
    100   ## Association preloading
    101 
    102   # First we traverse all assocs and find which queries we need to run.
    103   defp prepare_queries(structs, module, assocs, prefix, repo_name, tuplet) do
    104     Enum.reduce(assocs, {[], []}, fn
    105       {_key, {{:assoc, assoc, related_key}, take, query, preloads}}, {assocs, queries} ->
    106         {fetch_ids, loaded_ids, loaded_structs} = fetch_ids(structs, module, assoc, tuplet)
    107 
    108         queries =
    109           if fetch_ids != [] do
    110             [
    111               fn tuplet ->
    112                 fetch_query(fetch_ids, assoc, repo_name, query, prefix, related_key, take, tuplet)
    113               end
    114               | queries
    115             ]
    116           else
    117             queries
    118           end
    119 
    120         {[{assoc, fetch_ids != [], loaded_ids, loaded_structs, preloads} | assocs], queries}
    121     end)
    122   end
    123 
    124   # Then we execute queries in parallel
    125   defp maybe_pmap(preloaders, _repo_name, {adapter_meta, opts}) do
    126     if match?([_,_|_], preloaders) and not adapter_meta.adapter.checked_out?(adapter_meta) and
    127          Keyword.get(opts, :in_parallel, true) do
    128       # We pass caller: self() so the ownership pool knows where
    129       # to fetch the connection from and set the proper timeouts.
    130       # Note while the ownership pool uses '$callers' from pdict,
    131       # it does not do so in automatic mode, hence this line is
    132       # still necessary.
    133       opts = Keyword.put_new(opts, :caller, self())
    134 
    135       preloaders
    136       |> Task.async_stream(&(&1.({adapter_meta, opts})), timeout: :infinity)
    137       |> Enum.map(fn {:ok, assoc} -> assoc end)
    138     else
    139       Enum.map(preloaders, &(&1.({adapter_meta, opts})))
    140     end
    141   end
    142 
    143   # Then we unpack the query results, merge them, and preload recursively
    144   defp preload_assocs(
    145          [{assoc, query?, loaded_ids, loaded_structs, preloads} | assocs],
    146          queries,
    147          repo_name,
    148          tuplet
    149        ) do
    150     {fetch_ids, fetch_structs, queries} = maybe_unpack_query(query?, queries)
    151     all = preload_each(Enum.reverse(loaded_structs, fetch_structs), repo_name, preloads, tuplet)
    152     entry = {:assoc, assoc, assoc_map(assoc.cardinality, Enum.reverse(loaded_ids, fetch_ids), all)}
    153     [entry | preload_assocs(assocs, queries, repo_name, tuplet)]
    154   end
    155 
    156   defp preload_assocs([], [], _repo_name, _tuplet), do: []
    157 
    158   defp preload_embeds(structs, [], _repo_name, _tuplet), do: structs
    159 
    160   defp preload_embeds(structs, [embed | embeds], repo_name, tuplet) do
    161 
    162     {%{field: field, cardinality: card}, sub_preloads} = embed
    163 
    164     {embed_structs, counts} =
    165       Enum.flat_map_reduce(structs, [], fn
    166         %{^field => embeds}, counts when is_list(embeds) -> {embeds, [length(embeds) | counts]}
    167         %{^field => nil}, counts -> {[], [0 | counts]}
    168         %{^field => embed}, counts -> {[embed], [1 | counts]}
    169         nil, counts -> {[], [0 | counts]}
    170         struct, _counts -> raise ArgumentError, "expected #{inspect(struct)} to contain embed `#{field}`"
    171       end)
    172 
    173     embed_structs = preload_each(embed_structs, repo_name, sub_preloads, tuplet)
    174     structs = load_embeds(card, field, structs, embed_structs, Enum.reverse(counts), [])
    175     preload_embeds(structs, embeds, repo_name, tuplet)
    176   end
    177 
    178   defp load_embeds(_card, _field, [], [], [], acc), do: Enum.reverse(acc)
    179 
    180   defp load_embeds(card, field, [struct | structs], embed_structs, [0 | counts], acc),
    181     do: load_embeds(card, field, structs, embed_structs, counts, [struct | acc])
    182 
    183   defp load_embeds(:one, field, [struct | structs], [embed_struct | embed_structs], [1 | counts], acc),
    184     do: load_embeds(:one, field, structs, embed_structs, counts, [Map.put(struct, field, embed_struct) | acc])
    185 
    186   defp load_embeds(:many, field, [struct | structs], embed_structs, [count | counts], acc) do
    187     {current_embeds, rest_embeds} = split_n(embed_structs, count, [])
    188     acc = [Map.put(struct, field, Enum.reverse(current_embeds)) | acc]
    189     load_embeds(:many, field, structs, rest_embeds, counts, acc)
    190   end
    191 
    192   defp maybe_unpack_query(false, queries), do: {[], [], queries}
    193   defp maybe_unpack_query(true, [{ids, structs} | queries]), do: {ids, structs, queries}
    194 
    195   defp fetch_ids(structs, module, assoc, {_adapter_meta, opts}) do
    196     %{field: field, owner_key: owner_key, cardinality: card} = assoc
    197     force? = Keyword.get(opts, :force, false)
    198 
    199     Enum.reduce structs, {[], [], []}, fn
    200       nil, acc ->
    201         acc
    202       struct, {fetch_ids, loaded_ids, loaded_structs} ->
    203         assert_struct!(module, struct)
    204         %{^owner_key => id, ^field => value} = struct
    205         loaded? = Ecto.assoc_loaded?(value) and not force?
    206 
    207         if loaded? and is_nil(id) and not Ecto.Changeset.Relation.empty?(assoc, value) do
    208           Logger.warn """
    209           association `#{field}` for `#{inspect(module)}` has a loaded value but \
    210           its association key `#{owner_key}` is nil. This usually means one of:
    211 
    212             * `#{owner_key}` was not selected in a query
    213             * the struct was set with default values for `#{field}` which now you want to override
    214 
    215           If this is intentional, set force: true to disable this warning
    216           """
    217         end
    218 
    219         cond do
    220           card == :one and loaded? ->
    221             {fetch_ids, [id | loaded_ids], [value | loaded_structs]}
    222           card == :many and loaded? ->
    223             {fetch_ids, [{id, length(value)} | loaded_ids], value ++ loaded_structs}
    224           is_nil(id) ->
    225             {fetch_ids, loaded_ids, loaded_structs}
    226           true ->
    227             {[id | fetch_ids], loaded_ids, loaded_structs}
    228         end
    229     end
    230   end
    231 
    232   defp fetch_query(ids, assoc, _repo_name, query, _prefix, related_key, _take, _tuplet) when is_function(query, 1) do
    233     # Note we use an explicit sort because we don't want
    234     # to reorder based on the struct. Only the ID.
    235     ids
    236     |> Enum.uniq
    237     |> query.()
    238     |> fetched_records_to_tuple_ids(assoc, related_key)
    239     |> Enum.sort(fn {id1, _}, {id2, _} -> id1 <= id2 end)
    240     |> unzip_ids([], [])
    241   end
    242 
    243   defp fetch_query(ids, %{cardinality: card} = assoc, repo_name, query, prefix, related_key, take, tuplet) do
    244     query = assoc.__struct__.assoc_query(assoc, query, Enum.uniq(ids))
    245     field = related_key_to_field(query, related_key)
    246 
    247     # Normalize query
    248     query = %{Ecto.Query.Planner.ensure_select(query, take || true) | prefix: prefix}
    249 
    250     # Add the related key to the query results
    251     query = update_in query.select.expr, &{:{}, [], [field, &1]}
    252 
    253     # If we are returning many results, we must sort by the key too
    254     query =
    255       case {card, query.combinations} do
    256         {:many, [{kind, _} | []]} ->
    257           raise ArgumentError,
    258                 "`#{kind}` queries must be wrapped inside of a subquery " <>
    259                   "when preloading a `has_many` or `many_to_many` association. " <>
    260                   "You must also ensure that all members of the `#{kind}` query " <>
    261                   "select the parent's foreign key"
    262 
    263         {:many, _} ->
    264           update_in query.order_bys, fn order_bys ->
    265             [%Ecto.Query.QueryExpr{expr: preload_order(assoc, query, field), params: [],
    266                                    file: __ENV__.file, line: __ENV__.line}|order_bys]
    267           end
    268         {:one, _} ->
    269           query
    270       end
    271 
    272     unzip_ids Ecto.Repo.Queryable.all(repo_name, query, tuplet), [], []
    273   end
    274 
    275   defp fetched_records_to_tuple_ids([], _assoc, _related_key),
    276     do: []
    277 
    278   defp fetched_records_to_tuple_ids([%{} | _] = entries, _assoc, {0, key}),
    279     do: Enum.map(entries, &{Map.fetch!(&1, key), &1})
    280 
    281   defp fetched_records_to_tuple_ids([{_, %{}} | _] = entries, _assoc, _related_key),
    282     do: entries
    283 
    284   defp fetched_records_to_tuple_ids([entry | _], assoc, _),
    285     do: raise """
    286     invalid custom preload for `#{assoc.field}` on `#{inspect assoc.owner}`.
    287 
    288     For many_to_many associations, the custom function given to preload should \
    289     return a tuple with the associated key as first element and the record as \
    290     second element.
    291 
    292     For example, imagine posts has many to many tags through a posts_tags table. \
    293     When preloading the tags, you may write:
    294 
    295         custom_tags = fn post_ids ->
    296           Repo.all(
    297             from t in Tag,
    298                  join: pt in "posts_tags",
    299                  where: t.custom and pt.post_id in ^post_ids and pt.tag_id == t.id
    300           )
    301         end
    302 
    303         from Post, preload: [tags: ^custom_tags]
    304 
    305     Unfortunately the query above is not enough because Ecto won't know how to \
    306     associate the posts with the tags. In those cases, you need to return a tuple \
    307     with the `post_id` as first element and the tag record as second. The new query \
    308     will have a select field as follows:
    309 
    310         from t in Tag,
    311              join: pt in "posts_tags",
    312              where: t.custom and pt.post_id in ^post_ids and pt.tag_id == t.id,
    313              select: {pt.post_id, t}
    314 
    315     We expected a tuple but we got: #{inspect(entry)}
    316     """
    317 
    318   defp preload_order(assoc, query, related_field) do
    319     custom_order_by = Enum.map(assoc.preload_order, fn
    320       {direction, field} ->
    321         {direction, related_key_to_field(query, {0, field})}
    322       field ->
    323         {:asc, related_key_to_field(query, {0, field})}
    324     end)
    325 
    326     [{:asc, related_field} | custom_order_by]
    327   end
    328 
    329   defp related_key_to_field(query, {pos, key, field_type}) do
    330     field_ast = related_key_to_field(query, {pos, key})
    331 
    332     {:type, [], [field_ast, field_type]}
    333   end
    334 
    335   defp related_key_to_field(query, {pos, key}) do
    336     {{:., [], [{:&, [], [related_key_pos(query, pos)]}, key]}, [], []}
    337   end
    338 
    339   defp related_key_pos(_query, pos) when pos >= 0, do: pos
    340   defp related_key_pos(query, pos), do: Ecto.Query.Builder.count_binds(query) + pos
    341 
    342   defp unzip_ids([{k, v}|t], acc1, acc2), do: unzip_ids(t, [k|acc1], [v|acc2])
    343   defp unzip_ids([], acc1, acc2), do: {acc1, acc2}
    344 
    345   defp assert_struct!(mod, %{__struct__: mod}), do: true
    346   defp assert_struct!(mod, %{__struct__: struct}) do
    347     raise ArgumentError, "expected a homogeneous list containing the same struct, " <>
    348                          "got: #{inspect mod} and #{inspect struct}"
    349   end
    350 
    351   defp assoc_map(:one, ids, structs) do
    352     one_assoc_map(ids, structs, %{})
    353   end
    354   defp assoc_map(:many, ids, structs) do
    355     many_assoc_map(ids, structs, %{})
    356   end
    357 
    358   defp one_assoc_map([id|ids], [struct|structs], map) do
    359     one_assoc_map(ids, structs, Map.put(map, id, struct))
    360   end
    361   defp one_assoc_map([], [], map) do
    362     map
    363   end
    364 
    365   defp many_assoc_map([{id, n}|ids], structs, map) do
    366     {acc, structs} = split_n(structs, n, [])
    367     many_assoc_map(ids, structs, Map.put(map, id, acc))
    368   end
    369   defp many_assoc_map([id|ids], [struct|structs], map) do
    370     {ids, structs, acc} = split_while(ids, structs, id, [struct])
    371     many_assoc_map(ids, structs, Map.put(map, id, acc))
    372   end
    373   defp many_assoc_map([], [], map) do
    374     map
    375   end
    376 
    377   defp split_n(structs, 0, acc), do: {acc, structs}
    378   defp split_n([struct | structs], n, acc), do: split_n(structs, n - 1, [struct | acc])
    379 
    380   defp split_while([id|ids], [struct|structs], id, acc),
    381     do: split_while(ids, structs, id, [struct|acc])
    382   defp split_while(ids, structs, _id, acc),
    383     do: {ids, structs, acc}
    384 
    385   ## Load preloaded data
    386 
    387   defp load_assoc({:assoc, _assoc, _ids}, nil) do
    388     nil
    389   end
    390 
    391   defp load_assoc({:assoc, assoc, ids}, struct) do
    392     %{field: field, owner_key: owner_key, cardinality: cardinality} = assoc
    393     key = Map.fetch!(struct, owner_key)
    394 
    395     loaded =
    396       case ids do
    397         %{^key => value} -> value
    398         _ when cardinality == :many -> []
    399         _ -> nil
    400       end
    401 
    402     Map.put(struct, field, loaded)
    403   end
    404 
    405   defp load_through({:through, assoc, throughs}, struct) do
    406     %{cardinality: cardinality, field: field, owner: owner} = assoc
    407     {loaded, _} = Enum.reduce(throughs, {[struct], owner}, &recur_through/2)
    408     Map.put(struct, field, maybe_first(loaded, cardinality))
    409   end
    410 
    411   defp maybe_first(list, :one), do: List.first(list)
    412   defp maybe_first(list, _), do: list
    413 
    414   defp recur_through(field, {structs, owner}) do
    415     assoc = owner.__schema__(:association, field)
    416     case assoc.__struct__.preload_info(assoc) do
    417       {:assoc, %{related: related}, _} ->
    418         pk_fields =
    419           related.__schema__(:primary_key)
    420           |> validate_has_pk_field!(related, assoc)
    421 
    422         {children, _} =
    423           Enum.reduce(structs, {[], %{}}, fn struct, acc ->
    424             struct
    425             |> Map.fetch!(field)
    426             |> List.wrap()
    427             |> Enum.reduce(acc, fn child, {fresh, set} ->
    428               pk_values =
    429                 child
    430                 |> through_pks(pk_fields, assoc)
    431                 |> validate_non_null_pk!(child, pk_fields, assoc)
    432 
    433               case set do
    434                 %{^pk_values => true} ->
    435                   {fresh, set}
    436                 _ ->
    437                   {[child|fresh], Map.put(set, pk_values, true)}
    438               end
    439             end)
    440           end)
    441 
    442         {Enum.reverse(children), related}
    443 
    444       {:through, _, through} ->
    445         Enum.reduce(through, {structs, owner}, &recur_through/2)
    446     end
    447   end
    448 
    449   defp validate_has_pk_field!([], related, assoc) do
    450     raise ArgumentError,
    451           "cannot preload through association `#{assoc.field}` on " <>
    452             "`#{inspect assoc.owner}`. Ecto expected the #{inspect related} schema " <>
    453             "to have at least one primary key field"
    454   end
    455 
    456   defp validate_has_pk_field!(pk_fields, _related, _assoc), do: pk_fields
    457 
    458   defp through_pks(map, pks, assoc) do
    459     Enum.map(pks, fn pk ->
    460       case map do
    461         %{^pk => value} ->
    462           value
    463 
    464         _ ->
    465           raise ArgumentError,
    466                "cannot preload through association `#{assoc.field}` on " <>
    467                  "`#{inspect assoc.owner}`. Ecto expected a map/struct with " <>
    468                  "the key `#{pk}` but got: #{inspect map}"
    469       end
    470     end)
    471   end
    472 
    473   defp validate_non_null_pk!(values, map, pks, assoc) do
    474     case values do
    475       [nil | _] ->
    476         raise ArgumentError,
    477               "cannot preload through association `#{assoc.field}` on " <>
    478                 "`#{inspect assoc.owner}` because the primary key `#{hd(pks)}` " <>
    479                 "is nil for map/struct: #{inspect map}"
    480 
    481       _ ->
    482         values
    483     end
    484   end
    485 
    486   ## Normalizer
    487 
    488   def normalize(preload, take, original) do
    489     normalize_each(wrap(preload, original), [], take, original)
    490   end
    491 
    492   defp normalize_each({atom, {query, list}}, acc, take, original)
    493        when is_atom(atom) and (is_map(query) or is_function(query, 1)) do
    494     fields = take(take, atom)
    495     [{atom, {fields, query!(query), normalize_each(wrap(list, original), [], fields, original)}}|acc]
    496   end
    497 
    498   defp normalize_each({atom, query}, acc, take, _original)
    499        when is_atom(atom) and (is_map(query) or is_function(query, 1)) do
    500     [{atom, {take(take, atom), query!(query), []}}|acc]
    501   end
    502 
    503   defp normalize_each({atom, list}, acc, take, original) when is_atom(atom) do
    504     fields = take(take, atom)
    505     [{atom, {fields, nil, normalize_each(wrap(list, original), [], fields, original)}}|acc]
    506   end
    507 
    508   defp normalize_each(atom, acc, take, _original) when is_atom(atom) do
    509     [{atom, {take(take, atom), nil, []}}|acc]
    510   end
    511 
    512   defp normalize_each(other, acc, take, original) do
    513     Enum.reduce(wrap(other, original), acc, &normalize_each(&1, &2, take, original))
    514   end
    515 
    516   defp query!(query) when is_function(query, 1), do: query
    517   defp query!(%Ecto.Query{} = query), do: query
    518 
    519   defp take(take, field) do
    520     case Access.fetch(take, field) do
    521       {:ok, fields} -> List.wrap(fields)
    522       :error -> nil
    523     end
    524   end
    525 
    526   defp wrap(list, _original) when is_list(list),
    527     do: list
    528   defp wrap(atom, _original) when is_atom(atom),
    529     do: atom
    530   defp wrap(other, original) do
    531     raise ArgumentError, "invalid preload `#{inspect other}` in `#{inspect original}`. " <>
    532                          "preload expects an atom, a (nested) keyword or a (nested) list of atoms"
    533   end
    534 
    535   ## Expand
    536 
    537   def expand(schema, preloads, acc) do
    538     Enum.reduce(preloads, acc, fn {preload, {fields, query, sub_preloads}},
    539                                   {assocs, throughs, embeds} ->
    540       assoc_or_embed = association_or_embed!(schema, preload)
    541 
    542       info = assoc_or_embed.__struct__.preload_info(assoc_or_embed)
    543 
    544       case info do
    545         {:assoc, _, _} ->
    546           value = {info, fields, query, sub_preloads}
    547           assocs = Map.update(assocs, preload, value, &merge_preloads(preload, value, &1))
    548           {assocs, throughs, embeds}
    549 
    550         {:through, _, through} ->
    551           through =
    552             through
    553             |> Enum.reverse()
    554             |> Enum.reduce({fields, query, sub_preloads}, &{nil, nil, [{&1, &2}]})
    555             |> elem(2)
    556 
    557           expand(schema, through, {assocs, Map.put(throughs, preload, info), embeds})
    558 
    559         :embed ->
    560           if sub_preloads == [] do
    561             raise ArgumentError,
    562                   "cannot preload embedded field #{inspect(assoc_or_embed.field)} " <>
    563                     "without also preloading one of its associations as it has no effect"
    564           end
    565 
    566           embeds = [{assoc_or_embed, sub_preloads} | embeds]
    567           {assocs, throughs, embeds}
    568       end
    569     end)
    570   end
    571 
    572   defp merge_preloads(_preload, {info, _, nil, left}, {info, take, query, right}),
    573     do: {info, take, query, left ++ right}
    574   defp merge_preloads(_preload, {info, take, query, left}, {info, _, nil, right}),
    575     do: {info, take, query, left ++ right}
    576   defp merge_preloads(preload, {info, _, left, _}, {info, _, right, _}) do
    577     raise ArgumentError, "cannot preload `#{preload}` as it has been supplied more than once " <>
    578                          "with different queries: #{inspect left} and #{inspect right}"
    579   end
    580 
    581   defp association_or_embed!(schema, preload) do
    582     schema.__schema__(:association, preload) || schema.__schema__(:embed, preload) ||
    583       raise ArgumentError, "schema #{inspect schema} does not have association or embed #{inspect preload}#{maybe_module(preload)}"
    584   end
    585 
    586   defp maybe_module(assoc) do
    587     case Atom.to_string(assoc) do
    588       "Elixir." <> _ ->
    589         " (if you were trying to pass a schema as a query to preload, " <>
    590           "you have to explicitly convert it to a query by doing `from x in #{inspect assoc}` " <>
    591           "or by calling Ecto.Queryable.to_query/1)"
    592 
    593       _ ->
    594         ""
    595     end
    596   end
    597 
    598   defp filter_and_reraise(exception, stacktrace) do
    599     reraise exception, Enum.reject(stacktrace, &match?({__MODULE__, _, _, _}, &1))
    600   end
    601 end