planner.ex (74637B)
1 defmodule Ecto.Query.Planner do 2 # Normalizes a query and its parameters. 3 @moduledoc false 4 5 alias Ecto.Query.{BooleanExpr, DynamicExpr, FromExpr, JoinExpr, QueryExpr, SelectExpr} 6 7 if map_size(%Ecto.Query{}) != 21 do 8 raise "Ecto.Query match out of date in builder" 9 end 10 11 @parent_as __MODULE__ 12 @aggs ~w(count avg min max sum row_number rank dense_rank percent_rank cume_dist ntile lag lead first_value last_value nth_value)a 13 14 @doc """ 15 Converts a query to a list of joins. 16 17 The from is moved as last join with the where conditions as its "on" 18 in order to keep proper binding order. 19 """ 20 def query_to_joins(qual, source, %{wheres: wheres, joins: joins}, position) do 21 on = %QueryExpr{file: __ENV__.file, line: __ENV__.line, expr: true, params: []} 22 23 on = 24 Enum.reduce(wheres, on, fn %BooleanExpr{op: op, expr: expr, params: params}, acc -> 25 merge_expr_and_params(op, acc, expr, params) 26 end) 27 28 join = %JoinExpr{qual: qual, source: source, file: __ENV__.file, line: __ENV__.line, on: on} 29 last = length(joins) + position 30 31 mapping = fn 32 0 -> last 33 ix -> ix + position - 1 34 end 35 36 for {%{on: on} = join, ix} <- Enum.with_index(joins ++ [join]) do 37 %{join | on: rewrite_sources(on, mapping), ix: ix + position} 38 end 39 end 40 41 defp merge_expr_and_params(op, %QueryExpr{expr: left_expr, params: left_params} = struct, 42 right_expr, right_params) do 43 right_expr = Ecto.Query.Builder.bump_interpolations(right_expr, left_params) 44 %{struct | expr: merge_expr(op, left_expr, right_expr), params: left_params ++ right_params} 45 end 46 47 defp merge_expr(_op, left, true), do: left 48 defp merge_expr(_op, true, right), do: right 49 defp merge_expr(op, left, right), do: {op, [], [left, right]} 50 51 @doc """ 52 Rewrites the given query expression sources using the given mapping. 53 """ 54 def rewrite_sources(%{expr: expr, params: params} = part, mapping) do 55 expr = 56 Macro.prewalk expr, fn 57 %Ecto.Query.Tagged{type: type, tag: tag} = tagged -> 58 %{tagged | type: rewrite_type(type, mapping), tag: rewrite_type(tag, mapping)} 59 {:&, meta, [ix]} -> 60 {:&, meta, [mapping.(ix)]} 61 other -> 62 other 63 end 64 65 params = 66 Enum.map params, fn 67 {val, type} -> 68 {val, rewrite_type(type, mapping)} 69 val -> 70 val 71 end 72 73 %{part | expr: expr, params: params} 74 end 75 76 defp rewrite_type({composite, {ix, field}}, mapping) when is_integer(ix) do 77 {composite, {mapping.(ix), field}} 78 end 79 80 defp rewrite_type({ix, field}, mapping) when is_integer(ix) do 81 {mapping.(ix), field} 82 end 83 84 defp rewrite_type(other, _mapping) do 85 other 86 end 87 88 @doc """ 89 Define the query cache table. 90 """ 91 def new_query_cache(atom_name) do 92 :ets.new(atom_name || __MODULE__, [:set, :public, read_concurrency: true]) 93 end 94 95 @doc """ 96 Plans the query for execution. 97 98 Planning happens in multiple steps: 99 100 1. First the query is planned by retrieving 101 its cache key, casting and merging parameters 102 103 2. Then a cache lookup is done, if the query is 104 cached, we are done 105 106 3. If there is no cache, we need to actually 107 normalize and validate the query, asking the 108 adapter to prepare it 109 110 4. The query is sent to the adapter to be generated 111 112 ## Cache 113 114 All entries in the query, except the preload and sources 115 field, should be part of the cache key. 116 117 The cache value is the compiled query by the adapter 118 along-side the select expression. 119 """ 120 def query(query, operation, cache, adapter, counter) do 121 {query, params, key} = plan(query, operation, adapter) 122 {cast_params, dump_params} = Enum.unzip(params) 123 query_with_cache(key, query, operation, cache, adapter, counter, cast_params, dump_params) 124 end 125 126 defp query_with_cache(key, query, operation, cache, adapter, counter, cast_params, dump_params) do 127 case query_lookup(key, query, operation, cache, adapter, counter) do 128 {_, select, prepared} -> 129 {build_meta(query, select), {:nocache, prepared}, cast_params, dump_params} 130 {_key, :cached, select, cached} -> 131 update = &cache_update(cache, key, &1) 132 reset = &cache_reset(cache, key, &1) 133 {build_meta(query, select), {:cached, update, reset, cached}, cast_params, dump_params} 134 {_key, :cache, select, prepared} -> 135 update = &cache_update(cache, key, &1) 136 {build_meta(query, select), {:cache, update, prepared}, cast_params, dump_params} 137 end 138 end 139 140 defp query_lookup(:nocache, query, operation, _cache, adapter, counter) do 141 query_without_cache(query, operation, adapter, counter) 142 end 143 144 defp query_lookup(key, query, operation, cache, adapter, counter) do 145 case :ets.lookup(cache, key) do 146 [term] -> term 147 [] -> query_prepare(query, operation, adapter, counter, cache, key) 148 end 149 end 150 151 defp query_prepare(query, operation, adapter, counter, cache, key) do 152 case query_without_cache(query, operation, adapter, counter) do 153 {:cache, select, prepared} -> 154 cache_insert(cache, key, {key, :cache, select, prepared}) 155 {:nocache, _, _} = nocache -> 156 nocache 157 end 158 end 159 160 defp cache_insert(cache, key, elem) do 161 case :ets.insert_new(cache, elem) do 162 true -> 163 elem 164 false -> 165 [elem] = :ets.lookup(cache, key) 166 elem 167 end 168 end 169 170 defp cache_update(cache, key, cached) do 171 _ = :ets.update_element(cache, key, [{2, :cached}, {4, cached}]) 172 :ok 173 end 174 175 defp cache_reset(cache, key, prepared) do 176 _ = :ets.update_element(cache, key, [{2, :cache}, {4, prepared}]) 177 :ok 178 end 179 180 defp query_without_cache(query, operation, adapter, counter) do 181 {query, select} = normalize(query, operation, adapter, counter) 182 {cache, prepared} = adapter.prepare(operation, query) 183 {cache, select, prepared} 184 end 185 186 defp build_meta(%{sources: sources, preloads: preloads}, select) do 187 %{select: select, preloads: preloads, sources: sources} 188 end 189 190 @doc """ 191 Prepares the query for cache. 192 193 This means all the parameters from query expressions are 194 merged into a single value and their entries are pruned 195 from the query. 196 197 This function is called by the backend before invoking 198 any cache mechanism. 199 """ 200 @spec plan(Ecto.Query.t, atom, module) :: {planned_query :: Ecto.Query.t, parameters :: list, cache_key :: any} 201 def plan(query, operation, adapter) do 202 query 203 |> plan_sources(adapter) 204 |> plan_assocs() 205 |> plan_combinations(adapter) 206 |> plan_ctes(adapter) 207 |> plan_wheres(adapter) 208 |> plan_select(adapter) 209 |> plan_cache(operation, adapter) 210 rescue 211 e -> 212 # Reraise errors so we ignore the planner inner stacktrace 213 filter_and_reraise e, __STACKTRACE__ 214 end 215 216 @doc """ 217 Prepare all sources, by traversing and expanding from, joins, subqueries. 218 """ 219 def plan_sources(query, adapter) do 220 {from, source} = plan_from(query, adapter) 221 222 # Set up the initial source so we can refer 223 # to the parent in subqueries in joins 224 query = %{query | sources: {source}} 225 226 {joins, sources, tail_sources} = plan_joins(query, [source], length(query.joins), adapter) 227 228 %{query | from: from, 229 joins: joins |> Enum.reverse, 230 sources: (tail_sources ++ sources) |> Enum.reverse |> List.to_tuple()} 231 end 232 233 defp plan_from(%{from: nil} = query, _adapter) do 234 error!(query, "query must have a from expression") 235 end 236 237 defp plan_from(%{from: %{source: {:fragment, _, _}}, preloads: preloads, assocs: assocs} = query, _adapter) 238 when assocs != [] or preloads != [] do 239 error!(query, "cannot preload associations with a fragment source") 240 end 241 242 defp plan_from(%{from: from} = query, adapter) do 243 plan_source(query, from, adapter) 244 end 245 246 defp plan_source(query, %{source: %Ecto.SubQuery{} = subquery, prefix: prefix} = expr, adapter) do 247 subquery = plan_subquery(subquery, query, prefix, adapter, true) 248 {%{expr | source: subquery}, subquery} 249 end 250 251 defp plan_source(query, %{source: {nil, schema}} = expr, _adapter) 252 when is_atom(schema) and schema != nil do 253 source = schema.__schema__(:source) 254 prefix = plan_source_schema_prefix(expr, schema) || query.prefix 255 {%{expr | source: {source, schema}}, {source, schema, prefix}} 256 end 257 258 defp plan_source(query, %{source: {source, schema}, prefix: prefix} = expr, _adapter) 259 when is_binary(source) and is_atom(schema), 260 do: {expr, {source, schema, prefix || query.prefix}} 261 262 defp plan_source(_query, %{source: {:fragment, _, _} = source, prefix: nil} = expr, _adapter), 263 do: {expr, source} 264 265 defp plan_source(query, %{source: {:fragment, _, _}, prefix: prefix} = expr, _adapter), 266 do: error!(query, expr, "cannot set prefix: #{inspect(prefix)} option for fragment sources") 267 268 defp plan_subquery(subquery, query, prefix, adapter, source?) do 269 %{query: inner_query} = subquery 270 271 inner_query = %{ 272 inner_query 273 | prefix: prefix || subquery.query.prefix || query.prefix, 274 aliases: Map.put(inner_query.aliases, @parent_as, query) 275 } 276 277 {inner_query, params, key} = plan(inner_query, :all, adapter) 278 assert_no_subquery_assocs!(inner_query) 279 280 {inner_query, select} = 281 inner_query 282 |> ensure_select(true) 283 |> normalize_subquery_select(adapter, source?) 284 285 {_, inner_query} = pop_in(inner_query.aliases[@parent_as]) 286 %{subquery | query: inner_query, params: params, cache: key, select: select} 287 rescue 288 e -> raise Ecto.SubQueryError, query: query, exception: e 289 end 290 291 # The prefix for form are computed upfront, but not for joins 292 defp plan_source_schema_prefix(%FromExpr{prefix: prefix}, _schema), 293 do: prefix 294 295 defp plan_source_schema_prefix(%JoinExpr{prefix: prefix}, schema), 296 do: prefix || schema.__schema__(:prefix) 297 298 defp assert_no_subquery_assocs!(%{assocs: assocs, preloads: preloads} = query) 299 when assocs != [] or preloads != [] do 300 error!(query, "cannot preload associations in subquery") 301 end 302 303 defp assert_no_subquery_assocs!(query) do 304 query 305 end 306 307 defp normalize_subquery_select(query, adapter, source?) do 308 {schema_or_source, expr, %{select: select} = query} = rewrite_subquery_select_expr(query, source?) 309 {expr, _} = prewalk(expr, :select, query, select, 0, adapter) 310 {{:map, types}, _fields, _from} = collect_fields(expr, [], :never, query, select.take, true, %{}) 311 {query, subquery_source(schema_or_source, types)} 312 end 313 314 defp subquery_source(nil, types), do: {:map, types} 315 defp subquery_source(name, types) when is_atom(name), do: {:struct, name, types} 316 defp subquery_source({:source, schema, prefix, types}, only) do 317 types = Enum.map(only, fn {field, {:value, type}} -> {field, Keyword.get(types, field, type)} end) 318 {:source, schema, prefix, types} 319 end 320 321 defp rewrite_subquery_select_expr(%{select: select} = query, source?) do 322 %{expr: expr, take: take} = select 323 324 case subquery_select(expr, take, query) do 325 {schema_or_source, fields} -> 326 expr = {:%{}, [], fields} 327 {schema_or_source, expr, put_in(query.select.expr, expr)} 328 329 :error when source? -> 330 error!(query, "subquery/cte must select a source (t), a field (t.field) or a map, got: `#{Macro.to_string(expr)}`") 331 332 :error -> 333 expr = {:%{}, [], [result: expr]} 334 {nil, expr, put_in(query.select.expr, expr)} 335 end 336 end 337 338 defp subquery_select({:merge, _, [left, right]}, take, query) do 339 {left_struct, left_fields} = subquery_select(left, take, query) 340 {right_struct, right_fields} = subquery_select(right, take, query) 341 {left_struct || right_struct, Keyword.merge(left_fields, right_fields)} 342 end 343 defp subquery_select({:%, _, [name, map]}, take, query) do 344 {_, fields} = subquery_select(map, take, query) 345 {name, fields} 346 end 347 defp subquery_select({:%{}, _, [{:|, _, [{:&, [], [ix]}, pairs]}]} = expr, take, query) do 348 assert_subquery_fields!(query, expr, pairs) 349 drop = Map.new(pairs, fn {key, _} -> {key, nil} end) 350 {source, _} = source_take!(:select, query, take, ix, ix, drop) 351 352 # In case of map updates, we need to remove duplicated fields 353 # at query time because we use the field names as aliases and 354 # duplicate aliases will lead to invalid queries. 355 kept_keys = subquery_source_fields(source) -- Keyword.keys(pairs) 356 {keep_source_or_struct(source), subquery_fields(kept_keys, ix) ++ pairs} 357 end 358 defp subquery_select({:%{}, _, pairs} = expr, _take, query) do 359 assert_subquery_fields!(query, expr, pairs) 360 {nil, pairs} 361 end 362 defp subquery_select({:&, _, [ix]}, take, query) do 363 {source, _} = source_take!(:select, query, take, ix, ix, %{}) 364 fields = subquery_source_fields(source) 365 {keep_source_or_struct(source), subquery_fields(fields, ix)} 366 end 367 defp subquery_select({{:., _, [{:&, _, [_]}, field]}, _, []} = expr, _take, _query) do 368 {nil, [{field, expr}]} 369 end 370 defp subquery_select(_expr, _take, _query) do 371 :error 372 end 373 374 defp subquery_fields(fields, ix) do 375 for field <- fields do 376 {field, {{:., [], [{:&, [], [ix]}, field]}, [], []}} 377 end 378 end 379 380 defp keep_source_or_struct({:source, _, _, _} = source), do: source 381 defp keep_source_or_struct({:struct, name, _}), do: name 382 defp keep_source_or_struct(_), do: nil 383 384 defp subquery_source_fields({:source, _, _, types}), do: Keyword.keys(types) 385 defp subquery_source_fields({:struct, _, types}), do: Keyword.keys(types) 386 defp subquery_source_fields({:map, types}), do: Keyword.keys(types) 387 388 defp subquery_type_for({:source, _, _, fields}, field), do: Keyword.fetch(fields, field) 389 defp subquery_type_for({:struct, _name, types}, field), do: subquery_type_for_value(types, field) 390 defp subquery_type_for({:map, types}, field), do: subquery_type_for_value(types, field) 391 392 defp subquery_type_for_value(types, field) do 393 case Keyword.fetch(types, field) do 394 {:ok, {:value, type}} -> {:ok, type} 395 {:ok, _} -> {:ok, :any} 396 :error -> :error 397 end 398 end 399 400 defp assert_subquery_fields!(query, expr, pairs) do 401 Enum.each(pairs, fn 402 {key, _} when not is_atom(key) -> 403 error!(query, "only atom keys are allowed when selecting a map in subquery, got: `#{Macro.to_string(expr)}`") 404 405 {key, value} -> 406 if valid_subquery_value?(value) do 407 {key, value} 408 else 409 error!(query, "atoms, structs, maps, lists, tuples and sources are not allowed as map values in subquery, got: `#{Macro.to_string(expr)}`") 410 end 411 end) 412 end 413 414 defp valid_subquery_value?({_, _}), do: false 415 defp valid_subquery_value?(args) when is_list(args), do: false 416 defp valid_subquery_value?({container, _, args}) 417 when container in [:{}, :%{}, :&, :%] and is_list(args), do: false 418 defp valid_subquery_value?(nil), do: true 419 defp valid_subquery_value?(arg) when is_atom(arg), do: is_boolean(arg) 420 defp valid_subquery_value?(_), do: true 421 422 defp plan_joins(query, sources, offset, adapter) do 423 plan_joins(query.joins, query, [], sources, [], 1, offset, adapter) 424 end 425 426 defp plan_joins([%JoinExpr{assoc: {ix, assoc}, qual: qual, on: on, prefix: prefix} = join|t], 427 query, joins, sources, tail_sources, counter, offset, adapter) do 428 source = fetch_source!(sources, ix) 429 schema = schema_for_association_join!(query, join, source) 430 refl = schema.__schema__(:association, assoc) 431 432 unless refl do 433 error! query, join, "could not find association `#{assoc}` on schema #{inspect schema}" 434 end 435 436 # If we have the following join: 437 # 438 # from p in Post, 439 # join: p in assoc(p, :comments) 440 # 441 # The callback below will return a query that contains only 442 # joins in a way it starts with the Post and ends in the 443 # Comment. 444 # 445 # This means we need to rewrite the joins below to properly 446 # shift the &... identifier in a way that: 447 # 448 # &0 -> becomes assoc ix 449 # &LAST_JOIN -> becomes counter 450 # 451 # All values in the middle should be shifted by offset, 452 # all values after join are already correct. 453 child = refl.__struct__.joins_query(refl) 454 455 # Rewrite prefixes: 456 # 1. the child query has the parent query prefix 457 # (note the child query should NEVER have a prefix) 458 # 2. from and joins can have their prefixes explicitly 459 # overwritten by the join prefix 460 child = rewrite_prefix(child, query.prefix) 461 child = update_in child.from, &rewrite_prefix(&1, prefix) 462 child = update_in child.joins, &Enum.map(&1, fn join -> rewrite_prefix(join, prefix) end) 463 464 last_ix = length(child.joins) 465 source_ix = counter 466 467 {_, child_from_source} = plan_source(child, child.from, adapter) 468 469 {child_joins, child_sources, child_tail} = 470 plan_joins(child, [child_from_source], offset + last_ix - 1, adapter) 471 472 # Rewrite joins indexes as mentioned above 473 child_joins = Enum.map(child_joins, &rewrite_join(&1, qual, ix, last_ix, source_ix, offset)) 474 475 # Drop the last resource which is the association owner (it is reversed) 476 child_sources = Enum.drop(child_sources, -1) 477 478 [current_source|child_sources] = child_sources 479 child_sources = child_tail ++ child_sources 480 481 plan_joins(t, query, attach_on(child_joins, on) ++ joins, [current_source|sources], 482 child_sources ++ tail_sources, counter + 1, offset + length(child_sources), adapter) 483 end 484 485 defp plan_joins([%JoinExpr{source: %Ecto.Query{} = join_query, qual: qual, on: on, prefix: prefix} = join|t], 486 query, joins, sources, tail_sources, counter, offset, adapter) do 487 case join_query do 488 %{order_bys: [], limit: nil, offset: nil, group_bys: [], joins: [], 489 havings: [], preloads: [], assocs: [], distinct: nil, lock: nil} -> 490 join_query = rewrite_prefix(join_query, query.prefix) 491 from = rewrite_prefix(join_query.from, prefix) 492 {from, source} = plan_source(join_query, from, adapter) 493 [join] = attach_on(query_to_joins(qual, from.source, join_query, counter), on) 494 plan_joins(t, query, [join|joins], [source|sources], tail_sources, counter + 1, offset, adapter) 495 _ -> 496 error! query, join, """ 497 invalid query was interpolated in a join. 498 If you want to pass a query to a join, you must either: 499 500 1. Make sure the query only has `where` conditions (which will be converted to ON clauses) 501 2. Or wrap the query in a subquery by calling subquery(query) 502 """ 503 end 504 end 505 506 defp plan_joins([%JoinExpr{} = join|t], 507 query, joins, sources, tail_sources, counter, offset, adapter) do 508 {join, source} = plan_source(query, %{join | ix: counter}, adapter) 509 plan_joins(t, query, [join|joins], [source|sources], tail_sources, counter + 1, offset, adapter) 510 end 511 512 defp plan_joins([], _query, joins, sources, tail_sources, _counter, _offset, _adapter) do 513 {joins, sources, tail_sources} 514 end 515 516 defp attach_on([%{on: on} = h | t], %{expr: expr, params: params}) do 517 [%{h | on: merge_expr_and_params(:and, on, expr, params)} | t] 518 end 519 520 defp rewrite_prefix(expr, nil), do: expr 521 defp rewrite_prefix(%{prefix: nil} = expr, prefix), do: %{expr | prefix: prefix} 522 defp rewrite_prefix(expr, _prefix), do: expr 523 524 defp rewrite_join(%{on: on, ix: join_ix} = join, qual, ix, last_ix, source_ix, inc_ix) do 525 expr = Macro.prewalk on.expr, fn 526 {:&, meta, [join_ix]} -> 527 {:&, meta, [rewrite_ix(join_ix, ix, last_ix, source_ix, inc_ix)]} 528 expr = %Ecto.Query.Tagged{type: {type_ix, type}} when is_integer(type_ix) -> 529 %{expr | type: {rewrite_ix(type_ix, ix, last_ix, source_ix, inc_ix), type}} 530 other -> 531 other 532 end 533 534 params = Enum.map(on.params, &rewrite_param_ix(&1, ix, last_ix, source_ix, inc_ix)) 535 536 %{join | on: %{on | expr: expr, params: params}, qual: qual, 537 ix: rewrite_ix(join_ix, ix, last_ix, source_ix, inc_ix)} 538 end 539 540 # We need to replace the source by the one from the assoc 541 defp rewrite_ix(0, ix, _last_ix, _source_ix, _inc_x), do: ix 542 543 # The last entry will have the current source index 544 defp rewrite_ix(last_ix, _ix, last_ix, source_ix, _inc_x), do: source_ix 545 546 # All above last are already correct 547 defp rewrite_ix(join_ix, _ix, last_ix, _source_ix, _inc_ix) when join_ix > last_ix, do: join_ix 548 549 # All others need to be incremented by the offset sources 550 defp rewrite_ix(join_ix, _ix, _last_ix, _source_ix, inc_ix), do: join_ix + inc_ix 551 552 defp rewrite_param_ix({value, {upper, {type_ix, field}}}, ix, last_ix, source_ix, inc_ix) when is_integer(type_ix) do 553 {value, {upper, {rewrite_ix(type_ix, ix, last_ix, source_ix, inc_ix), field}}} 554 end 555 556 defp rewrite_param_ix({value, {type_ix, field}}, ix, last_ix, source_ix, inc_ix) when is_integer(type_ix) do 557 {value, {rewrite_ix(type_ix, ix, last_ix, source_ix, inc_ix), field}} 558 end 559 560 defp rewrite_param_ix(param, _, _, _, _), do: param 561 562 defp fetch_source!(sources, ix) when is_integer(ix) do 563 case Enum.reverse(sources) |> Enum.fetch(ix) do 564 {:ok, source} -> source 565 :error -> raise ArgumentError, "could not find a source with index `#{ix}` in `#{inspect sources}" 566 end 567 end 568 569 defp fetch_source!(_, ix) do 570 raise ArgumentError, "invalid binding index: `#{inspect ix}` (check if you're binding using a valid :as atom)" 571 end 572 573 defp schema_for_association_join!(query, join, source) do 574 case source do 575 {:fragment, _, _} -> 576 error! query, join, "cannot perform association joins on fragment sources" 577 578 {source, nil, _} -> 579 error! query, join, "cannot perform association join on #{inspect source} " <> 580 "because it does not have a schema" 581 582 {_, schema, _} -> 583 schema 584 585 %Ecto.SubQuery{select: {:source, {_, schema}, _, _}} -> 586 schema 587 588 %Ecto.SubQuery{select: {:struct, schema, _}} -> 589 schema 590 591 %Ecto.SubQuery{} -> 592 error! query, join, "can only perform association joins on subqueries " <> 593 "that return a source with schema in select" 594 595 _ -> 596 error! query, join, "can only perform association joins on sources with a schema" 597 end 598 end 599 600 @spec plan_wheres(Ecto.Query.t, module) :: Ecto.Query.t 601 defp plan_wheres(query, adapter) do 602 wheres = 603 Enum.map(query.wheres, fn 604 %{subqueries: []} = where -> 605 where 606 607 %{subqueries: subqueries} = where -> 608 %{where | subqueries: Enum.map(subqueries, &plan_subquery(&1, query, nil, adapter, false))} 609 end) 610 611 %{query | wheres: wheres} 612 end 613 614 @spec plan_select(Ecto.Query.t, module) :: Ecto.Query.t 615 defp plan_select(query, adapter) do 616 case query do 617 %{select: %{subqueries: [_ | _] = subqueries}} -> 618 subqueries = Enum.map(subqueries, &plan_subquery(&1, query, nil, adapter, false)) 619 put_in(query.select.subqueries, subqueries) 620 621 query -> query 622 end 623 end 624 625 @doc """ 626 Prepare the parameters by merging and casting them according to sources. 627 """ 628 def plan_cache(query, operation, adapter) do 629 {query, params, cache} = traverse_cache(query, operation, {[], []}, adapter) 630 {query, Enum.reverse(params), cache} 631 end 632 633 defp traverse_cache(query, operation, cache_params, adapter) do 634 fun = &{&3, merge_cache(&1, &2, &3, &4, operation, adapter)} 635 {query, {cache, params}} = traverse_exprs(query, operation, cache_params, fun) 636 {query, params, finalize_cache(query, operation, cache)} 637 end 638 639 defp merge_cache(:from, query, from, {cache, params}, _operation, adapter) do 640 {key, params} = source_cache(from, params) 641 {params, source_cacheable?} = cast_and_merge_params(:from, query, from, params, adapter) 642 {merge_cache({:from, key, from.hints}, cache, source_cacheable? and key != :nocache), params} 643 end 644 645 defp merge_cache(kind, query, expr, {cache, params}, _operation, adapter) 646 when kind in ~w(select distinct limit offset)a do 647 if expr do 648 {params, cacheable?} = cast_and_merge_params(kind, query, expr, params, adapter) 649 {merge_cache({kind, expr_to_cache(expr)}, cache, cacheable?), params} 650 else 651 {cache, params} 652 end 653 end 654 655 defp merge_cache(kind, query, exprs, {cache, params}, _operation, adapter) 656 when kind in ~w(where update group_by having order_by)a do 657 {expr_cache, {params, cacheable?}} = 658 Enum.map_reduce exprs, {params, true}, fn expr, {params, cacheable?} -> 659 {params, current_cacheable?} = cast_and_merge_params(kind, query, expr, params, adapter) 660 {expr_to_cache(expr), {params, cacheable? and current_cacheable?}} 661 end 662 663 case expr_cache do 664 [] -> {cache, params} 665 _ -> {merge_cache({kind, expr_cache}, cache, cacheable?), params} 666 end 667 end 668 669 defp merge_cache(:join, query, exprs, {cache, params}, _operation, adapter) do 670 {expr_cache, {params, cacheable?}} = 671 Enum.map_reduce exprs, {params, true}, fn 672 %JoinExpr{on: on, qual: qual, hints: hints} = join, {params, cacheable?} -> 673 {key, params} = source_cache(join, params) 674 {params, join_cacheable?} = cast_and_merge_params(:join, query, join, params, adapter) 675 {params, on_cacheable?} = cast_and_merge_params(:join, query, on, params, adapter) 676 {{qual, key, on.expr, hints}, 677 {params, cacheable? and join_cacheable? and on_cacheable? and key != :nocache}} 678 end 679 680 case expr_cache do 681 [] -> {cache, params} 682 _ -> {merge_cache({:join, expr_cache}, cache, cacheable?), params} 683 end 684 end 685 686 defp merge_cache(:windows, query, exprs, {cache, params}, _operation, adapter) do 687 {expr_cache, {params, cacheable?}} = 688 Enum.map_reduce exprs, {params, true}, fn {key, expr}, {params, cacheable?} -> 689 {params, current_cacheable?} = cast_and_merge_params(:windows, query, expr, params, adapter) 690 {{key, expr_to_cache(expr)}, {params, cacheable? and current_cacheable?}} 691 end 692 693 case expr_cache do 694 [] -> {cache, params} 695 _ -> {merge_cache({:windows, expr_cache}, cache, cacheable?), params} 696 end 697 end 698 699 defp merge_cache(:combination, _query, combinations, cache_and_params, operation, adapter) do 700 # In here we add each combination as its own entry in the cache key. 701 # We could group them to avoid multiple keys, but since they are uncommon, we keep it simple. 702 Enum.reduce combinations, cache_and_params, fn {modifier, query}, {cache, params} -> 703 {_, params, inner_cache} = traverse_cache(query, operation, {[], params}, adapter) 704 {merge_cache({modifier, inner_cache}, cache, inner_cache != :nocache), params} 705 end 706 end 707 708 defp merge_cache(:with_cte, _query, nil, cache_and_params, _operation, _adapter) do 709 cache_and_params 710 end 711 712 defp merge_cache(:with_cte, query, with_expr, cache_and_params, _operation, adapter) do 713 %{queries: queries, recursive: recursive} = with_expr 714 key = if recursive, do: :recursive_cte, else: :non_recursive_cte 715 716 # In here we add each cte as its own entry in the cache key. 717 # We could group them to avoid multiple keys, but since they are uncommon, we keep it simple. 718 Enum.reduce queries, cache_and_params, fn 719 {name, %Ecto.Query{} = query}, {cache, params} -> 720 {_, params, inner_cache} = traverse_cache(query, :all, {[], params}, adapter) 721 {merge_cache({key, name, inner_cache}, cache, inner_cache != :nocache), params} 722 723 {name, %Ecto.Query.QueryExpr{} = query_expr}, {cache, params} -> 724 {params, cacheable?} = cast_and_merge_params(:with_cte, query, query_expr, params, adapter) 725 {merge_cache({key, name, expr_to_cache(query_expr)}, cache, cacheable?), params} 726 end 727 end 728 729 defp expr_to_cache(%QueryExpr{expr: expr}), do: expr 730 defp expr_to_cache(%SelectExpr{expr: expr}), do: expr 731 defp expr_to_cache(%BooleanExpr{op: op, expr: expr, subqueries: []}), do: {op, expr} 732 defp expr_to_cache(%BooleanExpr{op: op, expr: expr, subqueries: subqueries}) do 733 # Alternate implementation could be replace {:subquery, i} expression in expr. 734 # Current strategy appends [{:subquery, i, cache}], where cache is the cache key for this subquery. 735 {op, expr, Enum.map(subqueries, fn %{cache: cache} -> {:subquery, cache} end)} 736 end 737 738 @spec cast_and_merge_params(atom, Ecto.Query.t, any, list, module) :: {params :: list, cacheable? :: boolean} 739 defp cast_and_merge_params(kind, query, expr, params, adapter) do 740 Enum.reduce expr.params, {params, true}, fn 741 {:subquery, i}, {acc, cacheable?} -> 742 # This is the place holder to intersperse subquery parameters. 743 %Ecto.SubQuery{params: subparams, cache: cache} = Enum.fetch!(expr.subqueries, i) 744 {Enum.reverse(subparams, acc), cacheable? and cache != :nocache} 745 746 {v, type}, {acc, cacheable?} -> 747 case cast_param(kind, query, expr, v, type, adapter) do 748 {cast_v, {:in, dump_v}} -> {split_in_params(cast_v, dump_v, acc), false} 749 cast_v_and_dump_v -> {[cast_v_and_dump_v | acc], cacheable?} 750 end 751 end 752 end 753 754 defp split_in_params(cast_v, dump_v, acc) do 755 Enum.zip(cast_v, dump_v) |> Enum.reverse(acc) 756 end 757 758 defp merge_cache(_left, _right, false), do: :nocache 759 defp merge_cache(_left, :nocache, true), do: :nocache 760 defp merge_cache(left, right, true), do: [left|right] 761 762 defp finalize_cache(_query, _operation, :nocache) do 763 :nocache 764 end 765 766 defp finalize_cache(query, operation, cache) do 767 %{assocs: assocs, prefix: prefix, lock: lock, select: select, aliases: aliases} = query 768 aliases = Map.delete(aliases, @parent_as) 769 770 cache = 771 case select do 772 %{take: take} when take != %{} -> 773 [take: take] ++ cache 774 _ -> 775 cache 776 end 777 778 cache = 779 cache 780 |> prepend_if(assocs != [], [assocs: assocs]) 781 |> prepend_if(prefix != nil, [prefix: prefix]) 782 |> prepend_if(lock != nil, [lock: lock]) 783 |> prepend_if(aliases != %{}, [aliases: aliases]) 784 785 [operation | cache] 786 end 787 788 defp prepend_if(cache, true, prepend), do: prepend ++ cache 789 defp prepend_if(cache, false, _prepend), do: cache 790 791 defp source_cache(%{source: {_, nil} = source, prefix: prefix}, params), 792 do: {{source, prefix}, params} 793 defp source_cache(%{source: {bin, schema}, prefix: prefix}, params), 794 do: {{bin, schema, schema.__schema__(:hash), prefix}, params} 795 defp source_cache(%{source: {:fragment, _, _} = source, prefix: prefix}, params), 796 do: {{source, prefix}, params} 797 defp source_cache(%{source: %Ecto.SubQuery{params: inner, cache: key}}, params), 798 do: {key, Enum.reverse(inner, params)} 799 800 defp cast_param(_kind, query, expr, %DynamicExpr{}, _type, _value) do 801 error! query, expr, "invalid dynamic expression", 802 "dynamic expressions can only be interpolated at the top level of where, having, group_by, order_by, select, update or a join's on" 803 end 804 defp cast_param(_kind, query, expr, [{key, _} | _], _type, _value) when is_atom(key) do 805 error! query, expr, "invalid keyword list", 806 "keyword lists are only allowed at the top level of where, having, distinct, order_by, update or a join's on" 807 end 808 defp cast_param(_kind, query, expr, %x{}, {:in, _type}, _value) when x in [Ecto.Query, Ecto.SubQuery] do 809 error! query, expr, "an #{inspect(x)} struct is not supported as right-side value of `in` operator", 810 "Did you mean to write `expr in subquery(query)` instead?" 811 end 812 defp cast_param(kind, query, expr, v, type, adapter) do 813 type = field_type!(kind, query, expr, type) 814 815 try do 816 case cast_param(kind, type, v, adapter) do 817 {:ok, v} -> v 818 {:error, error} -> error! query, expr, error 819 end 820 catch 821 :error, %Ecto.QueryError{} = e -> 822 raise Ecto.Query.CastError, value: v, type: type, message: Exception.message(e) 823 end 824 end 825 826 defp cast_param(kind, type, v, adapter) do 827 with {:ok, type} <- normalize_param(kind, type, v), 828 {:ok, cast_v} <- cast_param(kind, type, v), 829 {:ok, dump_v} <- dump_param(adapter, type, cast_v), 830 do: {:ok, {cast_v, dump_v}} 831 end 832 833 @doc """ 834 Prepare association fields found in the query. 835 """ 836 def plan_assocs(query) do 837 plan_assocs(query, 0, query.assocs) 838 query 839 end 840 841 defp plan_assocs(_query, _ix, []), do: :ok 842 defp plan_assocs(query, ix, assocs) do 843 # We validate the schema exists when preparing joins above 844 {_, parent_schema, _} = get_preload_source!(query, ix) 845 846 Enum.each assocs, fn {assoc, {child_ix, child_assocs}} -> 847 refl = parent_schema.__schema__(:association, assoc) 848 849 unless refl do 850 error! query, "field `#{inspect parent_schema}.#{assoc}` " <> 851 "in preload is not an association" 852 end 853 854 case find_source_expr(query, child_ix) do 855 %JoinExpr{qual: qual} when qual in [:inner, :left, :inner_lateral, :left_lateral] -> 856 :ok 857 %JoinExpr{qual: qual} -> 858 error! query, "association `#{inspect parent_schema}.#{assoc}` " <> 859 "in preload requires an inner, left or lateral join, got #{qual} join" 860 _ -> 861 :ok 862 end 863 864 plan_assocs(query, child_ix, child_assocs) 865 end 866 end 867 868 defp plan_combinations(query, adapter) do 869 combinations = 870 Enum.map query.combinations, fn {type, combination_query} -> 871 {prepared_query, _params, _key} = combination_query |> attach_prefix(query) |> plan(:all, adapter) 872 prepared_query = prepared_query |> ensure_select(true) 873 {type, prepared_query} 874 end 875 876 %{query | combinations: combinations} 877 end 878 879 defp plan_ctes(%Ecto.Query{with_ctes: nil} = query, _adapter), do: query 880 defp plan_ctes(%Ecto.Query{with_ctes: %{queries: queries}} = query, adapter) do 881 queries = 882 Enum.map queries, fn 883 {name, %Ecto.Query{} = cte_query} -> 884 {planned_query, _params, _key} = cte_query |> attach_prefix(query) |> plan(:all, adapter) 885 planned_query = planned_query |> ensure_select(true) 886 {name, planned_query} 887 888 {name, other} -> 889 {name, other} 890 end 891 892 put_in(query.with_ctes.queries, queries) 893 end 894 895 defp find_source_expr(query, 0) do 896 query.from 897 end 898 899 defp find_source_expr(query, ix) do 900 Enum.find(query.joins, & &1.ix == ix) 901 end 902 903 @doc """ 904 Used for customizing the query returning result. 905 """ 906 def ensure_select(%{select: select} = query, _fields) when select != nil do 907 query 908 end 909 def ensure_select(%{select: nil}, []) do 910 raise ArgumentError, ":returning expects at least one field to be given, got an empty list" 911 end 912 def ensure_select(%{select: nil} = query, fields) when is_list(fields) do 913 %{query | select: %SelectExpr{expr: {:&, [], [0]}, take: %{0 => {:any, fields}}, 914 line: __ENV__.line, file: __ENV__.file}} 915 end 916 def ensure_select(%{select: nil, from: %{source: {_, nil}}} = query, true) do 917 error! query, "queries that do not have a schema need to explicitly pass a :select clause" 918 end 919 def ensure_select(%{select: nil, from: %{source: {:fragment, _, _}}} = query, true) do 920 error! query, "queries from a fragment need to explicitly pass a :select clause" 921 end 922 def ensure_select(%{select: nil} = query, true) do 923 %{query | select: %SelectExpr{expr: {:&, [], [0]}, line: __ENV__.line, file: __ENV__.file}} 924 end 925 def ensure_select(%{select: nil} = query, false) do 926 query 927 end 928 929 @doc """ 930 Normalizes and validates the query. 931 932 After the query was planned and there is no cache 933 entry, we need to update its interpolations and check 934 its fields and associations exist and are valid. 935 """ 936 def normalize(query, operation, adapter, counter) do 937 query 938 |> normalize_query(operation, adapter, counter) 939 |> elem(0) 940 |> normalize_select(keep_literals?(operation, query), true) 941 rescue 942 e -> 943 # Reraise errors so we ignore the planner inner stacktrace 944 filter_and_reraise e, __STACKTRACE__ 945 end 946 947 defp keep_literals?(:insert_all, _), do: true 948 defp keep_literals?(_, %{combinations: combinations}), do: combinations != [] 949 950 defp normalize_query(query, operation, adapter, counter) do 951 case operation do 952 :all -> 953 assert_no_update!(query, operation) 954 :insert_all -> 955 assert_no_update!(query, operation) 956 :update_all -> 957 assert_update!(query, operation) 958 assert_only_filter_expressions!(query, operation) 959 :delete_all -> 960 assert_no_update!(query, operation) 961 assert_only_filter_expressions!(query, operation) 962 end 963 964 traverse_exprs(query, operation, counter, 965 &validate_and_increment(&1, &2, &3, &4, operation, adapter)) 966 end 967 968 defp validate_and_increment(:from, query, %{source: %Ecto.SubQuery{}}, _counter, kind, _adapter) when kind not in ~w(all insert_all)a do 969 error! query, "`#{kind}` does not allow subqueries in `from`" 970 end 971 defp validate_and_increment(:from, query, %{source: source} = expr, counter, _kind, adapter) do 972 {source, acc} = prewalk_source(source, :from, query, expr, counter, adapter) 973 {%{expr | source: source}, acc} 974 end 975 976 defp validate_and_increment(kind, query, expr, counter, _operation, adapter) 977 when kind in ~w(select distinct limit offset)a do 978 if expr do 979 prewalk(kind, query, expr, counter, adapter) 980 else 981 {nil, counter} 982 end 983 end 984 985 defp validate_and_increment(kind, query, exprs, counter, _operation, adapter) 986 when kind in ~w(where group_by having order_by update)a do 987 988 {exprs, counter} = 989 Enum.reduce(exprs, {[], counter}, fn 990 %{expr: []}, {list, acc} -> 991 {list, acc} 992 expr, {list, acc} -> 993 {expr, acc} = prewalk(kind, query, expr, acc, adapter) 994 {[expr|list], acc} 995 end) 996 {Enum.reverse(exprs), counter} 997 end 998 999 defp validate_and_increment(:with_cte, _query, nil, counter, _operation, _adapter) do 1000 {nil, counter} 1001 end 1002 1003 defp validate_and_increment(:with_cte, query, with_expr, counter, _operation, adapter) do 1004 fun = &validate_and_increment(&1, &2, &3, &4, :all, adapter) 1005 1006 {queries, counter} = 1007 Enum.reduce with_expr.queries, {[], counter}, fn 1008 {name, %Ecto.Query{} = inner_query}, {queries, counter} -> 1009 inner_query = put_in(inner_query.aliases[@parent_as], query) 1010 1011 # We don't want to use normalize_subquery_select because we are 1012 # going to prepare the whole query ourselves next. 1013 {_, _, inner_query} = rewrite_subquery_select_expr(inner_query, true) 1014 {inner_query, counter} = traverse_exprs(inner_query, :all, counter, fun) 1015 1016 # Now compute the fields as keyword lists so we emit AS in Ecto query. 1017 %{select: %{expr: expr, take: take}} = inner_query 1018 {{:map, types}, fields, _from} = collect_fields(expr, [], :never, inner_query, take, true, %{}) 1019 fields = cte_fields(Keyword.keys(types), Enum.reverse(fields), []) 1020 inner_query = put_in(inner_query.select.fields, fields) 1021 {_, inner_query} = pop_in(inner_query.aliases[@parent_as]) 1022 1023 {[{name, inner_query} | queries], counter} 1024 1025 {name, %QueryExpr{expr: {:fragment, _, _} = fragment} = query_expr}, {queries, counter} -> 1026 {fragment, counter} = prewalk_source(fragment, :with_cte, query, with_expr, counter, adapter) 1027 query_expr = %{query_expr | expr: fragment} 1028 {[{name, query_expr} | queries], counter} 1029 end 1030 1031 {%{with_expr | queries: Enum.reverse(queries)}, counter} 1032 end 1033 1034 defp validate_and_increment(:join, query, exprs, counter, _operation, adapter) do 1035 Enum.map_reduce exprs, counter, fn join, acc -> 1036 {source, acc} = prewalk_source(join.source, :join, query, join, acc, adapter) 1037 {on, acc} = prewalk(:join, query, join.on, acc, adapter) 1038 {%{join | on: on, source: source, params: nil}, acc} 1039 end 1040 end 1041 1042 defp validate_and_increment(:windows, query, exprs, counter, _operation, adapter) do 1043 {exprs, counter} = 1044 Enum.reduce(exprs, {[], counter}, fn {name, expr}, {list, acc} -> 1045 {expr, acc} = prewalk(:windows, query, expr, acc, adapter) 1046 {[{name, expr}|list], acc} 1047 end) 1048 1049 {Enum.reverse(exprs), counter} 1050 end 1051 1052 defp validate_and_increment(:combination, _query, combinations, counter, operation, adapter) do 1053 fun = &validate_and_increment(&1, &2, &3, &4, operation, adapter) 1054 1055 {combinations, counter} = 1056 Enum.reduce combinations, {[], counter}, fn {type, combination_query}, {combinations, counter} -> 1057 {combination_query, counter} = traverse_exprs(combination_query, operation, counter, fun) 1058 {combination_query, _} = combination_query |> normalize_select(true, true) 1059 {[{type, combination_query} | combinations], counter} 1060 end 1061 1062 {Enum.reverse(combinations), counter} 1063 end 1064 1065 defp validate_json_path!([path_field | rest], field, embed) do 1066 case embed do 1067 %{related: related, cardinality: :one} -> 1068 unless Enum.any?(related.__schema__(:fields), &Atom.to_string(&1) == path_field) do 1069 raise "field `#{path_field}` does not exist in #{inspect(related)}" 1070 end 1071 1072 path_embed = related.__schema__(:embed, String.to_atom(path_field)) 1073 validate_json_path!(rest, path_field, path_embed) 1074 1075 %{related: _, cardinality: :many} -> 1076 unless is_integer(path_field) do 1077 raise "cannot use `#{path_field}` to refer to an item in `embeds_many`" 1078 end 1079 1080 validate_json_path!(rest, path_field, %{embed | cardinality: :one}) 1081 1082 other -> 1083 raise "expected field `#{field}` to be of type embed, got: `#{inspect(other)}`" 1084 end 1085 end 1086 1087 defp validate_json_path!([], _field, _type) do 1088 :ok 1089 end 1090 1091 defp prewalk_source({:fragment, meta, fragments}, kind, query, expr, acc, adapter) do 1092 {fragments, acc} = prewalk(fragments, kind, query, expr, acc, adapter) 1093 {{:fragment, meta, fragments}, acc} 1094 end 1095 defp prewalk_source(%Ecto.SubQuery{query: inner_query} = subquery, kind, query, _expr, counter, adapter) do 1096 try do 1097 inner_query = put_in inner_query.aliases[@parent_as], query 1098 {inner_query, counter} = normalize_query(inner_query, :all, adapter, counter) 1099 {inner_query, _} = normalize_select(inner_query, true, false) 1100 {_, inner_query} = pop_in(inner_query.aliases[@parent_as]) 1101 1102 inner_query = 1103 # If the subquery comes from a select, we are not really interested on the fields 1104 if kind == :where do 1105 inner_query 1106 else 1107 update_in(inner_query.select.fields, fn fields -> 1108 subquery.select |> subquery_source_fields() |> Enum.zip(fields) 1109 end) 1110 end 1111 1112 {%{subquery | query: inner_query}, counter} 1113 rescue 1114 e -> raise Ecto.SubQueryError, query: query, exception: e 1115 end 1116 end 1117 defp prewalk_source(source, _kind, _query, _expr, acc, _adapter) do 1118 {source, acc} 1119 end 1120 1121 defp prewalk(:update, query, expr, counter, adapter) do 1122 source = get_source!(:update, query, 0) 1123 1124 {inner, acc} = 1125 Enum.map_reduce expr.expr, counter, fn {op, kw}, counter -> 1126 {kw, acc} = 1127 Enum.map_reduce kw, counter, fn {field, value}, counter -> 1128 {value, acc} = prewalk(value, :update, query, expr, counter, adapter) 1129 {{field_source(source, field), value}, acc} 1130 end 1131 {{op, kw}, acc} 1132 end 1133 1134 {%{expr | expr: inner, params: nil}, acc} 1135 end 1136 defp prewalk(kind, query, expr, counter, adapter) do 1137 {inner, acc} = prewalk(expr.expr, kind, query, expr, counter, adapter) 1138 {%{expr | expr: inner, params: nil}, acc} 1139 end 1140 1141 defp prewalk({:subquery, i}, kind, query, expr, acc, adapter) do 1142 prewalk_source(Enum.fetch!(expr.subqueries, i), kind, query, expr, acc, adapter) 1143 end 1144 1145 defp prewalk({:in, in_meta, [left, {:^, meta, [param]}]}, kind, query, expr, acc, adapter) do 1146 {left, acc} = prewalk(left, kind, query, expr, acc, adapter) 1147 {right, acc} = validate_in(meta, expr, param, acc, adapter) 1148 {{:in, in_meta, [left, right]}, acc} 1149 end 1150 1151 defp prewalk({:in, in_meta, [left, {:subquery, _} = right]}, kind, query, expr, acc, adapter) do 1152 {left, acc} = prewalk(left, kind, query, expr, acc, adapter) 1153 {right, acc} = prewalk(right, kind, query, expr, acc, adapter) 1154 1155 case right.query.select.fields do 1156 [_] -> :ok 1157 _ -> error!(query, "subquery must return a single field in order to be used on the right-side of `in`") 1158 end 1159 1160 {{:in, in_meta, [left, right]}, acc} 1161 end 1162 1163 defp prewalk({quantifier, meta, [{:subquery, _} = subquery]}, kind, query, expr, acc, adapter) when quantifier in [:exists, :any, :all] do 1164 {subquery, acc} = prewalk(subquery, kind, query, expr, acc, adapter) 1165 1166 case {quantifier, subquery.query.select.fields} do 1167 {:exists, _} -> 1168 :ok 1169 1170 {_, [_]} -> 1171 :ok 1172 1173 _ -> 1174 error!( 1175 query, 1176 "subquery must return a single field in order to be used with #{quantifier}" 1177 ) 1178 end 1179 1180 {{quantifier, meta, [subquery]}, acc} 1181 end 1182 1183 defp prewalk({{:., dot_meta, [left, field]}, meta, []}, 1184 kind, query, expr, acc, _adapter) do 1185 {ix, ix_expr, ix_query} = get_ix!(left, kind, query) 1186 extra = if kind == :select, do: [type: type!(kind, ix_query, expr, ix, field)], else: [] 1187 field = field_source(get_source!(kind, ix_query, ix), field) 1188 {{{:., extra ++ dot_meta, [ix_expr, field]}, meta, []}, acc} 1189 end 1190 1191 defp prewalk({:^, meta, [ix]}, _kind, _query, _expr, acc, _adapter) when is_integer(ix) do 1192 {{:^, meta, [acc]}, acc + 1} 1193 end 1194 1195 defp prewalk({:type, _, [arg, type]}, kind, query, expr, acc, adapter) do 1196 {arg, acc} = prewalk(arg, kind, query, expr, acc, adapter) 1197 type = field_type!(kind, query, expr, type, true) 1198 {%Ecto.Query.Tagged{value: arg, tag: type, type: Ecto.Type.type(type)}, acc} 1199 end 1200 1201 defp prewalk({:json_extract_path, meta, [json_field, path]}, kind, query, expr, acc, _adapter) do 1202 {{:., dot_meta, [{:&, amp_meta, [ix]}, field]}, expr_meta, []} = json_field 1203 1204 case type!(kind, query, expr, ix, field) do 1205 {:parameterized, Ecto.Embedded, embed} -> 1206 validate_json_path!(path, field, embed) 1207 1208 type -> 1209 case Ecto.Type.type(type) do 1210 :any -> 1211 :ok 1212 1213 :map -> 1214 :ok 1215 1216 {:map, _} -> 1217 :ok 1218 1219 _ -> 1220 raise "expected field `#{field}` to be an embed or a map, got: `#{inspect(type)}`" 1221 end 1222 end 1223 1224 field_source = kind |> get_source!(query, ix) |> field_source(field) 1225 1226 json_field = {{:., dot_meta, [{:&, amp_meta, [ix]}, field_source]}, expr_meta, []} 1227 {{:json_extract_path, meta, [json_field, path]}, acc} 1228 end 1229 1230 defp prewalk({:selected_as, [], [name]}, _kind, query, _expr, acc, _adapter) do 1231 name = selected_as!(query.select.aliases, name) 1232 {{:selected_as, [], [name]}, acc} 1233 end 1234 1235 defp prewalk(%Ecto.Query.Tagged{value: v, type: type} = tagged, kind, query, expr, acc, adapter) do 1236 if Ecto.Type.base?(type) do 1237 {tagged, acc} 1238 else 1239 {dump_param(kind, query, expr, v, type, adapter), acc} 1240 end 1241 end 1242 1243 defp prewalk({left, right}, kind, query, expr, acc, adapter) do 1244 {left, acc} = prewalk(left, kind, query, expr, acc, adapter) 1245 {right, acc} = prewalk(right, kind, query, expr, acc, adapter) 1246 {{left, right}, acc} 1247 end 1248 1249 defp prewalk({left, meta, args}, kind, query, expr, acc, adapter) do 1250 {left, acc} = prewalk(left, kind, query, expr, acc, adapter) 1251 {args, acc} = prewalk(args, kind, query, expr, acc, adapter) 1252 {{left, meta, args}, acc} 1253 end 1254 1255 defp prewalk(list, kind, query, expr, acc, adapter) when is_list(list) do 1256 Enum.map_reduce(list, acc, &prewalk(&1, kind, query, expr, &2, adapter)) 1257 end 1258 1259 defp prewalk(other, _kind, _query, _expr, acc, _adapter) do 1260 {other, acc} 1261 end 1262 1263 defp selected_as!(select_aliases, name) do 1264 case select_aliases do 1265 %{^name => _} -> 1266 name 1267 1268 _ -> 1269 raise ArgumentError, 1270 "invalid alias: `#{inspect(name)}`. Use `selected_as/2` to define aliases in the outer most `select` expression." 1271 end 1272 end 1273 1274 defp dump_param(kind, query, expr, v, type, adapter) do 1275 type = field_type!(kind, query, expr, type) 1276 1277 case dump_param(kind, type, v, adapter) do 1278 {:ok, v} -> 1279 v 1280 {:error, error} -> 1281 error = error <> ". Or the value is incompatible or it must be " <> 1282 "interpolated (using ^) so it may be cast accordingly" 1283 error! query, expr, error 1284 end 1285 end 1286 1287 defp dump_param(kind, type, v, adapter) do 1288 with {:ok, type} <- normalize_param(kind, type, v), 1289 do: dump_param(adapter, type, v) 1290 end 1291 1292 defp validate_in(meta, expr, param, acc, adapter) do 1293 {v, t} = Enum.fetch!(expr.params, param) 1294 length = length(v) 1295 1296 case adapter.dumpers(t, t) do 1297 [{:in, _} | _] -> {{:^, meta, [acc, length]}, acc + length} 1298 _ -> {{:^, meta, [acc, length]}, acc + 1} 1299 end 1300 end 1301 1302 defp normalize_select(%{select: nil} = query, _keep_literals?, _allow_alias?) do 1303 {query, nil} 1304 end 1305 1306 defp normalize_select(query, keep_literals?, allow_alias?) do 1307 %{assocs: assocs, preloads: preloads, select: select} = query 1308 %{take: take, expr: expr} = select 1309 {tag, from_take} = Map.get(take, 0, {:any, []}) 1310 source = get_source!(:select, query, 0) 1311 assocs = merge_assocs(assocs, query) 1312 1313 # In from, if there is a schema and we have a map tag with preloads, 1314 # it needs to be converted to a map in a later pass. 1315 {take, from_tag} = 1316 case source do 1317 {source, schema, _} 1318 when tag == :map and preloads != [] and is_binary(source) and schema != nil -> 1319 {Map.put(take, 0, {:struct, from_take}), :map} 1320 1321 _ -> 1322 {take, :any} 1323 end 1324 1325 {postprocess, fields, from} = 1326 collect_fields(expr, [], :none, query, take, keep_literals?, %{}) 1327 1328 # Convert selected_as/2 to a tuple so it can be aliased by the adapters. 1329 # Don't convert if the select expression belongs to a CTE or subquery 1330 # because those fields are already automatically aliased. 1331 fields = normalize_selected_as(fields, allow_alias?, select.aliases) 1332 1333 {fields, preprocess, from} = 1334 case from do 1335 {:ok, from_pre, from_expr, from_taken} -> 1336 {assoc_exprs, assoc_fields} = collect_assocs([], [], query, tag, from_take, assocs) 1337 fields = from_taken ++ Enum.reverse(assoc_fields, Enum.reverse(fields)) 1338 preprocess = [from_pre | Enum.reverse(assoc_exprs)] 1339 {fields, preprocess, {from_tag, from_expr}} 1340 1341 :none when preloads != [] or assocs != [] -> 1342 error! query, "the binding used in `from` must be selected in `select` when using `preload`" 1343 1344 :none -> 1345 {Enum.reverse(fields), [], :none} 1346 end 1347 1348 select = %{ 1349 preprocess: preprocess, 1350 postprocess: postprocess, 1351 take: from_take, 1352 assocs: assocs, 1353 from: from 1354 } 1355 1356 {put_in(query.select.fields, fields), select} 1357 end 1358 1359 defp normalize_selected_as(fields, _allow_alias?, aliases) when aliases == %{}, do: fields 1360 1361 defp normalize_selected_as(_fields, false, aliases) do 1362 raise ArgumentError, 1363 "`selected_as/2` can only be used in the outer most `select` expression. " <> 1364 "If you are attempting to alias a field from a subquery or cte, it is not allowed " <> 1365 "because the fields are automatically aliased by the corresponding map/struct key. " <> 1366 "The following field aliases were specified: #{inspect(Map.keys(aliases))}." 1367 end 1368 1369 defp normalize_selected_as(fields, true, _aliases) do 1370 Enum.map(fields, fn 1371 {:selected_as, _, [select_expr, name]} -> {name, select_expr} 1372 field -> field 1373 end) 1374 end 1375 1376 # Handling of source 1377 1378 defp collect_fields({:merge, _, [{:&, _, [0]}, right]}, fields, :none, query, take, keep_literals?, _drop) do 1379 {expr, taken} = source_take!(:select, query, take, 0, 0, %{}) 1380 from = {:ok, {:source, :from}, expr, taken} 1381 1382 {right, right_fields, _from} = collect_fields(right, [], from, query, take, keep_literals?, %{}) 1383 from = {:ok, {:merge, {:source, :from}, right}, expr, taken ++ Enum.reverse(right_fields)} 1384 1385 {{:source, :from}, fields, from} 1386 end 1387 1388 defp collect_fields({:&, _, [0]}, fields, :none, query, take, _keep_literals?, drop) do 1389 {expr, taken} = source_take!(:select, query, take, 0, 0, drop) 1390 {{:source, :from}, fields, {:ok, {:source, :from}, expr, taken}} 1391 end 1392 1393 defp collect_fields({:&, _, [0]}, fields, from, _query, _take, _keep_literals?, _drop) 1394 when from != :never do 1395 {{:source, :from}, fields, from} 1396 end 1397 1398 defp collect_fields({:&, _, [ix]}, fields, from, query, take, _keep_literals?, drop) do 1399 {expr, taken} = source_take!(:select, query, take, ix, ix, drop) 1400 {expr, Enum.reverse(taken, fields), from} 1401 end 1402 1403 # Expression handling 1404 1405 defp collect_fields({agg, _, [{{:., dot_meta, [{:&, _, [_]}, _]}, _, []} | _]} = expr, 1406 fields, from, _query, _take, _keep_literals?, _drop) 1407 when agg in @aggs do 1408 type = 1409 case agg do 1410 :count -> :integer 1411 :row_number -> :integer 1412 :rank -> :integer 1413 :dense_rank -> :integer 1414 :ntile -> :integer 1415 # If it is possible to upcast, we do it, otherwise keep the DB value. 1416 # For example, an average of integers will return a decimal, which can't be cast 1417 # as an integer. But an average of "moneys" should be upcast. 1418 _ -> {:maybe, Keyword.fetch!(dot_meta, :type)} 1419 end 1420 1421 {{:value, type}, [expr | fields], from} 1422 end 1423 1424 defp collect_fields({:filter, _, [call, _]} = expr, fields, from, query, take, keep_literals?, _drop) do 1425 case call do 1426 {agg, _, _} when agg in @aggs -> :ok 1427 {:fragment, _, [_ | _]} -> :ok 1428 _ -> error!(query, "filter(...) expects the first argument to be an aggregate expression, got: `#{Macro.to_string(expr)}`") 1429 end 1430 1431 {type, _, _} = collect_fields(call, fields, from, query, take, keep_literals?, %{}) 1432 {type, [expr | fields], from} 1433 end 1434 1435 defp collect_fields({:coalesce, _, [left, right]} = expr, fields, from, query, take, _keep_literals?, _drop) do 1436 {left_type, _, _} = collect_fields(left, fields, from, query, take, true, %{}) 1437 {right_type, _, _} = collect_fields(right, fields, from, query, take, true, %{}) 1438 1439 type = if left_type == right_type, do: left_type, else: {:value, :any} 1440 {type, [expr | fields], from} 1441 end 1442 1443 defp collect_fields({:over, _, [call, window]} = expr, fields, from, query, take, keep_literals?, _drop) do 1444 if is_atom(window) and not Keyword.has_key?(query.windows, window) do 1445 error!(query, "unknown window #{inspect window} given to over/2") 1446 end 1447 1448 {type, _, _} = collect_fields(call, fields, from, query, take, keep_literals?, %{}) 1449 {type, [expr | fields], from} 1450 end 1451 1452 defp collect_fields({{:., dot_meta, [{:&, _, [_]}, _]}, _, []} = expr, 1453 fields, from, _query, _take, _keep_literals?, _drop) do 1454 {{:value, Keyword.fetch!(dot_meta, :type)}, [expr | fields], from} 1455 end 1456 1457 defp collect_fields({left, right}, fields, from, query, take, keep_literals?, _drop) do 1458 {args, fields, from} = collect_args([left, right], fields, from, query, take, keep_literals?, []) 1459 {{:tuple, args}, fields, from} 1460 end 1461 1462 defp collect_fields({:{}, _, args}, fields, from, query, take, keep_literals?, _drop) do 1463 {args, fields, from} = collect_args(args, fields, from, query, take, keep_literals?, []) 1464 {{:tuple, args}, fields, from} 1465 end 1466 1467 defp collect_fields({:%{}, _, [{:|, _, [data, args]}]}, fields, from, query, take, keep_literals?, _drop) do 1468 drop = Map.new(args, fn {key, _} -> {key, nil} end) 1469 {data, fields, from} = collect_fields(data, fields, from, query, take, keep_literals?, drop) 1470 {args, fields, from} = collect_kv(args, fields, from, query, take, keep_literals?, []) 1471 {{:map, data, args}, fields, from} 1472 end 1473 1474 defp collect_fields({:%{}, _, args}, fields, from, query, take, keep_literals?, _drop) do 1475 {args, fields, from} = collect_kv(args, fields, from, query, take, keep_literals?, []) 1476 {{:map, args}, fields, from} 1477 end 1478 1479 defp collect_fields({:%, _, [name, {:%{}, _, [{:|, _, [data, args]}]}]}, 1480 fields, from, query, take, keep_literals?, _drop) do 1481 drop = Map.new(args, fn {key, _} -> {key, nil} end) 1482 {data, fields, from} = collect_fields(data, fields, from, query, take, keep_literals?, drop) 1483 {args, fields, from} = collect_kv(args, fields, from, query, take, keep_literals?, []) 1484 struct!(name, args) 1485 {{:struct, name, data, args}, fields, from} 1486 end 1487 1488 defp collect_fields({:%, _, [name, {:%{}, _, args}]}, fields, from, query, take, keep_literals?, _drop) do 1489 {args, fields, from} = collect_kv(args, fields, from, query, take, keep_literals?, []) 1490 struct!(name, args) 1491 {{:struct, name, args}, fields, from} 1492 end 1493 1494 defp collect_fields({:merge, _, args}, fields, from, query, take, keep_literals?, _drop) do 1495 {[left, right], fields, from} = collect_args(args, fields, from, query, take, keep_literals?, []) 1496 {{:merge, left, right}, fields, from} 1497 end 1498 1499 defp collect_fields({:date_add, _, [arg | _]} = expr, fields, from, query, take, keep_literals?, _drop) do 1500 case collect_fields(arg, fields, from, query, take, keep_literals?, %{}) do 1501 {{:value, :any}, _, _} -> {{:value, :date}, [expr | fields], from} 1502 {type, _, _} -> {type, [expr | fields], from} 1503 end 1504 end 1505 1506 defp collect_fields({:datetime_add, _, [arg | _]} = expr, fields, from, query, take, keep_literals?, _drop) do 1507 case collect_fields(arg, fields, from, query, take, keep_literals?, %{}) do 1508 {{:value, :any}, _, _} -> {{:value, :naive_datetime}, [expr | fields], from} 1509 {type, _, _} -> {type, [expr | fields], from} 1510 end 1511 end 1512 1513 defp collect_fields(args, fields, from, query, take, keep_literals?, _drop) when is_list(args) do 1514 {args, fields, from} = collect_args(args, fields, from, query, take, keep_literals?, []) 1515 {{:list, args}, fields, from} 1516 end 1517 1518 defp collect_fields(expr, fields, from, _query, _take, true, _drop) when is_binary(expr) do 1519 {{:value, :binary}, [expr | fields], from} 1520 end 1521 1522 defp collect_fields(expr, fields, from, _query, _take, true, _drop) when is_integer(expr) do 1523 {{:value, :integer}, [expr | fields], from} 1524 end 1525 1526 defp collect_fields(expr, fields, from, _query, _take, true, _drop) when is_float(expr) do 1527 {{:value, :float}, [expr | fields], from} 1528 end 1529 1530 defp collect_fields(expr, fields, from, _query, _take, true, _drop) when is_boolean(expr) do 1531 {{:value, :boolean}, [expr | fields], from} 1532 end 1533 1534 defp collect_fields(nil, fields, from, _query, _take, true, _drop) do 1535 {{:value, :any}, [nil | fields], from} 1536 end 1537 1538 defp collect_fields(expr, fields, from, _query, _take, _keep_literals?, _drop) when is_atom(expr) do 1539 {expr, fields, from} 1540 end 1541 1542 defp collect_fields(expr, fields, from, _query, _take, false, _drop) 1543 when is_binary(expr) or is_number(expr) do 1544 {expr, fields, from} 1545 end 1546 1547 defp collect_fields(%Ecto.Query.Tagged{tag: tag} = expr, fields, from, _query, _take, _keep_literals?, _drop) do 1548 {{:value, tag}, [expr | fields], from} 1549 end 1550 1551 defp collect_fields({op, _, [_]} = expr, fields, from, _query, _take, _keep_literals?, _drop) 1552 when op in ~w(not is_nil)a do 1553 {{:value, :boolean}, [expr | fields], from} 1554 end 1555 1556 defp collect_fields({op, _, [_, _]} = expr, fields, from, _query, _take, _keep_literals?, _drop) 1557 when op in ~w(< > <= >= == != and or like ilike)a do 1558 {{:value, :boolean}, [expr | fields], from} 1559 end 1560 1561 defp collect_fields({:selected_as, _, [select_expr, _name]} = expr, fields, from, query, take, keep_literals?, _drop) do 1562 {type, _, _} = collect_fields(select_expr, fields, from, query, take, keep_literals?, %{}) 1563 {type, [expr | fields], from} 1564 end 1565 1566 defp collect_fields(expr, fields, from, _query, _take, _keep_literals?, _drop) do 1567 {{:value, :any}, [expr | fields], from} 1568 end 1569 1570 defp collect_kv([{key, value} | elems], fields, from, query, take, keep_literals?, acc) do 1571 {key, fields, from} = collect_fields(key, fields, from, query, take, keep_literals?, %{}) 1572 {value, fields, from} = collect_fields(value, fields, from, query, take, keep_literals?, %{}) 1573 collect_kv(elems, fields, from, query, take, keep_literals?, [{key, value} | acc]) 1574 end 1575 1576 defp collect_kv([], fields, from, _query, _take, _keep_literals?, acc) do 1577 {Enum.reverse(acc), fields, from} 1578 end 1579 1580 defp collect_args([elem | elems], fields, from, query, take, keep_literals?, acc) do 1581 {elem, fields, from} = collect_fields(elem, fields, from, query, take, keep_literals?, %{}) 1582 collect_args(elems, fields, from, query, take, keep_literals?, [elem | acc]) 1583 end 1584 defp collect_args([], fields, from, _query, _take, _keep_literals?, acc) do 1585 {Enum.reverse(acc), fields, from} 1586 end 1587 1588 defp merge_assocs(assocs, query) do 1589 assocs 1590 |> Enum.reduce(%{}, fn {field, {index, children}}, acc -> 1591 children = merge_assocs(children, query) 1592 1593 Map.update(acc, field, {index, children}, fn 1594 {^index, current_children} -> 1595 {index, merge_assocs(children ++ current_children, query)} 1596 {other_index, _} -> 1597 error! query, "association `#{field}` is being set to binding at position #{index} " <> 1598 "and at position #{other_index} at the same time" 1599 end) 1600 end) 1601 |> Map.to_list() 1602 end 1603 1604 defp collect_assocs(exprs, fields, query, tag, take, [{assoc, {ix, children}}|tail]) do 1605 to_take = get_preload_source!(query, ix) 1606 {fetch, take_children} = fetch_assoc(tag, take, assoc) 1607 {expr, taken} = take!(to_take, query, fetch, assoc, ix, %{}) 1608 exprs = [expr | exprs] 1609 fields = Enum.reverse(taken, fields) 1610 {exprs, fields} = collect_assocs(exprs, fields, query, tag, take_children, children) 1611 {exprs, fields} = collect_assocs(exprs, fields, query, tag, take, tail) 1612 {exprs, fields} 1613 end 1614 defp collect_assocs(exprs, fields, _query, _tag, _take, []) do 1615 {exprs, fields} 1616 end 1617 1618 defp fetch_assoc(tag, take, assoc) do 1619 case Access.fetch(take, assoc) do 1620 {:ok, value} -> {{:ok, {tag, value}}, value} 1621 :error -> {:error, []} 1622 end 1623 end 1624 1625 defp source_take!(kind, query, take, field, ix, drop) do 1626 source = get_source!(kind, query, ix) 1627 take!(source, query, Access.fetch(take, field), field, ix, drop) 1628 end 1629 1630 defp take!(source, query, fetched, field, ix, drop) do 1631 case {fetched, source} do 1632 {{:ok, {:struct, _}}, {:fragment, _, _}} -> 1633 error! query, "it is not possible to return a struct subset of a fragment" 1634 1635 {{:ok, {:struct, _}}, %Ecto.SubQuery{}} -> 1636 error! query, "it is not possible to return a struct subset of a subquery" 1637 1638 {{:ok, {_, []}}, {_, _, _}} -> 1639 error! query, "at least one field must be selected for binding `#{field}`, got an empty list" 1640 1641 {{:ok, {:struct, _}}, {_, nil, _}} -> 1642 error! query, "struct/2 in select expects a source with a schema" 1643 1644 {{:ok, {kind, fields}}, {source, schema, prefix}} when is_binary(source) -> 1645 dumper = if schema, do: schema.__schema__(:dump), else: %{} 1646 schema = if kind == :map, do: nil, else: schema 1647 {types, fields} = select_dump(List.wrap(fields), dumper, ix, drop) 1648 {{:source, {source, schema}, prefix || query.prefix, types}, fields} 1649 1650 {{:ok, {_, fields}}, _} -> 1651 {{:map, Enum.map(fields, &{&1, {:value, :any}})}, Enum.map(fields, &select_field(&1, ix))} 1652 1653 {:error, {:fragment, _, _}} -> 1654 {{:value, :map}, [{:&, [], [ix]}]} 1655 1656 {:error, {_, nil, _}} -> 1657 {{:value, :map}, [{:&, [], [ix]}]} 1658 1659 {:error, {source, schema, prefix}} -> 1660 {types, fields} = select_dump(schema.__schema__(:query_fields), schema.__schema__(:dump), ix, drop) 1661 1662 {{:source, {source, schema}, prefix || query.prefix, types}, fields} 1663 1664 {:error, %Ecto.SubQuery{select: select}} -> 1665 fields = subquery_source_fields(select) 1666 {select, Enum.map(fields, &select_field(&1, ix))} 1667 end 1668 end 1669 1670 defp select_dump(fields, dumper, ix, drop) do 1671 fields 1672 |> Enum.reverse 1673 |> Enum.reduce({[], []}, fn 1674 field, {types, exprs} when is_atom(field) and not is_map_key(drop, field) -> 1675 {source, type} = Map.get(dumper, field, {field, :any}) 1676 {[{field, type} | types], [select_field(source, ix) | exprs]} 1677 _field, acc -> 1678 acc 1679 end) 1680 end 1681 1682 defp select_field(field, ix) do 1683 {{:., [], [{:&, [], [ix]}, field]}, [], []} 1684 end 1685 1686 defp get_ix!({:&, _, [ix]} = expr, _kind, query) do 1687 {ix, expr, query} 1688 end 1689 1690 defp get_ix!({:as, meta, [as]}, _kind, query) do 1691 case query.aliases do 1692 %{^as => ix} -> {ix, {:&, meta, [ix]}, query} 1693 %{} -> error!(query, "could not find named binding `as(#{inspect(as)})`") 1694 end 1695 end 1696 1697 defp get_ix!({:parent_as, meta, [as]}, kind, query) do 1698 case query.aliases[@parent_as] do 1699 %{aliases: %{^as => ix}, sources: sources} = query -> 1700 if kind == :select and not (ix < tuple_size(sources)) do 1701 error!(query, "the parent_as in a subquery select used as a join can only access the `from` binding") 1702 else 1703 {ix, {:parent_as, [], [as]}, query} 1704 end 1705 1706 %{} = parent -> 1707 get_ix!({:parent_as, meta, [as]}, kind, parent) 1708 1709 nil -> 1710 error!(query, "could not find named binding `parent_as(#{inspect(as)})`") 1711 end 1712 end 1713 1714 defp get_source!(where, %{sources: sources} = query, ix) do 1715 elem(sources, ix) 1716 rescue 1717 ArgumentError -> 1718 error! query, "invalid query has specified more bindings than bindings available " <> 1719 "in `#{where}` (look for `unknown_binding!` in the printed query below)" 1720 end 1721 1722 defp get_preload_source!(query, ix) do 1723 case get_source!(:preload, query, ix) do 1724 {source, schema, _} = all when is_binary(source) and schema != nil -> 1725 all 1726 _ -> 1727 error! query, "can only preload sources with a schema " <> 1728 "(fragments, binary and subqueries are not supported)" 1729 end 1730 end 1731 1732 @doc """ 1733 Puts the prefix given via `opts` into the given query, if available. 1734 """ 1735 def attach_prefix(%{prefix: nil} = query, opts) when is_list(opts) do 1736 case Keyword.fetch(opts, :prefix) do 1737 {:ok, prefix} -> %{query | prefix: prefix} 1738 :error -> query 1739 end 1740 end 1741 1742 def attach_prefix(%{prefix: nil} = query, %{prefix: prefix}) do 1743 %{query | prefix: prefix} 1744 end 1745 1746 def attach_prefix(query, _), do: query 1747 1748 ## Helpers 1749 1750 @all_exprs [with_cte: :with_ctes, distinct: :distinct, select: :select, from: :from, join: :joins, 1751 where: :wheres, group_by: :group_bys, having: :havings, windows: :windows, 1752 combination: :combinations, order_by: :order_bys, limit: :limit, offset: :offset] 1753 1754 # Although joins come before updates in the actual query, 1755 # the on fields are moved to where, so they effectively 1756 # need to come later for MySQL. This means subqueries 1757 # with parameters are not supported as a join on MySQL. 1758 # The only way to address it is by splitting how join 1759 # and their on expressions are processed. 1760 @update_all_exprs [with_cte: :with_ctes, from: :from, update: :updates, 1761 join: :joins, where: :wheres, select: :select] 1762 1763 @delete_all_exprs [with_cte: :with_ctes, from: :from, join: :joins, 1764 where: :wheres, select: :select] 1765 1766 # Traverse all query components with expressions. 1767 # Therefore from, preload, assocs and lock are not traversed. 1768 defp traverse_exprs(query, operation, acc, fun) do 1769 exprs = 1770 case operation do 1771 :all -> @all_exprs 1772 :insert_all -> @all_exprs 1773 :update_all -> @update_all_exprs 1774 :delete_all -> @delete_all_exprs 1775 end 1776 1777 Enum.reduce exprs, {query, acc}, fn {kind, key}, {query, acc} -> 1778 {traversed, acc} = fun.(kind, query, Map.fetch!(query, key), acc) 1779 {%{query | key => traversed}, acc} 1780 end 1781 end 1782 1783 defp field_type!(kind, query, expr, type, allow_virtuals? \\ false) 1784 1785 defp field_type!(kind, query, expr, {composite, {ix, field}}, allow_virtuals?) when is_integer(ix) do 1786 {composite, type!(kind, query, expr, ix, field, allow_virtuals?)} 1787 end 1788 1789 defp field_type!(kind, query, expr, {ix, field}, allow_virtuals?) when is_integer(ix) do 1790 type!(kind, query, expr, ix, field, allow_virtuals?) 1791 end 1792 1793 defp field_type!(_kind, _query, _expr, type, _) do 1794 type 1795 end 1796 1797 defp type!(kind, query, expr, schema, field, allow_virtuals? \\ false) 1798 1799 defp type!(_kind, _query, _expr, nil, _field, _allow_virtuals?), do: :any 1800 1801 defp type!(kind, query, expr, ix, field, allow_virtuals?) when is_integer(ix) do 1802 case get_source!(kind, query, ix) do 1803 {:fragment, _, _} -> 1804 :any 1805 1806 {_, schema, _} -> 1807 type!(kind, query, expr, schema, field, allow_virtuals?) 1808 1809 %Ecto.SubQuery{select: select} -> 1810 case subquery_type_for(select, field) do 1811 {:ok, type} -> type 1812 :error -> error!(query, expr, "field `#{field}` does not exist in subquery") 1813 end 1814 end 1815 end 1816 1817 defp type!(kind, query, expr, schema, field, allow_virtuals?) when is_atom(schema) do 1818 cond do 1819 type = schema.__schema__(:type, field) -> 1820 type 1821 1822 type = allow_virtuals? && schema.__schema__(:virtual_type, field) -> 1823 type 1824 1825 Map.has_key?(schema.__struct__(), field) -> 1826 case schema.__schema__(:association, field) do 1827 %Ecto.Association.BelongsTo{owner_key: owner_key} -> 1828 error! query, expr, "field `#{field}` in `#{kind}` is an association in schema #{inspect schema}. " <> 1829 "Did you mean to use `#{owner_key}`?" 1830 %_{} -> 1831 error! query, expr, "field `#{field}` in `#{kind}` is an association in schema #{inspect schema}" 1832 1833 _ -> 1834 error! query, expr, "field `#{field}` in `#{kind}` is a virtual field in schema #{inspect schema}" 1835 end 1836 1837 true -> 1838 hint = closest_fields_hint(field, schema) 1839 error! query, expr, "field `#{field}` in `#{kind}` does not exist in schema #{inspect schema}", hint 1840 end 1841 end 1842 1843 defp closest_fields_hint(input, schema) do 1844 input_string = Atom.to_string(input) 1845 1846 schema.__schema__(:fields) 1847 |> Enum.map(fn field -> {field, String.jaro_distance(input_string, Atom.to_string(field))} end) 1848 |> Enum.filter(fn {_field, score} -> score >= 0.77 end) 1849 |> Enum.sort(& elem(&1, 0) >= elem(&2, 0)) 1850 |> Enum.take(5) 1851 |> Enum.map(&elem(&1, 0)) 1852 |> case do 1853 [] -> 1854 nil 1855 1856 [suggestion] -> 1857 "Did you mean `#{suggestion}`?" 1858 1859 suggestions -> 1860 Enum.reduce(suggestions, "Did you mean one of: \n", fn suggestion, acc -> 1861 acc <> "\n * `#{suggestion}`" 1862 end) 1863 end 1864 end 1865 1866 defp normalize_param(_kind, {:out, {:array, type}}, _value) do 1867 {:ok, type} 1868 end 1869 defp normalize_param(_kind, {:out, :any}, _value) do 1870 {:ok, :any} 1871 end 1872 defp normalize_param(kind, {:out, other}, value) do 1873 {:error, "value `#{inspect value}` in `#{kind}` expected to be part of an array " <> 1874 "but matched type is #{inspect other}"} 1875 end 1876 defp normalize_param(_kind, type, _value) do 1877 {:ok, type} 1878 end 1879 1880 defp cast_param(kind, type, v) do 1881 case Ecto.Type.cast(type, v) do 1882 {:ok, v} -> 1883 {:ok, v} 1884 _ -> 1885 {:error, "value `#{inspect v}` in `#{kind}` cannot be cast to type #{inspect type}"} 1886 end 1887 end 1888 1889 defp dump_param(adapter, type, v) do 1890 case Ecto.Type.adapter_dump(adapter, type, v) do 1891 {:ok, v} -> 1892 {:ok, v} 1893 :error -> 1894 {:error, "value `#{inspect v}` cannot be dumped to type #{inspect type}"} 1895 end 1896 end 1897 1898 defp field_source({source, schema, _}, field) when is_binary(source) and schema != nil do 1899 # If the field is not found we return the field itself 1900 # which will be checked and raise later. 1901 schema.__schema__(:field_source, field) || field 1902 end 1903 defp field_source(_, field) do 1904 field 1905 end 1906 1907 defp cte_fields([_key | _rest_keys], [{:selected_as, _, [_, _]} | _rest_fields], _acc) do 1908 raise ArgumentError, 1909 "`selected_as/2` can only be used in the outer most `select` expression. " <> 1910 "If you are attempting to alias a field from a subquery or cte, it is not allowed " <> 1911 "because the fields are automatically aliased by the corresponding map/struct key." 1912 end 1913 1914 defp cte_fields([key | rest_keys], [field | rest_fields], acc) do 1915 cte_fields(rest_keys, rest_fields, [{key, field} | acc]) 1916 end 1917 1918 defp cte_fields(_keys, [], acc), do: :lists.reverse(acc) 1919 defp cte_fields([], _fields, acc), do: :lists.reverse(acc) 1920 1921 defp assert_update!(%Ecto.Query{updates: updates} = query, operation) do 1922 changes = 1923 Enum.reduce(updates, %{}, fn update, acc -> 1924 Enum.reduce(update.expr, acc, fn {_op, kw}, acc -> 1925 Enum.reduce(kw, acc, fn {k, v}, acc -> 1926 if Map.has_key?(acc, k) do 1927 error! query, "duplicate field `#{k}` for `#{operation}`" 1928 else 1929 Map.put(acc, k, v) 1930 end 1931 end) 1932 end) 1933 end) 1934 1935 if changes == %{} do 1936 error! query, "`#{operation}` requires at least one field to be updated" 1937 end 1938 end 1939 1940 defp assert_no_update!(query, operation) do 1941 case query do 1942 %Ecto.Query{updates: []} -> query 1943 _ -> 1944 error! query, "`#{operation}` does not allow `update` expressions" 1945 end 1946 end 1947 1948 defp assert_only_filter_expressions!(query, operation) do 1949 case query do 1950 %Ecto.Query{order_bys: [], limit: nil, offset: nil, group_bys: [], 1951 havings: [], preloads: [], assocs: [], distinct: nil, lock: nil, 1952 windows: [], combinations: []} -> 1953 query 1954 _ -> 1955 error! query, "`#{operation}` allows only `with_cte`, `where` and `join` expressions. " <> 1956 "You can exclude unwanted expressions from a query by using " <> 1957 "Ecto.Query.exclude/2. Error found" 1958 end 1959 end 1960 1961 defp filter_and_reraise(exception, stacktrace) do 1962 reraise exception, Enum.reject(stacktrace, &match?({__MODULE__, _, _, _}, &1)) 1963 end 1964 1965 defp error!(query, message) do 1966 raise Ecto.QueryError, message: message, query: query 1967 end 1968 1969 defp error!(query, expr, message) do 1970 raise Ecto.QueryError, message: message, query: query, file: expr.file, line: expr.line 1971 end 1972 1973 defp error!(query, expr, message, hint) do 1974 raise Ecto.QueryError, message: message, query: query, file: expr.file, line: expr.line, hint: hint 1975 end 1976 end