zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

association.ex (53574B)


      1 import Ecto.Query, only: [from: 1, from: 2, join: 4, join: 5, distinct: 3, where: 3]
      2 
      3 defmodule Ecto.Association.NotLoaded do
      4   @moduledoc """
      5   Struct returned by associations when they are not loaded.
      6 
      7   The fields are:
      8 
      9     * `__field__` - the association field in `owner`
     10     * `__owner__` - the schema that owns the association
     11     * `__cardinality__` - the cardinality of the association
     12   """
     13 
     14   @type t :: %__MODULE__{
     15     __field__: atom(),
     16     __owner__: any(),
     17     __cardinality__: atom()
     18   }
     19 
     20   defstruct [:__field__, :__owner__, :__cardinality__]
     21 
     22   defimpl Inspect do
     23     def inspect(not_loaded, _opts) do
     24       msg = "association #{inspect not_loaded.__field__} is not loaded"
     25       ~s(#Ecto.Association.NotLoaded<#{msg}>)
     26     end
     27   end
     28 end
     29 
     30 defmodule Ecto.Association do
     31   @moduledoc false
     32 
     33   @type t :: %{required(:__struct__) => atom,
     34                required(:on_cast) => nil | fun,
     35                required(:cardinality) => :one | :many,
     36                required(:relationship) => :parent | :child,
     37                required(:owner) => atom,
     38                required(:owner_key) => atom,
     39                required(:field) => atom,
     40                required(:unique) => boolean,
     41                optional(atom) => any}
     42 
     43   alias Ecto.Query.Builder.OrderBy
     44 
     45   @doc """
     46   Helper to check if a queryable is compiled.
     47   """
     48   def ensure_compiled(queryable, env) do
     49     if not is_atom(queryable) or queryable in env.context_modules do
     50       :skip
     51     else
     52       case Code.ensure_compiled(queryable) do
     53         {:module, _} -> :compiled
     54         {:error, :unavailable} -> :skip
     55         {:error, _} -> :not_found
     56       end
     57     end
     58   end
     59 
     60   @doc """
     61   Builds the association struct.
     62 
     63   The struct must be defined in the module that implements the
     64   callback and it must contain at least the following keys:
     65 
     66     * `:cardinality` - tells if the association is one to one
     67       or one/many to many
     68 
     69     * `:field` - tells the field in the owner struct where the
     70       association should be stored
     71 
     72     * `:owner` - the owner module of the association
     73 
     74     * `:owner_key` - the key in the owner with the association value
     75 
     76     * `:relationship` - if the relationship to the specified schema is
     77       of a `:child` or a `:parent`
     78 
     79   """
     80   @callback struct(module, field :: atom, opts :: Keyword.t) :: t
     81 
     82   @doc """
     83   Invoked after the schema is compiled to validate associations.
     84 
     85   Useful for checking if associated modules exist without running
     86   into deadlocks.
     87   """
     88   @callback after_compile_validation(t, Macro.Env.t) :: :ok | {:error, String.t}
     89 
     90   @doc """
     91   Builds a struct for the given association.
     92 
     93   The struct to build from is given as argument in case default values
     94   should be set in the struct.
     95 
     96   Invoked by `Ecto.build_assoc/3`.
     97   """
     98   @callback build(t, owner :: Ecto.Schema.t, %{atom => term} | [Keyword.t]) :: Ecto.Schema.t
     99 
    100   @doc """
    101   Returns an association join query.
    102 
    103   This callback receives the association struct and it must return
    104   a query that retrieves all associated entries using joins up to
    105   the owner association.
    106 
    107   For example, a `has_many :comments` inside a `Post` module would
    108   return:
    109 
    110       from c in Comment, join: p in Post, on: c.post_id == p.id
    111 
    112   Note all the logic must be expressed inside joins, as fields like
    113   `where` and `order_by` won't be used by the caller.
    114 
    115   This callback is invoked when `join: assoc(p, :comments)` is used
    116   inside queries.
    117   """
    118   @callback joins_query(t) :: Ecto.Query.t
    119 
    120   @doc """
    121   Returns the association query on top of the given query.
    122 
    123   If the query is `nil`, the association target must be used.
    124 
    125   This callback receives the association struct and it must return
    126   a query that retrieves all associated entries with the given
    127   values for the owner key.
    128 
    129   This callback is used by `Ecto.assoc/2` and when preloading.
    130   """
    131   @callback assoc_query(t, Ecto.Query.t | nil, values :: [term]) :: Ecto.Query.t
    132 
    133   @doc """
    134   Returns information used by the preloader.
    135   """
    136   @callback preload_info(t) ::
    137               {:assoc, t, {integer, atom} | {integer, atom, Ecto.Type.t()}} | {:through, t, [atom]}
    138 
    139   @doc """
    140   Performs the repository change on the association.
    141 
    142   Receives the parent changeset, the current changesets
    143   and the repository action options. Must return the
    144   persisted struct (or nil) or the changeset error.
    145   """
    146   @callback on_repo_change(t, parent :: Ecto.Changeset.t, changeset :: Ecto.Changeset.t, Ecto.Adapter.t, Keyword.t) ::
    147             {:ok, Ecto.Schema.t | nil} | {:error, Ecto.Changeset.t}
    148 
    149   @doc """
    150   Retrieves the association from the given schema.
    151   """
    152   def association_from_schema!(schema, assoc) do
    153     schema.__schema__(:association, assoc) ||
    154       raise ArgumentError, "schema #{inspect schema} does not have association #{inspect assoc}"
    155   end
    156 
    157   @doc """
    158   Returns the association key for the given module with the given suffix.
    159 
    160   ## Examples
    161 
    162       iex> Ecto.Association.association_key(Hello.World, :id)
    163       :world_id
    164 
    165       iex> Ecto.Association.association_key(Hello.HTTP, :id)
    166       :http_id
    167 
    168       iex> Ecto.Association.association_key(Hello.HTTPServer, :id)
    169       :http_server_id
    170 
    171   """
    172   def association_key(module, suffix) do
    173     prefix = module |> Module.split |> List.last |> Macro.underscore
    174     :"#{prefix}_#{suffix}"
    175   end
    176 
    177   @doc """
    178   Build an association query through the given associations from the specified owner table
    179   and through the given associations. Finally filter by the provided values of the owner_key of
    180   the first relationship in the chain. Used in Ecto.assoc/2.
    181   """
    182   def filter_through_chain(owner, through, values) do
    183     chain_through(owner, through, nil, values)
    184     |> distinct([x], true)
    185   end
    186 
    187   @doc """
    188   Join the target table given a list of associations to go through starting from the owner table.
    189   """
    190   def join_through_chain(owner, through, query) do
    191     chain_through(owner, through, query, nil)
    192   end
    193 
    194   # This function is used by both join_through_chain/3 and filter_through_chain/3 since the algorithm for both
    195   # is nearly identical barring a few differences.
    196   defp chain_through(owner, through, join_to, values) do
    197     # Flatten the chain of throughs. If any of the associations is a HasThrough this allows us to expand it so we have
    198     # a list of atomic associations to join through.
    199     {_, through} = flatten_through_chain(owner, through, [])
    200 
    201     # If we're joining then we're going forward from the owner table to the destination table.
    202     # Otherwise we're going backward from the destination table then filtering by values.
    203     chain_direction = if(join_to != nil, do: :forward, else: :backward)
    204 
    205     # This stage produces a list of joins represented as a keyword list with the following structure:
    206     # [
    207     #   [schema: (The Schema), in_key: (The key used to join into the table), out_key: (The key used to join with the next), where: (The condition KW list)]
    208     # ]
    209     relation_list = resolve_through_tables(owner, through, chain_direction)
    210 
    211     # Filter out the joins which are redundant
    212     filtered_list = Enum.with_index(relation_list)
    213     |> Enum.filter(fn
    214       # We always keep the first table in the chain since it's our source table for the query
    215       {_, 0} -> true
    216 
    217       {rel, _} ->
    218         # If the condition is not empty we need to join to the table. Otherwise if the in_key and out_key is the same
    219         # then this join is redundant since we can just join to the next table in the chain.
    220         rel.in_key != rel.out_key or rel.where != []
    221     end)
    222     |> Enum.map(&elem(&1, 0))
    223 
    224     # If we're preloading we don't need the last table since it is the owner table.
    225     filtered_list = if(join_to == nil, do: Enum.slice(filtered_list, 0..-2), else: filtered_list)
    226 
    227     [source | joins] = filtered_list
    228 
    229     source_schema = source.schema
    230     query = join_to || from(s in source_schema)
    231 
    232     counter = Ecto.Query.Builder.count_binds(query) - 1
    233 
    234     # We need to create the query by joining all the tables, and also we need the out_key of the final table to use
    235     # for the final WHERE clause with values.
    236     {_, query, _, dest_out_key} = Enum.reduce(joins, {source, query, counter, source.out_key}, fn curr_rel, {prev_rel, query, counter, _} ->
    237       related_queryable = curr_rel.schema
    238 
    239       next = join(query, :inner, [{src, counter}], dest in ^related_queryable, on: field(src, ^prev_rel.out_key) == field(dest, ^curr_rel.in_key))
    240         |> combine_joins_query(curr_rel.where, counter + 1)
    241 
    242       {curr_rel, next, counter + 1, curr_rel.out_key}
    243     end)
    244 
    245     final_bind = Ecto.Query.Builder.count_binds(query) - 1
    246 
    247     values = List.wrap(values)
    248     query = case {join_to, values} do
    249       {nil, [single_value]} ->
    250         query
    251         |> where([{dest, final_bind}], field(dest, ^dest_out_key) == ^single_value)
    252 
    253       {nil, values} ->
    254         query
    255         |> where([{dest, final_bind}], field(dest, ^dest_out_key) in ^values)
    256 
    257       {_, _} ->
    258         query
    259     end
    260 
    261     combine_assoc_query(query, source.where || [])
    262   end
    263 
    264   defp flatten_through_chain(owner, [], acc), do: {owner, acc}
    265   defp flatten_through_chain(owner, [assoc | tl], acc) do
    266     refl = association_from_schema!(owner, assoc)
    267     case refl do
    268       %{through: nested_throughs} ->
    269         {owner, acc} = flatten_through_chain(owner, nested_throughs, acc)
    270         flatten_through_chain(owner, tl, acc)
    271 
    272       _ ->
    273         flatten_through_chain(refl.related, tl, acc ++ [assoc])
    274     end
    275   end
    276 
    277   defp resolve_through_tables(owner, through, :backward) do
    278     # This step generates a list of maps with the following keys:
    279     # [
    280     #   %{schema: ..., out_key: ..., in_key: ..., where: ...}
    281     # ]
    282     # This is a list of all tables that we will need to join to follow the chain of throughs and which key is used
    283     # to join in and out of the table, along with the where condition for that table. The final table of the chain will
    284     # be "owner", and the first table of the chain will be the final destination table of all the throughs.
    285     initial_owner_map = %{schema: owner, out_key: nil, in_key: nil, where: nil}
    286 
    287     Enum.reduce(through, {owner, [initial_owner_map]}, fn assoc, {owner, table_list} ->
    288       refl = association_from_schema!(owner, assoc)
    289       [owner_map | table_list] = table_list
    290 
    291       table_list = case refl do
    292         %{join_through: join_through, join_keys: join_keys, join_where: join_where, where: where} ->
    293           [{owner_join_key, owner_key}, {related_join_key, related_key}] = join_keys
    294 
    295           owner_map = %{owner_map | in_key: owner_key}
    296           join_map = %{schema: join_through, out_key: owner_join_key, in_key: related_join_key, where: join_where}
    297           related_map = %{schema: refl.related, out_key: related_key, in_key: nil, where: where}
    298 
    299           [related_map, join_map, owner_map | table_list]
    300 
    301         _ ->
    302           owner_map = %{owner_map | in_key: refl.owner_key}
    303           related_map = %{schema: refl.related, out_key: refl.related_key, in_key: nil, where: refl.where}
    304 
    305           [related_map, owner_map | table_list]
    306       end
    307 
    308       {refl.related, table_list}
    309     end)
    310     |> elem(1)
    311   end
    312 
    313   defp resolve_through_tables(owner, through, :forward) do
    314     # In the forward case (joining) we need to reverse the list and swap the in_key for the out_key
    315     # since we've changed directions.
    316     resolve_through_tables(owner, through, :backward)
    317     |> Enum.reverse()
    318     |> Enum.map(fn %{out_key: out_key, in_key: in_key} = join ->
    319       %{join | out_key: in_key, in_key: out_key}
    320     end)
    321   end
    322 
    323   @doc """
    324   Add the default assoc query where clauses to a join.
    325 
    326   This handles only `where` and converts it to a `join`,
    327   as that is the only information propagate in join queries.
    328   """
    329   def combine_joins_query(query, [], _binding), do: query
    330 
    331   def combine_joins_query(%{joins: joins} = query, [_ | _] = conditions, binding) do
    332     {joins, [join_expr]} = Enum.split(joins, -1)
    333     %{on: %{params: params, expr: expr} = join_on} = join_expr
    334     {expr, params} = expand_where(conditions, expr, Enum.reverse(params), length(params), binding)
    335     %{query | joins: joins ++ [%{join_expr | on: %{join_on | expr: expr, params: params}}]}
    336   end
    337 
    338   @doc """
    339   Add the default assoc query where clauses a provided query.
    340   """
    341   def combine_assoc_query(query, []), do: query
    342   def combine_assoc_query(%{wheres: []} = query, conditions) do
    343     {expr, params} = expand_where(conditions, true, [], 0, 0)
    344     %{query | wheres: [%Ecto.Query.BooleanExpr{op: :and, expr: expr, params: params, line: __ENV__.line, file: __ENV__.file}]}
    345   end
    346   def combine_assoc_query(%{wheres: wheres} = query, conditions) do
    347     {wheres, [where_expr]} = Enum.split(wheres, -1)
    348     %{params: params, expr: expr} = where_expr
    349     {expr, params} = expand_where(conditions, expr, Enum.reverse(params), length(params), 0)
    350     %{query | wheres: wheres ++ [%{where_expr | expr: expr, params: params}]}
    351   end
    352 
    353   defp expand_where(conditions, expr, params, counter, binding) do
    354     conjoin_exprs = fn
    355       true, r -> r
    356       l, r-> {:and, [], [l, r]}
    357     end
    358 
    359     {expr, params, _counter} =
    360       Enum.reduce(conditions, {expr, params, counter}, fn
    361         {key, nil}, {expr, params, counter} ->
    362           expr = conjoin_exprs.(expr, {:is_nil, [], [to_field(binding, key)]})
    363           {expr, params, counter}
    364 
    365         {key, {:not, nil}}, {expr, params, counter} ->
    366           expr = conjoin_exprs.(expr, {:not, [], [{:is_nil, [], [to_field(binding, key)]}]})
    367           {expr, params, counter}
    368 
    369         {key, {:fragment, frag}}, {expr, params, counter} when is_binary(frag) ->
    370           pieces = Ecto.Query.Builder.fragment_pieces(frag, [to_field(binding, key)])
    371           expr = conjoin_exprs.(expr, {:fragment, [], pieces})
    372           {expr, params, counter}
    373 
    374         {key, {:in, value}}, {expr, params, counter} when is_list(value) ->
    375           expr = conjoin_exprs.(expr, {:in, [], [to_field(binding, key), {:^, [], [counter]}]})
    376           {expr, [{value, {:in, {binding, key}}} | params], counter + 1}
    377 
    378         {key, value}, {expr, params, counter} ->
    379           expr = conjoin_exprs.(expr, {:==, [], [to_field(binding, key), {:^, [], [counter]}]})
    380           {expr, [{value, {binding, key}} | params], counter + 1}
    381       end)
    382 
    383     {expr, Enum.reverse(params)}
    384   end
    385 
    386   defp to_field(binding, field),
    387     do: {{:., [], [{:&, [], [binding]}, field]}, [], []}
    388 
    389   @doc """
    390   Build a join query with the given `through` associations starting at `counter`.
    391   """
    392   def joins_query(query, through, counter) do
    393     Enum.reduce(through, {query, counter}, fn current, {acc, counter} ->
    394       query = join(acc, :inner, [{x, counter}], assoc(x, ^current))
    395       {query, counter + 1}
    396     end) |> elem(0)
    397   end
    398 
    399   @doc """
    400   Retrieves related module from queryable.
    401 
    402   ## Examples
    403 
    404       iex> Ecto.Association.related_from_query({"custom_source", Schema}, :comments_v1)
    405       Schema
    406 
    407       iex> Ecto.Association.related_from_query(Schema, :comments_v1)
    408       Schema
    409 
    410       iex> Ecto.Association.related_from_query("wrong", :comments_v1)
    411       ** (ArgumentError) association :comments_v1 queryable must be a schema or a {source, schema}. got: "wrong"
    412   """
    413   def related_from_query(atom, _name) when is_atom(atom), do: atom
    414   def related_from_query({source, schema}, _name) when is_binary(source) and is_atom(schema), do: schema
    415   def related_from_query(queryable, name) do
    416     raise ArgumentError, "association #{inspect name} queryable must be a schema or " <>
    417       "a {source, schema}. got: #{inspect queryable}"
    418   end
    419 
    420   @doc """
    421   Applies default values into the struct.
    422   """
    423   def apply_defaults(struct, defaults, _owner) when is_list(defaults) do
    424     struct(struct, defaults)
    425   end
    426 
    427   def apply_defaults(struct, {mod, fun, args}, owner) do
    428     apply(mod, fun, [struct.__struct__, owner | args])
    429   end
    430 
    431   @doc """
    432   Validates `defaults` for association named `name`.
    433   """
    434   def validate_defaults!(_module, _name, {mod, fun, args} = defaults)
    435       when is_atom(mod) and is_atom(fun) and is_list(args),
    436       do: defaults
    437 
    438   def validate_defaults!(module, _name, fun) when is_atom(fun),
    439     do: {module, fun, []}
    440 
    441   def validate_defaults!(_module, _name, defaults) when is_list(defaults),
    442     do: defaults
    443 
    444   def validate_defaults!(_module, name, defaults) do
    445     raise ArgumentError,
    446           "expected defaults for #{inspect name} to be a keyword list " <>
    447             "or a {module, fun, args} tuple, got: `#{inspect defaults}`"
    448   end
    449 
    450   @doc """
    451   Validates `preload_order` for association named `name`.
    452   """
    453   def validate_preload_order!(name, preload_order) when is_list(preload_order) do
    454     Enum.map(preload_order, fn
    455       field when is_atom(field) ->
    456         field
    457 
    458       {direction, field} when is_atom(direction) and is_atom(field) ->
    459         unless OrderBy.valid_direction?(direction) do
    460           raise ArgumentError,
    461           "expected `:preload_order` for #{inspect name} to be a keyword list or a list of atoms/fields, " <>
    462             "got: `#{inspect preload_order}`, " <>
    463             "`#{inspect direction}` is not a valid direction"
    464         end
    465 
    466         {direction, field}
    467 
    468       item ->
    469         raise ArgumentError,
    470           "expected `:preload_order` for #{inspect name} to be a keyword list or a list of atoms/fields, " <>
    471             "got: `#{inspect preload_order}`, " <>
    472             "`#{inspect item}` is not valid"
    473     end)
    474   end
    475 
    476   def validate_preload_order!(name, preload_order) do
    477     raise ArgumentError,
    478       "expected `:preload_order` for #{inspect name} to be a keyword list or a list of atoms/fields, " <>
    479         "got: `#{inspect preload_order}`"
    480   end
    481 
    482   @doc """
    483   Merges source from query into to the given schema.
    484 
    485   In case the query does not have a source, returns
    486   the schema unchanged.
    487   """
    488   def merge_source(schema, query)
    489 
    490   def merge_source(%{__meta__: %{source: source}} = struct, {source, _}) do
    491     struct
    492   end
    493 
    494   def merge_source(struct, {source, _}) do
    495     Ecto.put_meta(struct, source: source)
    496   end
    497 
    498   def merge_source(struct, _query) do
    499     struct
    500   end
    501 
    502   @doc """
    503   Updates the prefix of a changeset based on the metadata.
    504   """
    505   def update_parent_prefix(
    506         %{data: %{__meta__: %{prefix: prefix}}} = changeset,
    507         %{__meta__: %{prefix: prefix}}
    508       ),
    509       do: changeset
    510 
    511   def update_parent_prefix(
    512         %{data: %{__meta__: %{prefix: nil}}} = changeset,
    513         %{__meta__: %{prefix: prefix}}
    514       ),
    515       do: update_in(changeset.data, &Ecto.put_meta(&1, prefix: prefix))
    516 
    517 
    518   def update_parent_prefix(changeset, _),
    519     do: changeset
    520 
    521   @doc """
    522   Performs the repository action in the related changeset,
    523   returning `{:ok, data}` or `{:error, changes}`.
    524   """
    525   def on_repo_change(%{data: struct}, [], _adapter, _opts) do
    526     {:ok, struct}
    527   end
    528 
    529   def on_repo_change(changeset, assocs, adapter, opts) do
    530     %{data: struct, changes: changes, action: action} = changeset
    531 
    532     {struct, changes, _halt, valid?} =
    533       Enum.reduce(assocs, {struct, changes, false, true}, fn {refl, value}, acc ->
    534         on_repo_change(refl, value, changeset, action, adapter, opts, acc)
    535       end)
    536 
    537     case valid? do
    538       true  -> {:ok, struct}
    539       false -> {:error, changes}
    540     end
    541   end
    542 
    543   defp on_repo_change(%{cardinality: :one, field: field} = meta, nil, parent_changeset,
    544                       _repo_action, adapter, opts, {parent, changes, halt, valid?}) do
    545     if not halt, do: maybe_replace_one!(meta, nil, parent, parent_changeset, adapter, opts)
    546     {Map.put(parent, field, nil), Map.put(changes, field, nil), halt, valid?}
    547   end
    548 
    549   defp on_repo_change(%{cardinality: :one, field: field, __struct__: mod} = meta,
    550                       %{action: action, data: current} = changeset, parent_changeset,
    551                       repo_action, adapter, opts, {parent, changes, halt, valid?}) do
    552     check_action!(meta, action, repo_action)
    553     if not halt, do: maybe_replace_one!(meta, current, parent, parent_changeset, adapter, opts)
    554 
    555     case on_repo_change_unless_halted(halt, mod, meta, parent_changeset, changeset, adapter, opts) do
    556       {:ok, struct} ->
    557         {Map.put(parent, field, struct), Map.put(changes, field, changeset), halt, valid?}
    558 
    559       {:error, error_changeset} ->
    560         {parent, Map.put(changes, field, error_changeset),
    561          halted?(halt, changeset, error_changeset), false}
    562     end
    563   end
    564 
    565   defp on_repo_change(%{cardinality: :many, field: field, __struct__: mod} = meta,
    566                       changesets, parent_changeset, repo_action, adapter, opts,
    567                       {parent, changes, halt, all_valid?}) do
    568     {changesets, structs, halt, valid?} =
    569       Enum.reduce(changesets, {[], [], halt, true}, fn
    570         %{action: action} = changeset, {changesets, structs, halt, valid?} ->
    571           check_action!(meta, action, repo_action)
    572 
    573           case on_repo_change_unless_halted(halt, mod, meta, parent_changeset, changeset, adapter, opts) do
    574             {:ok, nil} ->
    575               {[changeset | changesets], structs, halt, valid?}
    576 
    577             {:ok, struct} ->
    578               {[changeset | changesets], [struct | structs], halt, valid?}
    579 
    580             {:error, error_changeset} ->
    581               {[error_changeset | changesets], structs, halted?(halt, changeset, error_changeset), false}
    582           end
    583       end)
    584 
    585     if valid? do
    586       {Map.put(parent, field, Enum.reverse(structs)),
    587        Map.put(changes, field, Enum.reverse(changesets)),
    588        halt, all_valid?}
    589     else
    590       {parent,
    591        Map.put(changes, field, Enum.reverse(changesets)),
    592        halt, false}
    593     end
    594   end
    595 
    596   defp check_action!(%{related: schema}, :delete, :insert),
    597     do: raise(ArgumentError, "got action :delete in changeset for associated #{inspect schema} while inserting")
    598   defp check_action!(_, _, _), do: :ok
    599 
    600   defp halted?(true, _, _), do: true
    601   defp halted?(_, %{valid?: true}, %{valid?: false}), do: true
    602   defp halted?(_, _, _), do: false
    603 
    604   defp on_repo_change_unless_halted(true, _mod, _meta, _parent, changeset, _adapter, _opts) do
    605     {:error, changeset}
    606   end
    607   defp on_repo_change_unless_halted(false, mod, meta, parent, changeset, adapter, opts) do
    608     mod.on_repo_change(meta, parent, changeset, adapter, opts)
    609   end
    610 
    611   defp maybe_replace_one!(%{field: field, __struct__: mod} = meta, current, parent,
    612                           parent_changeset, adapter, opts) do
    613     previous = Map.get(parent, field)
    614     if replaceable?(previous) and primary_key!(previous) != primary_key!(current) do
    615       changeset = %{Ecto.Changeset.change(previous) | action: :replace}
    616 
    617       case mod.on_repo_change(meta, parent_changeset, changeset, adapter, opts) do
    618         {:ok, _} ->
    619           :ok
    620         {:error, changeset} ->
    621           raise Ecto.InvalidChangesetError,
    622             action: changeset.action, changeset: changeset
    623       end
    624     end
    625   end
    626 
    627   defp maybe_replace_one!(_, _, _, _, _, _), do: :ok
    628 
    629   defp replaceable?(nil), do: false
    630   defp replaceable?(%Ecto.Association.NotLoaded{}), do: false
    631   defp replaceable?(%{__meta__: %{state: :built}}), do: false
    632   defp replaceable?(_), do: true
    633 
    634   defp primary_key!(nil), do: []
    635   defp primary_key!(struct), do: Ecto.primary_key!(struct)
    636 end
    637 
    638 defmodule Ecto.Association.Has do
    639   @moduledoc """
    640   The association struct for `has_one` and `has_many` associations.
    641 
    642   Its fields are:
    643 
    644     * `cardinality` - The association cardinality
    645     * `field` - The name of the association field on the schema
    646     * `owner` - The schema where the association was defined
    647     * `related` - The schema that is associated
    648     * `owner_key` - The key on the `owner` schema used for the association
    649     * `related_key` - The key on the `related` schema used for the association
    650     * `queryable` - The real query to use for querying association
    651     * `on_delete` - The action taken on associations when schema is deleted
    652     * `on_replace` - The action taken on associations when schema is replaced
    653     * `defaults` - Default fields used when building the association
    654     * `relationship` - The relationship to the specified schema, default is `:child`
    655     * `preload_order` - Default `order_by` of the association, used only by preload
    656   """
    657 
    658   @behaviour Ecto.Association
    659   @on_delete_opts [:nothing, :nilify_all, :delete_all]
    660   @on_replace_opts [:raise, :mark_as_invalid, :delete, :delete_if_exists, :nilify]
    661   @has_one_on_replace_opts @on_replace_opts ++ [:update]
    662   defstruct [:cardinality, :field, :owner, :related, :owner_key, :related_key, :on_cast,
    663              :queryable, :on_delete, :on_replace, where: [], unique: true, defaults: [],
    664              relationship: :child, ordered: false, preload_order: []]
    665 
    666   @impl true
    667   def after_compile_validation(%{queryable: queryable, related_key: related_key}, env) do
    668     compiled = Ecto.Association.ensure_compiled(queryable, env)
    669 
    670     cond do
    671       compiled == :skip ->
    672         :ok
    673       compiled == :not_found ->
    674         {:error, "associated schema #{inspect queryable} does not exist"}
    675       not function_exported?(queryable, :__schema__, 2) ->
    676         {:error, "associated module #{inspect queryable} is not an Ecto schema"}
    677       is_nil queryable.__schema__(:type, related_key) ->
    678         {:error, "associated schema #{inspect queryable} does not have field `#{related_key}`"}
    679       true ->
    680         :ok
    681     end
    682   end
    683 
    684   @impl true
    685   def struct(module, name, opts) do
    686     queryable = Keyword.fetch!(opts, :queryable)
    687     cardinality = Keyword.fetch!(opts, :cardinality)
    688     related = Ecto.Association.related_from_query(queryable, name)
    689 
    690     ref =
    691       module
    692       |> Module.get_attribute(:primary_key)
    693       |> get_ref(opts[:references], name)
    694 
    695     unless Module.get_attribute(module, :ecto_fields)[ref] do
    696       raise ArgumentError, "schema does not have the field #{inspect ref} used by " <>
    697         "association #{inspect name}, please set the :references option accordingly"
    698     end
    699 
    700     if opts[:through] do
    701       raise ArgumentError, "invalid association #{inspect name}. When using the :through " <>
    702                            "option, the schema should not be passed as second argument"
    703     end
    704 
    705     on_delete  = Keyword.get(opts, :on_delete, :nothing)
    706     unless on_delete in @on_delete_opts do
    707       raise ArgumentError, "invalid :on_delete option for #{inspect name}. " <>
    708         "The only valid options are: " <>
    709         Enum.map_join(@on_delete_opts, ", ", &"`#{inspect &1}`")
    710     end
    711 
    712     on_replace = Keyword.get(opts, :on_replace, :raise)
    713     on_replace_opts = if cardinality == :one, do: @has_one_on_replace_opts, else: @on_replace_opts
    714 
    715     unless on_replace in on_replace_opts do
    716       raise ArgumentError, "invalid `:on_replace` option for #{inspect name}. " <>
    717         "The only valid options are: " <>
    718         Enum.map_join(@on_replace_opts, ", ", &"`#{inspect &1}`")
    719     end
    720 
    721     defaults = Ecto.Association.validate_defaults!(module, name, opts[:defaults] || [])
    722     preload_order = Ecto.Association.validate_preload_order!(name, opts[:preload_order] || [])
    723     where = opts[:where] || []
    724 
    725     unless is_list(where) do
    726       raise ArgumentError, "expected `:where` for #{inspect name} to be a keyword list, got: `#{inspect where}`"
    727     end
    728 
    729     %__MODULE__{
    730       field: name,
    731       cardinality: cardinality,
    732       owner: module,
    733       related: related,
    734       owner_key: ref,
    735       related_key: opts[:foreign_key] || Ecto.Association.association_key(module, ref),
    736       queryable: queryable,
    737       on_delete: on_delete,
    738       on_replace: on_replace,
    739       defaults: defaults,
    740       where: where,
    741       preload_order: preload_order
    742     }
    743   end
    744 
    745   defp get_ref(primary_key, nil, name) when primary_key in [nil, false] do
    746     raise ArgumentError, "need to set :references option for " <>
    747       "association #{inspect name} when schema has no primary key"
    748   end
    749   defp get_ref(primary_key, nil, _name), do: elem(primary_key, 0)
    750   defp get_ref(_primary_key, references, _name), do: references
    751 
    752   @impl true
    753   def build(%{owner_key: owner_key, related_key: related_key} = refl, owner, attributes) do
    754     data = refl |> build(owner) |> struct(attributes)
    755     %{data | related_key => Map.get(owner, owner_key)}
    756   end
    757 
    758   @impl true
    759   def joins_query(%{related_key: related_key, owner: owner, owner_key: owner_key, queryable: queryable} = assoc) do
    760     from(o in owner, join: q in ^queryable, on: field(q, ^related_key) == field(o, ^owner_key))
    761     |> Ecto.Association.combine_joins_query(assoc.where, 1)
    762   end
    763 
    764   @impl true
    765   def assoc_query(%{related_key: related_key, queryable: queryable} = assoc, query, [value]) do
    766     from(x in (query || queryable), where: field(x, ^related_key) == ^value)
    767     |> Ecto.Association.combine_assoc_query(assoc.where)
    768   end
    769 
    770   @impl true
    771   def assoc_query(%{related_key: related_key, queryable: queryable} = assoc, query, values) do
    772     from(x in (query || queryable), where: field(x, ^related_key) in ^values)
    773     |> Ecto.Association.combine_assoc_query(assoc.where)
    774   end
    775 
    776   @impl true
    777   def preload_info(%{related_key: related_key} = refl) do
    778     {:assoc, refl, {0, related_key}}
    779   end
    780 
    781   @impl true
    782   def on_repo_change(%{on_replace: :delete_if_exists} = refl, parent_changeset,
    783                      %{action: :replace} = changeset, adapter, opts) do
    784     try do
    785       on_repo_change(%{refl | on_replace: :delete}, parent_changeset, changeset, adapter, opts)
    786     rescue
    787       Ecto.StaleEntryError -> {:ok, nil}
    788     end
    789   end
    790 
    791   def on_repo_change(%{on_replace: on_replace} = refl, %{data: parent} = parent_changeset,
    792                      %{action: :replace} = changeset, adapter, opts) do
    793     changeset = case on_replace do
    794       :nilify -> %{changeset | action: :update}
    795       :update -> %{changeset | action: :update}
    796       :delete -> %{changeset | action: :delete}
    797     end
    798 
    799     changeset = Ecto.Association.update_parent_prefix(changeset, parent)
    800 
    801     case on_repo_change(refl, %{parent_changeset | data: nil}, changeset, adapter, opts) do
    802       {:ok, _} -> {:ok, nil}
    803       {:error, changeset} -> {:error, changeset}
    804     end
    805   end
    806 
    807   def on_repo_change(assoc, parent_changeset, changeset, _adapter, opts) do
    808     %{data: parent, repo: repo} = parent_changeset
    809     %{action: action, changes: changes} = changeset
    810 
    811     {key, value} = parent_key(assoc, parent)
    812     changeset = update_parent_key(changeset, action, key, value)
    813     changeset = Ecto.Association.update_parent_prefix(changeset, parent)
    814 
    815     case apply(repo, action, [changeset, opts]) do
    816       {:ok, _} = ok ->
    817         if action == :delete, do: {:ok, nil}, else: ok
    818       {:error, changeset} ->
    819         original = Map.get(changes, key)
    820         {:error, put_in(changeset.changes[key], original)}
    821     end
    822   end
    823 
    824   defp update_parent_key(changeset, :delete, _key, _value),
    825     do: changeset
    826   defp update_parent_key(changeset, _action, key, value),
    827     do: Ecto.Changeset.put_change(changeset, key, value)
    828 
    829   defp parent_key(%{related_key: related_key}, nil) do
    830     {related_key, nil}
    831   end
    832   defp parent_key(%{owner_key: owner_key, related_key: related_key}, owner) do
    833     {related_key, Map.get(owner, owner_key)}
    834   end
    835 
    836   ## Relation callbacks
    837   @behaviour Ecto.Changeset.Relation
    838 
    839   @impl true
    840   def build(%{related: related, queryable: queryable, defaults: defaults}, owner) do
    841     related
    842     |> Ecto.Association.apply_defaults(defaults, owner)
    843     |> Ecto.Association.merge_source(queryable)
    844   end
    845 
    846   ## On delete callbacks
    847 
    848   @doc false
    849   def delete_all(refl, parent, repo_name, opts) do
    850     if query = on_delete_query(refl, parent) do
    851       Ecto.Repo.Queryable.delete_all repo_name, query, opts
    852     end
    853   end
    854 
    855   @doc false
    856   def nilify_all(%{related_key: related_key} = refl, parent, repo_name, opts) do
    857     if query = on_delete_query(refl, parent) do
    858       Ecto.Repo.Queryable.update_all repo_name, query, [set: [{related_key, nil}]], opts
    859     end
    860   end
    861 
    862   defp on_delete_query(%{owner_key: owner_key, related_key: related_key,
    863                          queryable: queryable}, parent) do
    864     if value = Map.get(parent, owner_key) do
    865       from x in queryable, where: field(x, ^related_key) == ^value
    866     end
    867   end
    868 end
    869 
    870 defmodule Ecto.Association.HasThrough do
    871   @moduledoc """
    872   The association struct for `has_one` and `has_many` through associations.
    873 
    874   Its fields are:
    875 
    876     * `cardinality` - The association cardinality
    877     * `field` - The name of the association field on the schema
    878     * `owner` - The schema where the association was defined
    879     * `owner_key` - The key on the `owner` schema used for the association
    880     * `through` - The through associations
    881     * `relationship` - The relationship to the specified schema, default `:child`
    882   """
    883 
    884   @behaviour Ecto.Association
    885   defstruct [:cardinality, :field, :owner, :owner_key, :through, :on_cast,
    886              relationship: :child, unique: true, ordered: false]
    887 
    888   @impl true
    889   def after_compile_validation(_, _) do
    890     :ok
    891   end
    892 
    893   @impl true
    894   def struct(module, name, opts) do
    895     through = Keyword.fetch!(opts, :through)
    896 
    897     refl =
    898       case through do
    899         [h,_|_] ->
    900           Module.get_attribute(module, :ecto_assocs)[h]
    901         _ ->
    902           raise ArgumentError, ":through expects a list with at least two entries: " <>
    903             "the association in the current module and one step through, got: #{inspect through}"
    904       end
    905 
    906     unless refl do
    907       raise ArgumentError, "schema does not have the association #{inspect hd(through)} " <>
    908         "used by association #{inspect name}, please ensure the association exists and " <>
    909         "is defined before the :through one"
    910     end
    911 
    912     %__MODULE__{
    913       field: name,
    914       cardinality: Keyword.fetch!(opts, :cardinality),
    915       through: through,
    916       owner: module,
    917       owner_key: refl.owner_key,
    918     }
    919   end
    920 
    921   @impl true
    922   def build(%{field: name}, %{__struct__: owner}, _attributes) do
    923     raise ArgumentError,
    924       "cannot build through association `#{inspect name}` for #{inspect owner}. " <>
    925       "Instead build the intermediate steps explicitly."
    926   end
    927 
    928   @impl true
    929   def preload_info(%{through: through} = refl) do
    930     {:through, refl, through}
    931   end
    932 
    933   @impl true
    934   def on_repo_change(%{field: name}, _, _, _, _) do
    935     raise ArgumentError,
    936       "cannot insert/update/delete through associations `#{inspect name}` via the repository. " <>
    937       "Instead build the intermediate steps explicitly."
    938   end
    939 
    940   @impl true
    941   def joins_query(%{owner: owner, through: through}) do
    942     Ecto.Association.join_through_chain(owner, through, from(x in owner))
    943   end
    944 
    945   @impl true
    946   def assoc_query(%{owner: owner, through: through}, _, values) do
    947     Ecto.Association.filter_through_chain(owner, through, values)
    948   end
    949 end
    950 
    951 defmodule Ecto.Association.BelongsTo do
    952   @moduledoc """
    953   The association struct for a `belongs_to` association.
    954 
    955   Its fields are:
    956 
    957     * `cardinality` - The association cardinality
    958     * `field` - The name of the association field on the schema
    959     * `owner` - The schema where the association was defined
    960     * `owner_key` - The key on the `owner` schema used for the association
    961     * `related` - The schema that is associated
    962     * `related_key` - The key on the `related` schema used for the association
    963     * `queryable` - The real query to use for querying association
    964     * `defaults` - Default fields used when building the association
    965     * `relationship` - The relationship to the specified schema, default `:parent`
    966     * `on_replace` - The action taken on associations when schema is replaced
    967   """
    968 
    969   @behaviour Ecto.Association
    970   @on_replace_opts [:raise, :mark_as_invalid, :delete, :delete_if_exists, :nilify, :update]
    971   defstruct [:field, :owner, :related, :owner_key, :related_key, :queryable, :on_cast,
    972              :on_replace, where: [], defaults: [], cardinality: :one, relationship: :parent,
    973              unique: true, ordered: false]
    974 
    975   @impl true
    976   def after_compile_validation(%{queryable: queryable, related_key: related_key}, env) do
    977     compiled = Ecto.Association.ensure_compiled(queryable, env)
    978 
    979     cond do
    980       compiled == :skip ->
    981         :ok
    982       compiled == :not_found ->
    983         {:error, "associated schema #{inspect queryable} does not exist"}
    984       not function_exported?(queryable, :__schema__, 2) ->
    985         {:error, "associated module #{inspect queryable} is not an Ecto schema"}
    986       is_nil queryable.__schema__(:type, related_key) ->
    987         {:error, "associated schema #{inspect queryable} does not have field `#{related_key}`"}
    988       true ->
    989         :ok
    990     end
    991   end
    992 
    993   @impl true
    994   def struct(module, name, opts) do
    995     ref = if ref = opts[:references], do: ref, else: :id
    996     queryable = Keyword.fetch!(opts, :queryable)
    997     related = Ecto.Association.related_from_query(queryable, name)
    998     on_replace = Keyword.get(opts, :on_replace, :raise)
    999 
   1000     unless on_replace in @on_replace_opts do
   1001       raise ArgumentError, "invalid `:on_replace` option for #{inspect name}. " <>
   1002         "The only valid options are: " <>
   1003         Enum.map_join(@on_replace_opts, ", ", &"`#{inspect &1}`")
   1004     end
   1005 
   1006     defaults = Ecto.Association.validate_defaults!(module, name, opts[:defaults] || [])
   1007     where = opts[:where] || []
   1008 
   1009     unless is_list(where) do
   1010       raise ArgumentError, "expected `:where` for #{inspect name} to be a keyword list, got: `#{inspect where}`"
   1011     end
   1012 
   1013     %__MODULE__{
   1014       field: name,
   1015       owner: module,
   1016       related: related,
   1017       owner_key: Keyword.fetch!(opts, :foreign_key),
   1018       related_key: ref,
   1019       queryable: queryable,
   1020       on_replace: on_replace,
   1021       defaults: defaults,
   1022       where: where
   1023     }
   1024   end
   1025 
   1026   @impl true
   1027   def build(refl, owner, attributes) do
   1028     refl
   1029     |> build(owner)
   1030     |> struct(attributes)
   1031   end
   1032 
   1033   @impl true
   1034   def joins_query(%{related_key: related_key, owner: owner, owner_key: owner_key, queryable: queryable} = assoc) do
   1035     from(o in owner, join: q in ^queryable, on: field(q, ^related_key) == field(o, ^owner_key))
   1036     |> Ecto.Association.combine_joins_query(assoc.where, 1)
   1037   end
   1038 
   1039   @impl true
   1040   def assoc_query(%{related_key: related_key, queryable: queryable} = assoc, query, [value]) do
   1041     from(x in (query || queryable), where: field(x, ^related_key) == ^value)
   1042     |> Ecto.Association.combine_assoc_query(assoc.where)
   1043   end
   1044 
   1045   @impl true
   1046   def assoc_query(%{related_key: related_key, queryable: queryable} = assoc, query, values) do
   1047     from(x in (query || queryable), where: field(x, ^related_key) in ^values)
   1048     |> Ecto.Association.combine_assoc_query(assoc.where)
   1049   end
   1050 
   1051   @impl true
   1052   def preload_info(%{related_key: related_key} = refl) do
   1053     {:assoc, refl, {0, related_key}}
   1054   end
   1055 
   1056   @impl true
   1057   def on_repo_change(%{on_replace: :nilify}, _, %{action: :replace}, _adapter, _opts) do
   1058     {:ok, nil}
   1059   end
   1060 
   1061   def on_repo_change(%{on_replace: :delete_if_exists} = refl, parent_changeset,
   1062                      %{action: :replace} = changeset, adapter, opts) do
   1063     try do
   1064       on_repo_change(%{refl | on_replace: :delete}, parent_changeset, changeset, adapter, opts)
   1065     rescue
   1066       Ecto.StaleEntryError -> {:ok, nil}
   1067     end
   1068   end
   1069 
   1070   def on_repo_change(%{on_replace: on_replace} = refl, parent_changeset,
   1071                      %{action: :replace} = changeset, adapter, opts) do
   1072     changeset =
   1073       case on_replace do
   1074         :delete -> %{changeset | action: :delete}
   1075         :update -> %{changeset | action: :update}
   1076       end
   1077 
   1078     on_repo_change(refl, parent_changeset, changeset, adapter, opts)
   1079   end
   1080 
   1081   def on_repo_change(_refl, %{data: parent, repo: repo}, %{action: action} = changeset, _adapter, opts) do
   1082     changeset = Ecto.Association.update_parent_prefix(changeset, parent)
   1083 
   1084     case apply(repo, action, [changeset, opts]) do
   1085       {:ok, _} = ok ->
   1086         if action == :delete, do: {:ok, nil}, else: ok
   1087       {:error, changeset} ->
   1088         {:error, changeset}
   1089     end
   1090   end
   1091 
   1092   ## Relation callbacks
   1093   @behaviour Ecto.Changeset.Relation
   1094 
   1095   @impl true
   1096   def build(%{related: related, queryable: queryable, defaults: defaults}, owner) do
   1097     related
   1098     |> Ecto.Association.apply_defaults(defaults, owner)
   1099     |> Ecto.Association.merge_source(queryable)
   1100   end
   1101 end
   1102 
   1103 defmodule Ecto.Association.ManyToMany do
   1104   @moduledoc """
   1105   The association struct for `many_to_many` associations.
   1106 
   1107   Its fields are:
   1108 
   1109     * `cardinality` - The association cardinality
   1110     * `field` - The name of the association field on the schema
   1111     * `owner` - The schema where the association was defined
   1112     * `related` - The schema that is associated
   1113     * `owner_key` - The key on the `owner` schema used for the association
   1114     * `queryable` - The real query to use for querying association
   1115     * `on_delete` - The action taken on associations when schema is deleted
   1116     * `on_replace` - The action taken on associations when schema is replaced
   1117     * `defaults` - Default fields used when building the association
   1118     * `relationship` - The relationship to the specified schema, default `:child`
   1119     * `join_keys` - The keyword list with many to many join keys
   1120     * `join_through` - Atom (representing a schema) or a string (representing a table)
   1121       for many to many associations
   1122     * `join_defaults` - A list of defaults for join associations
   1123     * `preload_order` - Default `order_by` of the association, used only by preload
   1124   """
   1125 
   1126   @behaviour Ecto.Association
   1127   @on_delete_opts [:nothing, :delete_all]
   1128   @on_replace_opts [:raise, :mark_as_invalid, :delete]
   1129   defstruct [:field, :owner, :related, :owner_key, :queryable, :on_delete,
   1130              :on_replace, :join_keys, :join_through, :on_cast, where: [],
   1131              join_where: [], defaults: [], join_defaults: [], relationship: :child,
   1132              cardinality: :many, unique: false, ordered: false, preload_order: []]
   1133 
   1134   @impl true
   1135   def after_compile_validation(%{queryable: queryable, join_through: join_through}, env) do
   1136     compiled = Ecto.Association.ensure_compiled(queryable, env)
   1137     join_compiled = Ecto.Association.ensure_compiled(join_through, env)
   1138 
   1139     cond do
   1140       compiled == :skip ->
   1141         :ok
   1142       compiled == :not_found ->
   1143         {:error, "associated schema #{inspect queryable} does not exist"}
   1144       not function_exported?(queryable, :__schema__, 2) ->
   1145         {:error, "associated module #{inspect queryable} is not an Ecto schema"}
   1146       join_compiled == :skip ->
   1147         :ok
   1148       join_compiled == :not_found ->
   1149         {:error, ":join_through schema #{inspect join_through} does not exist"}
   1150       not function_exported?(join_through, :__schema__, 2) ->
   1151         {:error, ":join_through module #{inspect join_through} is not an Ecto schema"}
   1152       true ->
   1153         :ok
   1154     end
   1155   end
   1156 
   1157   @impl true
   1158   def struct(module, name, opts) do
   1159     queryable = Keyword.fetch!(opts, :queryable)
   1160     related = Ecto.Association.related_from_query(queryable, name)
   1161 
   1162     join_keys = opts[:join_keys]
   1163     join_through = opts[:join_through]
   1164     validate_join_through(name, join_through)
   1165 
   1166     {owner_key, join_keys} =
   1167       case join_keys do
   1168         [{join_owner_key, owner_key}, {join_related_key, related_key}]
   1169             when is_atom(join_owner_key) and is_atom(owner_key) and
   1170                  is_atom(join_related_key) and is_atom(related_key) ->
   1171           {owner_key, join_keys}
   1172         nil ->
   1173           {:id, default_join_keys(module, related)}
   1174         _ ->
   1175           raise ArgumentError,
   1176             "many_to_many #{inspect name} expect :join_keys to be a keyword list " <>
   1177             "with two entries, the first being how the join table should reach " <>
   1178             "the current schema and the second how the join table should reach " <>
   1179             "the associated schema. For example: #{inspect default_join_keys(module, related)}"
   1180       end
   1181 
   1182     unless Module.get_attribute(module, :ecto_fields)[owner_key] do
   1183       raise ArgumentError, "schema does not have the field #{inspect owner_key} used by " <>
   1184         "association #{inspect name}, please set the :join_keys option accordingly"
   1185     end
   1186 
   1187     on_delete  = Keyword.get(opts, :on_delete, :nothing)
   1188     on_replace = Keyword.get(opts, :on_replace, :raise)
   1189 
   1190     unless on_delete in @on_delete_opts do
   1191       raise ArgumentError, "invalid :on_delete option for #{inspect name}. " <>
   1192         "The only valid options are: " <>
   1193         Enum.map_join(@on_delete_opts, ", ", &"`#{inspect &1}`")
   1194     end
   1195 
   1196     unless on_replace in @on_replace_opts do
   1197       raise ArgumentError, "invalid `:on_replace` option for #{inspect name}. " <>
   1198         "The only valid options are: " <>
   1199         Enum.map_join(@on_replace_opts, ", ", &"`#{inspect &1}`")
   1200     end
   1201 
   1202     where = opts[:where] || []
   1203     join_where = opts[:join_where] || []
   1204     defaults = Ecto.Association.validate_defaults!(module, name, opts[:defaults] || [])
   1205     join_defaults = Ecto.Association.validate_defaults!(module, name, opts[:join_defaults] || [])
   1206     preload_order = Ecto.Association.validate_preload_order!(name, opts[:preload_order] || [])
   1207 
   1208     unless is_list(where) do
   1209       raise ArgumentError, "expected `:where` for #{inspect name} to be a keyword list, got: `#{inspect where}`"
   1210     end
   1211 
   1212     unless is_list(join_where) do
   1213       raise ArgumentError, "expected `:join_where` for #{inspect name} to be a keyword list, got: `#{inspect join_where}`"
   1214     end
   1215 
   1216     if opts[:join_defaults] && is_binary(join_through) do
   1217       raise ArgumentError, ":join_defaults has no effect for a :join_through without a schema"
   1218     end
   1219 
   1220     %__MODULE__{
   1221       field: name,
   1222       cardinality: Keyword.fetch!(opts, :cardinality),
   1223       owner: module,
   1224       related: related,
   1225       owner_key: owner_key,
   1226       join_keys: join_keys,
   1227       join_where: join_where,
   1228       join_through: join_through,
   1229       join_defaults: join_defaults,
   1230       queryable: queryable,
   1231       on_delete: on_delete,
   1232       on_replace: on_replace,
   1233       unique: Keyword.get(opts, :unique, false),
   1234       defaults: defaults,
   1235       where: where,
   1236       preload_order: preload_order
   1237     }
   1238   end
   1239 
   1240   defp default_join_keys(module, related) do
   1241     [{Ecto.Association.association_key(module, :id), :id},
   1242      {Ecto.Association.association_key(related, :id), :id}]
   1243   end
   1244 
   1245   @impl true
   1246   def joins_query(%{owner: owner, queryable: queryable,
   1247                     join_through: join_through, join_keys: join_keys} = assoc) do
   1248     [{join_owner_key, owner_key}, {join_related_key, related_key}] = join_keys
   1249 
   1250     from(o in owner,
   1251       join: j in ^join_through, on: field(j, ^join_owner_key) == field(o, ^owner_key),
   1252       join: q in ^queryable, on: field(j, ^join_related_key) == field(q, ^related_key))
   1253     |> Ecto.Association.combine_joins_query(assoc.where, 2)
   1254     |> Ecto.Association.combine_joins_query(assoc.join_where, 1)
   1255   end
   1256 
   1257   def assoc_query(%{queryable: queryable} = refl, values) do
   1258     assoc_query(refl, queryable, values)
   1259   end
   1260 
   1261   @impl true
   1262   def assoc_query(assoc, query, values) do
   1263     %{queryable: queryable, join_through: join_through, join_keys: join_keys, owner: owner} = assoc
   1264     [{join_owner_key, owner_key}, {join_related_key, related_key}] = join_keys
   1265 
   1266     owner_key_type = owner.__schema__(:type, owner_key)
   1267 
   1268     # We only need to join in the "join table". Preload and Ecto.assoc expressions can then filter
   1269     # by &1.join_owner_key in ^... to filter down to the associated entries in the related table.
   1270     query =
   1271       from(q in (query || queryable),
   1272         join: j in ^join_through, on: field(q, ^related_key) == field(j, ^join_related_key),
   1273         where: field(j, ^join_owner_key) in type(^values, {:in, ^owner_key_type})
   1274       )
   1275       |> Ecto.Association.combine_assoc_query(assoc.where)
   1276 
   1277     Ecto.Association.combine_joins_query(query, assoc.join_where, length(query.joins))
   1278   end
   1279 
   1280   @impl true
   1281   def build(refl, owner, attributes) do
   1282     refl
   1283     |> build(owner)
   1284     |> struct(attributes)
   1285   end
   1286 
   1287   @impl true
   1288   def preload_info(%{join_keys: [{join_owner_key, owner_key}, {_, _}], owner: owner} = refl) do
   1289     owner_key_type = owner.__schema__(:type, owner_key)
   1290 
   1291     # When preloading use the last bound table (which is the join table) and the join_owner_key
   1292     # to filter out related entities to the owner structs we're preloading with.
   1293     {:assoc, refl, {-1, join_owner_key, owner_key_type}}
   1294   end
   1295 
   1296   @impl true
   1297   def on_repo_change(%{on_replace: :delete} = refl, parent_changeset,
   1298                      %{action: :replace}  = changeset, adapter, opts) do
   1299     on_repo_change(refl, parent_changeset, %{changeset | action: :delete}, adapter, opts)
   1300   end
   1301 
   1302   def on_repo_change(%{join_keys: join_keys, join_through: join_through, join_where: join_where},
   1303                      %{repo: repo, data: owner}, %{action: :delete, data: related}, adapter, opts) do
   1304     [{join_owner_key, owner_key}, {join_related_key, related_key}] = join_keys
   1305     owner_value = dump! :delete, join_through, owner, owner_key, adapter
   1306     related_value = dump! :delete, join_through, related, related_key, adapter
   1307 
   1308     query =
   1309       join_through
   1310       |> where([j], field(j, ^join_owner_key) == ^owner_value)
   1311       |> where([j], field(j, ^join_related_key) == ^related_value)
   1312       |> Ecto.Association.combine_assoc_query(join_where)
   1313 
   1314     query = %{query | prefix: owner.__meta__.prefix}
   1315     repo.delete_all(query, opts)
   1316     {:ok, nil}
   1317   end
   1318 
   1319   def on_repo_change(%{field: field, join_through: join_through, join_keys: join_keys} = refl,
   1320                      %{repo: repo, data: owner} = parent_changeset,
   1321                      %{action: action} = changeset, adapter, opts) do
   1322     changeset = Ecto.Association.update_parent_prefix(changeset, owner)
   1323 
   1324     case apply(repo, action, [changeset, opts]) do
   1325       {:ok, related} ->
   1326         [{join_owner_key, owner_key}, {join_related_key, related_key}] = join_keys
   1327 
   1328         if insert_join?(parent_changeset, changeset, field, related_key) do
   1329           owner_value = dump! :insert, join_through, owner, owner_key, adapter
   1330           related_value = dump! :insert, join_through, related, related_key, adapter
   1331           data = %{join_owner_key => owner_value, join_related_key => related_value}
   1332 
   1333           case insert_join(join_through, refl, parent_changeset, data, opts) do
   1334             {:error, join_changeset} ->
   1335               {:error, %{changeset | errors: join_changeset.errors ++ changeset.errors,
   1336                                      valid?: join_changeset.valid? and changeset.valid?}}
   1337             _ ->
   1338               {:ok, related}
   1339           end
   1340         else
   1341           {:ok, related}
   1342         end
   1343 
   1344       {:error, changeset} ->
   1345         {:error, changeset}
   1346     end
   1347   end
   1348 
   1349   defp validate_join_through(name, nil) do
   1350     raise ArgumentError, "many_to_many #{inspect name} associations require the :join_through option to be given"
   1351   end
   1352   defp validate_join_through(_, join_through) when is_atom(join_through) or is_binary(join_through) do
   1353     :ok
   1354   end
   1355   defp validate_join_through(name, _join_through) do
   1356     raise ArgumentError,
   1357       "many_to_many #{inspect name} associations require the :join_through option to be " <>
   1358       "an atom (representing a schema) or a string (representing a table)"
   1359   end
   1360 
   1361   defp insert_join?(%{action: :insert}, _, _field, _related_key), do: true
   1362   defp insert_join?(_, %{action: :insert}, _field, _related_key), do: true
   1363   defp insert_join?(%{data: owner}, %{data: related}, field, related_key) do
   1364     current_key = Map.fetch!(related, related_key)
   1365     not Enum.any? Map.fetch!(owner, field), fn child ->
   1366       Map.get(child, related_key) == current_key
   1367     end
   1368   end
   1369 
   1370   defp insert_join(join_through, _refl, %{repo: repo, data: owner}, data, opts) when is_binary(join_through) do
   1371     opts = Keyword.put_new(opts, :prefix, owner.__meta__.prefix)
   1372     repo.insert_all(join_through, [data], opts)
   1373   end
   1374 
   1375   defp insert_join(join_through, refl, parent_changeset, data, opts) when is_atom(join_through) do
   1376     %{repo: repo, constraints: constraints, data: owner} = parent_changeset
   1377 
   1378     changeset =
   1379       join_through
   1380       |> Ecto.Association.apply_defaults(refl.join_defaults, owner)
   1381       |> Map.merge(data)
   1382       |> Ecto.Changeset.change()
   1383       |> Map.put(:constraints, constraints)
   1384       |> put_new_prefix(owner.__meta__.prefix)
   1385 
   1386     repo.insert(changeset, opts)
   1387   end
   1388 
   1389   defp put_new_prefix(%{data: %{__meta__: %{prefix: prefix}}} = changeset, prefix),
   1390     do: changeset
   1391 
   1392   defp put_new_prefix(%{data: %{__meta__: %{prefix: nil}}} = changeset, prefix),
   1393     do: update_in(changeset.data, &Ecto.put_meta(&1, prefix: prefix))
   1394 
   1395   defp put_new_prefix(changeset, _),
   1396     do: changeset
   1397 
   1398   defp field!(op, struct, field) do
   1399     Map.get(struct, field) || raise "could not #{op} join entry because `#{field}` is nil in #{inspect struct}"
   1400   end
   1401 
   1402   defp dump!(action, join_through, struct, field, adapter) when is_binary(join_through) do
   1403     value = field!(action, struct, field)
   1404     type  = struct.__struct__.__schema__(:type, field)
   1405 
   1406     case Ecto.Type.adapter_dump(adapter, type, value) do
   1407       {:ok, value} ->
   1408         value
   1409       :error ->
   1410         raise Ecto.ChangeError,
   1411           "value `#{inspect value}` for `#{inspect struct.__struct__}.#{field}` " <>
   1412             "in `#{action}` does not match type #{inspect type}"
   1413     end
   1414   end
   1415 
   1416   defp dump!(action, join_through, struct, field, _) when is_atom(join_through) do
   1417     field!(action, struct, field)
   1418   end
   1419 
   1420   ## Relation callbacks
   1421   @behaviour Ecto.Changeset.Relation
   1422 
   1423   @impl true
   1424   def build(%{related: related, queryable: queryable, defaults: defaults}, owner) do
   1425     related
   1426     |> Ecto.Association.apply_defaults(defaults, owner)
   1427     |> Ecto.Association.merge_source(queryable)
   1428   end
   1429 
   1430   ## On delete callbacks
   1431 
   1432   @doc false
   1433   def delete_all(refl, parent, repo_name, opts) do
   1434     %{join_through: join_through, join_keys: join_keys, owner: owner} = refl
   1435     [{join_owner_key, owner_key}, {_, _}] = join_keys
   1436 
   1437     if value = Map.get(parent, owner_key) do
   1438       owner_type = owner.__schema__(:type, owner_key)
   1439       query = from j in join_through, where: field(j, ^join_owner_key) == type(^value, ^owner_type)
   1440       Ecto.Repo.Queryable.delete_all repo_name, query, opts
   1441     end
   1442   end
   1443 end