zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

schema.ex (37328B)


      1 defmodule Ecto.Repo.Schema do
      2   # The module invoked by user defined repos
      3   # for schema related functionality.
      4   @moduledoc false
      5 
      6   alias Ecto.Changeset
      7   alias Ecto.Changeset.Relation
      8   require Ecto.Query
      9 
     10   import Ecto.Query.Planner, only: [attach_prefix: 2]
     11 
     12   @doc """
     13   Implementation for `Ecto.Repo.insert_all/3`.
     14   """
     15   def insert_all(repo, name, schema, rows, tuplet) when is_atom(schema) do
     16     do_insert_all(repo, name, schema, schema.__schema__(:prefix),
     17                   schema.__schema__(:source), rows, tuplet)
     18   end
     19 
     20   def insert_all(repo, name, table, rows, tuplet) when is_binary(table) do
     21     do_insert_all(repo, name, nil, nil, table, rows, tuplet)
     22   end
     23 
     24   def insert_all(repo, name, {source, schema}, rows, tuplet) when is_atom(schema) do
     25     do_insert_all(repo, name, schema, schema.__schema__(:prefix), source, rows, tuplet)
     26   end
     27 
     28   defp do_insert_all(_repo, _name, _schema, _prefix, _source, [], {_adapter_meta, opts}) do
     29     if opts[:returning] do
     30       {0, []}
     31     else
     32       {0, nil}
     33     end
     34   end
     35 
     36   defp do_insert_all(repo, _name, schema, prefix, source, rows_or_query, {adapter_meta, opts}) do
     37     %{adapter: adapter} = adapter_meta
     38     autogen_id = schema && schema.__schema__(:autogenerate_id)
     39     dumper = schema && schema.__schema__(:dump)
     40     placeholder_map = Keyword.get(opts, :placeholders, %{})
     41 
     42     {return_fields_or_types, return_sources} =
     43       schema
     44       |> returning(opts)
     45       |> fields_to_sources(dumper)
     46 
     47     {rows_or_query, header, row_cast_params, placeholder_cast_params, placeholder_dump_params, counter} =
     48       extract_header_and_fields(repo, rows_or_query, schema, dumper, autogen_id, placeholder_map, adapter, opts)
     49 
     50     schema_meta = metadata(schema, prefix, source, autogen_id, nil, opts)
     51 
     52     on_conflict = Keyword.get(opts, :on_conflict, :raise)
     53     conflict_target = Keyword.get(opts, :conflict_target, [])
     54     conflict_target = conflict_target(conflict_target, dumper)
     55     {on_conflict, conflict_cast_params} = on_conflict(on_conflict, conflict_target, schema_meta, counter, adapter)
     56     opts = Keyword.put(opts, :cast_params, placeholder_cast_params ++ row_cast_params ++ conflict_cast_params)
     57 
     58     {count, rows_or_query} =
     59       adapter.insert_all(adapter_meta, schema_meta, header, rows_or_query, on_conflict, return_sources, placeholder_dump_params, opts)
     60 
     61     {count, postprocess(rows_or_query, return_fields_or_types, adapter, schema, schema_meta)}
     62   end
     63 
     64   defp postprocess(nil, [], _adapter, _schema, _schema_meta) do
     65     nil
     66   end
     67 
     68   defp postprocess(rows, fields, _adapter, nil, _schema_meta) do
     69     for row <- rows, do: Map.new(Enum.zip(fields, row))
     70   end
     71 
     72   defp postprocess(rows, types, adapter, schema, %{prefix: prefix, source: source}) do
     73     struct = Ecto.Schema.Loader.load_struct(schema, prefix, source)
     74 
     75     for row <- rows do
     76       {loaded, _} = Ecto.Repo.Queryable.struct_load!(types, row, [], false, struct, adapter)
     77       loaded
     78     end
     79   end
     80 
     81   defp extract_header_and_fields(_repo, rows, schema, dumper, autogen_id, placeholder_map, adapter, _opts)
     82        when is_list(rows) do
     83     mapper = init_mapper(schema, dumper, adapter, placeholder_map)
     84 
     85     {rows, {header, placeholder_dump, _}} =
     86       Enum.map_reduce(rows, {%{}, %{}, 1}, fn fields, acc ->
     87         {fields, {header, placeholder_dump, counter}} = Enum.map_reduce(fields, acc, mapper)
     88         {fields, header} = autogenerate_id(autogen_id, fields, header, adapter)
     89         {fields, {header, placeholder_dump, counter}}
     90       end)
     91 
     92     header = Map.keys(header)
     93 
     94     placeholder_size = map_size(placeholder_dump)
     95 
     96     {placeholder_cast_params, placeholder_dump_params} =
     97       placeholder_dump
     98       |> Enum.map(fn {_, {idx, _, cast_value, dump_value}} -> {idx, cast_value, dump_value} end)
     99       |> Enum.sort
    100       |> Enum.map(&{elem(&1, 1), elem(&1, 2)})
    101       |> Enum.unzip
    102 
    103     {rows, row_cast_params, counter} = plan_query_in_rows(rows, header, adapter, placeholder_size)
    104     {rows, header, row_cast_params, placeholder_cast_params, placeholder_dump_params, fn -> counter end}
    105   end
    106 
    107   defp extract_header_and_fields(repo, %Ecto.Query{} = query, _schema, _dumper, _autogen_id, _placeholder_map, adapter, opts) do
    108     {query, opts} = repo.prepare_query(:insert_all, query, opts)
    109     query = attach_prefix(query, opts)
    110 
    111     {query, cast_params, dump_params} = Ecto.Adapter.Queryable.plan_query(:insert_all, adapter, query)
    112 
    113     header = case query.select do
    114       %Ecto.Query.SelectExpr{expr: {:%{}, _ctx, args}} ->
    115         Enum.map(args, &elem(&1, 0))
    116 
    117       _ ->
    118         raise ArgumentError, """
    119         cannot generate a fields list for insert_all from the given source query
    120         because it does not have a select clause that uses a map:
    121 
    122           #{inspect query}
    123 
    124         Please add a select clause that selects into a map, like this:
    125 
    126           from x in Source,
    127             ...,
    128             select: %{
    129               field_a: x.bar,
    130               field_b: x.foo
    131             }
    132 
    133         The keys must exist in the schema that is being inserted into
    134         """
    135     end
    136 
    137     counter = fn -> length(dump_params) end
    138 
    139     {{query, dump_params}, header, cast_params, [], [], counter}
    140   end
    141 
    142   defp extract_header_and_fields(_repo, rows_or_query, _schema, _dumper, _autogen_id, _placeholder_map, _adapter, _opts) do
    143     raise ArgumentError, "expected a list of rows or a query, but got #{inspect rows_or_query} as rows_or_query argument in insert_all"
    144   end
    145 
    146   defp init_mapper(nil, _dumper, _adapter, placeholder_map) do
    147     fn {field, value}, acc ->
    148       extract_value(field, value, :any, placeholder_map, acc, & &1)
    149     end
    150   end
    151 
    152   defp init_mapper(schema, dumper, adapter, placeholder_map) do
    153     fn {field, value}, acc ->
    154       case dumper do
    155         %{^field => {source, type}} ->
    156           extract_value(source, value, type, placeholder_map, acc, fn val ->
    157             dump_field!(:insert_all, schema, field, type, val, adapter)
    158           end)
    159 
    160         %{} ->
    161           raise ArgumentError,
    162                 "unknown field `#{inspect(field)}` in schema #{inspect(schema)} given to " <>
    163                   "insert_all. Note virtual fields and associations are not supported"
    164       end
    165     end
    166   end
    167 
    168   defp extract_value(source, value, type, placeholder_map, acc, dumper) do
    169     {header, placeholder_dump, counter} = acc
    170 
    171     case value do
    172       %Ecto.Query{} = query ->
    173         {{source, query}, {Map.put(header, source, true), placeholder_dump, counter}}
    174 
    175       {:placeholder, key} ->
    176         {value, placeholder_dump, counter} =
    177           extract_placeholder(key, type, placeholder_map, placeholder_dump, counter, dumper)
    178 
    179         {{source, value},
    180           {Map.put(header, source, true), placeholder_dump, counter}}
    181 
    182       cast_value ->
    183         {{source, cast_value, dumper.(value)},
    184          {Map.put(header, source, true), placeholder_dump, counter}}
    185     end
    186   end
    187 
    188   defp extract_placeholder(key, type, placeholder_map, placeholder_dump, counter, dumper) do
    189     case placeholder_dump do
    190       %{^key => {idx, ^type, _, _}} ->
    191         {{:placeholder, idx}, placeholder_dump, counter}
    192 
    193       %{^key => {_, type, _}} ->
    194         raise ArgumentError,
    195               "a placeholder key can only be used with columns of the same type. " <>
    196                 "The key #{inspect(key)} has already been dumped as a #{inspect(type)}"
    197 
    198       %{} ->
    199         {cast_value, dump_value} =
    200           case placeholder_map do
    201             %{^key => cast_value} ->
    202               {cast_value, dumper.(cast_value)}
    203 
    204             _ ->
    205               raise KeyError,
    206                     "placeholder key #{inspect(key)} not found in #{inspect(placeholder_map)}"
    207           end
    208 
    209         placeholder_dump = Map.put(placeholder_dump, key, {counter, type, cast_value, dump_value})
    210         {{:placeholder, counter}, placeholder_dump, counter + 1}
    211     end
    212   end
    213 
    214   defp plan_query_in_rows(rows, header, adapter, counter) do
    215     {rows, {cast_params, counter}} =
    216       Enum.map_reduce(rows, {[], counter}, fn fields, {cast_param_acc, counter} ->
    217         Enum.flat_map_reduce(header, {cast_param_acc, counter}, fn key, {cast_param_acc, counter} ->
    218           case :lists.keyfind(key, 1, fields) do
    219             {^key, %Ecto.Query{} = query} ->
    220               {query, params, _} = Ecto.Query.Planner.plan(query, :all, adapter)
    221               {cast_params, dump_params} = Enum.unzip(params)
    222               {query, _} = Ecto.Query.Planner.normalize(query, :all, adapter, counter)
    223               num_params = length(dump_params)
    224 
    225               {[{key, {query, dump_params}}], {Enum.reverse(cast_params, cast_param_acc), counter + num_params}}
    226 
    227             {^key, {:placeholder, _} = value} ->
    228               {[{key, value}], {cast_param_acc, counter}}
    229 
    230             {^key, cast_value, dump_value} ->
    231               {[{key, dump_value}], {[cast_value | cast_param_acc], counter + 1}}
    232 
    233             false ->
    234               {[], {cast_param_acc, counter}}
    235           end
    236         end)
    237       end)
    238 
    239     {rows, Enum.reverse(cast_params), counter}
    240   end
    241 
    242   defp autogenerate_id(nil, fields, header, _adapter) do
    243     {fields, header}
    244   end
    245 
    246   defp autogenerate_id({key, source, type}, fields, header, adapter) do
    247     case :lists.keyfind(key, 1, fields) do
    248       {^key, _, _} ->
    249         {fields, header}
    250 
    251       false ->
    252         if dump_value = Ecto.Type.adapter_autogenerate(adapter, type) do
    253           {:ok, cast_value} = Ecto.Type.adapter_load(adapter, type, dump_value)
    254           {[{source, cast_value, dump_value} | fields], Map.put(header, source, true)}
    255         else
    256           {fields, header}
    257         end
    258     end
    259   end
    260 
    261   @doc """
    262   Implementation for `Ecto.Repo.insert!/2`.
    263   """
    264   def insert!(repo, name, struct_or_changeset, tuplet) do
    265     case insert(repo, name, struct_or_changeset, tuplet) do
    266       {:ok, struct} ->
    267         struct
    268 
    269       {:error, %Ecto.Changeset{} = changeset} ->
    270         raise Ecto.InvalidChangesetError, action: :insert, changeset: changeset
    271     end
    272   end
    273 
    274   @doc """
    275   Implementation for `Ecto.Repo.update!/2`.
    276   """
    277   def update!(repo, name, struct_or_changeset, tuplet) do
    278     case update(repo, name, struct_or_changeset, tuplet) do
    279       {:ok, struct} ->
    280         struct
    281 
    282       {:error, %Ecto.Changeset{} = changeset} ->
    283         raise Ecto.InvalidChangesetError, action: :update, changeset: changeset
    284     end
    285   end
    286 
    287   @doc """
    288   Implementation for `Ecto.Repo.delete!/2`.
    289   """
    290   def delete!(repo, name, struct_or_changeset, tuplet) do
    291     case delete(repo, name, struct_or_changeset, tuplet) do
    292       {:ok, struct} ->
    293         struct
    294 
    295       {:error, %Ecto.Changeset{} = changeset} ->
    296         raise Ecto.InvalidChangesetError, action: :delete, changeset: changeset
    297     end
    298   end
    299 
    300   @doc """
    301   Implementation for `Ecto.Repo.insert/2`.
    302   """
    303   def insert(repo, name, %Changeset{} = changeset, tuplet) do
    304     do_insert(repo, name, changeset, tuplet)
    305   end
    306 
    307   def insert(repo, name, %{__struct__: _} = struct, tuplet) do
    308     do_insert(repo, name, Ecto.Changeset.change(struct), tuplet)
    309   end
    310 
    311   defp do_insert(repo, _name, %Changeset{valid?: true} = changeset, {adapter_meta, opts} = tuplet) do
    312     %{adapter: adapter} = adapter_meta
    313     %{prepare: prepare, repo_opts: repo_opts} = changeset
    314     opts = Keyword.merge(repo_opts, opts)
    315 
    316     struct = struct_from_changeset!(:insert, changeset)
    317     schema = struct.__struct__
    318     dumper = schema.__schema__(:dump)
    319     fields = schema.__schema__(:fields)
    320     assocs = schema.__schema__(:associations)
    321     embeds = schema.__schema__(:embeds)
    322 
    323     {return_types, return_sources} =
    324       schema
    325       |> returning(opts)
    326       |> add_read_after_writes(schema)
    327       |> fields_to_sources(dumper)
    328 
    329     on_conflict = Keyword.get(opts, :on_conflict, :raise)
    330     conflict_target = Keyword.get(opts, :conflict_target, [])
    331     conflict_target = conflict_target(conflict_target, dumper)
    332 
    333     # On insert, we always merge the whole struct into the
    334     # changeset as changes, except the primary key if it is nil.
    335     changeset = put_repo_and_action(changeset, :insert, repo, tuplet)
    336     changeset = Relation.surface_changes(changeset, struct, fields ++ assocs)
    337 
    338     wrap_in_transaction(adapter, adapter_meta, opts, changeset, assocs, embeds, prepare, fn ->
    339       assoc_opts = assoc_opts(assocs, opts)
    340       user_changeset = run_prepare(changeset, prepare)
    341 
    342       {changeset, parents, children} = pop_assocs(user_changeset, assocs)
    343       changeset = process_parents(changeset, user_changeset, parents, adapter, assoc_opts)
    344 
    345       if changeset.valid? do
    346         embeds = Ecto.Embedded.prepare(changeset, embeds, adapter, :insert)
    347 
    348         autogen_id = schema.__schema__(:autogenerate_id)
    349         schema_meta = metadata(struct, autogen_id, opts)
    350         changes = Map.merge(changeset.changes, embeds)
    351 
    352         {changes, cast_extra, dump_extra, return_types, return_sources} =
    353           autogenerate_id(autogen_id, changes, return_types, return_sources, adapter)
    354 
    355         changes = Map.take(changes, fields)
    356         autogen = autogenerate_changes(schema, :insert, changes)
    357 
    358         dump_changes =
    359           dump_changes!(:insert, changes, autogen, schema, dump_extra, dumper, adapter)
    360 
    361         {on_conflict, conflict_cast_params} =
    362           on_conflict(on_conflict, conflict_target, schema_meta, fn -> length(dump_changes) end, adapter)
    363 
    364         change_values = Enum.map(changes, &elem(&1, 1))
    365         autogen_values = Enum.map(autogen, &elem(&1, 1))
    366         opts = Keyword.put(opts, :cast_params, change_values ++ autogen_values ++ cast_extra ++ conflict_cast_params)
    367         args = [adapter_meta, schema_meta, dump_changes, on_conflict, return_sources, opts]
    368 
    369         case apply(user_changeset, adapter, :insert, args) do
    370           {:ok, values} ->
    371             values = dump_extra ++ values
    372 
    373             changeset
    374             |> load_changes(:loaded, return_types, values, embeds, autogen, adapter, schema_meta)
    375             |> process_children(user_changeset, children, adapter, assoc_opts)
    376 
    377           {:error, _} = error ->
    378             error
    379         end
    380       else
    381         {:error, changeset}
    382       end
    383     end)
    384   end
    385 
    386   defp do_insert(repo, _name, %Changeset{valid?: false} = changeset, tuplet) do
    387     {:error, put_repo_and_action(changeset, :insert, repo, tuplet)}
    388   end
    389 
    390   @doc """
    391   Implementation for `Ecto.Repo.update/2`.
    392   """
    393   def update(repo, name, %Changeset{} = changeset, tuplet) do
    394     do_update(repo, name, changeset, tuplet)
    395   end
    396 
    397   def update(_repo, _name, %{__struct__: _}, _tuplet) do
    398     raise ArgumentError, "giving a struct to Ecto.Repo.update/2 is not supported. " <>
    399                          "Ecto is unable to properly track changes when a struct is given, " <>
    400                          "an Ecto.Changeset must be given instead"
    401   end
    402 
    403   defp do_update(repo, _name, %Changeset{valid?: true} = changeset, {adapter_meta, opts} = tuplet) do
    404     %{adapter: adapter} = adapter_meta
    405     %{prepare: prepare, repo_opts: repo_opts} = changeset
    406     opts = Keyword.merge(repo_opts, opts)
    407 
    408     struct = struct_from_changeset!(:update, changeset)
    409     schema = struct.__struct__
    410     dumper = schema.__schema__(:dump)
    411     fields = schema.__schema__(:fields)
    412     assocs = schema.__schema__(:associations)
    413     embeds = schema.__schema__(:embeds)
    414 
    415     force? = !!opts[:force]
    416     filters = add_pk_filter!(changeset.filters, struct)
    417 
    418     {return_types, return_sources} =
    419       schema
    420       |> returning(opts)
    421       |> add_read_after_writes(schema)
    422       |> fields_to_sources(dumper)
    423 
    424     # Differently from insert, update does not copy the struct
    425     # fields into the changeset. All changes must be in the
    426     # changeset before hand.
    427     changeset = put_repo_and_action(changeset, :update, repo, tuplet)
    428 
    429     if changeset.changes != %{} or force? do
    430       wrap_in_transaction(adapter, adapter_meta, opts, changeset, assocs, embeds, prepare, fn ->
    431         assoc_opts = assoc_opts(assocs, opts)
    432         user_changeset = run_prepare(changeset, prepare)
    433 
    434         {changeset, parents, children} = pop_assocs(user_changeset, assocs)
    435         changeset = process_parents(changeset, user_changeset, parents, adapter, assoc_opts)
    436 
    437         if changeset.valid? do
    438           embeds = Ecto.Embedded.prepare(changeset, embeds, adapter, :update)
    439 
    440           changes = changeset.changes |> Map.merge(embeds) |> Map.take(fields)
    441           autogen = autogenerate_changes(schema, :update, changes)
    442           dump_changes = dump_changes!(:update, changes, autogen, schema, [], dumper, adapter)
    443 
    444           schema_meta = metadata(struct, schema.__schema__(:autogenerate_id), opts)
    445           dump_filters = dump_fields!(:update, schema, filters, dumper, adapter)
    446 
    447           change_values = Enum.map(changes, &elem(&1, 1))
    448           autogen_values = Enum.map(autogen, &elem(&1, 1))
    449           filter_values = Enum.map(filters, &elem(&1, 1))
    450           opts = Keyword.put(opts, :cast_params, change_values ++ autogen_values ++ filter_values)
    451           args = [adapter_meta, schema_meta, dump_changes, dump_filters, return_sources, opts]
    452 
    453           # If there are no changes or all the changes were autogenerated but not forced, we skip
    454           {action, autogen} =
    455             if changes != %{} or (autogen != [] and force?),
    456                do: {:update, autogen},
    457                else: {:noop, []}
    458 
    459           case apply(user_changeset, adapter, action, args) do
    460             {:ok, values} ->
    461               changeset
    462               |> load_changes(:loaded, return_types, values, embeds, autogen, adapter, schema_meta)
    463               |> process_children(user_changeset, children, adapter, assoc_opts)
    464 
    465             {:error, _} = error ->
    466               error
    467           end
    468         else
    469           {:error, changeset}
    470         end
    471       end)
    472     else
    473       {:ok, changeset.data}
    474     end
    475   end
    476 
    477   defp do_update(repo, _name, %Changeset{valid?: false} = changeset, tuplet) do
    478     {:error, put_repo_and_action(changeset, :update, repo, tuplet)}
    479   end
    480 
    481   @doc """
    482   Implementation for `Ecto.Repo.insert_or_update/2`.
    483   """
    484   def insert_or_update(repo, name, changeset, tuplet) do
    485     case get_state(changeset) do
    486       :built  -> insert(repo, name, changeset, tuplet)
    487       :loaded -> update(repo, name, changeset, tuplet)
    488       state   -> raise ArgumentError, "the changeset has an invalid state " <>
    489                                       "for Repo.insert_or_update/2: #{state}"
    490     end
    491   end
    492 
    493   @doc """
    494   Implementation for `Ecto.Repo.insert_or_update!/2`.
    495   """
    496   def insert_or_update!(repo, name, changeset, tuplet) do
    497     case get_state(changeset) do
    498       :built  -> insert!(repo, name, changeset, tuplet)
    499       :loaded -> update!(repo, name, changeset, tuplet)
    500       state   -> raise ArgumentError, "the changeset has an invalid state " <>
    501                                       "for Repo.insert_or_update!/2: #{state}"
    502     end
    503   end
    504 
    505   defp get_state(%Changeset{data: %{__meta__: %{state: state}}}), do: state
    506   defp get_state(%{__struct__: _}) do
    507     raise ArgumentError, "giving a struct to Repo.insert_or_update/2 or " <>
    508                          "Repo.insert_or_update!/2 is not supported. " <>
    509                          "Please use an Ecto.Changeset"
    510   end
    511 
    512   @doc """
    513   Implementation for `Ecto.Repo.delete/2`.
    514   """
    515   def delete(repo, name, %Changeset{} = changeset, tuplet) do
    516     do_delete(repo, name, changeset, tuplet)
    517   end
    518 
    519   def delete(repo, name, %{__struct__: _} = struct, tuplet) do
    520     changeset = Ecto.Changeset.change(struct)
    521     do_delete(repo, name, changeset, tuplet)
    522   end
    523 
    524   defp do_delete(repo, name, %Changeset{valid?: true} = changeset, {adapter_meta, opts} = tuplet) do
    525     %{adapter: adapter} = adapter_meta
    526     %{prepare: prepare, repo_opts: repo_opts} = changeset
    527     opts = Keyword.merge(repo_opts, opts)
    528 
    529     struct = struct_from_changeset!(:delete, changeset)
    530     schema = struct.__struct__
    531     assocs = to_delete_assocs(schema)
    532     dumper = schema.__schema__(:dump)
    533     changeset = put_repo_and_action(changeset, :delete, repo, tuplet)
    534 
    535     wrap_in_transaction(adapter, adapter_meta, opts, assocs != [], prepare, fn ->
    536       changeset = run_prepare(changeset, prepare)
    537 
    538       if changeset.valid? do
    539         filters = add_pk_filter!(changeset.filters, struct)
    540         dump_filters = dump_fields!(:delete, schema, filters, dumper, adapter)
    541 
    542         # Delete related associations
    543         for %{__struct__: mod, on_delete: on_delete} = reflection <- assocs do
    544           apply(mod, on_delete, [reflection, changeset.data, name, tuplet])
    545         end
    546 
    547         schema_meta = metadata(struct, schema.__schema__(:autogenerate_id), opts)
    548         filter_values = Enum.map(filters, &elem(&1, 1))
    549         opts = Keyword.put(opts, :cast_params, filter_values)
    550         args = [adapter_meta, schema_meta, dump_filters, opts]
    551 
    552         case apply(changeset, adapter, :delete, args) do
    553           {:ok, values} ->
    554             changeset = load_changes(changeset, :deleted, [], values, %{}, [], adapter, schema_meta)
    555             {:ok, changeset.data}
    556 
    557           {:error, _} = error ->
    558             error
    559         end
    560       else
    561         {:error, changeset}
    562       end
    563     end)
    564   end
    565 
    566   defp do_delete(repo, _name, %Changeset{valid?: false} = changeset, tuplet) do
    567     {:error, put_repo_and_action(changeset, :delete, repo, tuplet)}
    568   end
    569 
    570   def load(adapter, schema_or_types, data) do
    571     do_load(schema_or_types, data, &Ecto.Type.adapter_load(adapter, &1, &2))
    572   end
    573 
    574   defp do_load(schema, data, loader) when is_list(data),
    575     do: do_load(schema, Map.new(data), loader)
    576   defp do_load(schema, {fields, values}, loader) when is_list(fields) and is_list(values),
    577     do: do_load(schema, Enum.zip(fields, values), loader)
    578   defp do_load(schema, data, loader) when is_atom(schema),
    579     do: Ecto.Schema.Loader.unsafe_load(schema, data, loader)
    580   defp do_load(types, data, loader) when is_map(types),
    581     do: Ecto.Schema.Loader.unsafe_load(%{}, types, data, loader)
    582 
    583   ## Helpers
    584 
    585   defp returning(schema, opts) do
    586     case Keyword.get(opts, :returning, false) do
    587       [_ | _] = fields ->
    588         fields
    589       [] ->
    590         raise ArgumentError, ":returning expects at least one field to be given, got an empty list"
    591       true when is_nil(schema) ->
    592         raise ArgumentError, ":returning option can only be set to true if a schema is given"
    593       true ->
    594         schema.__schema__(:fields)
    595       false ->
    596         []
    597     end
    598   end
    599 
    600   defp add_read_after_writes([], schema),
    601     do: schema.__schema__(:read_after_writes)
    602 
    603   defp add_read_after_writes(return, schema),
    604     do: Enum.uniq(return ++ schema.__schema__(:read_after_writes))
    605 
    606   defp fields_to_sources(fields, nil) do
    607     {fields, fields}
    608   end
    609   defp fields_to_sources(fields, dumper) do
    610     Enum.reduce(fields, {[], []}, fn field, {types, sources} ->
    611       {source, type} = Map.fetch!(dumper, field)
    612       {[{field, type} | types], [source | sources]}
    613     end)
    614   end
    615 
    616   defp struct_from_changeset!(action, %{data: nil}),
    617     do: raise(ArgumentError, "cannot #{action} a changeset without :data")
    618   defp struct_from_changeset!(_action, %{data: struct}),
    619     do: struct
    620 
    621   defp put_repo_and_action(%{action: :ignore, valid?: valid?} = changeset, action, repo, {_adapter_meta, opts}) do
    622     if valid? do
    623       raise ArgumentError, "a valid changeset with action :ignore was given to " <>
    624                            "#{inspect repo}.#{action}/2. Changesets can only be ignored " <>
    625                            "in a repository action if they are also invalid"
    626     else
    627       %{changeset | action: action, repo: repo, repo_opts: opts}
    628     end
    629   end
    630   defp put_repo_and_action(%{action: given}, action, repo, _tuplet) when given != nil and given != action,
    631     do: raise(ArgumentError, "a changeset with action #{inspect given} was given to #{inspect repo}.#{action}/2")
    632   defp put_repo_and_action(changeset, action, repo, {_adapter_meta, opts}),
    633     do: %{changeset | action: action, repo: repo, repo_opts: opts}
    634 
    635   defp run_prepare(changeset, prepare) do
    636     Enum.reduce(Enum.reverse(prepare), changeset, fn fun, acc ->
    637       case fun.(acc) do
    638         %Ecto.Changeset{} = acc ->
    639           acc
    640 
    641         other ->
    642           raise "expected function #{inspect fun} given to Ecto.Changeset.prepare_changes/2 " <>
    643                 "to return an Ecto.Changeset, got: `#{inspect other}`"
    644       end
    645     end)
    646   end
    647 
    648   defp metadata(schema, prefix, source, autogen_id, context, opts) do
    649     %{
    650       autogenerate_id: autogen_id,
    651       context: context,
    652       schema: schema,
    653       source: source,
    654       prefix: Keyword.get(opts, :prefix, prefix)
    655     }
    656   end
    657   defp metadata(%{__struct__: schema, __meta__: %{context: context, source: source, prefix: prefix}},
    658                 autogen_id, opts) do
    659     metadata(schema, prefix, source, autogen_id, context, opts)
    660   end
    661   defp metadata(%{__struct__: schema}, _, _) do
    662     raise ArgumentError, "#{inspect(schema)} needs to be a schema with source"
    663   end
    664 
    665   defp conflict_target({:unsafe_fragment, fragment}, _dumper) when is_binary(fragment) do
    666     {:unsafe_fragment, fragment}
    667   end
    668   defp conflict_target(conflict_target, dumper) do
    669     for target <- List.wrap(conflict_target) do
    670       case dumper do
    671         %{^target => {alias, _}} ->
    672           alias
    673         %{} when is_atom(target) ->
    674           raise ArgumentError, "unknown field `#{inspect(target)}` in conflict_target"
    675         _ ->
    676           target
    677       end
    678     end
    679   end
    680 
    681   defp on_conflict(on_conflict, conflict_target, schema_meta, counter_fun, adapter) do
    682     %{source: source, schema: schema, prefix: prefix} = schema_meta
    683 
    684     case on_conflict do
    685       :raise when conflict_target == [] ->
    686         {{:raise, [], []}, []}
    687 
    688       :raise ->
    689         raise ArgumentError, ":conflict_target option is forbidden when :on_conflict is :raise"
    690 
    691       :nothing ->
    692         {{:nothing, [], conflict_target}, []}
    693 
    694       {:replace, keys} when is_list(keys) ->
    695         fields = Enum.map(keys, &field_source!(schema, &1))
    696         {{fields, [], conflict_target}, []}
    697 
    698       :replace_all ->
    699         {{replace_all_fields!(:replace_all, schema, []), [], conflict_target}, []}
    700 
    701       {:replace_all_except, fields} ->
    702         {{replace_all_fields!(:replace_all_except, schema, fields), [], conflict_target}, []}
    703 
    704       [_ | _] = on_conflict ->
    705         from = if schema, do: {source, schema}, else: source
    706         query = Ecto.Query.from from, update: ^on_conflict
    707         on_conflict_query(query, {source, schema}, prefix, counter_fun, adapter, conflict_target)
    708 
    709       %Ecto.Query{} = query ->
    710         on_conflict_query(query, {source, schema}, prefix, counter_fun, adapter, conflict_target)
    711 
    712       other ->
    713         raise ArgumentError, "unknown value for :on_conflict, got: #{inspect other}"
    714     end
    715   end
    716 
    717   defp replace_all_fields!(kind, nil, _to_remove) do
    718     raise ArgumentError, "cannot use #{inspect(kind)} on operations without a schema"
    719   end
    720 
    721   defp replace_all_fields!(_kind, schema, to_remove) do
    722     Enum.map(schema.__schema__(:fields) -- to_remove, &field_source!(schema, &1))
    723   end
    724 
    725   defp field_source!(nil, field) do
    726     field
    727   end
    728 
    729   defp field_source!(schema, field) do
    730     schema.__schema__(:field_source, field) ||
    731       raise ArgumentError, "unknown field for :on_conflict, got: #{inspect(field)}"
    732   end
    733 
    734   defp on_conflict_query(query, from, prefix, counter_fun, adapter, conflict_target) do
    735     {query, params, _} =
    736       Ecto.Query.Planner.plan(%{query | prefix: prefix}, :update_all, adapter)
    737 
    738     {cast_params, dump_params} = Enum.unzip(params)
    739 
    740     unless query.from.source == from do
    741       raise ArgumentError, "cannot run on_conflict: query because the query " <>
    742                            "has a different {source, schema} pair than the " <>
    743                            "original struct/changeset/query. Got #{inspect query.from} " <>
    744                            "and #{inspect from} respectively"
    745     end
    746 
    747     {query, _} = Ecto.Query.Planner.normalize(query, :update_all, adapter, counter_fun.())
    748     {{query, dump_params, conflict_target}, cast_params}
    749   end
    750 
    751   defp apply(_user_changeset, _adapter, :noop, _args) do
    752     {:ok, []}
    753   end
    754 
    755   defp apply(user_changeset, adapter, action, args) do
    756     case apply(adapter, action, args) do
    757       {:ok, values} ->
    758         {:ok, values}
    759 
    760       {:invalid, constraints} ->
    761         {:error, constraints_to_errors(user_changeset, action, constraints)}
    762 
    763       {:error, :stale} ->
    764         opts = List.last(args)
    765 
    766         case Keyword.fetch(opts, :stale_error_field) do
    767           {:ok, stale_error_field} when is_atom(stale_error_field) ->
    768             stale_message = Keyword.get(opts, :stale_error_message, "is stale")
    769             user_changeset = Changeset.add_error(user_changeset, stale_error_field, stale_message, [stale: true])
    770             {:error, user_changeset}
    771 
    772           _other ->
    773             raise Ecto.StaleEntryError, changeset: user_changeset, action: action
    774         end
    775     end
    776   end
    777 
    778   defp constraints_to_errors(%{constraints: user_constraints, errors: errors} = changeset, action, constraints) do
    779     constraint_errors =
    780       Enum.map constraints, fn {type, constraint} ->
    781         user_constraint =
    782           Enum.find(user_constraints, fn c ->
    783             case {c.type, c.constraint,  c.match} do
    784               {^type, ^constraint, :exact} -> true
    785               {^type, cc, :suffix} -> String.ends_with?(constraint, cc)
    786               {^type, cc, :prefix} -> String.starts_with?(constraint, cc)
    787               _ -> false
    788             end
    789           end)
    790 
    791         case user_constraint do
    792           %{field: field, error_message: error_message, error_type: error_type} ->
    793             {field, {error_message, [constraint: error_type, constraint_name: constraint]}}
    794           nil ->
    795             raise Ecto.ConstraintError, action: action, type: type,
    796                                         constraint: constraint, changeset: changeset
    797         end
    798       end
    799 
    800     %{changeset | errors: constraint_errors ++ errors, valid?: false}
    801   end
    802 
    803   defp load_changes(changeset, state, types, values, embeds, autogen, adapter, schema_meta) do
    804     %{data: data, changes: changes} = changeset
    805 
    806     data =
    807       data
    808       |> merge_changes(changes)
    809       |> Map.merge(embeds)
    810       |> merge_autogen(autogen)
    811       |> apply_metadata(state, schema_meta)
    812       |> load_each(values, types, adapter)
    813 
    814     Map.put(changeset, :data, data)
    815   end
    816 
    817   defp merge_changes(data, changes) do
    818     changes =
    819       Enum.reduce(changes, changes, fn {key, _value}, changes ->
    820         if Map.has_key?(data, key), do: changes, else: Map.delete(changes, key)
    821       end)
    822 
    823     Map.merge(data, changes)
    824   end
    825 
    826   defp merge_autogen(data, autogen) do
    827     Enum.reduce(autogen, data, fn {k, v}, acc -> %{acc | k => v} end)
    828   end
    829 
    830   defp apply_metadata(%{__meta__: meta} = data, state, %{source: source, prefix: prefix}) do
    831     %{data | __meta__: %{meta | state: state, source: source, prefix: prefix}}
    832   end
    833 
    834   defp load_each(struct, [{_, value} | kv], [{key, type} | types], adapter) do
    835     case Ecto.Type.adapter_load(adapter, type, value) do
    836       {:ok, value} ->
    837         load_each(%{struct | key => value}, kv, types, adapter)
    838       :error ->
    839         raise ArgumentError, "cannot load `#{inspect value}` as type #{inspect type} " <>
    840                              "for field `#{key}` in schema #{inspect struct.__struct__}"
    841     end
    842   end
    843   defp load_each(struct, [], _types, _adapter) do
    844     struct
    845   end
    846 
    847   defp pop_assocs(changeset, []) do
    848     {changeset, [], []}
    849   end
    850 
    851   defp pop_assocs(%{changes: changes, types: types} = changeset, assocs) do
    852     {changes, parent, child} =
    853       Enum.reduce assocs, {changes, [], []}, fn assoc, {changes, parent, child} ->
    854         case changes do
    855           %{^assoc => value} ->
    856             changes = Map.delete(changes, assoc)
    857 
    858             case types do
    859               %{^assoc => {:assoc, %{relationship: :parent} = refl}} ->
    860                 {changes, [{refl, value} | parent], child}
    861               %{^assoc => {:assoc, %{relationship: :child} = refl}} ->
    862                 {changes, parent, [{refl, value} | child]}
    863             end
    864 
    865           %{} ->
    866             {changes, parent, child}
    867         end
    868       end
    869 
    870     {%{changeset | changes: changes}, parent, child}
    871   end
    872 
    873   # Don't mind computing options if there are no assocs
    874   defp assoc_opts([], _opts), do: []
    875 
    876   defp assoc_opts(_assocs, opts) do
    877     Keyword.take(opts, [:timeout, :log, :telemetry_event, :prefix])
    878   end
    879 
    880   defp process_parents(changeset, user_changeset, assocs, adapter, opts) do
    881     %{changes: changes, valid?: valid?} = changeset
    882 
    883     # Even if the changeset is invalid, we want to run parent callbacks
    884     # to collect feedback. But if all is ok, still return the user changeset.
    885     case Ecto.Association.on_repo_change(changeset, assocs, adapter, opts) do
    886       {:ok, struct} when valid? ->
    887         changes = change_parents(changes, struct, assocs)
    888         %{changeset | changes: changes, data: struct}
    889 
    890       {:ok, _} ->
    891         user_changeset
    892 
    893       {:error, changes} ->
    894         %{user_changeset | changes: Map.merge(user_changeset.changes, changes), valid?: false}
    895     end
    896   end
    897 
    898   defp change_parents(changes, struct, assocs) do
    899     Enum.reduce assocs, changes, fn {refl, _}, acc ->
    900       %{field: field, owner_key: owner_key, related_key: related_key} = refl
    901       related = Map.get(struct, field)
    902       value = related && Map.fetch!(related, related_key)
    903 
    904       case Map.fetch(changes, owner_key) do
    905         {:ok, current} when current != value ->
    906           raise ArgumentError,
    907             "cannot change belongs_to association `#{field}` because there is " <>
    908             "already a change setting its foreign key `#{owner_key}` to `#{inspect current}`"
    909 
    910         _ ->
    911           Map.put(acc, owner_key, value)
    912       end
    913     end
    914   end
    915 
    916   defp process_children(changeset, user_changeset, assocs, adapter, opts) do
    917     case Ecto.Association.on_repo_change(changeset, assocs, adapter, opts) do
    918       {:ok, struct} ->
    919         {:ok, struct}
    920 
    921       {:error, changes} ->
    922         changes = Map.merge(user_changeset.changes, changes)
    923         {:error, %{user_changeset | changes: changes, valid?: false}}
    924     end
    925   end
    926 
    927   defp to_delete_assocs(schema) do
    928     for assoc <- schema.__schema__(:associations),
    929         reflection = schema.__schema__(:association, assoc),
    930         match?(%{on_delete: on_delete} when on_delete != :nothing, reflection),
    931         do: reflection
    932   end
    933 
    934   defp autogenerate_id(nil, changes, return_types, return_sources, _adapter) do
    935     {changes, [], [], return_types, return_sources}
    936   end
    937 
    938   defp autogenerate_id({key, source, type}, changes, return_types, return_sources, adapter) do
    939     cond do
    940       Map.has_key?(changes, key) -> # Set by user
    941         {changes, [], [], return_types, return_sources}
    942       dump_value = Ecto.Type.adapter_autogenerate(adapter, type) -> # Autogenerated now
    943         {:ok, cast_value} = Ecto.Type.adapter_load(adapter, type, dump_value)
    944         {changes, [cast_value], [{source, dump_value}] , [{key, type} | return_types], return_sources}
    945       true -> # Autogenerated in storage
    946         {changes, [], [], [{key, type} | return_types], [source | List.delete(return_sources, source)]}
    947     end
    948   end
    949 
    950   defp dump_changes!(action, changes, autogen, schema, extra, dumper, adapter) do
    951     dump_fields!(action, schema, changes, dumper, adapter) ++
    952     dump_fields!(action, schema, autogen, dumper, adapter) ++
    953     extra
    954   end
    955 
    956   defp autogenerate_changes(schema, action, changes) do
    957     autogen_fields = action |> action_to_auto() |> schema.__schema__()
    958 
    959     Enum.flat_map(autogen_fields, fn {fields, {mod, fun, args}} ->
    960       case Enum.reject(fields, &Map.has_key?(changes, &1)) do
    961         [] ->
    962           []
    963 
    964         fields ->
    965           generated = apply(mod, fun, args)
    966           Enum.map(fields, &{&1, generated})
    967       end
    968     end)
    969   end
    970 
    971   defp action_to_auto(:insert), do: :autogenerate
    972   defp action_to_auto(:update), do: :autoupdate
    973 
    974   defp add_pk_filter!(filters, struct) do
    975     Enum.reduce Ecto.primary_key!(struct), filters, fn
    976       {_k, nil}, _acc ->
    977         raise Ecto.NoPrimaryKeyValueError, struct: struct
    978       {k, v}, acc ->
    979         Map.put(acc, k, v)
    980     end
    981   end
    982 
    983   defp wrap_in_transaction(adapter, adapter_meta, opts, changeset, assocs, embeds, prepare, fun) do
    984     %{changes: changes} = changeset
    985     changed = &Map.has_key?(changes, &1)
    986     relations_changed? = Enum.any?(assocs, changed) or Enum.any?(embeds, changed)
    987     wrap_in_transaction(adapter, adapter_meta, opts, relations_changed?, prepare, fun)
    988   end
    989 
    990   defp wrap_in_transaction(adapter, adapter_meta, opts, relations_changed?, prepare, fun) do
    991     if (relations_changed? or prepare != []) and
    992        function_exported?(adapter, :transaction, 3) and
    993        not adapter.in_transaction?(adapter_meta) do
    994       adapter.transaction(adapter_meta, opts, fn ->
    995         case fun.() do
    996           {:ok, struct} -> struct
    997           {:error, changeset} -> adapter.rollback(adapter_meta, changeset)
    998         end
    999       end)
   1000     else
   1001       fun.()
   1002     end
   1003   end
   1004 
   1005   defp dump_field!(action, schema, field, type, value, adapter) do
   1006     case Ecto.Type.adapter_dump(adapter, type, value) do
   1007       {:ok, value} ->
   1008         value
   1009       :error ->
   1010         raise Ecto.ChangeError,
   1011               "value `#{inspect(value)}` for `#{inspect(schema)}.#{field}` " <>
   1012               "in `#{action}` does not match type #{inspect type}"
   1013     end
   1014   end
   1015 
   1016   defp dump_fields!(action, schema, kw, dumper, adapter) do
   1017     for {field, value} <- kw do
   1018       {alias, type} = Map.fetch!(dumper, field)
   1019       {alias, dump_field!(action, schema, field, type, value, adapter)}
   1020     end
   1021   end
   1022 end