queryable.ex (16586B)
1 defmodule Ecto.Repo.Queryable do 2 @moduledoc false 3 4 alias Ecto.Queryable 5 alias Ecto.Query 6 alias Ecto.Query.Planner 7 alias Ecto.Query.SelectExpr 8 9 import Ecto.Query.Planner, only: [attach_prefix: 2] 10 11 require Ecto.Query 12 13 def all(name, queryable, tuplet) do 14 query = 15 queryable 16 |> Ecto.Queryable.to_query() 17 |> Ecto.Query.Planner.ensure_select(true) 18 19 execute(:all, name, query, tuplet) |> elem(1) 20 end 21 22 def stream(_name, queryable, {adapter_meta, opts}) do 23 %{adapter: adapter, cache: cache, repo: repo} = adapter_meta 24 25 query = 26 queryable 27 |> Ecto.Queryable.to_query() 28 |> Ecto.Query.Planner.ensure_select(true) 29 30 {query, opts} = repo.prepare_query(:stream, query, opts) 31 query = attach_prefix(query, opts) 32 33 {query_meta, prepared, cast_params, dump_params} = 34 Planner.query(query, :all, cache, adapter, 0) 35 36 opts = Keyword.put(opts, :cast_params, cast_params) 37 38 case query_meta do 39 %{select: nil} -> 40 adapter_meta 41 |> adapter.stream(query_meta, prepared, dump_params, opts) 42 |> Stream.flat_map(fn {_, nil} -> [] end) 43 44 %{select: select, preloads: preloads} -> 45 %{ 46 assocs: assocs, 47 preprocess: preprocess, 48 postprocess: postprocess, 49 take: take, 50 from: from 51 } = select 52 53 if preloads != [] or assocs != [] do 54 raise Ecto.QueryError, query: query, message: "preloads are not supported on streams" 55 end 56 57 preprocessor = preprocessor(from, preprocess, adapter) 58 stream = adapter.stream(adapter_meta, query_meta, prepared, dump_params, opts) 59 postprocessor = postprocessor(from, postprocess, take, adapter) 60 61 stream 62 |> Stream.flat_map(fn {_, rows} -> rows end) 63 |> Stream.map(preprocessor) 64 |> Stream.map(postprocessor) 65 end 66 end 67 68 def get(name, queryable, id, opts) do 69 one(name, query_for_get(queryable, id), opts) 70 end 71 72 def get!(name, queryable, id, opts) do 73 one!(name, query_for_get(queryable, id), opts) 74 end 75 76 def get_by(name, queryable, clauses, opts) do 77 one(name, query_for_get_by(queryable, clauses), opts) 78 end 79 80 def get_by!(name, queryable, clauses, opts) do 81 one!(name, query_for_get_by(queryable, clauses), opts) 82 end 83 84 def reload(name, [head | _] = structs, opts) when is_list(structs) do 85 results = all(name, query_for_reload(structs), opts) 86 87 [pk] = head.__struct__.__schema__(:primary_key) 88 89 for struct <- structs do 90 struct_pk = Map.fetch!(struct, pk) 91 Enum.find(results, &Map.fetch!(&1, pk) == struct_pk) 92 end 93 end 94 95 def reload(name, struct, opts) do 96 one(name, query_for_reload([struct]), opts) 97 end 98 99 def reload!(name, [head | _] = structs, opts) when is_list(structs) do 100 query = query_for_reload(structs) 101 results = all(name, query, opts) 102 103 [pk] = head.__struct__.__schema__(:primary_key) 104 105 for struct <- structs do 106 struct_pk = Map.fetch!(struct, pk) 107 Enum.find(results, &Map.fetch!(&1, pk) == struct_pk) || raise "could not reload #{inspect(struct)}, maybe it doesn't exist or was deleted" 108 end 109 end 110 111 def reload!(name, struct, opts) do 112 query = query_for_reload([struct]) 113 one!(name, query, opts) 114 end 115 116 def aggregate(name, queryable, aggregate, opts) do 117 one!(name, query_for_aggregate(queryable, aggregate), opts) 118 end 119 120 def aggregate(name, queryable, aggregate, field, opts) do 121 one!(name, query_for_aggregate(queryable, aggregate, field), opts) 122 end 123 124 def exists?(name, queryable, opts) do 125 queryable = 126 Query.exclude(queryable, :select) 127 |> Query.exclude(:preload) 128 |> Query.exclude(:order_by) 129 |> Query.exclude(:distinct) 130 |> Query.select(1) 131 |> Query.limit(1) 132 |> rewrite_combinations() 133 134 case all(name, queryable, opts) do 135 [1] -> true 136 [] -> false 137 end 138 end 139 140 defp rewrite_combinations(%{combinations: []} = query), do: query 141 142 defp rewrite_combinations(%{combinations: combinations} = query) do 143 combinations = Enum.map(combinations, fn {type, query} -> 144 {type, query |> Query.exclude(:select) |> Query.select(1)} 145 end) 146 147 %{query | combinations: combinations} 148 end 149 150 def one(name, queryable, tuplet) do 151 case all(name, queryable, tuplet) do 152 [one] -> one 153 [] -> nil 154 other -> raise Ecto.MultipleResultsError, queryable: queryable, count: length(other) 155 end 156 end 157 158 def one!(name, queryable, tuplet) do 159 case all(name, queryable, tuplet) do 160 [one] -> one 161 [] -> raise Ecto.NoResultsError, queryable: queryable 162 other -> raise Ecto.MultipleResultsError, queryable: queryable, count: length(other) 163 end 164 end 165 166 def update_all(name, queryable, [], tuplet) do 167 update_all(name, queryable, tuplet) 168 end 169 170 def update_all(name, queryable, updates, tuplet) do 171 query = Query.from(queryable, update: ^updates) 172 update_all(name, query, tuplet) 173 end 174 175 defp update_all(name, queryable, tuplet) do 176 query = Ecto.Queryable.to_query(queryable) 177 execute(:update_all, name, query, tuplet) 178 end 179 180 def delete_all(name, queryable, tuplet) do 181 query = Ecto.Queryable.to_query(queryable) 182 execute(:delete_all, name, query, tuplet) 183 end 184 185 @doc """ 186 Load structs from query. 187 """ 188 def struct_load!([{field, type} | types], [value | values], acc, all_nil?, struct, adapter) do 189 all_nil? = all_nil? and value == nil 190 value = load!(type, value, field, struct, adapter) 191 struct_load!(types, values, [{field, value} | acc], all_nil?, struct, adapter) 192 end 193 194 def struct_load!([], values, _acc, true, _struct, _adapter) do 195 {nil, values} 196 end 197 198 def struct_load!([], values, acc, false, struct, _adapter) do 199 {Map.merge(struct, Map.new(acc)), values} 200 end 201 202 ## Helpers 203 204 defp execute(operation, name, query, {adapter_meta, opts} = tuplet) do 205 %{adapter: adapter, cache: cache, repo: repo} = adapter_meta 206 207 {query, opts} = repo.prepare_query(operation, query, opts) 208 query = attach_prefix(query, opts) 209 210 {query_meta, prepared, cast_params, dump_params} = 211 Planner.query(query, operation, cache, adapter, 0) 212 213 opts = Keyword.put(opts, :cast_params, cast_params) 214 215 case query_meta do 216 %{select: nil} -> 217 adapter.execute(adapter_meta, query_meta, prepared, dump_params, opts) 218 219 %{select: select, sources: sources, preloads: preloads} -> 220 %{ 221 preprocess: preprocess, 222 postprocess: postprocess, 223 take: take, 224 assocs: assocs, 225 from: from 226 } = select 227 228 preprocessor = preprocessor(from, preprocess, adapter) 229 {count, rows} = adapter.execute(adapter_meta, query_meta, prepared, dump_params, opts) 230 postprocessor = postprocessor(from, postprocess, take, adapter) 231 232 {count, 233 rows 234 |> Ecto.Repo.Assoc.query(assocs, sources, preprocessor) 235 |> Ecto.Repo.Preloader.query(name, preloads, take, postprocessor, tuplet)} 236 end 237 end 238 239 defp preprocessor({_, {:source, {source, schema}, prefix, types}}, preprocess, adapter) do 240 struct = Ecto.Schema.Loader.load_struct(schema, prefix, source) 241 242 fn row -> 243 {entry, rest} = struct_load!(types, row, [], false, struct, adapter) 244 preprocess(rest, preprocess, entry, adapter) 245 end 246 end 247 248 defp preprocessor({_, from}, preprocess, adapter) do 249 fn row -> 250 {entry, rest} = process(row, from, nil, adapter) 251 preprocess(rest, preprocess, entry, adapter) 252 end 253 end 254 255 defp preprocessor(:none, preprocess, adapter) do 256 fn row -> 257 preprocess(row, preprocess, nil, adapter) 258 end 259 end 260 261 defp preprocess(row, [], _from, _adapter) do 262 row 263 end 264 265 defp preprocess(row, [source | sources], from, adapter) do 266 {entry, rest} = process(row, source, from, adapter) 267 [entry | preprocess(rest, sources, from, adapter)] 268 end 269 270 defp postprocessor({:any, _}, postprocess, _take, adapter) do 271 fn [from | row] -> 272 row |> process(postprocess, from, adapter) |> elem(0) 273 end 274 end 275 276 defp postprocessor({:map, _}, postprocess, take, adapter) do 277 fn [from | row] -> 278 row |> process(postprocess, to_map(from, take), adapter) |> elem(0) 279 end 280 end 281 282 defp postprocessor(:none, postprocess, _take, adapter) do 283 fn row -> row |> process(postprocess, nil, adapter) |> elem(0) end 284 end 285 286 defp process(row, {:source, :from}, from, _adapter) do 287 {from, row} 288 end 289 290 defp process(row, {:source, {source, schema}, prefix, types}, _from, adapter) do 291 struct = Ecto.Schema.Loader.load_struct(schema, prefix, source) 292 struct_load!(types, row, [], true, struct, adapter) 293 end 294 295 defp process(row, {:merge, left, right}, from, adapter) do 296 {left, row} = process(row, left, from, adapter) 297 {right, row} = process(row, right, from, adapter) 298 299 data = 300 case {left, right} do 301 {%{__struct__: s}, %{__struct__: s}} -> 302 Map.merge(left, right) 303 304 {%{__struct__: left_struct}, %{__struct__: right_struct}} -> 305 raise ArgumentError, 306 "cannot merge structs of different types, " <> 307 "got: #{inspect(left_struct)} and #{inspect(right_struct)}" 308 309 {%{__struct__: name}, %{}} -> 310 for {key, _} <- right, not Map.has_key?(left, key) do 311 raise ArgumentError, "struct #{inspect(name)} does not have the key #{inspect(key)}" 312 end 313 314 Map.merge(left, right) 315 316 {%{}, %{}} -> 317 Map.merge(left, right) 318 319 {%{}, nil} -> 320 left 321 322 {_, %{}} -> 323 raise ArgumentError, 324 "cannot merge because the left side is not a map, got: #{inspect(left)}" 325 326 {%{}, _} -> 327 raise ArgumentError, 328 "cannot merge because the right side is not a map, got: #{inspect(right)}" 329 end 330 331 {data, row} 332 end 333 334 defp process(row, {:struct, struct, data, args}, from, adapter) do 335 case process(row, data, from, adapter) do 336 {%{__struct__: ^struct} = data, row} -> 337 process_update(data, args, row, from, adapter) 338 339 {data, _row} -> 340 raise BadStructError, struct: struct, term: data 341 end 342 end 343 344 defp process(row, {:struct, struct, args}, from, adapter) do 345 {fields, row} = process_kv(args, row, from, adapter) 346 347 case Map.merge(struct.__struct__(), Map.new(fields)) do 348 %{__meta__: %Ecto.Schema.Metadata{state: state} = metadata} = struct 349 when state != :loaded -> 350 {Map.replace!(struct, :__meta__, %{metadata | state: :loaded}), row} 351 352 map -> 353 {map, row} 354 end 355 end 356 357 defp process(row, {:map, data, args}, from, adapter) do 358 {data, row} = process(row, data, from, adapter) 359 process_update(data, args, row, from, adapter) 360 end 361 362 defp process(row, {:map, args}, from, adapter) do 363 {args, row} = process_kv(args, row, from, adapter) 364 {Map.new(args), row} 365 end 366 367 defp process(row, {:list, args}, from, adapter) do 368 process_args(args, row, from, adapter) 369 end 370 371 defp process(row, {:tuple, args}, from, adapter) do 372 {args, row} = process_args(args, row, from, adapter) 373 {List.to_tuple(args), row} 374 end 375 376 defp process([value | row], {:value, :any}, _from, _adapter) do 377 {value, row} 378 end 379 380 defp process([value | row], {:value, type}, _from, adapter) do 381 {load!(type, value, nil, nil, adapter), row} 382 end 383 384 defp process(row, value, _from, _adapter) 385 when is_binary(value) or is_number(value) or is_atom(value) do 386 {value, row} 387 end 388 389 defp process_update(data, args, row, from, adapter) do 390 {args, row} = process_kv(args, row, from, adapter) 391 data = Enum.reduce(args, data, fn {key, value}, acc -> %{acc | key => value} end) 392 {data, row} 393 end 394 395 defp process_args(args, row, from, adapter) do 396 Enum.map_reduce(args, row, fn arg, row -> 397 process(row, arg, from, adapter) 398 end) 399 end 400 401 defp process_kv(kv, row, from, adapter) do 402 Enum.map_reduce(kv, row, fn {key, value}, row -> 403 {key, row} = process(row, key, from, adapter) 404 {value, row} = process(row, value, from, adapter) 405 {{key, value}, row} 406 end) 407 end 408 409 @compile {:inline, load!: 5} 410 defp load!(type, value, field, struct, adapter) do 411 case Ecto.Type.adapter_load(adapter, type, value) do 412 {:ok, value} -> 413 value 414 415 :error -> 416 field = field && " for field #{inspect(field)}" 417 struct = struct && " in #{inspect(struct)}" 418 419 raise ArgumentError, 420 "cannot load `#{inspect(value)}` as type #{inspect(type)}#{field}#{struct}" 421 end 422 end 423 424 defp to_map(nil, _fields) do 425 nil 426 end 427 428 defp to_map(value, fields) when is_list(value) do 429 Enum.map(value, &to_map(&1, fields)) 430 end 431 432 defp to_map(value, fields) do 433 for field <- fields, into: %{} do 434 case field do 435 {k, v} -> {k, to_map(Map.fetch!(value, k), List.wrap(v))} 436 k -> {k, Map.fetch!(value, k)} 437 end 438 end 439 end 440 441 defp query_for_get(_queryable, nil) do 442 raise ArgumentError, "cannot perform Ecto.Repo.get/2 because the given value is nil" 443 end 444 445 defp query_for_get(queryable, id) do 446 query = Queryable.to_query(queryable) 447 schema = assert_schema!(query) 448 449 case schema.__schema__(:primary_key) do 450 [pk] -> 451 Query.from(x in query, where: field(x, ^pk) == ^id) 452 453 pks -> 454 raise ArgumentError, 455 "Ecto.Repo.get/2 requires the schema #{inspect(schema)} " <> 456 "to have exactly one primary key, got: #{inspect(pks)}" 457 end 458 end 459 460 defp query_for_get_by(queryable, clauses) do 461 Query.where(queryable, [], ^Enum.to_list(clauses)) 462 end 463 464 defp query_for_reload([head| _] = structs) do 465 assert_structs!(structs) 466 467 schema = head.__struct__ 468 prefix = head.__meta__.prefix 469 470 case schema.__schema__(:primary_key) do 471 [pk] -> 472 keys = Enum.map(structs, &get_pk!(&1, pk)) 473 query = Query.from(x in schema, where: field(x, ^pk) in ^keys) 474 %{query | prefix: prefix} 475 476 pks -> 477 raise ArgumentError, 478 "Ecto.Repo.reload/2 requires the schema #{inspect(schema)} " <> 479 "to have exactly one primary key, got: #{inspect(pks)}" 480 end 481 end 482 483 defp query_for_aggregate(queryable, aggregate) do 484 query = 485 case prepare_for_aggregate(queryable) do 486 %{distinct: nil, limit: nil, offset: nil, combinations: []} = query -> 487 %{query | order_bys: []} 488 489 query -> 490 query 491 |> Query.subquery() 492 |> Queryable.Ecto.SubQuery.to_query() 493 end 494 495 select = %SelectExpr{expr: {aggregate, [], []}, file: __ENV__.file, line: __ENV__.line} 496 %{query | select: select} 497 end 498 499 defp query_for_aggregate(queryable, aggregate, field) do 500 ast = field(0, field) 501 502 query = 503 case prepare_for_aggregate(queryable) do 504 %{distinct: nil, limit: nil, offset: nil, combinations: []} = query -> 505 %{query | order_bys: []} 506 507 query -> 508 select = %SelectExpr{expr: ast, file: __ENV__.file, line: __ENV__.line} 509 510 %{query | select: select} 511 |> Query.subquery() 512 |> Queryable.Ecto.SubQuery.to_query() 513 end 514 515 select = %SelectExpr{expr: {aggregate, [], [ast]}, file: __ENV__.file, line: __ENV__.line} 516 %{query | select: select} 517 end 518 519 defp prepare_for_aggregate(queryable) do 520 case %{Queryable.to_query(queryable) | preloads: [], assocs: []} do 521 %{group_bys: [_ | _]} = query -> 522 raise Ecto.QueryError, message: "cannot aggregate on query with group_by", query: query 523 524 %{} = query -> 525 query 526 end 527 end 528 529 defp field(ix, field) when is_integer(ix) and is_atom(field) do 530 {{:., [], [{:&, [], [ix]}, field]}, [], []} 531 end 532 533 defp assert_schema!(%{from: %{source: {_source, schema}}}) when schema != nil, do: schema 534 535 defp assert_schema!(query) do 536 raise Ecto.QueryError, 537 query: query, 538 message: "expected a from expression with a schema" 539 end 540 541 defp assert_structs!([head | _] = structs) when is_list(structs) do 542 unless Enum.all?(structs, &schema?/1) do 543 raise ArgumentError, "expected a struct or a list of structs, received #{inspect(structs)}" 544 end 545 546 unless Enum.all?(structs, &(&1.__struct__ == head.__struct__)) do 547 raise ArgumentError, "expected an homogenous list, received different struct types" 548 end 549 550 :ok 551 end 552 553 defp schema?(%{__meta__: _}), do: true 554 defp schema?(_), do: false 555 556 defp get_pk!(struct, pk) do 557 struct 558 |> Map.fetch!(pk) 559 |> case do 560 nil -> 561 raise ArgumentError, "Ecto.Repo.reload/2 expects existent structs, found a `nil` primary key" 562 key -> 563 key 564 end 565 end 566 end