zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

queryable.ex (16586B)


      1 defmodule Ecto.Repo.Queryable do
      2   @moduledoc false
      3 
      4   alias Ecto.Queryable
      5   alias Ecto.Query
      6   alias Ecto.Query.Planner
      7   alias Ecto.Query.SelectExpr
      8 
      9   import Ecto.Query.Planner, only: [attach_prefix: 2]
     10 
     11   require Ecto.Query
     12 
     13   def all(name, queryable, tuplet) do
     14     query =
     15       queryable
     16       |> Ecto.Queryable.to_query()
     17       |> Ecto.Query.Planner.ensure_select(true)
     18 
     19     execute(:all, name, query, tuplet) |> elem(1)
     20   end
     21 
     22   def stream(_name, queryable, {adapter_meta, opts}) do
     23     %{adapter: adapter, cache: cache, repo: repo} = adapter_meta
     24 
     25     query =
     26       queryable
     27       |> Ecto.Queryable.to_query()
     28       |> Ecto.Query.Planner.ensure_select(true)
     29 
     30     {query, opts} = repo.prepare_query(:stream, query, opts)
     31     query = attach_prefix(query, opts)
     32 
     33     {query_meta, prepared, cast_params, dump_params} =
     34       Planner.query(query, :all, cache, adapter, 0)
     35 
     36     opts = Keyword.put(opts, :cast_params, cast_params)
     37 
     38     case query_meta do
     39       %{select: nil} ->
     40         adapter_meta
     41         |> adapter.stream(query_meta, prepared, dump_params, opts)
     42         |> Stream.flat_map(fn {_, nil} -> [] end)
     43 
     44       %{select: select, preloads: preloads} ->
     45         %{
     46           assocs: assocs,
     47           preprocess: preprocess,
     48           postprocess: postprocess,
     49           take: take,
     50           from: from
     51         } = select
     52 
     53         if preloads != [] or assocs != [] do
     54           raise Ecto.QueryError, query: query, message: "preloads are not supported on streams"
     55         end
     56 
     57         preprocessor = preprocessor(from, preprocess, adapter)
     58         stream = adapter.stream(adapter_meta, query_meta, prepared, dump_params, opts)
     59         postprocessor = postprocessor(from, postprocess, take, adapter)
     60 
     61         stream
     62         |> Stream.flat_map(fn {_, rows} -> rows end)
     63         |> Stream.map(preprocessor)
     64         |> Stream.map(postprocessor)
     65     end
     66   end
     67 
     68   def get(name, queryable, id, opts) do
     69     one(name, query_for_get(queryable, id), opts)
     70   end
     71 
     72   def get!(name, queryable, id, opts) do
     73     one!(name, query_for_get(queryable, id), opts)
     74   end
     75 
     76   def get_by(name, queryable, clauses, opts) do
     77     one(name, query_for_get_by(queryable, clauses), opts)
     78   end
     79 
     80   def get_by!(name, queryable, clauses, opts) do
     81     one!(name, query_for_get_by(queryable, clauses), opts)
     82   end
     83 
     84   def reload(name, [head | _] = structs, opts) when is_list(structs) do
     85     results = all(name, query_for_reload(structs), opts)
     86 
     87     [pk] = head.__struct__.__schema__(:primary_key)
     88 
     89     for struct <- structs do
     90       struct_pk = Map.fetch!(struct, pk)
     91       Enum.find(results, &Map.fetch!(&1, pk) == struct_pk)
     92     end
     93   end
     94 
     95   def reload(name, struct, opts) do
     96     one(name, query_for_reload([struct]), opts)
     97   end
     98 
     99   def reload!(name, [head | _] = structs, opts) when is_list(structs) do
    100     query = query_for_reload(structs)
    101     results = all(name, query, opts)
    102 
    103     [pk] = head.__struct__.__schema__(:primary_key)
    104 
    105     for struct <- structs do
    106       struct_pk = Map.fetch!(struct, pk)
    107       Enum.find(results, &Map.fetch!(&1, pk) == struct_pk) || raise "could not reload #{inspect(struct)}, maybe it doesn't exist or was deleted"
    108     end
    109   end
    110 
    111   def reload!(name, struct, opts) do
    112     query = query_for_reload([struct])
    113     one!(name, query, opts)
    114   end
    115 
    116   def aggregate(name, queryable, aggregate, opts) do
    117     one!(name, query_for_aggregate(queryable, aggregate), opts)
    118   end
    119 
    120   def aggregate(name, queryable, aggregate, field, opts) do
    121     one!(name, query_for_aggregate(queryable, aggregate, field), opts)
    122   end
    123 
    124   def exists?(name, queryable, opts) do
    125     queryable =
    126       Query.exclude(queryable, :select)
    127       |> Query.exclude(:preload)
    128       |> Query.exclude(:order_by)
    129       |> Query.exclude(:distinct)
    130       |> Query.select(1)
    131       |> Query.limit(1)
    132       |> rewrite_combinations()
    133 
    134     case all(name, queryable, opts) do
    135       [1] -> true
    136       [] -> false
    137     end
    138   end
    139 
    140   defp rewrite_combinations(%{combinations: []} = query), do: query
    141 
    142   defp rewrite_combinations(%{combinations: combinations} = query) do
    143     combinations = Enum.map(combinations, fn {type, query} ->
    144       {type, query |> Query.exclude(:select) |> Query.select(1)}
    145     end)
    146 
    147     %{query | combinations: combinations}
    148   end
    149 
    150   def one(name, queryable, tuplet) do
    151     case all(name, queryable, tuplet) do
    152       [one] -> one
    153       [] -> nil
    154       other -> raise Ecto.MultipleResultsError, queryable: queryable, count: length(other)
    155     end
    156   end
    157 
    158   def one!(name, queryable, tuplet) do
    159     case all(name, queryable, tuplet) do
    160       [one] -> one
    161       [] -> raise Ecto.NoResultsError, queryable: queryable
    162       other -> raise Ecto.MultipleResultsError, queryable: queryable, count: length(other)
    163     end
    164   end
    165 
    166   def update_all(name, queryable, [], tuplet) do
    167     update_all(name, queryable, tuplet)
    168   end
    169 
    170   def update_all(name, queryable, updates, tuplet) do
    171     query = Query.from(queryable, update: ^updates)
    172     update_all(name, query, tuplet)
    173   end
    174 
    175   defp update_all(name, queryable, tuplet) do
    176     query = Ecto.Queryable.to_query(queryable)
    177     execute(:update_all, name, query, tuplet)
    178   end
    179 
    180   def delete_all(name, queryable, tuplet) do
    181     query = Ecto.Queryable.to_query(queryable)
    182     execute(:delete_all, name, query, tuplet)
    183   end
    184 
    185   @doc """
    186   Load structs from query.
    187   """
    188   def struct_load!([{field, type} | types], [value | values], acc, all_nil?, struct, adapter) do
    189     all_nil? = all_nil? and value == nil
    190     value = load!(type, value, field, struct, adapter)
    191     struct_load!(types, values, [{field, value} | acc], all_nil?, struct, adapter)
    192   end
    193 
    194   def struct_load!([], values, _acc, true, _struct, _adapter) do
    195     {nil, values}
    196   end
    197 
    198   def struct_load!([], values, acc, false, struct, _adapter) do
    199     {Map.merge(struct, Map.new(acc)), values}
    200   end
    201 
    202   ## Helpers
    203 
    204   defp execute(operation, name, query, {adapter_meta, opts} = tuplet) do
    205     %{adapter: adapter, cache: cache, repo: repo} = adapter_meta
    206 
    207     {query, opts} = repo.prepare_query(operation, query, opts)
    208     query = attach_prefix(query, opts)
    209 
    210     {query_meta, prepared, cast_params, dump_params} =
    211       Planner.query(query, operation, cache, adapter, 0)
    212 
    213     opts = Keyword.put(opts, :cast_params, cast_params)
    214 
    215     case query_meta do
    216       %{select: nil} ->
    217         adapter.execute(adapter_meta, query_meta, prepared, dump_params, opts)
    218 
    219       %{select: select, sources: sources, preloads: preloads} ->
    220         %{
    221           preprocess: preprocess,
    222           postprocess: postprocess,
    223           take: take,
    224           assocs: assocs,
    225           from: from
    226         } = select
    227 
    228         preprocessor = preprocessor(from, preprocess, adapter)
    229         {count, rows} = adapter.execute(adapter_meta, query_meta, prepared, dump_params, opts)
    230         postprocessor = postprocessor(from, postprocess, take, adapter)
    231 
    232         {count,
    233          rows
    234          |> Ecto.Repo.Assoc.query(assocs, sources, preprocessor)
    235          |> Ecto.Repo.Preloader.query(name, preloads, take, postprocessor, tuplet)}
    236     end
    237   end
    238 
    239   defp preprocessor({_, {:source, {source, schema}, prefix, types}}, preprocess, adapter) do
    240     struct = Ecto.Schema.Loader.load_struct(schema, prefix, source)
    241 
    242     fn row ->
    243       {entry, rest} = struct_load!(types, row, [], false, struct, adapter)
    244       preprocess(rest, preprocess, entry, adapter)
    245     end
    246   end
    247 
    248   defp preprocessor({_, from}, preprocess, adapter) do
    249     fn row ->
    250       {entry, rest} = process(row, from, nil, adapter)
    251       preprocess(rest, preprocess, entry, adapter)
    252     end
    253   end
    254 
    255   defp preprocessor(:none, preprocess, adapter) do
    256     fn row ->
    257       preprocess(row, preprocess, nil, adapter)
    258     end
    259   end
    260 
    261   defp preprocess(row, [], _from, _adapter) do
    262     row
    263   end
    264 
    265   defp preprocess(row, [source | sources], from, adapter) do
    266     {entry, rest} = process(row, source, from, adapter)
    267     [entry | preprocess(rest, sources, from, adapter)]
    268   end
    269 
    270   defp postprocessor({:any, _}, postprocess, _take, adapter) do
    271     fn [from | row] ->
    272       row |> process(postprocess, from, adapter) |> elem(0)
    273     end
    274   end
    275 
    276   defp postprocessor({:map, _}, postprocess, take, adapter) do
    277     fn [from | row] ->
    278       row |> process(postprocess, to_map(from, take), adapter) |> elem(0)
    279     end
    280   end
    281 
    282   defp postprocessor(:none, postprocess, _take, adapter) do
    283     fn row -> row |> process(postprocess, nil, adapter) |> elem(0) end
    284   end
    285 
    286   defp process(row, {:source, :from}, from, _adapter) do
    287     {from, row}
    288   end
    289 
    290   defp process(row, {:source, {source, schema}, prefix, types}, _from, adapter) do
    291     struct = Ecto.Schema.Loader.load_struct(schema, prefix, source)
    292     struct_load!(types, row, [], true, struct, adapter)
    293   end
    294 
    295   defp process(row, {:merge, left, right}, from, adapter) do
    296     {left, row} = process(row, left, from, adapter)
    297     {right, row} = process(row, right, from, adapter)
    298 
    299     data =
    300       case {left, right} do
    301         {%{__struct__: s}, %{__struct__: s}} ->
    302           Map.merge(left, right)
    303 
    304         {%{__struct__: left_struct}, %{__struct__: right_struct}} ->
    305           raise ArgumentError,
    306                 "cannot merge structs of different types, " <>
    307                   "got: #{inspect(left_struct)} and #{inspect(right_struct)}"
    308 
    309         {%{__struct__: name}, %{}} ->
    310           for {key, _} <- right, not Map.has_key?(left, key) do
    311             raise ArgumentError, "struct #{inspect(name)} does not have the key #{inspect(key)}"
    312           end
    313 
    314           Map.merge(left, right)
    315 
    316         {%{}, %{}} ->
    317           Map.merge(left, right)
    318 
    319         {%{}, nil} ->
    320           left
    321 
    322         {_, %{}} ->
    323           raise ArgumentError,
    324                 "cannot merge because the left side is not a map, got: #{inspect(left)}"
    325 
    326         {%{}, _} ->
    327           raise ArgumentError,
    328                 "cannot merge because the right side is not a map, got: #{inspect(right)}"
    329       end
    330 
    331     {data, row}
    332   end
    333 
    334   defp process(row, {:struct, struct, data, args}, from, adapter) do
    335     case process(row, data, from, adapter) do
    336       {%{__struct__: ^struct} = data, row} ->
    337         process_update(data, args, row, from, adapter)
    338 
    339       {data, _row} ->
    340         raise BadStructError, struct: struct, term: data
    341     end
    342   end
    343 
    344   defp process(row, {:struct, struct, args}, from, adapter) do
    345     {fields, row} = process_kv(args, row, from, adapter)
    346 
    347     case Map.merge(struct.__struct__(), Map.new(fields)) do
    348       %{__meta__: %Ecto.Schema.Metadata{state: state} = metadata} = struct
    349       when state != :loaded ->
    350         {Map.replace!(struct, :__meta__, %{metadata | state: :loaded}), row}
    351 
    352       map ->
    353         {map, row}
    354     end
    355   end
    356 
    357   defp process(row, {:map, data, args}, from, adapter) do
    358     {data, row} = process(row, data, from, adapter)
    359     process_update(data, args, row, from, adapter)
    360   end
    361 
    362   defp process(row, {:map, args}, from, adapter) do
    363     {args, row} = process_kv(args, row, from, adapter)
    364     {Map.new(args), row}
    365   end
    366 
    367   defp process(row, {:list, args}, from, adapter) do
    368     process_args(args, row, from, adapter)
    369   end
    370 
    371   defp process(row, {:tuple, args}, from, adapter) do
    372     {args, row} = process_args(args, row, from, adapter)
    373     {List.to_tuple(args), row}
    374   end
    375 
    376   defp process([value | row], {:value, :any}, _from, _adapter) do
    377     {value, row}
    378   end
    379 
    380   defp process([value | row], {:value, type}, _from, adapter) do
    381     {load!(type, value, nil, nil, adapter), row}
    382   end
    383 
    384   defp process(row, value, _from, _adapter)
    385        when is_binary(value) or is_number(value) or is_atom(value) do
    386     {value, row}
    387   end
    388 
    389   defp process_update(data, args, row, from, adapter) do
    390     {args, row} = process_kv(args, row, from, adapter)
    391     data = Enum.reduce(args, data, fn {key, value}, acc -> %{acc | key => value} end)
    392     {data, row}
    393   end
    394 
    395   defp process_args(args, row, from, adapter) do
    396     Enum.map_reduce(args, row, fn arg, row ->
    397       process(row, arg, from, adapter)
    398     end)
    399   end
    400 
    401   defp process_kv(kv, row, from, adapter) do
    402     Enum.map_reduce(kv, row, fn {key, value}, row ->
    403       {key, row} = process(row, key, from, adapter)
    404       {value, row} = process(row, value, from, adapter)
    405       {{key, value}, row}
    406     end)
    407   end
    408 
    409   @compile {:inline, load!: 5}
    410   defp load!(type, value, field, struct, adapter) do
    411     case Ecto.Type.adapter_load(adapter, type, value) do
    412       {:ok, value} ->
    413         value
    414 
    415       :error ->
    416         field = field && " for field #{inspect(field)}"
    417         struct = struct && " in #{inspect(struct)}"
    418 
    419         raise ArgumentError,
    420               "cannot load `#{inspect(value)}` as type #{inspect(type)}#{field}#{struct}"
    421     end
    422   end
    423 
    424   defp to_map(nil, _fields) do
    425     nil
    426   end
    427 
    428   defp to_map(value, fields) when is_list(value) do
    429     Enum.map(value, &to_map(&1, fields))
    430   end
    431 
    432   defp to_map(value, fields) do
    433     for field <- fields, into: %{} do
    434       case field do
    435         {k, v} -> {k, to_map(Map.fetch!(value, k), List.wrap(v))}
    436         k -> {k, Map.fetch!(value, k)}
    437       end
    438     end
    439   end
    440 
    441   defp query_for_get(_queryable, nil) do
    442     raise ArgumentError, "cannot perform Ecto.Repo.get/2 because the given value is nil"
    443   end
    444 
    445   defp query_for_get(queryable, id) do
    446     query = Queryable.to_query(queryable)
    447     schema = assert_schema!(query)
    448 
    449     case schema.__schema__(:primary_key) do
    450       [pk] ->
    451         Query.from(x in query, where: field(x, ^pk) == ^id)
    452 
    453       pks ->
    454         raise ArgumentError,
    455               "Ecto.Repo.get/2 requires the schema #{inspect(schema)} " <>
    456                 "to have exactly one primary key, got: #{inspect(pks)}"
    457     end
    458   end
    459 
    460   defp query_for_get_by(queryable, clauses) do
    461     Query.where(queryable, [], ^Enum.to_list(clauses))
    462   end
    463 
    464   defp query_for_reload([head| _] = structs) do
    465     assert_structs!(structs)
    466 
    467     schema = head.__struct__
    468     prefix = head.__meta__.prefix
    469 
    470     case schema.__schema__(:primary_key) do
    471       [pk] ->
    472         keys = Enum.map(structs, &get_pk!(&1, pk))
    473         query = Query.from(x in schema, where: field(x, ^pk) in ^keys)
    474         %{query | prefix: prefix}
    475 
    476       pks ->
    477         raise ArgumentError,
    478               "Ecto.Repo.reload/2 requires the schema #{inspect(schema)} " <>
    479                 "to have exactly one primary key, got: #{inspect(pks)}"
    480     end
    481   end
    482 
    483   defp query_for_aggregate(queryable, aggregate) do
    484     query =
    485       case prepare_for_aggregate(queryable) do
    486         %{distinct: nil, limit: nil, offset: nil, combinations: []} = query ->
    487           %{query | order_bys: []}
    488 
    489         query ->
    490           query
    491           |> Query.subquery()
    492           |> Queryable.Ecto.SubQuery.to_query()
    493       end
    494 
    495     select = %SelectExpr{expr: {aggregate, [], []}, file: __ENV__.file, line: __ENV__.line}
    496     %{query | select: select}
    497   end
    498 
    499   defp query_for_aggregate(queryable, aggregate, field) do
    500     ast = field(0, field)
    501 
    502     query =
    503       case prepare_for_aggregate(queryable) do
    504         %{distinct: nil, limit: nil, offset: nil, combinations: []} = query ->
    505           %{query | order_bys: []}
    506 
    507         query ->
    508           select = %SelectExpr{expr: ast, file: __ENV__.file, line: __ENV__.line}
    509 
    510           %{query | select: select}
    511           |> Query.subquery()
    512           |> Queryable.Ecto.SubQuery.to_query()
    513       end
    514 
    515     select = %SelectExpr{expr: {aggregate, [], [ast]}, file: __ENV__.file, line: __ENV__.line}
    516     %{query | select: select}
    517   end
    518 
    519   defp prepare_for_aggregate(queryable) do
    520     case %{Queryable.to_query(queryable) | preloads: [], assocs: []} do
    521       %{group_bys: [_ | _]} = query ->
    522         raise Ecto.QueryError, message: "cannot aggregate on query with group_by", query: query
    523 
    524       %{} = query ->
    525         query
    526     end
    527   end
    528 
    529   defp field(ix, field) when is_integer(ix) and is_atom(field) do
    530     {{:., [], [{:&, [], [ix]}, field]}, [], []}
    531   end
    532 
    533   defp assert_schema!(%{from: %{source: {_source, schema}}}) when schema != nil, do: schema
    534 
    535   defp assert_schema!(query) do
    536     raise Ecto.QueryError,
    537       query: query,
    538       message: "expected a from expression with a schema"
    539   end
    540 
    541   defp assert_structs!([head | _] = structs) when is_list(structs) do
    542     unless Enum.all?(structs, &schema?/1) do
    543       raise ArgumentError, "expected a struct or a list of structs, received #{inspect(structs)}"
    544     end
    545 
    546     unless Enum.all?(structs, &(&1.__struct__ == head.__struct__)) do
    547       raise ArgumentError, "expected an homogenous list, received different struct types"
    548     end
    549 
    550     :ok
    551   end
    552 
    553   defp schema?(%{__meta__: _}), do: true
    554   defp schema?(_), do: false
    555 
    556   defp get_pk!(struct, pk) do
    557     struct
    558     |> Map.fetch!(pk)
    559     |> case do
    560       nil ->
    561         raise ArgumentError, "Ecto.Repo.reload/2 expects existent structs, found a `nil` primary key"
    562       key ->
    563         key
    564     end
    565   end
    566 end