schema.ex (37328B)
1 defmodule Ecto.Repo.Schema do 2 # The module invoked by user defined repos 3 # for schema related functionality. 4 @moduledoc false 5 6 alias Ecto.Changeset 7 alias Ecto.Changeset.Relation 8 require Ecto.Query 9 10 import Ecto.Query.Planner, only: [attach_prefix: 2] 11 12 @doc """ 13 Implementation for `Ecto.Repo.insert_all/3`. 14 """ 15 def insert_all(repo, name, schema, rows, tuplet) when is_atom(schema) do 16 do_insert_all(repo, name, schema, schema.__schema__(:prefix), 17 schema.__schema__(:source), rows, tuplet) 18 end 19 20 def insert_all(repo, name, table, rows, tuplet) when is_binary(table) do 21 do_insert_all(repo, name, nil, nil, table, rows, tuplet) 22 end 23 24 def insert_all(repo, name, {source, schema}, rows, tuplet) when is_atom(schema) do 25 do_insert_all(repo, name, schema, schema.__schema__(:prefix), source, rows, tuplet) 26 end 27 28 defp do_insert_all(_repo, _name, _schema, _prefix, _source, [], {_adapter_meta, opts}) do 29 if opts[:returning] do 30 {0, []} 31 else 32 {0, nil} 33 end 34 end 35 36 defp do_insert_all(repo, _name, schema, prefix, source, rows_or_query, {adapter_meta, opts}) do 37 %{adapter: adapter} = adapter_meta 38 autogen_id = schema && schema.__schema__(:autogenerate_id) 39 dumper = schema && schema.__schema__(:dump) 40 placeholder_map = Keyword.get(opts, :placeholders, %{}) 41 42 {return_fields_or_types, return_sources} = 43 schema 44 |> returning(opts) 45 |> fields_to_sources(dumper) 46 47 {rows_or_query, header, row_cast_params, placeholder_cast_params, placeholder_dump_params, counter} = 48 extract_header_and_fields(repo, rows_or_query, schema, dumper, autogen_id, placeholder_map, adapter, opts) 49 50 schema_meta = metadata(schema, prefix, source, autogen_id, nil, opts) 51 52 on_conflict = Keyword.get(opts, :on_conflict, :raise) 53 conflict_target = Keyword.get(opts, :conflict_target, []) 54 conflict_target = conflict_target(conflict_target, dumper) 55 {on_conflict, conflict_cast_params} = on_conflict(on_conflict, conflict_target, schema_meta, counter, adapter) 56 opts = Keyword.put(opts, :cast_params, placeholder_cast_params ++ row_cast_params ++ conflict_cast_params) 57 58 {count, rows_or_query} = 59 adapter.insert_all(adapter_meta, schema_meta, header, rows_or_query, on_conflict, return_sources, placeholder_dump_params, opts) 60 61 {count, postprocess(rows_or_query, return_fields_or_types, adapter, schema, schema_meta)} 62 end 63 64 defp postprocess(nil, [], _adapter, _schema, _schema_meta) do 65 nil 66 end 67 68 defp postprocess(rows, fields, _adapter, nil, _schema_meta) do 69 for row <- rows, do: Map.new(Enum.zip(fields, row)) 70 end 71 72 defp postprocess(rows, types, adapter, schema, %{prefix: prefix, source: source}) do 73 struct = Ecto.Schema.Loader.load_struct(schema, prefix, source) 74 75 for row <- rows do 76 {loaded, _} = Ecto.Repo.Queryable.struct_load!(types, row, [], false, struct, adapter) 77 loaded 78 end 79 end 80 81 defp extract_header_and_fields(_repo, rows, schema, dumper, autogen_id, placeholder_map, adapter, _opts) 82 when is_list(rows) do 83 mapper = init_mapper(schema, dumper, adapter, placeholder_map) 84 85 {rows, {header, placeholder_dump, _}} = 86 Enum.map_reduce(rows, {%{}, %{}, 1}, fn fields, acc -> 87 {fields, {header, placeholder_dump, counter}} = Enum.map_reduce(fields, acc, mapper) 88 {fields, header} = autogenerate_id(autogen_id, fields, header, adapter) 89 {fields, {header, placeholder_dump, counter}} 90 end) 91 92 header = Map.keys(header) 93 94 placeholder_size = map_size(placeholder_dump) 95 96 {placeholder_cast_params, placeholder_dump_params} = 97 placeholder_dump 98 |> Enum.map(fn {_, {idx, _, cast_value, dump_value}} -> {idx, cast_value, dump_value} end) 99 |> Enum.sort 100 |> Enum.map(&{elem(&1, 1), elem(&1, 2)}) 101 |> Enum.unzip 102 103 {rows, row_cast_params, counter} = plan_query_in_rows(rows, header, adapter, placeholder_size) 104 {rows, header, row_cast_params, placeholder_cast_params, placeholder_dump_params, fn -> counter end} 105 end 106 107 defp extract_header_and_fields(repo, %Ecto.Query{} = query, _schema, _dumper, _autogen_id, _placeholder_map, adapter, opts) do 108 {query, opts} = repo.prepare_query(:insert_all, query, opts) 109 query = attach_prefix(query, opts) 110 111 {query, cast_params, dump_params} = Ecto.Adapter.Queryable.plan_query(:insert_all, adapter, query) 112 113 header = case query.select do 114 %Ecto.Query.SelectExpr{expr: {:%{}, _ctx, args}} -> 115 Enum.map(args, &elem(&1, 0)) 116 117 _ -> 118 raise ArgumentError, """ 119 cannot generate a fields list for insert_all from the given source query 120 because it does not have a select clause that uses a map: 121 122 #{inspect query} 123 124 Please add a select clause that selects into a map, like this: 125 126 from x in Source, 127 ..., 128 select: %{ 129 field_a: x.bar, 130 field_b: x.foo 131 } 132 133 The keys must exist in the schema that is being inserted into 134 """ 135 end 136 137 counter = fn -> length(dump_params) end 138 139 {{query, dump_params}, header, cast_params, [], [], counter} 140 end 141 142 defp extract_header_and_fields(_repo, rows_or_query, _schema, _dumper, _autogen_id, _placeholder_map, _adapter, _opts) do 143 raise ArgumentError, "expected a list of rows or a query, but got #{inspect rows_or_query} as rows_or_query argument in insert_all" 144 end 145 146 defp init_mapper(nil, _dumper, _adapter, placeholder_map) do 147 fn {field, value}, acc -> 148 extract_value(field, value, :any, placeholder_map, acc, & &1) 149 end 150 end 151 152 defp init_mapper(schema, dumper, adapter, placeholder_map) do 153 fn {field, value}, acc -> 154 case dumper do 155 %{^field => {source, type}} -> 156 extract_value(source, value, type, placeholder_map, acc, fn val -> 157 dump_field!(:insert_all, schema, field, type, val, adapter) 158 end) 159 160 %{} -> 161 raise ArgumentError, 162 "unknown field `#{inspect(field)}` in schema #{inspect(schema)} given to " <> 163 "insert_all. Note virtual fields and associations are not supported" 164 end 165 end 166 end 167 168 defp extract_value(source, value, type, placeholder_map, acc, dumper) do 169 {header, placeholder_dump, counter} = acc 170 171 case value do 172 %Ecto.Query{} = query -> 173 {{source, query}, {Map.put(header, source, true), placeholder_dump, counter}} 174 175 {:placeholder, key} -> 176 {value, placeholder_dump, counter} = 177 extract_placeholder(key, type, placeholder_map, placeholder_dump, counter, dumper) 178 179 {{source, value}, 180 {Map.put(header, source, true), placeholder_dump, counter}} 181 182 cast_value -> 183 {{source, cast_value, dumper.(value)}, 184 {Map.put(header, source, true), placeholder_dump, counter}} 185 end 186 end 187 188 defp extract_placeholder(key, type, placeholder_map, placeholder_dump, counter, dumper) do 189 case placeholder_dump do 190 %{^key => {idx, ^type, _, _}} -> 191 {{:placeholder, idx}, placeholder_dump, counter} 192 193 %{^key => {_, type, _}} -> 194 raise ArgumentError, 195 "a placeholder key can only be used with columns of the same type. " <> 196 "The key #{inspect(key)} has already been dumped as a #{inspect(type)}" 197 198 %{} -> 199 {cast_value, dump_value} = 200 case placeholder_map do 201 %{^key => cast_value} -> 202 {cast_value, dumper.(cast_value)} 203 204 _ -> 205 raise KeyError, 206 "placeholder key #{inspect(key)} not found in #{inspect(placeholder_map)}" 207 end 208 209 placeholder_dump = Map.put(placeholder_dump, key, {counter, type, cast_value, dump_value}) 210 {{:placeholder, counter}, placeholder_dump, counter + 1} 211 end 212 end 213 214 defp plan_query_in_rows(rows, header, adapter, counter) do 215 {rows, {cast_params, counter}} = 216 Enum.map_reduce(rows, {[], counter}, fn fields, {cast_param_acc, counter} -> 217 Enum.flat_map_reduce(header, {cast_param_acc, counter}, fn key, {cast_param_acc, counter} -> 218 case :lists.keyfind(key, 1, fields) do 219 {^key, %Ecto.Query{} = query} -> 220 {query, params, _} = Ecto.Query.Planner.plan(query, :all, adapter) 221 {cast_params, dump_params} = Enum.unzip(params) 222 {query, _} = Ecto.Query.Planner.normalize(query, :all, adapter, counter) 223 num_params = length(dump_params) 224 225 {[{key, {query, dump_params}}], {Enum.reverse(cast_params, cast_param_acc), counter + num_params}} 226 227 {^key, {:placeholder, _} = value} -> 228 {[{key, value}], {cast_param_acc, counter}} 229 230 {^key, cast_value, dump_value} -> 231 {[{key, dump_value}], {[cast_value | cast_param_acc], counter + 1}} 232 233 false -> 234 {[], {cast_param_acc, counter}} 235 end 236 end) 237 end) 238 239 {rows, Enum.reverse(cast_params), counter} 240 end 241 242 defp autogenerate_id(nil, fields, header, _adapter) do 243 {fields, header} 244 end 245 246 defp autogenerate_id({key, source, type}, fields, header, adapter) do 247 case :lists.keyfind(key, 1, fields) do 248 {^key, _, _} -> 249 {fields, header} 250 251 false -> 252 if dump_value = Ecto.Type.adapter_autogenerate(adapter, type) do 253 {:ok, cast_value} = Ecto.Type.adapter_load(adapter, type, dump_value) 254 {[{source, cast_value, dump_value} | fields], Map.put(header, source, true)} 255 else 256 {fields, header} 257 end 258 end 259 end 260 261 @doc """ 262 Implementation for `Ecto.Repo.insert!/2`. 263 """ 264 def insert!(repo, name, struct_or_changeset, tuplet) do 265 case insert(repo, name, struct_or_changeset, tuplet) do 266 {:ok, struct} -> 267 struct 268 269 {:error, %Ecto.Changeset{} = changeset} -> 270 raise Ecto.InvalidChangesetError, action: :insert, changeset: changeset 271 end 272 end 273 274 @doc """ 275 Implementation for `Ecto.Repo.update!/2`. 276 """ 277 def update!(repo, name, struct_or_changeset, tuplet) do 278 case update(repo, name, struct_or_changeset, tuplet) do 279 {:ok, struct} -> 280 struct 281 282 {:error, %Ecto.Changeset{} = changeset} -> 283 raise Ecto.InvalidChangesetError, action: :update, changeset: changeset 284 end 285 end 286 287 @doc """ 288 Implementation for `Ecto.Repo.delete!/2`. 289 """ 290 def delete!(repo, name, struct_or_changeset, tuplet) do 291 case delete(repo, name, struct_or_changeset, tuplet) do 292 {:ok, struct} -> 293 struct 294 295 {:error, %Ecto.Changeset{} = changeset} -> 296 raise Ecto.InvalidChangesetError, action: :delete, changeset: changeset 297 end 298 end 299 300 @doc """ 301 Implementation for `Ecto.Repo.insert/2`. 302 """ 303 def insert(repo, name, %Changeset{} = changeset, tuplet) do 304 do_insert(repo, name, changeset, tuplet) 305 end 306 307 def insert(repo, name, %{__struct__: _} = struct, tuplet) do 308 do_insert(repo, name, Ecto.Changeset.change(struct), tuplet) 309 end 310 311 defp do_insert(repo, _name, %Changeset{valid?: true} = changeset, {adapter_meta, opts} = tuplet) do 312 %{adapter: adapter} = adapter_meta 313 %{prepare: prepare, repo_opts: repo_opts} = changeset 314 opts = Keyword.merge(repo_opts, opts) 315 316 struct = struct_from_changeset!(:insert, changeset) 317 schema = struct.__struct__ 318 dumper = schema.__schema__(:dump) 319 fields = schema.__schema__(:fields) 320 assocs = schema.__schema__(:associations) 321 embeds = schema.__schema__(:embeds) 322 323 {return_types, return_sources} = 324 schema 325 |> returning(opts) 326 |> add_read_after_writes(schema) 327 |> fields_to_sources(dumper) 328 329 on_conflict = Keyword.get(opts, :on_conflict, :raise) 330 conflict_target = Keyword.get(opts, :conflict_target, []) 331 conflict_target = conflict_target(conflict_target, dumper) 332 333 # On insert, we always merge the whole struct into the 334 # changeset as changes, except the primary key if it is nil. 335 changeset = put_repo_and_action(changeset, :insert, repo, tuplet) 336 changeset = Relation.surface_changes(changeset, struct, fields ++ assocs) 337 338 wrap_in_transaction(adapter, adapter_meta, opts, changeset, assocs, embeds, prepare, fn -> 339 assoc_opts = assoc_opts(assocs, opts) 340 user_changeset = run_prepare(changeset, prepare) 341 342 {changeset, parents, children} = pop_assocs(user_changeset, assocs) 343 changeset = process_parents(changeset, user_changeset, parents, adapter, assoc_opts) 344 345 if changeset.valid? do 346 embeds = Ecto.Embedded.prepare(changeset, embeds, adapter, :insert) 347 348 autogen_id = schema.__schema__(:autogenerate_id) 349 schema_meta = metadata(struct, autogen_id, opts) 350 changes = Map.merge(changeset.changes, embeds) 351 352 {changes, cast_extra, dump_extra, return_types, return_sources} = 353 autogenerate_id(autogen_id, changes, return_types, return_sources, adapter) 354 355 changes = Map.take(changes, fields) 356 autogen = autogenerate_changes(schema, :insert, changes) 357 358 dump_changes = 359 dump_changes!(:insert, changes, autogen, schema, dump_extra, dumper, adapter) 360 361 {on_conflict, conflict_cast_params} = 362 on_conflict(on_conflict, conflict_target, schema_meta, fn -> length(dump_changes) end, adapter) 363 364 change_values = Enum.map(changes, &elem(&1, 1)) 365 autogen_values = Enum.map(autogen, &elem(&1, 1)) 366 opts = Keyword.put(opts, :cast_params, change_values ++ autogen_values ++ cast_extra ++ conflict_cast_params) 367 args = [adapter_meta, schema_meta, dump_changes, on_conflict, return_sources, opts] 368 369 case apply(user_changeset, adapter, :insert, args) do 370 {:ok, values} -> 371 values = dump_extra ++ values 372 373 changeset 374 |> load_changes(:loaded, return_types, values, embeds, autogen, adapter, schema_meta) 375 |> process_children(user_changeset, children, adapter, assoc_opts) 376 377 {:error, _} = error -> 378 error 379 end 380 else 381 {:error, changeset} 382 end 383 end) 384 end 385 386 defp do_insert(repo, _name, %Changeset{valid?: false} = changeset, tuplet) do 387 {:error, put_repo_and_action(changeset, :insert, repo, tuplet)} 388 end 389 390 @doc """ 391 Implementation for `Ecto.Repo.update/2`. 392 """ 393 def update(repo, name, %Changeset{} = changeset, tuplet) do 394 do_update(repo, name, changeset, tuplet) 395 end 396 397 def update(_repo, _name, %{__struct__: _}, _tuplet) do 398 raise ArgumentError, "giving a struct to Ecto.Repo.update/2 is not supported. " <> 399 "Ecto is unable to properly track changes when a struct is given, " <> 400 "an Ecto.Changeset must be given instead" 401 end 402 403 defp do_update(repo, _name, %Changeset{valid?: true} = changeset, {adapter_meta, opts} = tuplet) do 404 %{adapter: adapter} = adapter_meta 405 %{prepare: prepare, repo_opts: repo_opts} = changeset 406 opts = Keyword.merge(repo_opts, opts) 407 408 struct = struct_from_changeset!(:update, changeset) 409 schema = struct.__struct__ 410 dumper = schema.__schema__(:dump) 411 fields = schema.__schema__(:fields) 412 assocs = schema.__schema__(:associations) 413 embeds = schema.__schema__(:embeds) 414 415 force? = !!opts[:force] 416 filters = add_pk_filter!(changeset.filters, struct) 417 418 {return_types, return_sources} = 419 schema 420 |> returning(opts) 421 |> add_read_after_writes(schema) 422 |> fields_to_sources(dumper) 423 424 # Differently from insert, update does not copy the struct 425 # fields into the changeset. All changes must be in the 426 # changeset before hand. 427 changeset = put_repo_and_action(changeset, :update, repo, tuplet) 428 429 if changeset.changes != %{} or force? do 430 wrap_in_transaction(adapter, adapter_meta, opts, changeset, assocs, embeds, prepare, fn -> 431 assoc_opts = assoc_opts(assocs, opts) 432 user_changeset = run_prepare(changeset, prepare) 433 434 {changeset, parents, children} = pop_assocs(user_changeset, assocs) 435 changeset = process_parents(changeset, user_changeset, parents, adapter, assoc_opts) 436 437 if changeset.valid? do 438 embeds = Ecto.Embedded.prepare(changeset, embeds, adapter, :update) 439 440 changes = changeset.changes |> Map.merge(embeds) |> Map.take(fields) 441 autogen = autogenerate_changes(schema, :update, changes) 442 dump_changes = dump_changes!(:update, changes, autogen, schema, [], dumper, adapter) 443 444 schema_meta = metadata(struct, schema.__schema__(:autogenerate_id), opts) 445 dump_filters = dump_fields!(:update, schema, filters, dumper, adapter) 446 447 change_values = Enum.map(changes, &elem(&1, 1)) 448 autogen_values = Enum.map(autogen, &elem(&1, 1)) 449 filter_values = Enum.map(filters, &elem(&1, 1)) 450 opts = Keyword.put(opts, :cast_params, change_values ++ autogen_values ++ filter_values) 451 args = [adapter_meta, schema_meta, dump_changes, dump_filters, return_sources, opts] 452 453 # If there are no changes or all the changes were autogenerated but not forced, we skip 454 {action, autogen} = 455 if changes != %{} or (autogen != [] and force?), 456 do: {:update, autogen}, 457 else: {:noop, []} 458 459 case apply(user_changeset, adapter, action, args) do 460 {:ok, values} -> 461 changeset 462 |> load_changes(:loaded, return_types, values, embeds, autogen, adapter, schema_meta) 463 |> process_children(user_changeset, children, adapter, assoc_opts) 464 465 {:error, _} = error -> 466 error 467 end 468 else 469 {:error, changeset} 470 end 471 end) 472 else 473 {:ok, changeset.data} 474 end 475 end 476 477 defp do_update(repo, _name, %Changeset{valid?: false} = changeset, tuplet) do 478 {:error, put_repo_and_action(changeset, :update, repo, tuplet)} 479 end 480 481 @doc """ 482 Implementation for `Ecto.Repo.insert_or_update/2`. 483 """ 484 def insert_or_update(repo, name, changeset, tuplet) do 485 case get_state(changeset) do 486 :built -> insert(repo, name, changeset, tuplet) 487 :loaded -> update(repo, name, changeset, tuplet) 488 state -> raise ArgumentError, "the changeset has an invalid state " <> 489 "for Repo.insert_or_update/2: #{state}" 490 end 491 end 492 493 @doc """ 494 Implementation for `Ecto.Repo.insert_or_update!/2`. 495 """ 496 def insert_or_update!(repo, name, changeset, tuplet) do 497 case get_state(changeset) do 498 :built -> insert!(repo, name, changeset, tuplet) 499 :loaded -> update!(repo, name, changeset, tuplet) 500 state -> raise ArgumentError, "the changeset has an invalid state " <> 501 "for Repo.insert_or_update!/2: #{state}" 502 end 503 end 504 505 defp get_state(%Changeset{data: %{__meta__: %{state: state}}}), do: state 506 defp get_state(%{__struct__: _}) do 507 raise ArgumentError, "giving a struct to Repo.insert_or_update/2 or " <> 508 "Repo.insert_or_update!/2 is not supported. " <> 509 "Please use an Ecto.Changeset" 510 end 511 512 @doc """ 513 Implementation for `Ecto.Repo.delete/2`. 514 """ 515 def delete(repo, name, %Changeset{} = changeset, tuplet) do 516 do_delete(repo, name, changeset, tuplet) 517 end 518 519 def delete(repo, name, %{__struct__: _} = struct, tuplet) do 520 changeset = Ecto.Changeset.change(struct) 521 do_delete(repo, name, changeset, tuplet) 522 end 523 524 defp do_delete(repo, name, %Changeset{valid?: true} = changeset, {adapter_meta, opts} = tuplet) do 525 %{adapter: adapter} = adapter_meta 526 %{prepare: prepare, repo_opts: repo_opts} = changeset 527 opts = Keyword.merge(repo_opts, opts) 528 529 struct = struct_from_changeset!(:delete, changeset) 530 schema = struct.__struct__ 531 assocs = to_delete_assocs(schema) 532 dumper = schema.__schema__(:dump) 533 changeset = put_repo_and_action(changeset, :delete, repo, tuplet) 534 535 wrap_in_transaction(adapter, adapter_meta, opts, assocs != [], prepare, fn -> 536 changeset = run_prepare(changeset, prepare) 537 538 if changeset.valid? do 539 filters = add_pk_filter!(changeset.filters, struct) 540 dump_filters = dump_fields!(:delete, schema, filters, dumper, adapter) 541 542 # Delete related associations 543 for %{__struct__: mod, on_delete: on_delete} = reflection <- assocs do 544 apply(mod, on_delete, [reflection, changeset.data, name, tuplet]) 545 end 546 547 schema_meta = metadata(struct, schema.__schema__(:autogenerate_id), opts) 548 filter_values = Enum.map(filters, &elem(&1, 1)) 549 opts = Keyword.put(opts, :cast_params, filter_values) 550 args = [adapter_meta, schema_meta, dump_filters, opts] 551 552 case apply(changeset, adapter, :delete, args) do 553 {:ok, values} -> 554 changeset = load_changes(changeset, :deleted, [], values, %{}, [], adapter, schema_meta) 555 {:ok, changeset.data} 556 557 {:error, _} = error -> 558 error 559 end 560 else 561 {:error, changeset} 562 end 563 end) 564 end 565 566 defp do_delete(repo, _name, %Changeset{valid?: false} = changeset, tuplet) do 567 {:error, put_repo_and_action(changeset, :delete, repo, tuplet)} 568 end 569 570 def load(adapter, schema_or_types, data) do 571 do_load(schema_or_types, data, &Ecto.Type.adapter_load(adapter, &1, &2)) 572 end 573 574 defp do_load(schema, data, loader) when is_list(data), 575 do: do_load(schema, Map.new(data), loader) 576 defp do_load(schema, {fields, values}, loader) when is_list(fields) and is_list(values), 577 do: do_load(schema, Enum.zip(fields, values), loader) 578 defp do_load(schema, data, loader) when is_atom(schema), 579 do: Ecto.Schema.Loader.unsafe_load(schema, data, loader) 580 defp do_load(types, data, loader) when is_map(types), 581 do: Ecto.Schema.Loader.unsafe_load(%{}, types, data, loader) 582 583 ## Helpers 584 585 defp returning(schema, opts) do 586 case Keyword.get(opts, :returning, false) do 587 [_ | _] = fields -> 588 fields 589 [] -> 590 raise ArgumentError, ":returning expects at least one field to be given, got an empty list" 591 true when is_nil(schema) -> 592 raise ArgumentError, ":returning option can only be set to true if a schema is given" 593 true -> 594 schema.__schema__(:fields) 595 false -> 596 [] 597 end 598 end 599 600 defp add_read_after_writes([], schema), 601 do: schema.__schema__(:read_after_writes) 602 603 defp add_read_after_writes(return, schema), 604 do: Enum.uniq(return ++ schema.__schema__(:read_after_writes)) 605 606 defp fields_to_sources(fields, nil) do 607 {fields, fields} 608 end 609 defp fields_to_sources(fields, dumper) do 610 Enum.reduce(fields, {[], []}, fn field, {types, sources} -> 611 {source, type} = Map.fetch!(dumper, field) 612 {[{field, type} | types], [source | sources]} 613 end) 614 end 615 616 defp struct_from_changeset!(action, %{data: nil}), 617 do: raise(ArgumentError, "cannot #{action} a changeset without :data") 618 defp struct_from_changeset!(_action, %{data: struct}), 619 do: struct 620 621 defp put_repo_and_action(%{action: :ignore, valid?: valid?} = changeset, action, repo, {_adapter_meta, opts}) do 622 if valid? do 623 raise ArgumentError, "a valid changeset with action :ignore was given to " <> 624 "#{inspect repo}.#{action}/2. Changesets can only be ignored " <> 625 "in a repository action if they are also invalid" 626 else 627 %{changeset | action: action, repo: repo, repo_opts: opts} 628 end 629 end 630 defp put_repo_and_action(%{action: given}, action, repo, _tuplet) when given != nil and given != action, 631 do: raise(ArgumentError, "a changeset with action #{inspect given} was given to #{inspect repo}.#{action}/2") 632 defp put_repo_and_action(changeset, action, repo, {_adapter_meta, opts}), 633 do: %{changeset | action: action, repo: repo, repo_opts: opts} 634 635 defp run_prepare(changeset, prepare) do 636 Enum.reduce(Enum.reverse(prepare), changeset, fn fun, acc -> 637 case fun.(acc) do 638 %Ecto.Changeset{} = acc -> 639 acc 640 641 other -> 642 raise "expected function #{inspect fun} given to Ecto.Changeset.prepare_changes/2 " <> 643 "to return an Ecto.Changeset, got: `#{inspect other}`" 644 end 645 end) 646 end 647 648 defp metadata(schema, prefix, source, autogen_id, context, opts) do 649 %{ 650 autogenerate_id: autogen_id, 651 context: context, 652 schema: schema, 653 source: source, 654 prefix: Keyword.get(opts, :prefix, prefix) 655 } 656 end 657 defp metadata(%{__struct__: schema, __meta__: %{context: context, source: source, prefix: prefix}}, 658 autogen_id, opts) do 659 metadata(schema, prefix, source, autogen_id, context, opts) 660 end 661 defp metadata(%{__struct__: schema}, _, _) do 662 raise ArgumentError, "#{inspect(schema)} needs to be a schema with source" 663 end 664 665 defp conflict_target({:unsafe_fragment, fragment}, _dumper) when is_binary(fragment) do 666 {:unsafe_fragment, fragment} 667 end 668 defp conflict_target(conflict_target, dumper) do 669 for target <- List.wrap(conflict_target) do 670 case dumper do 671 %{^target => {alias, _}} -> 672 alias 673 %{} when is_atom(target) -> 674 raise ArgumentError, "unknown field `#{inspect(target)}` in conflict_target" 675 _ -> 676 target 677 end 678 end 679 end 680 681 defp on_conflict(on_conflict, conflict_target, schema_meta, counter_fun, adapter) do 682 %{source: source, schema: schema, prefix: prefix} = schema_meta 683 684 case on_conflict do 685 :raise when conflict_target == [] -> 686 {{:raise, [], []}, []} 687 688 :raise -> 689 raise ArgumentError, ":conflict_target option is forbidden when :on_conflict is :raise" 690 691 :nothing -> 692 {{:nothing, [], conflict_target}, []} 693 694 {:replace, keys} when is_list(keys) -> 695 fields = Enum.map(keys, &field_source!(schema, &1)) 696 {{fields, [], conflict_target}, []} 697 698 :replace_all -> 699 {{replace_all_fields!(:replace_all, schema, []), [], conflict_target}, []} 700 701 {:replace_all_except, fields} -> 702 {{replace_all_fields!(:replace_all_except, schema, fields), [], conflict_target}, []} 703 704 [_ | _] = on_conflict -> 705 from = if schema, do: {source, schema}, else: source 706 query = Ecto.Query.from from, update: ^on_conflict 707 on_conflict_query(query, {source, schema}, prefix, counter_fun, adapter, conflict_target) 708 709 %Ecto.Query{} = query -> 710 on_conflict_query(query, {source, schema}, prefix, counter_fun, adapter, conflict_target) 711 712 other -> 713 raise ArgumentError, "unknown value for :on_conflict, got: #{inspect other}" 714 end 715 end 716 717 defp replace_all_fields!(kind, nil, _to_remove) do 718 raise ArgumentError, "cannot use #{inspect(kind)} on operations without a schema" 719 end 720 721 defp replace_all_fields!(_kind, schema, to_remove) do 722 Enum.map(schema.__schema__(:fields) -- to_remove, &field_source!(schema, &1)) 723 end 724 725 defp field_source!(nil, field) do 726 field 727 end 728 729 defp field_source!(schema, field) do 730 schema.__schema__(:field_source, field) || 731 raise ArgumentError, "unknown field for :on_conflict, got: #{inspect(field)}" 732 end 733 734 defp on_conflict_query(query, from, prefix, counter_fun, adapter, conflict_target) do 735 {query, params, _} = 736 Ecto.Query.Planner.plan(%{query | prefix: prefix}, :update_all, adapter) 737 738 {cast_params, dump_params} = Enum.unzip(params) 739 740 unless query.from.source == from do 741 raise ArgumentError, "cannot run on_conflict: query because the query " <> 742 "has a different {source, schema} pair than the " <> 743 "original struct/changeset/query. Got #{inspect query.from} " <> 744 "and #{inspect from} respectively" 745 end 746 747 {query, _} = Ecto.Query.Planner.normalize(query, :update_all, adapter, counter_fun.()) 748 {{query, dump_params, conflict_target}, cast_params} 749 end 750 751 defp apply(_user_changeset, _adapter, :noop, _args) do 752 {:ok, []} 753 end 754 755 defp apply(user_changeset, adapter, action, args) do 756 case apply(adapter, action, args) do 757 {:ok, values} -> 758 {:ok, values} 759 760 {:invalid, constraints} -> 761 {:error, constraints_to_errors(user_changeset, action, constraints)} 762 763 {:error, :stale} -> 764 opts = List.last(args) 765 766 case Keyword.fetch(opts, :stale_error_field) do 767 {:ok, stale_error_field} when is_atom(stale_error_field) -> 768 stale_message = Keyword.get(opts, :stale_error_message, "is stale") 769 user_changeset = Changeset.add_error(user_changeset, stale_error_field, stale_message, [stale: true]) 770 {:error, user_changeset} 771 772 _other -> 773 raise Ecto.StaleEntryError, changeset: user_changeset, action: action 774 end 775 end 776 end 777 778 defp constraints_to_errors(%{constraints: user_constraints, errors: errors} = changeset, action, constraints) do 779 constraint_errors = 780 Enum.map constraints, fn {type, constraint} -> 781 user_constraint = 782 Enum.find(user_constraints, fn c -> 783 case {c.type, c.constraint, c.match} do 784 {^type, ^constraint, :exact} -> true 785 {^type, cc, :suffix} -> String.ends_with?(constraint, cc) 786 {^type, cc, :prefix} -> String.starts_with?(constraint, cc) 787 _ -> false 788 end 789 end) 790 791 case user_constraint do 792 %{field: field, error_message: error_message, error_type: error_type} -> 793 {field, {error_message, [constraint: error_type, constraint_name: constraint]}} 794 nil -> 795 raise Ecto.ConstraintError, action: action, type: type, 796 constraint: constraint, changeset: changeset 797 end 798 end 799 800 %{changeset | errors: constraint_errors ++ errors, valid?: false} 801 end 802 803 defp load_changes(changeset, state, types, values, embeds, autogen, adapter, schema_meta) do 804 %{data: data, changes: changes} = changeset 805 806 data = 807 data 808 |> merge_changes(changes) 809 |> Map.merge(embeds) 810 |> merge_autogen(autogen) 811 |> apply_metadata(state, schema_meta) 812 |> load_each(values, types, adapter) 813 814 Map.put(changeset, :data, data) 815 end 816 817 defp merge_changes(data, changes) do 818 changes = 819 Enum.reduce(changes, changes, fn {key, _value}, changes -> 820 if Map.has_key?(data, key), do: changes, else: Map.delete(changes, key) 821 end) 822 823 Map.merge(data, changes) 824 end 825 826 defp merge_autogen(data, autogen) do 827 Enum.reduce(autogen, data, fn {k, v}, acc -> %{acc | k => v} end) 828 end 829 830 defp apply_metadata(%{__meta__: meta} = data, state, %{source: source, prefix: prefix}) do 831 %{data | __meta__: %{meta | state: state, source: source, prefix: prefix}} 832 end 833 834 defp load_each(struct, [{_, value} | kv], [{key, type} | types], adapter) do 835 case Ecto.Type.adapter_load(adapter, type, value) do 836 {:ok, value} -> 837 load_each(%{struct | key => value}, kv, types, adapter) 838 :error -> 839 raise ArgumentError, "cannot load `#{inspect value}` as type #{inspect type} " <> 840 "for field `#{key}` in schema #{inspect struct.__struct__}" 841 end 842 end 843 defp load_each(struct, [], _types, _adapter) do 844 struct 845 end 846 847 defp pop_assocs(changeset, []) do 848 {changeset, [], []} 849 end 850 851 defp pop_assocs(%{changes: changes, types: types} = changeset, assocs) do 852 {changes, parent, child} = 853 Enum.reduce assocs, {changes, [], []}, fn assoc, {changes, parent, child} -> 854 case changes do 855 %{^assoc => value} -> 856 changes = Map.delete(changes, assoc) 857 858 case types do 859 %{^assoc => {:assoc, %{relationship: :parent} = refl}} -> 860 {changes, [{refl, value} | parent], child} 861 %{^assoc => {:assoc, %{relationship: :child} = refl}} -> 862 {changes, parent, [{refl, value} | child]} 863 end 864 865 %{} -> 866 {changes, parent, child} 867 end 868 end 869 870 {%{changeset | changes: changes}, parent, child} 871 end 872 873 # Don't mind computing options if there are no assocs 874 defp assoc_opts([], _opts), do: [] 875 876 defp assoc_opts(_assocs, opts) do 877 Keyword.take(opts, [:timeout, :log, :telemetry_event, :prefix]) 878 end 879 880 defp process_parents(changeset, user_changeset, assocs, adapter, opts) do 881 %{changes: changes, valid?: valid?} = changeset 882 883 # Even if the changeset is invalid, we want to run parent callbacks 884 # to collect feedback. But if all is ok, still return the user changeset. 885 case Ecto.Association.on_repo_change(changeset, assocs, adapter, opts) do 886 {:ok, struct} when valid? -> 887 changes = change_parents(changes, struct, assocs) 888 %{changeset | changes: changes, data: struct} 889 890 {:ok, _} -> 891 user_changeset 892 893 {:error, changes} -> 894 %{user_changeset | changes: Map.merge(user_changeset.changes, changes), valid?: false} 895 end 896 end 897 898 defp change_parents(changes, struct, assocs) do 899 Enum.reduce assocs, changes, fn {refl, _}, acc -> 900 %{field: field, owner_key: owner_key, related_key: related_key} = refl 901 related = Map.get(struct, field) 902 value = related && Map.fetch!(related, related_key) 903 904 case Map.fetch(changes, owner_key) do 905 {:ok, current} when current != value -> 906 raise ArgumentError, 907 "cannot change belongs_to association `#{field}` because there is " <> 908 "already a change setting its foreign key `#{owner_key}` to `#{inspect current}`" 909 910 _ -> 911 Map.put(acc, owner_key, value) 912 end 913 end 914 end 915 916 defp process_children(changeset, user_changeset, assocs, adapter, opts) do 917 case Ecto.Association.on_repo_change(changeset, assocs, adapter, opts) do 918 {:ok, struct} -> 919 {:ok, struct} 920 921 {:error, changes} -> 922 changes = Map.merge(user_changeset.changes, changes) 923 {:error, %{user_changeset | changes: changes, valid?: false}} 924 end 925 end 926 927 defp to_delete_assocs(schema) do 928 for assoc <- schema.__schema__(:associations), 929 reflection = schema.__schema__(:association, assoc), 930 match?(%{on_delete: on_delete} when on_delete != :nothing, reflection), 931 do: reflection 932 end 933 934 defp autogenerate_id(nil, changes, return_types, return_sources, _adapter) do 935 {changes, [], [], return_types, return_sources} 936 end 937 938 defp autogenerate_id({key, source, type}, changes, return_types, return_sources, adapter) do 939 cond do 940 Map.has_key?(changes, key) -> # Set by user 941 {changes, [], [], return_types, return_sources} 942 dump_value = Ecto.Type.adapter_autogenerate(adapter, type) -> # Autogenerated now 943 {:ok, cast_value} = Ecto.Type.adapter_load(adapter, type, dump_value) 944 {changes, [cast_value], [{source, dump_value}] , [{key, type} | return_types], return_sources} 945 true -> # Autogenerated in storage 946 {changes, [], [], [{key, type} | return_types], [source | List.delete(return_sources, source)]} 947 end 948 end 949 950 defp dump_changes!(action, changes, autogen, schema, extra, dumper, adapter) do 951 dump_fields!(action, schema, changes, dumper, adapter) ++ 952 dump_fields!(action, schema, autogen, dumper, adapter) ++ 953 extra 954 end 955 956 defp autogenerate_changes(schema, action, changes) do 957 autogen_fields = action |> action_to_auto() |> schema.__schema__() 958 959 Enum.flat_map(autogen_fields, fn {fields, {mod, fun, args}} -> 960 case Enum.reject(fields, &Map.has_key?(changes, &1)) do 961 [] -> 962 [] 963 964 fields -> 965 generated = apply(mod, fun, args) 966 Enum.map(fields, &{&1, generated}) 967 end 968 end) 969 end 970 971 defp action_to_auto(:insert), do: :autogenerate 972 defp action_to_auto(:update), do: :autoupdate 973 974 defp add_pk_filter!(filters, struct) do 975 Enum.reduce Ecto.primary_key!(struct), filters, fn 976 {_k, nil}, _acc -> 977 raise Ecto.NoPrimaryKeyValueError, struct: struct 978 {k, v}, acc -> 979 Map.put(acc, k, v) 980 end 981 end 982 983 defp wrap_in_transaction(adapter, adapter_meta, opts, changeset, assocs, embeds, prepare, fun) do 984 %{changes: changes} = changeset 985 changed = &Map.has_key?(changes, &1) 986 relations_changed? = Enum.any?(assocs, changed) or Enum.any?(embeds, changed) 987 wrap_in_transaction(adapter, adapter_meta, opts, relations_changed?, prepare, fun) 988 end 989 990 defp wrap_in_transaction(adapter, adapter_meta, opts, relations_changed?, prepare, fun) do 991 if (relations_changed? or prepare != []) and 992 function_exported?(adapter, :transaction, 3) and 993 not adapter.in_transaction?(adapter_meta) do 994 adapter.transaction(adapter_meta, opts, fn -> 995 case fun.() do 996 {:ok, struct} -> struct 997 {:error, changeset} -> adapter.rollback(adapter_meta, changeset) 998 end 999 end) 1000 else 1001 fun.() 1002 end 1003 end 1004 1005 defp dump_field!(action, schema, field, type, value, adapter) do 1006 case Ecto.Type.adapter_dump(adapter, type, value) do 1007 {:ok, value} -> 1008 value 1009 :error -> 1010 raise Ecto.ChangeError, 1011 "value `#{inspect(value)}` for `#{inspect(schema)}.#{field}` " <> 1012 "in `#{action}` does not match type #{inspect type}" 1013 end 1014 end 1015 1016 defp dump_fields!(action, schema, kw, dumper, adapter) do 1017 for {field, value} <- kw do 1018 {alias, type} = Map.fetch!(dumper, field) 1019 {alias, dump_field!(action, schema, field, type, value, adapter)} 1020 end 1021 end 1022 end