preloader.ex (21789B)
1 defmodule Ecto.Repo.Preloader do 2 # The module invoked by user defined repo_names 3 # for preload related functionality. 4 @moduledoc false 5 6 require Ecto.Query 7 require Logger 8 9 @doc """ 10 Transforms a result set based on query preloads, loading 11 the associations onto their parent schema. 12 """ 13 @spec query([list], Ecto.Repo.t, list, Access.t, fun, {adapter_meta :: map, opts :: Keyword.t}) :: [list] 14 def query([], _repo_name, _preloads, _take, _fun, _tuplet), do: [] 15 def query(rows, _repo_name, [], _take, fun, _tuplet), do: Enum.map(rows, fun) 16 17 def query(rows, repo_name, preloads, take, fun, tuplet) do 18 rows 19 |> extract() 20 |> normalize_and_preload_each(repo_name, preloads, take, tuplet) 21 |> unextract(rows, fun) 22 end 23 24 defp extract([[nil|_]|t2]), do: extract(t2) 25 defp extract([[h|_]|t2]), do: [h|extract(t2)] 26 defp extract([]), do: [] 27 28 defp unextract(structs, [[nil|_] = h2|t2], fun), do: [fun.(h2)|unextract(structs, t2, fun)] 29 defp unextract([h1|structs], [[_|t1]|t2], fun), do: [fun.([h1|t1])|unextract(structs, t2, fun)] 30 defp unextract([], [], _fun), do: [] 31 32 @doc """ 33 Implementation for `Ecto.Repo.preload/2`. 34 """ 35 @spec preload(structs, atom, atom | list, {adapter_meta :: map, opts :: Keyword.t}) :: 36 structs when structs: [Ecto.Schema.t] | Ecto.Schema.t | nil 37 def preload(nil, _repo_name, _preloads, _tuplet) do 38 nil 39 end 40 41 def preload(structs, repo_name, preloads, {_adapter_meta, opts} = tuplet) when is_list(structs) do 42 normalize_and_preload_each(structs, repo_name, preloads, opts[:take], tuplet) 43 end 44 45 def preload(struct, repo_name, preloads, {_adapter_meta, opts} = tuplet) when is_map(struct) do 46 normalize_and_preload_each([struct], repo_name, preloads, opts[:take], tuplet) |> hd() 47 end 48 49 defp normalize_and_preload_each(structs, repo_name, preloads, take, tuplet) do 50 preloads = normalize(preloads, take, preloads) 51 preload_each(structs, repo_name, preloads, tuplet) 52 rescue 53 e -> 54 # Reraise errors so we ignore the preload inner stacktrace 55 filter_and_reraise e, __STACKTRACE__ 56 end 57 58 ## Preloading 59 60 defp preload_each(structs, _repo_name, [], _tuplet), do: structs 61 defp preload_each([], _repo_name, _preloads, _tuplet), do: [] 62 defp preload_each(structs, repo_name, preloads, tuplet) do 63 if sample = Enum.find(structs, & &1) do 64 module = sample.__struct__ 65 prefix = preload_prefix(tuplet, sample) 66 {assocs, throughs, embeds} = expand(module, preloads, {%{}, %{}, []}) 67 structs = preload_embeds(structs, embeds, repo_name, tuplet) 68 69 {fetched_assocs, to_fetch_queries} = 70 prepare_queries(structs, module, assocs, prefix, repo_name, tuplet) 71 72 fetched_queries = maybe_pmap(to_fetch_queries, repo_name, tuplet) 73 assocs = preload_assocs(fetched_assocs, fetched_queries, repo_name, tuplet) 74 throughs = Map.values(throughs) 75 76 for struct <- structs do 77 struct = Enum.reduce assocs, struct, &load_assoc/2 78 struct = Enum.reduce throughs, struct, &load_through/2 79 struct 80 end 81 else 82 structs 83 end 84 end 85 86 defp preload_prefix({_adapter_meta, opts}, sample) do 87 case Keyword.fetch(opts, :prefix) do 88 {:ok, prefix} -> 89 prefix 90 91 :error -> 92 case sample do 93 %{__meta__: %{prefix: prefix}} -> prefix 94 # Must be an embedded schema 95 _ -> nil 96 end 97 end 98 end 99 100 ## Association preloading 101 102 # First we traverse all assocs and find which queries we need to run. 103 defp prepare_queries(structs, module, assocs, prefix, repo_name, tuplet) do 104 Enum.reduce(assocs, {[], []}, fn 105 {_key, {{:assoc, assoc, related_key}, take, query, preloads}}, {assocs, queries} -> 106 {fetch_ids, loaded_ids, loaded_structs} = fetch_ids(structs, module, assoc, tuplet) 107 108 queries = 109 if fetch_ids != [] do 110 [ 111 fn tuplet -> 112 fetch_query(fetch_ids, assoc, repo_name, query, prefix, related_key, take, tuplet) 113 end 114 | queries 115 ] 116 else 117 queries 118 end 119 120 {[{assoc, fetch_ids != [], loaded_ids, loaded_structs, preloads} | assocs], queries} 121 end) 122 end 123 124 # Then we execute queries in parallel 125 defp maybe_pmap(preloaders, _repo_name, {adapter_meta, opts}) do 126 if match?([_,_|_], preloaders) and not adapter_meta.adapter.checked_out?(adapter_meta) and 127 Keyword.get(opts, :in_parallel, true) do 128 # We pass caller: self() so the ownership pool knows where 129 # to fetch the connection from and set the proper timeouts. 130 # Note while the ownership pool uses '$callers' from pdict, 131 # it does not do so in automatic mode, hence this line is 132 # still necessary. 133 opts = Keyword.put_new(opts, :caller, self()) 134 135 preloaders 136 |> Task.async_stream(&(&1.({adapter_meta, opts})), timeout: :infinity) 137 |> Enum.map(fn {:ok, assoc} -> assoc end) 138 else 139 Enum.map(preloaders, &(&1.({adapter_meta, opts}))) 140 end 141 end 142 143 # Then we unpack the query results, merge them, and preload recursively 144 defp preload_assocs( 145 [{assoc, query?, loaded_ids, loaded_structs, preloads} | assocs], 146 queries, 147 repo_name, 148 tuplet 149 ) do 150 {fetch_ids, fetch_structs, queries} = maybe_unpack_query(query?, queries) 151 all = preload_each(Enum.reverse(loaded_structs, fetch_structs), repo_name, preloads, tuplet) 152 entry = {:assoc, assoc, assoc_map(assoc.cardinality, Enum.reverse(loaded_ids, fetch_ids), all)} 153 [entry | preload_assocs(assocs, queries, repo_name, tuplet)] 154 end 155 156 defp preload_assocs([], [], _repo_name, _tuplet), do: [] 157 158 defp preload_embeds(structs, [], _repo_name, _tuplet), do: structs 159 160 defp preload_embeds(structs, [embed | embeds], repo_name, tuplet) do 161 162 {%{field: field, cardinality: card}, sub_preloads} = embed 163 164 {embed_structs, counts} = 165 Enum.flat_map_reduce(structs, [], fn 166 %{^field => embeds}, counts when is_list(embeds) -> {embeds, [length(embeds) | counts]} 167 %{^field => nil}, counts -> {[], [0 | counts]} 168 %{^field => embed}, counts -> {[embed], [1 | counts]} 169 nil, counts -> {[], [0 | counts]} 170 struct, _counts -> raise ArgumentError, "expected #{inspect(struct)} to contain embed `#{field}`" 171 end) 172 173 embed_structs = preload_each(embed_structs, repo_name, sub_preloads, tuplet) 174 structs = load_embeds(card, field, structs, embed_structs, Enum.reverse(counts), []) 175 preload_embeds(structs, embeds, repo_name, tuplet) 176 end 177 178 defp load_embeds(_card, _field, [], [], [], acc), do: Enum.reverse(acc) 179 180 defp load_embeds(card, field, [struct | structs], embed_structs, [0 | counts], acc), 181 do: load_embeds(card, field, structs, embed_structs, counts, [struct | acc]) 182 183 defp load_embeds(:one, field, [struct | structs], [embed_struct | embed_structs], [1 | counts], acc), 184 do: load_embeds(:one, field, structs, embed_structs, counts, [Map.put(struct, field, embed_struct) | acc]) 185 186 defp load_embeds(:many, field, [struct | structs], embed_structs, [count | counts], acc) do 187 {current_embeds, rest_embeds} = split_n(embed_structs, count, []) 188 acc = [Map.put(struct, field, Enum.reverse(current_embeds)) | acc] 189 load_embeds(:many, field, structs, rest_embeds, counts, acc) 190 end 191 192 defp maybe_unpack_query(false, queries), do: {[], [], queries} 193 defp maybe_unpack_query(true, [{ids, structs} | queries]), do: {ids, structs, queries} 194 195 defp fetch_ids(structs, module, assoc, {_adapter_meta, opts}) do 196 %{field: field, owner_key: owner_key, cardinality: card} = assoc 197 force? = Keyword.get(opts, :force, false) 198 199 Enum.reduce structs, {[], [], []}, fn 200 nil, acc -> 201 acc 202 struct, {fetch_ids, loaded_ids, loaded_structs} -> 203 assert_struct!(module, struct) 204 %{^owner_key => id, ^field => value} = struct 205 loaded? = Ecto.assoc_loaded?(value) and not force? 206 207 if loaded? and is_nil(id) and not Ecto.Changeset.Relation.empty?(assoc, value) do 208 Logger.warn """ 209 association `#{field}` for `#{inspect(module)}` has a loaded value but \ 210 its association key `#{owner_key}` is nil. This usually means one of: 211 212 * `#{owner_key}` was not selected in a query 213 * the struct was set with default values for `#{field}` which now you want to override 214 215 If this is intentional, set force: true to disable this warning 216 """ 217 end 218 219 cond do 220 card == :one and loaded? -> 221 {fetch_ids, [id | loaded_ids], [value | loaded_structs]} 222 card == :many and loaded? -> 223 {fetch_ids, [{id, length(value)} | loaded_ids], value ++ loaded_structs} 224 is_nil(id) -> 225 {fetch_ids, loaded_ids, loaded_structs} 226 true -> 227 {[id | fetch_ids], loaded_ids, loaded_structs} 228 end 229 end 230 end 231 232 defp fetch_query(ids, assoc, _repo_name, query, _prefix, related_key, _take, _tuplet) when is_function(query, 1) do 233 # Note we use an explicit sort because we don't want 234 # to reorder based on the struct. Only the ID. 235 ids 236 |> Enum.uniq 237 |> query.() 238 |> fetched_records_to_tuple_ids(assoc, related_key) 239 |> Enum.sort(fn {id1, _}, {id2, _} -> id1 <= id2 end) 240 |> unzip_ids([], []) 241 end 242 243 defp fetch_query(ids, %{cardinality: card} = assoc, repo_name, query, prefix, related_key, take, tuplet) do 244 query = assoc.__struct__.assoc_query(assoc, query, Enum.uniq(ids)) 245 field = related_key_to_field(query, related_key) 246 247 # Normalize query 248 query = %{Ecto.Query.Planner.ensure_select(query, take || true) | prefix: prefix} 249 250 # Add the related key to the query results 251 query = update_in query.select.expr, &{:{}, [], [field, &1]} 252 253 # If we are returning many results, we must sort by the key too 254 query = 255 case {card, query.combinations} do 256 {:many, [{kind, _} | []]} -> 257 raise ArgumentError, 258 "`#{kind}` queries must be wrapped inside of a subquery " <> 259 "when preloading a `has_many` or `many_to_many` association. " <> 260 "You must also ensure that all members of the `#{kind}` query " <> 261 "select the parent's foreign key" 262 263 {:many, _} -> 264 update_in query.order_bys, fn order_bys -> 265 [%Ecto.Query.QueryExpr{expr: preload_order(assoc, query, field), params: [], 266 file: __ENV__.file, line: __ENV__.line}|order_bys] 267 end 268 {:one, _} -> 269 query 270 end 271 272 unzip_ids Ecto.Repo.Queryable.all(repo_name, query, tuplet), [], [] 273 end 274 275 defp fetched_records_to_tuple_ids([], _assoc, _related_key), 276 do: [] 277 278 defp fetched_records_to_tuple_ids([%{} | _] = entries, _assoc, {0, key}), 279 do: Enum.map(entries, &{Map.fetch!(&1, key), &1}) 280 281 defp fetched_records_to_tuple_ids([{_, %{}} | _] = entries, _assoc, _related_key), 282 do: entries 283 284 defp fetched_records_to_tuple_ids([entry | _], assoc, _), 285 do: raise """ 286 invalid custom preload for `#{assoc.field}` on `#{inspect assoc.owner}`. 287 288 For many_to_many associations, the custom function given to preload should \ 289 return a tuple with the associated key as first element and the record as \ 290 second element. 291 292 For example, imagine posts has many to many tags through a posts_tags table. \ 293 When preloading the tags, you may write: 294 295 custom_tags = fn post_ids -> 296 Repo.all( 297 from t in Tag, 298 join: pt in "posts_tags", 299 where: t.custom and pt.post_id in ^post_ids and pt.tag_id == t.id 300 ) 301 end 302 303 from Post, preload: [tags: ^custom_tags] 304 305 Unfortunately the query above is not enough because Ecto won't know how to \ 306 associate the posts with the tags. In those cases, you need to return a tuple \ 307 with the `post_id` as first element and the tag record as second. The new query \ 308 will have a select field as follows: 309 310 from t in Tag, 311 join: pt in "posts_tags", 312 where: t.custom and pt.post_id in ^post_ids and pt.tag_id == t.id, 313 select: {pt.post_id, t} 314 315 We expected a tuple but we got: #{inspect(entry)} 316 """ 317 318 defp preload_order(assoc, query, related_field) do 319 custom_order_by = Enum.map(assoc.preload_order, fn 320 {direction, field} -> 321 {direction, related_key_to_field(query, {0, field})} 322 field -> 323 {:asc, related_key_to_field(query, {0, field})} 324 end) 325 326 [{:asc, related_field} | custom_order_by] 327 end 328 329 defp related_key_to_field(query, {pos, key, field_type}) do 330 field_ast = related_key_to_field(query, {pos, key}) 331 332 {:type, [], [field_ast, field_type]} 333 end 334 335 defp related_key_to_field(query, {pos, key}) do 336 {{:., [], [{:&, [], [related_key_pos(query, pos)]}, key]}, [], []} 337 end 338 339 defp related_key_pos(_query, pos) when pos >= 0, do: pos 340 defp related_key_pos(query, pos), do: Ecto.Query.Builder.count_binds(query) + pos 341 342 defp unzip_ids([{k, v}|t], acc1, acc2), do: unzip_ids(t, [k|acc1], [v|acc2]) 343 defp unzip_ids([], acc1, acc2), do: {acc1, acc2} 344 345 defp assert_struct!(mod, %{__struct__: mod}), do: true 346 defp assert_struct!(mod, %{__struct__: struct}) do 347 raise ArgumentError, "expected a homogeneous list containing the same struct, " <> 348 "got: #{inspect mod} and #{inspect struct}" 349 end 350 351 defp assoc_map(:one, ids, structs) do 352 one_assoc_map(ids, structs, %{}) 353 end 354 defp assoc_map(:many, ids, structs) do 355 many_assoc_map(ids, structs, %{}) 356 end 357 358 defp one_assoc_map([id|ids], [struct|structs], map) do 359 one_assoc_map(ids, structs, Map.put(map, id, struct)) 360 end 361 defp one_assoc_map([], [], map) do 362 map 363 end 364 365 defp many_assoc_map([{id, n}|ids], structs, map) do 366 {acc, structs} = split_n(structs, n, []) 367 many_assoc_map(ids, structs, Map.put(map, id, acc)) 368 end 369 defp many_assoc_map([id|ids], [struct|structs], map) do 370 {ids, structs, acc} = split_while(ids, structs, id, [struct]) 371 many_assoc_map(ids, structs, Map.put(map, id, acc)) 372 end 373 defp many_assoc_map([], [], map) do 374 map 375 end 376 377 defp split_n(structs, 0, acc), do: {acc, structs} 378 defp split_n([struct | structs], n, acc), do: split_n(structs, n - 1, [struct | acc]) 379 380 defp split_while([id|ids], [struct|structs], id, acc), 381 do: split_while(ids, structs, id, [struct|acc]) 382 defp split_while(ids, structs, _id, acc), 383 do: {ids, structs, acc} 384 385 ## Load preloaded data 386 387 defp load_assoc({:assoc, _assoc, _ids}, nil) do 388 nil 389 end 390 391 defp load_assoc({:assoc, assoc, ids}, struct) do 392 %{field: field, owner_key: owner_key, cardinality: cardinality} = assoc 393 key = Map.fetch!(struct, owner_key) 394 395 loaded = 396 case ids do 397 %{^key => value} -> value 398 _ when cardinality == :many -> [] 399 _ -> nil 400 end 401 402 Map.put(struct, field, loaded) 403 end 404 405 defp load_through({:through, assoc, throughs}, struct) do 406 %{cardinality: cardinality, field: field, owner: owner} = assoc 407 {loaded, _} = Enum.reduce(throughs, {[struct], owner}, &recur_through/2) 408 Map.put(struct, field, maybe_first(loaded, cardinality)) 409 end 410 411 defp maybe_first(list, :one), do: List.first(list) 412 defp maybe_first(list, _), do: list 413 414 defp recur_through(field, {structs, owner}) do 415 assoc = owner.__schema__(:association, field) 416 case assoc.__struct__.preload_info(assoc) do 417 {:assoc, %{related: related}, _} -> 418 pk_fields = 419 related.__schema__(:primary_key) 420 |> validate_has_pk_field!(related, assoc) 421 422 {children, _} = 423 Enum.reduce(structs, {[], %{}}, fn struct, acc -> 424 struct 425 |> Map.fetch!(field) 426 |> List.wrap() 427 |> Enum.reduce(acc, fn child, {fresh, set} -> 428 pk_values = 429 child 430 |> through_pks(pk_fields, assoc) 431 |> validate_non_null_pk!(child, pk_fields, assoc) 432 433 case set do 434 %{^pk_values => true} -> 435 {fresh, set} 436 _ -> 437 {[child|fresh], Map.put(set, pk_values, true)} 438 end 439 end) 440 end) 441 442 {Enum.reverse(children), related} 443 444 {:through, _, through} -> 445 Enum.reduce(through, {structs, owner}, &recur_through/2) 446 end 447 end 448 449 defp validate_has_pk_field!([], related, assoc) do 450 raise ArgumentError, 451 "cannot preload through association `#{assoc.field}` on " <> 452 "`#{inspect assoc.owner}`. Ecto expected the #{inspect related} schema " <> 453 "to have at least one primary key field" 454 end 455 456 defp validate_has_pk_field!(pk_fields, _related, _assoc), do: pk_fields 457 458 defp through_pks(map, pks, assoc) do 459 Enum.map(pks, fn pk -> 460 case map do 461 %{^pk => value} -> 462 value 463 464 _ -> 465 raise ArgumentError, 466 "cannot preload through association `#{assoc.field}` on " <> 467 "`#{inspect assoc.owner}`. Ecto expected a map/struct with " <> 468 "the key `#{pk}` but got: #{inspect map}" 469 end 470 end) 471 end 472 473 defp validate_non_null_pk!(values, map, pks, assoc) do 474 case values do 475 [nil | _] -> 476 raise ArgumentError, 477 "cannot preload through association `#{assoc.field}` on " <> 478 "`#{inspect assoc.owner}` because the primary key `#{hd(pks)}` " <> 479 "is nil for map/struct: #{inspect map}" 480 481 _ -> 482 values 483 end 484 end 485 486 ## Normalizer 487 488 def normalize(preload, take, original) do 489 normalize_each(wrap(preload, original), [], take, original) 490 end 491 492 defp normalize_each({atom, {query, list}}, acc, take, original) 493 when is_atom(atom) and (is_map(query) or is_function(query, 1)) do 494 fields = take(take, atom) 495 [{atom, {fields, query!(query), normalize_each(wrap(list, original), [], fields, original)}}|acc] 496 end 497 498 defp normalize_each({atom, query}, acc, take, _original) 499 when is_atom(atom) and (is_map(query) or is_function(query, 1)) do 500 [{atom, {take(take, atom), query!(query), []}}|acc] 501 end 502 503 defp normalize_each({atom, list}, acc, take, original) when is_atom(atom) do 504 fields = take(take, atom) 505 [{atom, {fields, nil, normalize_each(wrap(list, original), [], fields, original)}}|acc] 506 end 507 508 defp normalize_each(atom, acc, take, _original) when is_atom(atom) do 509 [{atom, {take(take, atom), nil, []}}|acc] 510 end 511 512 defp normalize_each(other, acc, take, original) do 513 Enum.reduce(wrap(other, original), acc, &normalize_each(&1, &2, take, original)) 514 end 515 516 defp query!(query) when is_function(query, 1), do: query 517 defp query!(%Ecto.Query{} = query), do: query 518 519 defp take(take, field) do 520 case Access.fetch(take, field) do 521 {:ok, fields} -> List.wrap(fields) 522 :error -> nil 523 end 524 end 525 526 defp wrap(list, _original) when is_list(list), 527 do: list 528 defp wrap(atom, _original) when is_atom(atom), 529 do: atom 530 defp wrap(other, original) do 531 raise ArgumentError, "invalid preload `#{inspect other}` in `#{inspect original}`. " <> 532 "preload expects an atom, a (nested) keyword or a (nested) list of atoms" 533 end 534 535 ## Expand 536 537 def expand(schema, preloads, acc) do 538 Enum.reduce(preloads, acc, fn {preload, {fields, query, sub_preloads}}, 539 {assocs, throughs, embeds} -> 540 assoc_or_embed = association_or_embed!(schema, preload) 541 542 info = assoc_or_embed.__struct__.preload_info(assoc_or_embed) 543 544 case info do 545 {:assoc, _, _} -> 546 value = {info, fields, query, sub_preloads} 547 assocs = Map.update(assocs, preload, value, &merge_preloads(preload, value, &1)) 548 {assocs, throughs, embeds} 549 550 {:through, _, through} -> 551 through = 552 through 553 |> Enum.reverse() 554 |> Enum.reduce({fields, query, sub_preloads}, &{nil, nil, [{&1, &2}]}) 555 |> elem(2) 556 557 expand(schema, through, {assocs, Map.put(throughs, preload, info), embeds}) 558 559 :embed -> 560 if sub_preloads == [] do 561 raise ArgumentError, 562 "cannot preload embedded field #{inspect(assoc_or_embed.field)} " <> 563 "without also preloading one of its associations as it has no effect" 564 end 565 566 embeds = [{assoc_or_embed, sub_preloads} | embeds] 567 {assocs, throughs, embeds} 568 end 569 end) 570 end 571 572 defp merge_preloads(_preload, {info, _, nil, left}, {info, take, query, right}), 573 do: {info, take, query, left ++ right} 574 defp merge_preloads(_preload, {info, take, query, left}, {info, _, nil, right}), 575 do: {info, take, query, left ++ right} 576 defp merge_preloads(preload, {info, _, left, _}, {info, _, right, _}) do 577 raise ArgumentError, "cannot preload `#{preload}` as it has been supplied more than once " <> 578 "with different queries: #{inspect left} and #{inspect right}" 579 end 580 581 defp association_or_embed!(schema, preload) do 582 schema.__schema__(:association, preload) || schema.__schema__(:embed, preload) || 583 raise ArgumentError, "schema #{inspect schema} does not have association or embed #{inspect preload}#{maybe_module(preload)}" 584 end 585 586 defp maybe_module(assoc) do 587 case Atom.to_string(assoc) do 588 "Elixir." <> _ -> 589 " (if you were trying to pass a schema as a query to preload, " <> 590 "you have to explicitly convert it to a query by doing `from x in #{inspect assoc}` " <> 591 "or by calling Ecto.Queryable.to_query/1)" 592 593 _ -> 594 "" 595 end 596 end 597 598 defp filter_and_reraise(exception, stacktrace) do 599 reraise exception, Enum.reject(stacktrace, &match?({__MODULE__, _, _, _}, &1)) 600 end 601 end