sql.ex (40713B)
1 defmodule Ecto.Adapters.SQL do 2 @moduledoc ~S""" 3 This application provides functionality for working with 4 SQL databases in `Ecto`. 5 6 ## Built-in adapters 7 8 By default, we support the following adapters: 9 10 * `Ecto.Adapters.Postgres` for Postgres 11 * `Ecto.Adapters.MyXQL` for MySQL 12 * `Ecto.Adapters.Tds` for SQLServer 13 14 ## Additional functions 15 16 If your `Ecto.Repo` is backed by any of the SQL adapters above, 17 this module will inject additional functions into your repository: 18 19 * `disconnect_all(interval, options \\ [])` - 20 shortcut for `Ecto.Adapters.SQL.disconnect_all/3` 21 22 * `explain(type, query, options \\ [])` - 23 shortcut for `Ecto.Adapters.SQL.explain/4` 24 25 * `query(sql, params, options \\ [])` - 26 shortcut for `Ecto.Adapters.SQL.query/4` 27 28 * `query!(sql, params, options \\ [])` - 29 shortcut for `Ecto.Adapters.SQL.query!/4` 30 31 * `query_many(sql, params, options \\ [])` - 32 shortcut for `Ecto.Adapters.SQL.query_many/4` 33 34 * `query_many!(sql, params, options \\ [])` - 35 shortcut for `Ecto.Adapters.SQL.query_many!/4` 36 37 * `to_sql(type, query)` - 38 shortcut for `Ecto.Adapters.SQL.to_sql/3` 39 40 Generally speaking, you must invoke those functions directly from 41 your repository, for example: `MyApp.Repo.query("SELECT true")`. 42 You can also invoke them directly from `Ecto.Adapters.SQL`, but 43 keep in mind that in such cases features such as "dynamic repositories" 44 won't be available. 45 46 ## Migrations 47 48 `ecto_sql` supports database migrations. You can generate a migration 49 with: 50 51 $ mix ecto.gen.migration create_posts 52 53 This will create a new file inside `priv/repo/migrations` with the 54 `change` function. Check `Ecto.Migration` for more information. 55 56 To interface with migrations, developers typically use mix tasks: 57 58 * `mix ecto.migrations` - lists all available migrations and their status 59 * `mix ecto.migrate` - runs a migration 60 * `mix ecto.rollback` - rolls back a previously run migration 61 62 If you want to run migrations programmatically, see `Ecto.Migrator`. 63 64 ## SQL sandbox 65 66 `ecto_sql` provides a sandbox for testing. The sandbox wraps each 67 test in a transaction, making sure the tests are isolated and can 68 run concurrently. See `Ecto.Adapters.SQL.Sandbox` for more information. 69 70 ## Structure load and dumping 71 72 If you have an existing database, you may want to dump its existing 73 structure and make it reproducible from within Ecto. This can be 74 achieved with two Mix tasks: 75 76 * `mix ecto.load` - loads an existing structure into the database 77 * `mix ecto.dump` - dumps the existing database structure to the filesystem 78 79 For creating and dropping databases, see `mix ecto.create` 80 and `mix ecto.drop` that are included as part of Ecto. 81 82 ## Custom adapters 83 84 Developers can implement their own SQL adapters by using 85 `Ecto.Adapters.SQL` and by implementing the callbacks required 86 by `Ecto.Adapters.SQL.Connection` for handling connections and 87 performing queries. The connection handling and pooling for SQL 88 adapters should be built using the `DBConnection` library. 89 90 When using `Ecto.Adapters.SQL`, the following options are required: 91 92 * `:driver` (required) - the database driver library. 93 For example: `:postgrex` 94 95 """ 96 97 require Logger 98 99 @doc false 100 defmacro __using__(opts) do 101 quote do 102 @behaviour Ecto.Adapter 103 @behaviour Ecto.Adapter.Migration 104 @behaviour Ecto.Adapter.Queryable 105 @behaviour Ecto.Adapter.Schema 106 @behaviour Ecto.Adapter.Transaction 107 108 opts = unquote(opts) 109 @conn __MODULE__.Connection 110 @driver Keyword.fetch!(opts, :driver) 111 112 @impl true 113 defmacro __before_compile__(env) do 114 Ecto.Adapters.SQL.__before_compile__(@driver, env) 115 end 116 117 @impl true 118 def ensure_all_started(config, type) do 119 Ecto.Adapters.SQL.ensure_all_started(@driver, config, type) 120 end 121 122 @impl true 123 def init(config) do 124 Ecto.Adapters.SQL.init(@conn, @driver, config) 125 end 126 127 @impl true 128 def checkout(meta, opts, fun) do 129 Ecto.Adapters.SQL.checkout(meta, opts, fun) 130 end 131 132 @impl true 133 def checked_out?(meta) do 134 Ecto.Adapters.SQL.checked_out?(meta) 135 end 136 137 @impl true 138 def loaders({:map, _}, type), do: [&Ecto.Type.embedded_load(type, &1, :json)] 139 def loaders(:binary_id, type), do: [Ecto.UUID, type] 140 def loaders(_, type), do: [type] 141 142 @impl true 143 def dumpers({:map, _}, type), do: [&Ecto.Type.embedded_dump(type, &1, :json)] 144 def dumpers(:binary_id, type), do: [type, Ecto.UUID] 145 def dumpers(_, type), do: [type] 146 147 ## Query 148 149 @impl true 150 def prepare(:all, query) do 151 {:cache, {System.unique_integer([:positive]), IO.iodata_to_binary(@conn.all(query))}} 152 end 153 154 def prepare(:update_all, query) do 155 {:cache, {System.unique_integer([:positive]), IO.iodata_to_binary(@conn.update_all(query))}} 156 end 157 158 def prepare(:delete_all, query) do 159 {:cache, {System.unique_integer([:positive]), IO.iodata_to_binary(@conn.delete_all(query))}} 160 end 161 162 @impl true 163 def execute(adapter_meta, query_meta, query, params, opts) do 164 Ecto.Adapters.SQL.execute(:named, adapter_meta, query_meta, query, params, opts) 165 end 166 167 @impl true 168 def stream(adapter_meta, query_meta, query, params, opts) do 169 Ecto.Adapters.SQL.stream(adapter_meta, query_meta, query, params, opts) 170 end 171 172 ## Schema 173 174 @impl true 175 def autogenerate(:id), do: nil 176 def autogenerate(:embed_id), do: Ecto.UUID.generate() 177 def autogenerate(:binary_id), do: Ecto.UUID.bingenerate() 178 179 @impl true 180 def insert_all(adapter_meta, schema_meta, header, rows, on_conflict, returning, placeholders, opts) do 181 Ecto.Adapters.SQL.insert_all(adapter_meta, schema_meta, @conn, header, rows, on_conflict, returning, placeholders, opts) 182 end 183 184 @impl true 185 def insert(adapter_meta, %{source: source, prefix: prefix}, params, 186 {kind, conflict_params, _} = on_conflict, returning, opts) do 187 {fields, values} = :lists.unzip(params) 188 sql = @conn.insert(prefix, source, fields, [fields], on_conflict, returning, []) 189 Ecto.Adapters.SQL.struct(adapter_meta, @conn, sql, :insert, source, [], values ++ conflict_params, kind, returning, opts) 190 end 191 192 @impl true 193 def update(adapter_meta, %{source: source, prefix: prefix}, fields, params, returning, opts) do 194 {fields, field_values} = :lists.unzip(fields) 195 filter_values = Keyword.values(params) 196 sql = @conn.update(prefix, source, fields, params, returning) 197 Ecto.Adapters.SQL.struct(adapter_meta, @conn, sql, :update, source, params, field_values ++ filter_values, :raise, returning, opts) 198 end 199 200 @impl true 201 def delete(adapter_meta, %{source: source, prefix: prefix}, params, opts) do 202 filter_values = Keyword.values(params) 203 sql = @conn.delete(prefix, source, params, []) 204 Ecto.Adapters.SQL.struct(adapter_meta, @conn, sql, :delete, source, params, filter_values, :raise, [], opts) 205 end 206 207 ## Transaction 208 209 @impl true 210 def transaction(meta, opts, fun) do 211 Ecto.Adapters.SQL.transaction(meta, opts, fun) 212 end 213 214 @impl true 215 def in_transaction?(meta) do 216 Ecto.Adapters.SQL.in_transaction?(meta) 217 end 218 219 @impl true 220 def rollback(meta, value) do 221 Ecto.Adapters.SQL.rollback(meta, value) 222 end 223 224 ## Migration 225 226 @impl true 227 def execute_ddl(meta, definition, opts) do 228 Ecto.Adapters.SQL.execute_ddl(meta, @conn, definition, opts) 229 end 230 231 defoverridable [prepare: 2, execute: 5, insert: 6, update: 6, delete: 4, insert_all: 8, 232 execute_ddl: 3, loaders: 2, dumpers: 2, autogenerate: 1, 233 ensure_all_started: 2, __before_compile__: 1] 234 end 235 end 236 237 @doc """ 238 Converts the given query to SQL according to its kind and the 239 adapter in the given repository. 240 241 ## Examples 242 243 The examples below are meant for reference. Each adapter will 244 return a different result: 245 246 iex> Ecto.Adapters.SQL.to_sql(:all, Repo, Post) 247 {"SELECT p.id, p.title, p.inserted_at, p.created_at FROM posts as p", []} 248 249 iex> Ecto.Adapters.SQL.to_sql(:update_all, Repo, 250 from(p in Post, update: [set: [title: ^"hello"]])) 251 {"UPDATE posts AS p SET title = $1", ["hello"]} 252 253 This function is also available under the repository with name `to_sql`: 254 255 iex> Repo.to_sql(:all, Post) 256 {"SELECT p.id, p.title, p.inserted_at, p.created_at FROM posts as p", []} 257 258 """ 259 @spec to_sql(:all | :update_all | :delete_all, Ecto.Repo.t, Ecto.Queryable.t) :: 260 {String.t, [term]} 261 def to_sql(kind, repo, queryable) do 262 case Ecto.Adapter.Queryable.prepare_query(kind, repo, queryable) do 263 {{:cached, _update, _reset, {_id, cached}}, params} -> 264 {String.Chars.to_string(cached), params} 265 266 {{:cache, _update, {_id, prepared}}, params} -> 267 {prepared, params} 268 269 {{:nocache, {_id, prepared}}, params} -> 270 {prepared, params} 271 end 272 end 273 274 @doc """ 275 Executes an EXPLAIN statement or similar for the given query according to its kind and the 276 adapter in the given repository. 277 278 ## Examples 279 280 # Postgres 281 iex> Ecto.Adapters.SQL.explain(Repo, :all, Post) 282 "Seq Scan on posts p0 (cost=0.00..12.12 rows=1 width=443)" 283 284 # MySQL 285 iex> Ecto.Adapters.SQL.explain(Repo, :all, from(p in Post, where: p.title == "title")) |> IO.puts() 286 +----+-------------+-------+------------+------+---------------+------+---------+------+------+----------+-------------+ 287 | id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | filtered | Extra | 288 +----+-------------+-------+------------+------+---------------+------+---------+------+------+----------+-------------+ 289 | 1 | SIMPLE | p0 | NULL | ALL | NULL | NULL | NULL | NULL | 1 | 100.0 | Using where | 290 +----+-------------+-------+------------+------+---------------+------+---------+------+------+----------+-------------+ 291 292 # Shared opts 293 iex> Ecto.Adapters.SQL.explain(Repo, :all, Post, analyze: true, timeout: 20_000) 294 "Seq Scan on posts p0 (cost=0.00..11.70 rows=170 width=443) (actual time=0.013..0.013 rows=0 loops=1)\\nPlanning Time: 0.031 ms\\nExecution Time: 0.021 ms" 295 296 It's safe to execute it for updates and deletes, no data change will be committed: 297 298 iex> Ecto.Adapters.SQL.explain(Repo, :update_all, from(p in Post, update: [set: [title: "new title"]])) 299 "Update on posts p0 (cost=0.00..11.70 rows=170 width=449)\\n -> Seq Scan on posts p0 (cost=0.00..11.70 rows=170 width=449)" 300 301 This function is also available under the repository with name `explain`: 302 303 iex> Repo.explain(:all, from(p in Post, where: p.title == "title")) 304 "Seq Scan on posts p0 (cost=0.00..12.12 rows=1 width=443)\\n Filter: ((title)::text = 'title'::text)" 305 306 ### Options 307 308 Built-in adapters support passing `opts` to the EXPLAIN statement according to the following: 309 310 Adapter | Supported opts 311 ---------------- | -------------- 312 Postgrex | `analyze`, `verbose`, `costs`, `settings`, `buffers`, `timing`, `summary` 313 MyXQL | None 314 315 _Postgrex_: Check [PostgreSQL doc](https://www.postgresql.org/docs/current/sql-explain.html) 316 for version compatibility. 317 318 Also note that: 319 320 * Currently `:map`, `:yaml`, and `:text` format options are supported 321 for PostgreSQL. `:map` is the deserialized JSON encoding. The last two 322 options return the result as a string; 323 324 * Any other value passed to `opts` will be forwarded to the underlying 325 adapter query function, including Repo shared options such as `:timeout`; 326 327 * Non built-in adapters may have specific behavior and you should consult 328 their own documentation. 329 330 """ 331 @spec explain(pid() | Ecto.Repo.t | Ecto.Adapter.adapter_meta, 332 :all | :update_all | :delete_all, 333 Ecto.Queryable.t, opts :: Keyword.t) :: String.t | Exception.t 334 def explain(repo, operation, queryable, opts \\ []) 335 336 def explain(repo, operation, queryable, opts) when is_atom(repo) or is_pid(repo) do 337 explain(Ecto.Adapter.lookup_meta(repo), operation, queryable, opts) 338 end 339 340 def explain(%{repo: repo} = adapter_meta, operation, queryable, opts) do 341 Ecto.Multi.new() 342 |> Ecto.Multi.run(:explain, fn _, _ -> 343 {prepared, prepared_params} = to_sql(operation, repo, queryable) 344 sql_call(adapter_meta, :explain_query, [prepared], prepared_params, opts) 345 end) 346 |> Ecto.Multi.run(:rollback, fn _, _ -> 347 {:error, :forced_rollback} 348 end) 349 |> repo.transaction(opts) 350 |> case do 351 {:error, :rollback, :forced_rollback, %{explain: result}} -> result 352 {:error, :explain, error, _} -> raise error 353 _ -> raise "unable to execute explain" 354 end 355 end 356 357 @doc """ 358 Forces all connections in the repo pool to disconnect within the given interval. 359 360 Once this function is called, the pool will disconnect all of its connections 361 as they are checked in or as they are pinged. Checked in connections will be 362 randomly disconnected within the given time interval. Pinged connections are 363 immediately disconnected - as they are idle (according to `:idle_interval`). 364 365 If the connection has a backoff configured (which is the case by default), 366 disconnecting means an attempt at a new connection will be done immediately 367 after, without starting a new process for each connection. However, if backoff 368 has been disabled, the connection process will terminate. In such cases, 369 disconnecting all connections may cause the pool supervisor to restart 370 depending on the max_restarts/max_seconds configuration of the pool, 371 so you will want to set those carefully. 372 373 For convenience, this function is also available in the repository: 374 375 iex> MyRepo.disconnect_all(60_000) 376 :ok 377 """ 378 @spec disconnect_all(pid | Ecto.Repo.t | Ecto.Adapter.adapter_meta, non_neg_integer, opts :: Keyword.t()) :: :ok 379 def disconnect_all(repo, interval, opts \\ []) 380 381 def disconnect_all(repo, interval, opts) when is_atom(repo) or is_pid(repo) do 382 disconnect_all(Ecto.Adapter.lookup_meta(repo), interval, opts) 383 end 384 385 def disconnect_all(%{pid: pid} = _adapter_meta, interval, opts) do 386 DBConnection.disconnect_all(pid, interval, opts) 387 end 388 389 @doc """ 390 Returns a stream that runs a custom SQL query on given repo when reduced. 391 392 In case of success it is a enumerable containing maps with at least two keys: 393 394 * `:num_rows` - the number of rows affected 395 396 * `:rows` - the result set as a list. `nil` may be returned 397 instead of the list if the command does not yield any row 398 as result (but still yields the number of affected rows, 399 like a `delete` command without returning would) 400 401 In case of failure it raises an exception. 402 403 If the adapter supports a collectable stream, the stream may also be used as 404 the collectable in `Enum.into/3`. Behaviour depends on the adapter. 405 406 ## Options 407 408 * `:log` - When false, does not log the query 409 * `:max_rows` - The number of rows to load from the database as we stream 410 411 ## Examples 412 413 iex> Ecto.Adapters.SQL.stream(MyRepo, "SELECT $1::integer + $2", [40, 2]) |> Enum.to_list() 414 [%{rows: [[42]], num_rows: 1}] 415 416 """ 417 @spec stream(Ecto.Repo.t, String.t, [term], Keyword.t) :: Enum.t 418 def stream(repo, sql, params \\ [], opts \\ []) do 419 repo 420 |> Ecto.Adapter.lookup_meta() 421 |> Ecto.Adapters.SQL.Stream.build(sql, params, opts) 422 end 423 424 @doc """ 425 Same as `query/4` but raises on invalid queries. 426 """ 427 @spec query!(pid() | Ecto.Repo.t | Ecto.Adapter.adapter_meta, iodata, [term], Keyword.t) :: 428 %{:rows => nil | [[term] | binary], 429 :num_rows => non_neg_integer, 430 optional(atom) => any} 431 def query!(repo, sql, params \\ [], opts \\ []) do 432 case query(repo, sql, params, opts) do 433 {:ok, result} -> result 434 {:error, err} -> raise_sql_call_error err 435 end 436 end 437 438 @doc """ 439 Runs a custom SQL query on the given repo. 440 441 In case of success, it must return an `:ok` tuple containing 442 a map with at least two keys: 443 444 * `:num_rows` - the number of rows affected 445 446 * `:rows` - the result set as a list. `nil` may be returned 447 instead of the list if the command does not yield any row 448 as result (but still yields the number of affected rows, 449 like a `delete` command without returning would) 450 451 ## Options 452 453 * `:log` - When false, does not log the query 454 455 ## Examples 456 457 iex> Ecto.Adapters.SQL.query(MyRepo, "SELECT $1::integer + $2", [40, 2]) 458 {:ok, %{rows: [[42]], num_rows: 1}} 459 460 For convenience, this function is also available under the repository: 461 462 iex> MyRepo.query("SELECT $1::integer + $2", [40, 2]) 463 {:ok, %{rows: [[42]], num_rows: 1}} 464 465 """ 466 @spec query(pid() | Ecto.Repo.t | Ecto.Adapter.adapter_meta, iodata, [term], Keyword.t) :: 467 {:ok, %{:rows => nil | [[term] | binary], 468 :num_rows => non_neg_integer, 469 optional(atom) => any}} 470 | {:error, Exception.t} 471 def query(repo, sql, params \\ [], opts \\ []) 472 473 def query(repo, sql, params, opts) when is_atom(repo) or is_pid(repo) do 474 query(Ecto.Adapter.lookup_meta(repo), sql, params, opts) 475 end 476 477 def query(adapter_meta, sql, params, opts) do 478 sql_call(adapter_meta, :query, [sql], params, opts) 479 end 480 481 @doc """ 482 Same as `query_many/4` but raises on invalid queries. 483 """ 484 @spec query_many!(Ecto.Repo.t | Ecto.Adapter.adapter_meta, iodata, [term], Keyword.t) :: 485 [%{:rows => nil | [[term] | binary], 486 :num_rows => non_neg_integer, 487 optional(atom) => any}] 488 def query_many!(repo, sql, params \\ [], opts \\ []) do 489 case query_many(repo, sql, params, opts) do 490 {:ok, result} -> result 491 {:error, err} -> raise_sql_call_error err 492 end 493 end 494 495 @doc """ 496 Runs a custom SQL query that returns multiple results on the given repo. 497 498 In case of success, it must return an `:ok` tuple containing 499 a list of maps with at least two keys: 500 501 * `:num_rows` - the number of rows affected 502 503 * `:rows` - the result set as a list. `nil` may be returned 504 instead of the list if the command does not yield any row 505 as result (but still yields the number of affected rows, 506 like a `delete` command without returning would) 507 508 ## Options 509 510 * `:log` - When false, does not log the query 511 512 ## Examples 513 514 iex> Ecto.Adapters.SQL.query_many(MyRepo, "SELECT $1; SELECT $2;", [40, 2]) 515 {:ok, [%{rows: [[40]], num_rows: 1}, %{rows: [[2]], num_rows: 1}]} 516 517 For convenience, this function is also available under the repository: 518 519 iex> MyRepo.query_many(SELECT $1; SELECT $2;", [40, 2]) 520 {:ok, [%{rows: [[40]], num_rows: 1}, %{rows: [[2]], num_rows: 1}]} 521 522 """ 523 @spec query_many(pid() | Ecto.Repo.t | Ecto.Adapter.adapter_meta, iodata, [term], Keyword.t) :: 524 {:ok, [%{:rows => nil | [[term] | binary], 525 :num_rows => non_neg_integer, 526 optional(atom) => any}]} 527 | {:error, Exception.t} 528 def query_many(repo, sql, params \\ [], opts \\ []) 529 530 def query_many(repo, sql, params, opts) when is_atom(repo) or is_pid(repo) do 531 query_many(Ecto.Adapter.lookup_meta(repo), sql, params, opts) 532 end 533 534 def query_many(adapter_meta, sql, params, opts) do 535 sql_call(adapter_meta, :query_many, [sql], params, opts) 536 end 537 538 defp sql_call(adapter_meta, callback, args, params, opts) do 539 %{pid: pool, telemetry: telemetry, sql: sql, opts: default_opts} = adapter_meta 540 conn = get_conn_or_pool(pool) 541 opts = with_log(telemetry, params, opts ++ default_opts) 542 args = args ++ [params, opts] 543 apply(sql, callback, [conn | args]) 544 end 545 546 defp put_source(opts, %{sources: sources}) when is_binary(elem(elem(sources, 0), 0)) do 547 {source, _, _} = elem(sources, 0) 548 [source: source] ++ opts 549 end 550 551 defp put_source(opts, _) do 552 opts 553 end 554 555 @doc """ 556 Check if the given `table` exists. 557 558 Returns `true` if the `table` exists in the `repo`, otherwise `false`. 559 The table is checked against the current database/schema in the connection. 560 """ 561 @spec table_exists?(Ecto.Repo.t, table :: String.t) :: boolean 562 def table_exists?(repo, table) when is_atom(repo) do 563 %{sql: sql} = adapter_meta = Ecto.Adapter.lookup_meta(repo) 564 {query, params} = sql.table_exists_query(table) 565 query!(adapter_meta, query, params, []).num_rows != 0 566 end 567 568 # Returns a formatted table for a given query `result`. 569 # 570 # ## Examples 571 # 572 # iex> Ecto.Adapters.SQL.format_table(query) |> IO.puts() 573 # +---------------+---------+--------+ 574 # | title | counter | public | 575 # +---------------+---------+--------+ 576 # | My Post Title | 1 | NULL | 577 # +---------------+---------+--------+ 578 @doc false 579 @spec format_table(%{:columns => [String.t] | nil, :rows => [term()] | nil, optional(atom) => any()}) :: String.t 580 def format_table(result) 581 582 def format_table(nil), do: "" 583 def format_table(%{columns: nil}), do: "" 584 def format_table(%{columns: []}), do: "" 585 def format_table(%{columns: columns, rows: nil}), do: format_table(%{columns: columns, rows: []}) 586 587 def format_table(%{columns: columns, rows: rows}) do 588 column_widths = 589 [columns | rows] 590 |> List.zip() 591 |> Enum.map(&Tuple.to_list/1) 592 |> Enum.map(fn column_with_rows -> 593 column_with_rows |> Enum.map(&binary_length/1) |> Enum.max() 594 end) 595 596 [ 597 separator(column_widths), 598 "\n", 599 cells(columns, column_widths), 600 "\n", 601 separator(column_widths), 602 "\n", 603 Enum.map(rows, &cells(&1, column_widths) ++ ["\n"]), 604 separator(column_widths) 605 ] 606 |> IO.iodata_to_binary() 607 end 608 609 defp binary_length(nil), do: 4 # NULL 610 defp binary_length(binary) when is_binary(binary), do: String.length(binary) 611 defp binary_length(other), do: other |> inspect() |> String.length() 612 613 defp separator(widths) do 614 Enum.map(widths, & [?+, ?-, String.duplicate("-", &1), ?-]) ++ [?+] 615 end 616 617 defp cells(items, widths) do 618 cell = 619 [items, widths] 620 |> List.zip() 621 |> Enum.map(fn {item, width} -> [?|, " ", format_item(item, width) , " "] end) 622 623 [cell | [?|]] 624 end 625 626 defp format_item(nil, width), do: String.pad_trailing("NULL", width) 627 defp format_item(item, width) when is_binary(item), do: String.pad_trailing(item, width) 628 defp format_item(item, width) when is_number(item), do: item |> inspect() |> String.pad_leading(width) 629 defp format_item(item, width), do: item |> inspect() |> String.pad_trailing(width) 630 631 ## Callbacks 632 633 @doc false 634 def __before_compile__(_driver, _env) do 635 quote do 636 @doc """ 637 A convenience function for SQL-based repositories that executes the given query. 638 639 See `Ecto.Adapters.SQL.query/4` for more information. 640 """ 641 def query(sql, params \\ [], opts \\ []) do 642 Ecto.Adapters.SQL.query(get_dynamic_repo(), sql, params, opts) 643 end 644 645 @doc """ 646 A convenience function for SQL-based repositories that executes the given query. 647 648 See `Ecto.Adapters.SQL.query!/4` for more information. 649 """ 650 def query!(sql, params \\ [], opts \\ []) do 651 Ecto.Adapters.SQL.query!(get_dynamic_repo(), sql, params, opts) 652 end 653 654 @doc """ 655 A convenience function for SQL-based repositories that executes the given multi-result query. 656 657 See `Ecto.Adapters.SQL.query_many/4` for more information. 658 """ 659 def query_many(sql, params \\ [], opts \\ []) do 660 Ecto.Adapters.SQL.query_many(get_dynamic_repo(), sql, params, opts) 661 end 662 663 @doc """ 664 A convenience function for SQL-based repositories that executes the given multi-result query. 665 666 See `Ecto.Adapters.SQL.query_many!/4` for more information. 667 """ 668 def query_many!(sql, params \\ [], opts \\ []) do 669 Ecto.Adapters.SQL.query_many!(get_dynamic_repo(), sql, params, opts) 670 end 671 672 @doc """ 673 A convenience function for SQL-based repositories that translates the given query to SQL. 674 675 See `Ecto.Adapters.SQL.to_sql/3` for more information. 676 """ 677 def to_sql(operation, queryable) do 678 Ecto.Adapters.SQL.to_sql(operation, get_dynamic_repo(), queryable) 679 end 680 681 @doc """ 682 A convenience function for SQL-based repositories that executes an EXPLAIN statement or similar 683 depending on the adapter to obtain statistics for the given query. 684 685 See `Ecto.Adapters.SQL.explain/4` for more information. 686 """ 687 def explain(operation, queryable, opts \\ []) do 688 Ecto.Adapters.SQL.explain(get_dynamic_repo(), operation, queryable, opts) 689 end 690 691 @doc """ 692 A convenience function for SQL-based repositories that forces all connections in the 693 pool to disconnect within the given interval. 694 695 See `Ecto.Adapters.SQL.disconnect_all/3` for more information. 696 """ 697 def disconnect_all(interval, opts \\ []) do 698 Ecto.Adapters.SQL.disconnect_all(get_dynamic_repo(), interval, opts) 699 end 700 end 701 end 702 703 @doc false 704 def ensure_all_started(driver, _config, type) do 705 Application.ensure_all_started(driver, type) 706 end 707 708 @pool_opts [:timeout, :pool, :pool_size] ++ 709 [:queue_target, :queue_interval, :ownership_timeout, :repo] 710 711 @doc false 712 def init(connection, driver, config) do 713 unless Code.ensure_loaded?(connection) do 714 raise """ 715 could not find #{inspect connection}. 716 717 Please verify you have added #{inspect driver} as a dependency: 718 719 {#{inspect driver}, ">= 0.0.0"} 720 721 And remember to recompile Ecto afterwards by cleaning the current build: 722 723 mix deps.clean --build ecto 724 """ 725 end 726 727 log = Keyword.get(config, :log, :debug) 728 stacktrace = Keyword.get(config, :stacktrace, nil) 729 telemetry_prefix = Keyword.fetch!(config, :telemetry_prefix) 730 telemetry = {config[:repo], log, telemetry_prefix ++ [:query]} 731 732 config = adapter_config(config) 733 opts = Keyword.take(config, @pool_opts) 734 meta = %{telemetry: telemetry, sql: connection, stacktrace: stacktrace, opts: opts} 735 {:ok, connection.child_spec(config), meta} 736 end 737 738 defp adapter_config(config) do 739 if Keyword.has_key?(config, :pool_timeout) do 740 message = """ 741 :pool_timeout option no longer has an effect and has been replaced with an improved queuing system. 742 See \"Queue config\" in DBConnection.start_link/2 documentation for more information. 743 """ 744 745 IO.warn(message) 746 end 747 748 config 749 |> Keyword.delete(:name) 750 |> Keyword.update(:pool, DBConnection.ConnectionPool, &normalize_pool/1) 751 end 752 753 defp normalize_pool(pool) do 754 if Code.ensure_loaded?(pool) && function_exported?(pool, :unboxed_run, 2) do 755 DBConnection.Ownership 756 else 757 pool 758 end 759 end 760 761 @doc false 762 def checkout(adapter_meta, opts, callback) do 763 checkout_or_transaction(:run, adapter_meta, opts, callback) 764 end 765 766 @doc false 767 def checked_out?(adapter_meta) do 768 %{pid: pool} = adapter_meta 769 get_conn(pool) != nil 770 end 771 772 ## Query 773 774 @doc false 775 def insert_all(adapter_meta, schema_meta, conn, header, rows, on_conflict, returning, placeholders, opts) do 776 %{source: source, prefix: prefix} = schema_meta 777 {_, conflict_params, _} = on_conflict 778 779 {rows, params} = 780 case rows do 781 {%Ecto.Query{} = query, params} -> {query, Enum.reverse(params)} 782 rows -> unzip_inserts(header, rows) 783 end 784 785 sql = conn.insert(prefix, source, header, rows, on_conflict, returning, placeholders) 786 787 opts = if is_nil(Keyword.get(opts, :cache_statement)) do 788 [{:cache_statement, "ecto_insert_all_#{source}"} | opts] 789 else 790 opts 791 end 792 793 all_params = placeholders ++ Enum.reverse(params, conflict_params) 794 795 %{num_rows: num, rows: rows} = query!(adapter_meta, sql, all_params, opts) 796 {num, rows} 797 end 798 799 defp unzip_inserts(header, rows) do 800 Enum.map_reduce rows, [], fn fields, params -> 801 Enum.map_reduce header, params, fn key, acc -> 802 case :lists.keyfind(key, 1, fields) do 803 {^key, {%Ecto.Query{} = query, query_params}} -> 804 {{query, length(query_params)}, Enum.reverse(query_params, acc)} 805 806 {^key, {:placeholder, placeholder_index}} -> 807 {{:placeholder, Integer.to_string(placeholder_index)}, acc} 808 809 {^key, value} -> {key, [value | acc]} 810 811 false -> {nil, acc} 812 end 813 end 814 end 815 end 816 817 @doc false 818 def execute(prepare, adapter_meta, query_meta, prepared, params, opts) do 819 %{num_rows: num, rows: rows} = 820 execute!(prepare, adapter_meta, prepared, params, put_source(opts, query_meta)) 821 822 {num, rows} 823 end 824 825 defp execute!(prepare, adapter_meta, {:cache, update, {id, prepared}}, params, opts) do 826 name = prepare_name(prepare, id) 827 828 case sql_call(adapter_meta, :prepare_execute, [name, prepared], params, opts) do 829 {:ok, query, result} -> 830 maybe_update_cache(prepare, update, {id, query}) 831 result 832 {:error, err} -> 833 raise_sql_call_error err 834 end 835 end 836 837 defp execute!(:unnamed = prepare, adapter_meta, {:cached, _update, _reset, {id, cached}}, params, opts) do 838 name = prepare_name(prepare, id) 839 prepared = String.Chars.to_string(cached) 840 841 case sql_call(adapter_meta, :prepare_execute, [name, prepared], params, opts) do 842 {:ok, _query, result} -> 843 result 844 {:error, err} -> 845 raise_sql_call_error err 846 end 847 end 848 849 defp execute!(:named = _prepare, adapter_meta, {:cached, update, reset, {id, cached}}, params, opts) do 850 case sql_call(adapter_meta, :execute, [cached], params, opts) do 851 {:ok, query, result} -> 852 update.({id, query}) 853 result 854 {:ok, result} -> 855 result 856 {:error, err} -> 857 raise_sql_call_error err 858 {:reset, err} -> 859 reset.({id, String.Chars.to_string(cached)}) 860 raise_sql_call_error err 861 end 862 end 863 864 defp execute!(_prepare, adapter_meta, {:nocache, {_id, prepared}}, params, opts) do 865 case sql_call(adapter_meta, :query, [prepared], params, opts) do 866 {:ok, res} -> res 867 {:error, err} -> raise_sql_call_error err 868 end 869 end 870 871 defp prepare_name(:named, id), do: "ecto_" <> Integer.to_string(id) 872 defp prepare_name(:unnamed, _id), do: "" 873 874 defp maybe_update_cache(:named = _prepare, update, value), do: update.(value) 875 defp maybe_update_cache(:unnamed = _prepare, _update, _value), do: :noop 876 877 @doc false 878 def stream(adapter_meta, query_meta, prepared, params, opts) do 879 do_stream(adapter_meta, prepared, params, put_source(opts, query_meta)) 880 end 881 882 defp do_stream(adapter_meta, {:cache, _, {_, prepared}}, params, opts) do 883 prepare_stream(adapter_meta, prepared, params, opts) 884 end 885 886 defp do_stream(adapter_meta, {:cached, _, _, {_, cached}}, params, opts) do 887 prepare_stream(adapter_meta, String.Chars.to_string(cached), params, opts) 888 end 889 890 defp do_stream(adapter_meta, {:nocache, {_id, prepared}}, params, opts) do 891 prepare_stream(adapter_meta, prepared, params, opts) 892 end 893 894 defp prepare_stream(adapter_meta, prepared, params, opts) do 895 adapter_meta 896 |> Ecto.Adapters.SQL.Stream.build(prepared, params, opts) 897 |> Stream.map(fn(%{num_rows: nrows, rows: rows}) -> {nrows, rows} end) 898 end 899 900 defp raise_sql_call_error(%DBConnection.OwnershipError{} = err) do 901 message = err.message <> "\nSee Ecto.Adapters.SQL.Sandbox docs for more information." 902 raise %{err | message: message} 903 end 904 905 defp raise_sql_call_error(err), do: raise err 906 907 @doc false 908 def reduce(adapter_meta, statement, params, opts, acc, fun) do 909 %{pid: pool, telemetry: telemetry, sql: sql, opts: default_opts} = adapter_meta 910 opts = with_log(telemetry, params, opts ++ default_opts) 911 912 case get_conn(pool) do 913 %DBConnection{conn_mode: :transaction} = conn -> 914 sql 915 |> apply(:stream, [conn, statement, params, opts]) 916 |> Enumerable.reduce(acc, fun) 917 918 _ -> 919 raise "cannot reduce stream outside of transaction" 920 end 921 end 922 923 @doc false 924 def into(adapter_meta, statement, params, opts) do 925 %{pid: pool, telemetry: telemetry, sql: sql, opts: default_opts} = adapter_meta 926 opts = with_log(telemetry, params, opts ++ default_opts) 927 928 case get_conn(pool) do 929 %DBConnection{conn_mode: :transaction} = conn -> 930 sql 931 |> apply(:stream, [conn, statement, params, opts]) 932 |> Collectable.into() 933 934 _ -> 935 raise "cannot collect into stream outside of transaction" 936 end 937 end 938 939 @doc false 940 def struct(adapter_meta, conn, sql, operation, source, params, values, on_conflict, returning, opts) do 941 opts = if is_nil(Keyword.get(opts, :cache_statement)) do 942 [{:cache_statement, "ecto_#{operation}_#{source}_#{length(params)}"} | opts] 943 else 944 opts 945 end 946 947 case query(adapter_meta, sql, values, opts) do 948 {:ok, %{rows: nil, num_rows: 1}} -> 949 {:ok, []} 950 951 {:ok, %{rows: [values], num_rows: 1}} -> 952 {:ok, Enum.zip(returning, values)} 953 954 {:ok, %{num_rows: 0}} -> 955 if on_conflict == :nothing, do: {:ok, []}, else: {:error, :stale} 956 957 {:ok, %{num_rows: num_rows}} when num_rows > 1 -> 958 raise Ecto.MultiplePrimaryKeyError, 959 source: source, params: params, count: num_rows, operation: operation 960 961 {:error, err} -> 962 case conn.to_constraints(err, source: source) do 963 [] -> raise_sql_call_error err 964 constraints -> {:invalid, constraints} 965 end 966 end 967 end 968 969 ## Transactions 970 971 @doc false 972 def transaction(adapter_meta, opts, callback) do 973 checkout_or_transaction(:transaction, adapter_meta, opts, callback) 974 end 975 976 @doc false 977 def in_transaction?(%{pid: pool}) do 978 match?(%DBConnection{conn_mode: :transaction}, get_conn(pool)) 979 end 980 981 @doc false 982 def rollback(%{pid: pool}, value) do 983 case get_conn(pool) do 984 %DBConnection{conn_mode: :transaction} = conn -> DBConnection.rollback(conn, value) 985 _ -> raise "cannot call rollback outside of transaction" 986 end 987 end 988 989 ## Migrations 990 991 @doc false 992 def execute_ddl(meta, conn, definition, opts) do 993 ddl_logs = 994 definition 995 |> conn.execute_ddl() 996 |> List.wrap() 997 |> Enum.map(&query!(meta, &1, [], opts)) 998 |> Enum.flat_map(&conn.ddl_logs/1) 999 1000 {:ok, ddl_logs} 1001 end 1002 1003 @doc false 1004 def raise_migration_pool_size_error do 1005 raise Ecto.MigrationError, """ 1006 Migrations failed to run because the connection pool size is less than 2. 1007 1008 Ecto requires a pool size of at least 2 to support concurrent migrators. 1009 When migrations run, Ecto uses one connection to maintain a lock and 1010 another to run migrations. 1011 1012 If you are running migrations with Mix, you can increase the number 1013 of connections via the pool size option: 1014 1015 mix ecto.migrate --pool-size 2 1016 1017 If you are running the Ecto.Migrator programmatically, you can configure 1018 the pool size via your application config: 1019 1020 config :my_app, Repo, 1021 ..., 1022 pool_size: 2 # at least 1023 """ 1024 end 1025 1026 ## Log 1027 1028 defp with_log(telemetry, params, opts) do 1029 [log: &log(telemetry, params, &1, opts)] ++ opts 1030 end 1031 1032 defp log({repo, log, event_name}, params, entry, opts) do 1033 %{ 1034 connection_time: query_time, 1035 decode_time: decode_time, 1036 pool_time: queue_time, 1037 idle_time: idle_time, 1038 result: result, 1039 query: query 1040 } = entry 1041 1042 source = Keyword.get(opts, :source) 1043 query = String.Chars.to_string(query) 1044 result = with {:ok, _query, res} <- result, do: {:ok, res} 1045 stacktrace = Keyword.get(opts, :stacktrace) 1046 1047 acc = 1048 if idle_time, do: [idle_time: idle_time], else: [] 1049 1050 measurements = 1051 log_measurements( 1052 [query_time: query_time, decode_time: decode_time, queue_time: queue_time], 1053 0, 1054 acc 1055 ) 1056 1057 metadata = %{ 1058 type: :ecto_sql_query, 1059 repo: repo, 1060 result: result, 1061 params: params, 1062 query: query, 1063 source: source, 1064 stacktrace: stacktrace, 1065 options: Keyword.get(opts, :telemetry_options, []) 1066 } 1067 1068 if event_name = Keyword.get(opts, :telemetry_event, event_name) do 1069 :telemetry.execute(event_name, measurements, metadata) 1070 end 1071 1072 case Keyword.get(opts, :log, log) do 1073 true -> 1074 Logger.log( 1075 log, 1076 fn -> log_iodata(measurements, repo, source, query, opts[:cast_params] || params, result, stacktrace) end, 1077 ansi_color: sql_color(query) 1078 ) 1079 1080 false -> 1081 :ok 1082 1083 level -> 1084 Logger.log( 1085 level, 1086 fn -> log_iodata(measurements, repo, source, query, opts[:cast_params] || params, result, stacktrace) end, 1087 ansi_color: sql_color(query) 1088 ) 1089 end 1090 1091 :ok 1092 end 1093 1094 defp log_measurements([{_, nil} | rest], total, acc), 1095 do: log_measurements(rest, total, acc) 1096 1097 defp log_measurements([{key, value} | rest], total, acc), 1098 do: log_measurements(rest, total + value, [{key, value} | acc]) 1099 1100 defp log_measurements([], total, acc), 1101 do: Map.new([total_time: total] ++ acc) 1102 1103 defp log_iodata(measurements, repo, source, query, params, result, stacktrace) do 1104 [ 1105 "QUERY", 1106 ?\s, 1107 log_ok_error(result), 1108 log_ok_source(source), 1109 log_time("db", measurements, :query_time, true), 1110 log_time("decode", measurements, :decode_time, false), 1111 log_time("queue", measurements, :queue_time, false), 1112 log_time("idle", measurements, :idle_time, true), 1113 ?\n, 1114 query, 1115 ?\s, 1116 inspect(params, charlists: false), 1117 log_stacktrace(stacktrace, repo) 1118 ] 1119 end 1120 1121 defp log_ok_error({:ok, _res}), do: "OK" 1122 defp log_ok_error({:error, _err}), do: "ERROR" 1123 1124 defp log_ok_source(nil), do: "" 1125 defp log_ok_source(source), do: " source=#{inspect(source)}" 1126 1127 defp log_time(label, measurements, key, force) do 1128 case measurements do 1129 %{^key => time} -> 1130 us = System.convert_time_unit(time, :native, :microsecond) 1131 ms = div(us, 100) / 10 1132 1133 if force or ms > 0 do 1134 [?\s, label, ?=, :io_lib_format.fwrite_g(ms), ?m, ?s] 1135 else 1136 [] 1137 end 1138 1139 %{} -> 1140 [] 1141 end 1142 end 1143 1144 defp log_stacktrace(stacktrace, repo) do 1145 with [_ | _] <- stacktrace, 1146 {module, function, arity, info} <- last_non_ecto(Enum.reverse(stacktrace), repo, nil) do 1147 [ 1148 ?\n, 1149 IO.ANSI.light_black(), 1150 "↳ ", 1151 Exception.format_mfa(module, function, arity), 1152 log_stacktrace_info(info), 1153 IO.ANSI.reset(), 1154 ] 1155 else 1156 _ -> [] 1157 end 1158 end 1159 1160 defp log_stacktrace_info([file: file, line: line] ++ _) do 1161 [", at: ", file, ?:, Integer.to_string(line)] 1162 end 1163 1164 defp log_stacktrace_info(_) do 1165 [] 1166 end 1167 1168 @repo_modules [Ecto.Repo.Queryable, Ecto.Repo.Schema, Ecto.Repo.Transaction] 1169 1170 defp last_non_ecto([{mod, _, _, _} | _stacktrace], repo, last) 1171 when mod == repo or mod in @repo_modules, 1172 do: last 1173 1174 defp last_non_ecto([last | stacktrace], repo, _last), 1175 do: last_non_ecto(stacktrace, repo, last) 1176 1177 defp last_non_ecto([], _repo, last), 1178 do: last 1179 1180 ## Connection helpers 1181 1182 defp checkout_or_transaction(fun, adapter_meta, opts, callback) do 1183 %{pid: pool, telemetry: telemetry, opts: default_opts} = adapter_meta 1184 opts = with_log(telemetry, [], opts ++ default_opts) 1185 1186 callback = fn conn -> 1187 previous_conn = put_conn(pool, conn) 1188 1189 try do 1190 callback.() 1191 after 1192 reset_conn(pool, previous_conn) 1193 end 1194 end 1195 1196 apply(DBConnection, fun, [get_conn_or_pool(pool), callback, opts]) 1197 end 1198 1199 defp get_conn_or_pool(pool) do 1200 Process.get(key(pool), pool) 1201 end 1202 1203 defp get_conn(pool) do 1204 Process.get(key(pool)) 1205 end 1206 1207 defp put_conn(pool, conn) do 1208 Process.put(key(pool), conn) 1209 end 1210 1211 defp reset_conn(pool, conn) do 1212 if conn do 1213 put_conn(pool, conn) 1214 else 1215 Process.delete(key(pool)) 1216 end 1217 end 1218 1219 defp key(pool), do: {__MODULE__, pool} 1220 1221 defp sql_color("SELECT" <> _), do: :cyan 1222 defp sql_color("ROLLBACK" <> _), do: :red 1223 defp sql_color("LOCK" <> _), do: :white 1224 defp sql_color("INSERT" <> _), do: :green 1225 defp sql_color("UPDATE" <> _), do: :yellow 1226 defp sql_color("DELETE" <> _), do: :red 1227 defp sql_color("begin" <> _), do: :magenta 1228 defp sql_color("commit" <> _), do: :magenta 1229 defp sql_color(_), do: nil 1230 end