connection.ex (51604B)
1 if Code.ensure_loaded?(Postgrex) do 2 defmodule Ecto.Adapters.Postgres.Connection do 3 @moduledoc false 4 5 @default_port 5432 6 @behaviour Ecto.Adapters.SQL.Connection 7 8 ## Module and Options 9 10 @impl true 11 def child_spec(opts) do 12 opts 13 |> Keyword.put_new(:port, @default_port) 14 |> Postgrex.child_spec() 15 end 16 17 @impl true 18 def to_constraints(%Postgrex.Error{postgres: %{code: :unique_violation, constraint: constraint}}, _opts), 19 do: [unique: constraint] 20 def to_constraints(%Postgrex.Error{postgres: %{code: :foreign_key_violation, constraint: constraint}}, _opts), 21 do: [foreign_key: constraint] 22 def to_constraints(%Postgrex.Error{postgres: %{code: :exclusion_violation, constraint: constraint}}, _opts), 23 do: [exclusion: constraint] 24 def to_constraints(%Postgrex.Error{postgres: %{code: :check_violation, constraint: constraint}}, _opts), 25 do: [check: constraint] 26 27 # Postgres 9.2 and earlier does not provide the constraint field 28 @impl true 29 def to_constraints(%Postgrex.Error{postgres: %{code: :unique_violation, message: message}}, _opts) do 30 case :binary.split(message, " unique constraint ") do 31 [_, quoted] -> [unique: strip_quotes(quoted)] 32 _ -> [] 33 end 34 end 35 def to_constraints(%Postgrex.Error{postgres: %{code: :foreign_key_violation, message: message}}, _opts) do 36 case :binary.split(message, " foreign key constraint ") do 37 [_, quoted] -> 38 [quoted | _] = :binary.split(quoted, " on table ") 39 [foreign_key: strip_quotes(quoted)] 40 _ -> 41 [] 42 end 43 end 44 def to_constraints(%Postgrex.Error{postgres: %{code: :exclusion_violation, message: message}}, _opts) do 45 case :binary.split(message, " exclusion constraint ") do 46 [_, quoted] -> [exclusion: strip_quotes(quoted)] 47 _ -> [] 48 end 49 end 50 def to_constraints(%Postgrex.Error{postgres: %{code: :check_violation, message: message}}, _opts) do 51 case :binary.split(message, " check constraint ") do 52 [_, quoted] -> [check: strip_quotes(quoted)] 53 _ -> [] 54 end 55 end 56 57 def to_constraints(_, _opts), 58 do: [] 59 60 defp strip_quotes(quoted) do 61 size = byte_size(quoted) - 2 62 <<_, unquoted::binary-size(size), _>> = quoted 63 unquoted 64 end 65 66 ## Query 67 68 @impl true 69 def prepare_execute(conn, name, sql, params, opts) do 70 case Postgrex.prepare_execute(conn, name, sql, params, opts) do 71 {:error, %Postgrex.Error{postgres: %{pg_code: "22P02", message: message}} = error} -> 72 context = """ 73 . If you are trying to query a JSON field, the parameter may need to be interpolated. \ 74 Instead of 75 76 p.json["field"] != "value" 77 78 do 79 80 p.json["field"] != ^"value" 81 """ 82 83 {:error, put_in(error.postgres.message, message <> context)} 84 other -> 85 other 86 end 87 88 end 89 90 @impl true 91 def query(conn, sql, params, opts) do 92 Postgrex.query(conn, sql, params, opts) 93 end 94 95 @impl true 96 def query_many(_conn, _sql, _params, _opts) do 97 raise RuntimeError, "query_many is not supported in the Postgrex adapter" 98 end 99 100 @impl true 101 def execute(conn, %{ref: ref} = query, params, opts) do 102 case Postgrex.execute(conn, query, params, opts) do 103 {:ok, %{ref: ^ref}, result} -> 104 {:ok, result} 105 106 {:ok, _, _} = ok -> 107 ok 108 109 {:error, %Postgrex.QueryError{} = err} -> 110 {:reset, err} 111 112 {:error, %Postgrex.Error{postgres: %{code: :feature_not_supported}} = err} -> 113 {:reset, err} 114 115 {:error, _} = error -> 116 error 117 end 118 end 119 120 @impl true 121 def stream(conn, sql, params, opts) do 122 Postgrex.stream(conn, sql, params, opts) 123 end 124 125 @parent_as __MODULE__ 126 alias Ecto.Query.{BooleanExpr, JoinExpr, QueryExpr, WithExpr} 127 128 @impl true 129 def all(query, as_prefix \\ []) do 130 sources = create_names(query, as_prefix) 131 {select_distinct, order_by_distinct} = distinct(query.distinct, sources, query) 132 133 cte = cte(query, sources) 134 from = from(query, sources) 135 select = select(query, select_distinct, sources) 136 join = join(query, sources) 137 where = where(query, sources) 138 group_by = group_by(query, sources) 139 having = having(query, sources) 140 window = window(query, sources) 141 combinations = combinations(query) 142 order_by = order_by(query, order_by_distinct, sources) 143 limit = limit(query, sources) 144 offset = offset(query, sources) 145 lock = lock(query, sources) 146 147 [cte, select, from, join, where, group_by, having, window, combinations, order_by, limit, offset | lock] 148 end 149 150 @impl true 151 def update_all(%{from: %{source: source}} = query, prefix \\ nil) do 152 sources = create_names(query, []) 153 cte = cte(query, sources) 154 {from, name} = get_source(query, sources, 0, source) 155 156 prefix = prefix || ["UPDATE ", from, " AS ", name | " SET "] 157 fields = update_fields(query, sources) 158 {join, wheres} = using_join(query, :update_all, "FROM", sources) 159 where = where(%{query | wheres: wheres ++ query.wheres}, sources) 160 161 [cte, prefix, fields, join, where | returning(query, sources)] 162 end 163 164 @impl true 165 def delete_all(%{from: from} = query) do 166 sources = create_names(query, []) 167 cte = cte(query, sources) 168 {from, name} = get_source(query, sources, 0, from) 169 170 {join, wheres} = using_join(query, :delete_all, "USING", sources) 171 where = where(%{query | wheres: wheres ++ query.wheres}, sources) 172 173 [cte, "DELETE FROM ", from, " AS ", name, join, where | returning(query, sources)] 174 end 175 176 @impl true 177 def insert(prefix, table, header, rows, on_conflict, returning, placeholders) do 178 counter_offset = length(placeholders) + 1 179 values = 180 if header == [] do 181 [" VALUES " | intersperse_map(rows, ?,, fn _ -> "(DEFAULT)" end)] 182 else 183 [" (", quote_names(header), ") " | insert_all(rows, counter_offset)] 184 end 185 186 ["INSERT INTO ", quote_table(prefix, table), insert_as(on_conflict), 187 values, on_conflict(on_conflict, header) | returning(returning)] 188 end 189 190 defp insert_as({%{sources: sources}, _, _}) do 191 {_expr, name, _schema} = create_name(sources, 0, []) 192 [" AS " | name] 193 end 194 defp insert_as({_, _, _}) do 195 [] 196 end 197 198 defp on_conflict({:raise, _, []}, _header), 199 do: [] 200 defp on_conflict({:nothing, _, targets}, _header), 201 do: [" ON CONFLICT ", conflict_target(targets) | "DO NOTHING"] 202 defp on_conflict({fields, _, targets}, _header) when is_list(fields), 203 do: [" ON CONFLICT ", conflict_target!(targets), "DO " | replace(fields)] 204 defp on_conflict({query, _, targets}, _header), 205 do: [" ON CONFLICT ", conflict_target!(targets), "DO " | update_all(query, "UPDATE SET ")] 206 207 defp conflict_target!([]), 208 do: error!(nil, "the :conflict_target option is required on upserts by PostgreSQL") 209 defp conflict_target!(target), 210 do: conflict_target(target) 211 212 defp conflict_target({:unsafe_fragment, fragment}), 213 do: [fragment, ?\s] 214 defp conflict_target([]), 215 do: [] 216 defp conflict_target(targets), 217 do: [?(, quote_names(targets), ?), ?\s] 218 219 defp replace(fields) do 220 ["UPDATE SET " | 221 intersperse_map(fields, ?,, fn field -> 222 quoted = quote_name(field) 223 [quoted, " = ", "EXCLUDED." | quoted] 224 end)] 225 end 226 227 defp insert_all(query = %Ecto.Query{}, _counter) do 228 [?(, all(query), ?)] 229 end 230 231 defp insert_all(rows, counter) do 232 ["VALUES ", intersperse_reduce(rows, ?,, counter, fn row, counter -> 233 {row, counter} = insert_each(row, counter) 234 {[?(, row, ?)], counter} 235 end) 236 |> elem(0)] 237 end 238 239 defp insert_each(values, counter) do 240 intersperse_reduce(values, ?,, counter, fn 241 nil, counter -> 242 {"DEFAULT", counter} 243 244 {%Ecto.Query{} = query, params_counter}, counter -> 245 {[?(, all(query), ?)], counter + params_counter} 246 247 {:placeholder, placeholder_index}, counter -> 248 {[?$ | placeholder_index], counter} 249 250 _, counter -> 251 {[?$ | Integer.to_string(counter)], counter + 1} 252 end) 253 end 254 255 @impl true 256 def update(prefix, table, fields, filters, returning) do 257 {fields, count} = intersperse_reduce(fields, ", ", 1, fn field, acc -> 258 {[quote_name(field), " = $" | Integer.to_string(acc)], acc + 1} 259 end) 260 261 {filters, _count} = intersperse_reduce(filters, " AND ", count, fn 262 {field, nil}, acc -> 263 {[quote_name(field), " IS NULL"], acc} 264 265 {field, _value}, acc -> 266 {[quote_name(field), " = $" | Integer.to_string(acc)], acc + 1} 267 end) 268 269 ["UPDATE ", quote_table(prefix, table), " SET ", 270 fields, " WHERE ", filters | returning(returning)] 271 end 272 273 @impl true 274 def delete(prefix, table, filters, returning) do 275 {filters, _} = intersperse_reduce(filters, " AND ", 1, fn 276 {field, nil}, acc -> 277 {[quote_name(field), " IS NULL"], acc} 278 279 {field, _value}, acc -> 280 {[quote_name(field), " = $" | Integer.to_string(acc)], acc + 1} 281 end) 282 283 ["DELETE FROM ", quote_table(prefix, table), " WHERE ", filters | returning(returning)] 284 end 285 286 @impl true 287 def explain_query(conn, query, params, opts) do 288 {explain_opts, opts} = 289 Keyword.split(opts, ~w[analyze verbose costs settings buffers timing summary format]a) 290 291 map_format? = {:format, :map} in explain_opts 292 293 case query(conn, build_explain_query(query, explain_opts), params, opts) do 294 {:ok, %Postgrex.Result{rows: rows}} when map_format? -> 295 {:ok, List.flatten(rows)} 296 {:ok, %Postgrex.Result{rows: rows}} -> 297 {:ok, Enum.map_join(rows, "\n", & &1)} 298 error -> error 299 end 300 end 301 302 def build_explain_query(query, []) do 303 ["EXPLAIN ", query] 304 |> IO.iodata_to_binary() 305 end 306 307 def build_explain_query(query, opts) do 308 {analyze, opts} = Keyword.pop(opts, :analyze) 309 {verbose, opts} = Keyword.pop(opts, :verbose) 310 311 # Given only ANALYZE or VERBOSE opts we assume the legacy format 312 # to support all Postgres versions, otherwise assume the new 313 # syntax supported since v9.0 314 case opts do 315 [] -> 316 [ 317 "EXPLAIN ", 318 if_do(quote_boolean(analyze) == "TRUE", "ANALYZE "), 319 if_do(quote_boolean(verbose) == "TRUE", "VERBOSE "), 320 query 321 ] 322 323 opts -> 324 opts = 325 ([analyze: analyze, verbose: verbose] ++ opts) 326 |> Enum.reduce([], fn 327 {_, nil}, acc -> 328 acc 329 330 {:format, value}, acc -> 331 [String.upcase("#{format_to_sql(value)}") | acc] 332 333 {opt, value}, acc -> 334 [String.upcase("#{opt} #{quote_boolean(value)}") | acc] 335 end) 336 |> Enum.reverse() 337 |> Enum.join(", ") 338 339 ["EXPLAIN ( ", opts, " ) ", query] 340 end 341 |> IO.iodata_to_binary() 342 end 343 344 ## Query generation 345 346 binary_ops = 347 [==: " = ", !=: " != ", <=: " <= ", >=: " >= ", <: " < ", >: " > ", 348 +: " + ", -: " - ", *: " * ", /: " / ", 349 and: " AND ", or: " OR ", ilike: " ILIKE ", like: " LIKE "] 350 351 @binary_ops Keyword.keys(binary_ops) 352 353 Enum.map(binary_ops, fn {op, str} -> 354 defp handle_call(unquote(op), 2), do: {:binary_op, unquote(str)} 355 end) 356 357 defp handle_call(fun, _arity), do: {:fun, Atom.to_string(fun)} 358 359 defp select(%{select: %{fields: fields}} = query, select_distinct, sources) do 360 ["SELECT", select_distinct, ?\s | select_fields(fields, sources, query)] 361 end 362 363 defp select_fields([], _sources, _query), 364 do: "TRUE" 365 defp select_fields(fields, sources, query) do 366 intersperse_map(fields, ", ", fn 367 {:&, _, [idx]} -> 368 case elem(sources, idx) do 369 {nil, source, nil} -> 370 error!(query, "PostgreSQL adapter does not support selecting all fields from fragment #{source}. " <> 371 "Please specify exactly which fields you want to select") 372 {source, _, nil} -> 373 error!(query, "PostgreSQL adapter does not support selecting all fields from #{source} without a schema. " <> 374 "Please specify a schema or specify exactly which fields you want to select") 375 {_, source, _} -> 376 source 377 end 378 {key, value} -> 379 [expr(value, sources, query), " AS " | quote_name(key)] 380 value -> 381 expr(value, sources, query) 382 end) 383 end 384 385 defp distinct(nil, _, _), do: {[], []} 386 defp distinct(%QueryExpr{expr: []}, _, _), do: {[], []} 387 defp distinct(%QueryExpr{expr: true}, _, _), do: {" DISTINCT", []} 388 defp distinct(%QueryExpr{expr: false}, _, _), do: {[], []} 389 defp distinct(%QueryExpr{expr: exprs}, sources, query) do 390 {[" DISTINCT ON (", 391 intersperse_map(exprs, ", ", fn {_, expr} -> expr(expr, sources, query) end), ?)], 392 exprs} 393 end 394 395 defp from(%{from: %{hints: [_ | _]}} = query, _sources) do 396 error!(query, "table hints are not supported by PostgreSQL") 397 end 398 399 defp from(%{from: %{source: source}} = query, sources) do 400 {from, name} = get_source(query, sources, 0, source) 401 [" FROM ", from, " AS " | name] 402 end 403 404 defp cte(%{with_ctes: %WithExpr{recursive: recursive, queries: [_ | _] = queries}} = query, sources) do 405 recursive_opt = if recursive, do: "RECURSIVE ", else: "" 406 ctes = intersperse_map(queries, ", ", &cte_expr(&1, sources, query)) 407 ["WITH ", recursive_opt, ctes, " "] 408 end 409 410 defp cte(%{with_ctes: _}, _), do: [] 411 412 defp cte_expr({name, cte}, sources, query) do 413 [quote_name(name), " AS ", cte_query(cte, sources, query)] 414 end 415 416 defp cte_query(%Ecto.Query{} = query, sources, parent_query) do 417 query = put_in(query.aliases[@parent_as], {parent_query, sources}) 418 ["(", all(query, subquery_as_prefix(sources)), ")"] 419 end 420 421 defp cte_query(%QueryExpr{expr: expr}, sources, query) do 422 expr(expr, sources, query) 423 end 424 425 defp update_fields(%{updates: updates} = query, sources) do 426 for(%{expr: expr} <- updates, 427 {op, kw} <- expr, 428 {key, value} <- kw, 429 do: update_op(op, key, value, sources, query)) |> Enum.intersperse(", ") 430 end 431 432 defp update_op(:set, key, value, sources, query) do 433 [quote_name(key), " = " | expr(value, sources, query)] 434 end 435 436 defp update_op(:inc, key, value, sources, query) do 437 [quote_name(key), " = ", quote_qualified_name(key, sources, 0), " + " | 438 expr(value, sources, query)] 439 end 440 441 defp update_op(:push, key, value, sources, query) do 442 [quote_name(key), " = array_append(", quote_qualified_name(key, sources, 0), 443 ", ", expr(value, sources, query), ?)] 444 end 445 446 defp update_op(:pull, key, value, sources, query) do 447 [quote_name(key), " = array_remove(", quote_qualified_name(key, sources, 0), 448 ", ", expr(value, sources, query), ?)] 449 end 450 451 defp update_op(command, _key, _value, _sources, query) do 452 error!(query, "unknown update operation #{inspect command} for PostgreSQL") 453 end 454 455 defp using_join(%{joins: []}, _kind, _prefix, _sources), do: {[], []} 456 defp using_join(%{joins: joins} = query, kind, prefix, sources) do 457 froms = 458 intersperse_map(joins, ", ", fn 459 %JoinExpr{qual: :inner, ix: ix, source: source} -> 460 {join, name} = get_source(query, sources, ix, source) 461 [join, " AS " | name] 462 %JoinExpr{qual: qual} -> 463 error!(query, "PostgreSQL supports only inner joins on #{kind}, got: `#{qual}`") 464 end) 465 466 wheres = 467 for %JoinExpr{on: %QueryExpr{expr: value} = expr} <- joins, 468 value != true, 469 do: expr |> Map.put(:__struct__, BooleanExpr) |> Map.put(:op, :and) 470 471 {[?\s, prefix, ?\s | froms], wheres} 472 end 473 474 defp join(%{joins: []}, _sources), do: [] 475 defp join(%{joins: joins} = query, sources) do 476 [?\s | intersperse_map(joins, ?\s, fn 477 %JoinExpr{on: %QueryExpr{expr: expr}, qual: qual, ix: ix, source: source, hints: hints} -> 478 if hints != [] do 479 error!(query, "table hints are not supported by PostgreSQL") 480 end 481 482 {join, name} = get_source(query, sources, ix, source) 483 [join_qual(qual), join, " AS ", name | join_on(qual, expr, sources, query)] 484 end)] 485 end 486 487 defp join_on(:cross, true, _sources, _query), do: [] 488 defp join_on(_qual, expr, sources, query), do: [" ON " | expr(expr, sources, query)] 489 490 defp join_qual(:inner), do: "INNER JOIN " 491 defp join_qual(:inner_lateral), do: "INNER JOIN LATERAL " 492 defp join_qual(:left), do: "LEFT OUTER JOIN " 493 defp join_qual(:left_lateral), do: "LEFT OUTER JOIN LATERAL " 494 defp join_qual(:right), do: "RIGHT OUTER JOIN " 495 defp join_qual(:full), do: "FULL OUTER JOIN " 496 defp join_qual(:cross), do: "CROSS JOIN " 497 498 defp where(%{wheres: wheres} = query, sources) do 499 boolean(" WHERE ", wheres, sources, query) 500 end 501 502 defp having(%{havings: havings} = query, sources) do 503 boolean(" HAVING ", havings, sources, query) 504 end 505 506 defp group_by(%{group_bys: []}, _sources), do: [] 507 defp group_by(%{group_bys: group_bys} = query, sources) do 508 [" GROUP BY " | 509 intersperse_map(group_bys, ", ", fn 510 %QueryExpr{expr: expr} -> 511 intersperse_map(expr, ", ", &expr(&1, sources, query)) 512 end)] 513 end 514 515 defp window(%{windows: []}, _sources), do: [] 516 defp window(%{windows: windows} = query, sources) do 517 [" WINDOW " | 518 intersperse_map(windows, ", ", fn {name, %{expr: kw}} -> 519 [quote_name(name), " AS " | window_exprs(kw, sources, query)] 520 end)] 521 end 522 523 defp window_exprs(kw, sources, query) do 524 [?(, intersperse_map(kw, ?\s, &window_expr(&1, sources, query)), ?)] 525 end 526 527 defp window_expr({:partition_by, fields}, sources, query) do 528 ["PARTITION BY " | intersperse_map(fields, ", ", &expr(&1, sources, query))] 529 end 530 531 defp window_expr({:order_by, fields}, sources, query) do 532 ["ORDER BY " | intersperse_map(fields, ", ", &order_by_expr(&1, sources, query))] 533 end 534 535 defp window_expr({:frame, {:fragment, _, _} = fragment}, sources, query) do 536 expr(fragment, sources, query) 537 end 538 539 defp order_by(%{order_bys: []}, _distinct, _sources), do: [] 540 defp order_by(%{order_bys: order_bys} = query, distinct, sources) do 541 order_bys = Enum.flat_map(order_bys, & &1.expr) 542 order_bys = order_by_concat(distinct, order_bys) 543 [" ORDER BY " | intersperse_map(order_bys, ", ", &order_by_expr(&1, sources, query))] 544 end 545 546 defp order_by_concat([head | left], [head | right]), do: [head | order_by_concat(left, right)] 547 defp order_by_concat(left, right), do: left ++ right 548 549 defp order_by_expr({dir, expr}, sources, query) do 550 str = expr(expr, sources, query) 551 552 case dir do 553 :asc -> str 554 :asc_nulls_last -> [str | " ASC NULLS LAST"] 555 :asc_nulls_first -> [str | " ASC NULLS FIRST"] 556 :desc -> [str | " DESC"] 557 :desc_nulls_last -> [str | " DESC NULLS LAST"] 558 :desc_nulls_first -> [str | " DESC NULLS FIRST"] 559 end 560 end 561 562 defp limit(%{limit: nil}, _sources), do: [] 563 defp limit(%{limit: %QueryExpr{expr: expr}} = query, sources) do 564 [" LIMIT " | expr(expr, sources, query)] 565 end 566 567 defp offset(%{offset: nil}, _sources), do: [] 568 defp offset(%{offset: %QueryExpr{expr: expr}} = query, sources) do 569 [" OFFSET " | expr(expr, sources, query)] 570 end 571 572 defp combinations(%{combinations: combinations}) do 573 Enum.map(combinations, fn 574 {:union, query} -> [" UNION (", all(query), ")"] 575 {:union_all, query} -> [" UNION ALL (", all(query), ")"] 576 {:except, query} -> [" EXCEPT (", all(query), ")"] 577 {:except_all, query} -> [" EXCEPT ALL (", all(query), ")"] 578 {:intersect, query} -> [" INTERSECT (", all(query), ")"] 579 {:intersect_all, query} -> [" INTERSECT ALL (", all(query), ")"] 580 end) 581 end 582 583 defp lock(%{lock: nil}, _sources), do: [] 584 defp lock(%{lock: binary}, _sources) when is_binary(binary), do: [?\s | binary] 585 defp lock(%{lock: expr} = query, sources), do: [?\s | expr(expr, sources, query)] 586 587 defp boolean(_name, [], _sources, _query), do: [] 588 defp boolean(name, [%{expr: expr, op: op} | query_exprs], sources, query) do 589 [name | 590 Enum.reduce(query_exprs, {op, paren_expr(expr, sources, query)}, fn 591 %BooleanExpr{expr: expr, op: op}, {op, acc} -> 592 {op, [acc, operator_to_boolean(op), paren_expr(expr, sources, query)]} 593 %BooleanExpr{expr: expr, op: op}, {_, acc} -> 594 {op, [?(, acc, ?), operator_to_boolean(op), paren_expr(expr, sources, query)]} 595 end) |> elem(1)] 596 end 597 598 defp operator_to_boolean(:and), do: " AND " 599 defp operator_to_boolean(:or), do: " OR " 600 601 defp parens_for_select([first_expr | _] = expr) do 602 if is_binary(first_expr) and String.match?(first_expr, ~r/^\s*select/i) do 603 [?(, expr, ?)] 604 else 605 expr 606 end 607 end 608 609 defp paren_expr(expr, sources, query) do 610 [?(, expr(expr, sources, query), ?)] 611 end 612 613 defp expr({:^, [], [ix]}, _sources, _query) do 614 [?$ | Integer.to_string(ix + 1)] 615 end 616 617 defp expr({{:., _, [{:parent_as, _, [as]}, field]}, _, []}, _sources, query) 618 when is_atom(field) do 619 {ix, sources} = get_parent_sources_ix(query, as) 620 quote_qualified_name(field, sources, ix) 621 end 622 623 defp expr({{:., _, [{:&, _, [idx]}, field]}, _, []}, sources, _query) when is_atom(field) do 624 quote_qualified_name(field, sources, idx) 625 end 626 627 defp expr({:&, _, [idx]}, sources, _query) do 628 {_, source, _} = elem(sources, idx) 629 source 630 end 631 632 defp expr({:in, _, [_left, []]}, _sources, _query) do 633 "false" 634 end 635 636 defp expr({:in, _, [left, right]}, sources, query) when is_list(right) do 637 args = intersperse_map(right, ?,, &expr(&1, sources, query)) 638 [expr(left, sources, query), " IN (", args, ?)] 639 end 640 641 defp expr({:in, _, [left, {:^, _, [ix, _]}]}, sources, query) do 642 [expr(left, sources, query), " = ANY($", Integer.to_string(ix + 1), ?)] 643 end 644 645 defp expr({:in, _, [left, %Ecto.SubQuery{} = subquery]}, sources, query) do 646 [expr(left, sources, query), " IN ", expr(subquery, sources, query)] 647 end 648 649 defp expr({:in, _, [left, right]}, sources, query) do 650 [expr(left, sources, query), " = ANY(", expr(right, sources, query), ?)] 651 end 652 653 defp expr({:is_nil, _, [arg]}, sources, query) do 654 [expr(arg, sources, query) | " IS NULL"] 655 end 656 657 defp expr({:not, _, [expr]}, sources, query) do 658 ["NOT (", expr(expr, sources, query), ?)] 659 end 660 661 defp expr(%Ecto.SubQuery{query: query}, sources, parent_query) do 662 query = put_in(query.aliases[@parent_as], {parent_query, sources}) 663 [?(, all(query, subquery_as_prefix(sources)), ?)] 664 end 665 666 defp expr({:fragment, _, [kw]}, _sources, query) when is_list(kw) or tuple_size(kw) == 3 do 667 error!(query, "PostgreSQL adapter does not support keyword or interpolated fragments") 668 end 669 670 defp expr({:fragment, _, parts}, sources, query) do 671 Enum.map(parts, fn 672 {:raw, part} -> part 673 {:expr, expr} -> expr(expr, sources, query) 674 end) 675 |> parens_for_select 676 end 677 678 defp expr({:literal, _, [literal]}, _sources, _query) do 679 quote_name(literal) 680 end 681 682 defp expr({:selected_as, _, [name]}, _sources, _query) do 683 [quote_name(name)] 684 end 685 686 defp expr({:datetime_add, _, [datetime, count, interval]}, sources, query) do 687 [expr(datetime, sources, query), type_unless_typed(datetime, "timestamp"), " + ", 688 interval(count, interval, sources, query)] 689 end 690 691 defp expr({:date_add, _, [date, count, interval]}, sources, query) do 692 [?(, expr(date, sources, query), type_unless_typed(date, "date"), " + ", 693 interval(count, interval, sources, query) | ")::date"] 694 end 695 696 defp expr({:json_extract_path, _, [expr, path]}, sources, query) do 697 json_extract_path(expr, path, sources, query) 698 end 699 700 defp expr({:filter, _, [agg, filter]}, sources, query) do 701 aggregate = expr(agg, sources, query) 702 [aggregate, " FILTER (WHERE ", expr(filter, sources, query), ?)] 703 end 704 705 defp expr({:over, _, [agg, name]}, sources, query) when is_atom(name) do 706 aggregate = expr(agg, sources, query) 707 [aggregate, " OVER " | quote_name(name)] 708 end 709 710 defp expr({:over, _, [agg, kw]}, sources, query) do 711 aggregate = expr(agg, sources, query) 712 [aggregate, " OVER ", window_exprs(kw, sources, query)] 713 end 714 715 defp expr({:{}, _, elems}, sources, query) do 716 [?(, intersperse_map(elems, ?,, &expr(&1, sources, query)), ?)] 717 end 718 719 defp expr({:count, _, []}, _sources, _query), do: "count(*)" 720 721 defp expr({:==, _, [{:json_extract_path, _, [expr, path]} = left, right]}, sources, query) 722 when is_binary(right) or is_integer(right) or is_boolean(right) do 723 case Enum.split(path, -1) do 724 {path, [last]} when is_binary(last) -> 725 extracted = json_extract_path(expr, path, sources, query) 726 [?(, extracted, "@>'{", escape_json(last), ": ", escape_json(right) | "}')"] 727 728 _ -> 729 [maybe_paren(left, sources, query), " = " | maybe_paren(right, sources, query)] 730 end 731 end 732 733 defp expr({fun, _, args}, sources, query) when is_atom(fun) and is_list(args) do 734 {modifier, args} = 735 case args do 736 [rest, :distinct] -> {"DISTINCT ", [rest]} 737 _ -> {[], args} 738 end 739 740 case handle_call(fun, length(args)) do 741 {:binary_op, op} -> 742 [left, right] = args 743 [maybe_paren(left, sources, query), op | maybe_paren(right, sources, query)] 744 {:fun, fun} -> 745 [fun, ?(, modifier, intersperse_map(args, ", ", &expr(&1, sources, query)), ?)] 746 end 747 end 748 749 defp expr(list, sources, query) when is_list(list) do 750 ["ARRAY[", intersperse_map(list, ?,, &expr(&1, sources, query)), ?]] 751 end 752 753 defp expr(%Decimal{} = decimal, _sources, _query) do 754 Decimal.to_string(decimal, :normal) 755 end 756 757 defp expr(%Ecto.Query.Tagged{value: binary, type: :binary}, _sources, _query) 758 when is_binary(binary) do 759 ["'\\x", Base.encode16(binary, case: :lower) | "'::bytea"] 760 end 761 762 defp expr(%Ecto.Query.Tagged{value: other, type: type}, sources, query) do 763 [maybe_paren(other, sources, query), ?:, ?: | tagged_to_db(type)] 764 end 765 766 defp expr(nil, _sources, _query), do: "NULL" 767 defp expr(true, _sources, _query), do: "TRUE" 768 defp expr(false, _sources, _query), do: "FALSE" 769 770 defp expr(literal, _sources, _query) when is_binary(literal) do 771 [?\', escape_string(literal), ?\'] 772 end 773 774 defp expr(literal, _sources, _query) when is_integer(literal) do 775 Integer.to_string(literal) 776 end 777 778 defp expr(literal, _sources, _query) when is_float(literal) do 779 [Float.to_string(literal) | "::float"] 780 end 781 782 defp expr(expr, _sources, query) do 783 error!(query, "unsupported expression: #{inspect(expr)}") 784 end 785 786 defp json_extract_path(expr, [], sources, query) do 787 expr(expr, sources, query) 788 end 789 790 defp json_extract_path(expr, path, sources, query) do 791 path = intersperse_map(path, ?,, &escape_json/1) 792 [?(, expr(expr, sources, query), "#>'{", path, "}')"] 793 end 794 795 defp type_unless_typed(%Ecto.Query.Tagged{}, _type), do: [] 796 defp type_unless_typed(_, type), do: [?:, ?: | type] 797 798 # Always use the largest possible type for integers 799 defp tagged_to_db(:id), do: "bigint" 800 defp tagged_to_db(:integer), do: "bigint" 801 defp tagged_to_db({:array, type}), do: [tagged_to_db(type), ?[, ?]] 802 defp tagged_to_db(type), do: ecto_to_db(type) 803 804 defp interval(count, interval, _sources, _query) when is_integer(count) do 805 ["interval '", String.Chars.Integer.to_string(count), ?\s, interval, ?\'] 806 end 807 808 defp interval(count, interval, _sources, _query) when is_float(count) do 809 count = :erlang.float_to_binary(count, [:compact, decimals: 16]) 810 ["interval '", count, ?\s, interval, ?\'] 811 end 812 813 defp interval(count, interval, sources, query) do 814 [?(, expr(count, sources, query), "::numeric * ", 815 interval(1, interval, sources, query), ?)] 816 end 817 818 defp maybe_paren({op, _, [_, _]} = expr, sources, query) when op in @binary_ops, 819 do: paren_expr(expr, sources, query) 820 821 defp maybe_paren({:is_nil, _, [_]} = expr, sources, query), 822 do: paren_expr(expr, sources, query) 823 824 defp maybe_paren(expr, sources, query), 825 do: expr(expr, sources, query) 826 827 defp returning(%{select: nil}, _sources), 828 do: [] 829 defp returning(%{select: %{fields: fields}} = query, sources), 830 do: [" RETURNING " | select_fields(fields, sources, query)] 831 832 defp returning([]), 833 do: [] 834 defp returning(returning), 835 do: [" RETURNING " | quote_names(returning)] 836 837 defp create_names(%{sources: sources}, as_prefix) do 838 create_names(sources, 0, tuple_size(sources), as_prefix) |> List.to_tuple() 839 end 840 841 defp create_names(sources, pos, limit, as_prefix) when pos < limit do 842 [create_name(sources, pos, as_prefix) | create_names(sources, pos + 1, limit, as_prefix)] 843 end 844 845 defp create_names(_sources, pos, pos, as_prefix) do 846 [as_prefix] 847 end 848 849 defp subquery_as_prefix(sources) do 850 [?s | :erlang.element(tuple_size(sources), sources)] 851 end 852 853 defp create_name(sources, pos, as_prefix) do 854 case elem(sources, pos) do 855 {:fragment, _, _} -> 856 {nil, as_prefix ++ [?f | Integer.to_string(pos)], nil} 857 858 {table, schema, prefix} -> 859 name = as_prefix ++ [create_alias(table) | Integer.to_string(pos)] 860 {quote_table(prefix, table), name, schema} 861 862 %Ecto.SubQuery{} -> 863 {nil, as_prefix ++ [?s | Integer.to_string(pos)], nil} 864 end 865 end 866 867 defp create_alias(<<first, _rest::binary>>) when first in ?a..?z when first in ?A..?Z do 868 first 869 end 870 defp create_alias(_) do 871 ?t 872 end 873 874 # DDL 875 876 alias Ecto.Migration.{Table, Index, Reference, Constraint} 877 878 @creates [:create, :create_if_not_exists] 879 @drops [:drop, :drop_if_exists] 880 881 @impl true 882 def execute_ddl({command, %Table{} = table, columns}) when command in @creates do 883 table_name = quote_table(table.prefix, table.name) 884 query = ["CREATE TABLE ", 885 if_do(command == :create_if_not_exists, "IF NOT EXISTS "), 886 table_name, ?\s, ?(, 887 column_definitions(table, columns), pk_definition(columns, ", "), ?), 888 options_expr(table.options)] 889 890 [query] ++ 891 comments_on("TABLE", table_name, table.comment) ++ 892 comments_for_columns(table_name, columns) 893 end 894 895 def execute_ddl({command, %Table{} = table, mode}) when command in @drops do 896 [["DROP TABLE ", if_do(command == :drop_if_exists, "IF EXISTS "), 897 quote_table(table.prefix, table.name), drop_mode(mode)]] 898 end 899 900 def execute_ddl({:alter, %Table{} = table, changes}) do 901 table_name = quote_table(table.prefix, table.name) 902 query = ["ALTER TABLE ", table_name, ?\s, 903 column_changes(table, changes), pk_definition(changes, ", ADD ")] 904 905 [query] ++ 906 comments_on("TABLE", table_name, table.comment) ++ 907 comments_for_columns(table_name, changes) 908 end 909 910 def execute_ddl({command, %Index{} = index}) when command in @creates do 911 fields = intersperse_map(index.columns, ", ", &index_expr/1) 912 include_fields = intersperse_map(index.include, ", ", &index_expr/1) 913 914 maybe_nulls_distinct = 915 case index.nulls_distinct do 916 nil -> [] 917 true -> " NULLS DISTINCT" 918 false -> " NULLS NOT DISTINCT" 919 end 920 921 queries = [["CREATE ", 922 if_do(index.unique, "UNIQUE "), 923 "INDEX ", 924 if_do(index.concurrently, "CONCURRENTLY "), 925 if_do(command == :create_if_not_exists, "IF NOT EXISTS "), 926 quote_name(index.name), 927 " ON ", 928 quote_table(index.prefix, index.table), 929 if_do(index.using, [" USING " , to_string(index.using)]), 930 ?\s, ?(, fields, ?), 931 if_do(include_fields != [], [" INCLUDE ", ?(, include_fields, ?)]), 932 maybe_nulls_distinct, 933 if_do(index.where, [" WHERE ", to_string(index.where)])]] 934 935 queries ++ comments_on("INDEX", quote_table(index.prefix, index.name), index.comment) 936 end 937 938 def execute_ddl({command, %Index{} = index, mode}) when command in @drops do 939 [["DROP INDEX ", 940 if_do(index.concurrently, "CONCURRENTLY "), 941 if_do(command == :drop_if_exists, "IF EXISTS "), 942 quote_table(index.prefix, index.name), 943 drop_mode(mode)]] 944 end 945 946 def execute_ddl({:rename, %Table{} = current_table, %Table{} = new_table}) do 947 [["ALTER TABLE ", quote_table(current_table.prefix, current_table.name), 948 " RENAME TO ", quote_table(nil, new_table.name)]] 949 end 950 951 def execute_ddl({:rename, %Table{} = table, current_column, new_column}) do 952 [["ALTER TABLE ", quote_table(table.prefix, table.name), " RENAME ", 953 quote_name(current_column), " TO ", quote_name(new_column)]] 954 end 955 956 def execute_ddl({:create, %Constraint{} = constraint}) do 957 table_name = quote_table(constraint.prefix, constraint.table) 958 queries = [["ALTER TABLE ", table_name, 959 " ADD ", new_constraint_expr(constraint)]] 960 961 queries ++ comments_on("CONSTRAINT", constraint.name, constraint.comment, table_name) 962 end 963 964 def execute_ddl({command, %Constraint{}, :cascade}) when command in @drops, 965 do: error!(nil, "PostgreSQL does not support `CASCADE` in DROP CONSTRAINT commands") 966 967 def execute_ddl({command, %Constraint{} = constraint, :restrict}) when command in @drops do 968 [["ALTER TABLE ", quote_table(constraint.prefix, constraint.table), 969 " DROP CONSTRAINT ", if_do(command == :drop_if_exists, "IF EXISTS "), quote_name(constraint.name)]] 970 end 971 972 def execute_ddl(string) when is_binary(string), do: [string] 973 974 def execute_ddl(keyword) when is_list(keyword), 975 do: error!(nil, "PostgreSQL adapter does not support keyword lists in execute") 976 977 @impl true 978 def ddl_logs(%Postgrex.Result{} = result) do 979 %{messages: messages} = result 980 981 for message <- messages do 982 %{message: message, severity: severity} = message 983 984 {ddl_log_level(severity), message, []} 985 end 986 end 987 988 @impl true 989 def table_exists_query(table) do 990 {"SELECT true FROM information_schema.tables WHERE table_name = $1 AND table_schema = current_schema() LIMIT 1", [table]} 991 end 992 993 defp drop_mode(:cascade), do: " CASCADE" 994 defp drop_mode(:restrict), do: [] 995 996 # From https://www.postgresql.org/docs/current/protocol-error-fields.html. 997 defp ddl_log_level("DEBUG"), do: :debug 998 defp ddl_log_level("LOG"), do: :info 999 defp ddl_log_level("INFO"), do: :info 1000 defp ddl_log_level("NOTICE"), do: :info 1001 defp ddl_log_level("WARNING"), do: :warn 1002 defp ddl_log_level("ERROR"), do: :error 1003 defp ddl_log_level("FATAL"), do: :error 1004 defp ddl_log_level("PANIC"), do: :error 1005 defp ddl_log_level(_severity), do: :info 1006 1007 defp pk_definition(columns, prefix) do 1008 pks = 1009 for {_, name, _, opts} <- columns, 1010 opts[:primary_key], 1011 do: name 1012 1013 case pks do 1014 [] -> [] 1015 _ -> [prefix, "PRIMARY KEY (", quote_names(pks), ")"] 1016 end 1017 end 1018 1019 defp comments_on(_object, _name, nil), do: [] 1020 defp comments_on(object, name, comment) do 1021 [["COMMENT ON ", object, ?\s, name, " IS ", single_quote(comment)]] 1022 end 1023 1024 defp comments_on(_object, _name, nil, _table_name), do: [] 1025 defp comments_on(object, name, comment, table_name) do 1026 [["COMMENT ON ", object, ?\s, quote_name(name), " ON ", table_name, 1027 " IS ", single_quote(comment)]] 1028 end 1029 1030 defp comments_for_columns(table_name, columns) do 1031 Enum.flat_map(columns, fn 1032 {_operation, column_name, _column_type, opts} -> 1033 column_name = [table_name, ?. | quote_name(column_name)] 1034 comments_on("COLUMN", column_name, opts[:comment]) 1035 _ -> [] 1036 end) 1037 end 1038 1039 defp column_definitions(table, columns) do 1040 intersperse_map(columns, ", ", &column_definition(table, &1)) 1041 end 1042 1043 defp column_definition(table, {:add, name, %Reference{} = ref, opts}) do 1044 [quote_name(name), ?\s, reference_column_type(ref.type, opts), 1045 column_options(ref.type, opts), ", ", reference_expr(ref, table, name)] 1046 end 1047 1048 defp column_definition(_table, {:add, name, type, opts}) do 1049 [quote_name(name), ?\s, column_type(type, opts), column_options(type, opts)] 1050 end 1051 1052 defp column_changes(table, columns) do 1053 intersperse_map(columns, ", ", &column_change(table, &1)) 1054 end 1055 1056 defp column_change(table, {:add, name, %Reference{} = ref, opts}) do 1057 ["ADD COLUMN ", quote_name(name), ?\s, reference_column_type(ref.type, opts), 1058 column_options(ref.type, opts), ", ADD ", reference_expr(ref, table, name)] 1059 end 1060 1061 defp column_change(_table, {:add, name, type, opts}) do 1062 ["ADD COLUMN ", quote_name(name), ?\s, column_type(type, opts), 1063 column_options(type, opts)] 1064 end 1065 1066 defp column_change(table, {:add_if_not_exists, name, %Reference{} = ref, opts}) do 1067 ["ADD COLUMN IF NOT EXISTS ", quote_name(name), ?\s, reference_column_type(ref.type, opts), 1068 column_options(ref.type, opts), ", ADD ", reference_expr(ref, table, name)] 1069 end 1070 1071 defp column_change(_table, {:add_if_not_exists, name, type, opts}) do 1072 ["ADD COLUMN IF NOT EXISTS ", quote_name(name), ?\s, column_type(type, opts), 1073 column_options(type, opts)] 1074 end 1075 1076 defp column_change(table, {:modify, name, %Reference{} = ref, opts}) do 1077 [drop_reference_expr(opts[:from], table, name), "ALTER COLUMN ", quote_name(name), 1078 " TYPE ", reference_column_type(ref.type, opts), 1079 ", ADD ", reference_expr(ref, table, name), 1080 modify_null(name, opts), modify_default(name, ref.type, opts)] 1081 end 1082 1083 defp column_change(table, {:modify, name, type, opts}) do 1084 [drop_reference_expr(opts[:from], table, name), "ALTER COLUMN ", quote_name(name), " TYPE ", 1085 column_type(type, opts), modify_null(name, opts), modify_default(name, type, opts)] 1086 end 1087 1088 defp column_change(_table, {:remove, name}), do: ["DROP COLUMN ", quote_name(name)] 1089 defp column_change(table, {:remove, name, %Reference{} = ref, _opts}) do 1090 [drop_reference_expr(ref, table, name), "DROP COLUMN ", quote_name(name)] 1091 end 1092 defp column_change(_table, {:remove, name, _type, _opts}), do: ["DROP COLUMN ", quote_name(name)] 1093 1094 defp column_change(table, {:remove_if_exists, name, %Reference{} = ref}) do 1095 [drop_reference_if_exists_expr(ref, table, name), "DROP COLUMN IF EXISTS ", quote_name(name)] 1096 end 1097 defp column_change(_table, {:remove_if_exists, name, _type}), do: ["DROP COLUMN IF EXISTS ", quote_name(name)] 1098 1099 defp modify_null(name, opts) do 1100 case Keyword.get(opts, :null) do 1101 true -> [", ALTER COLUMN ", quote_name(name), " DROP NOT NULL"] 1102 false -> [", ALTER COLUMN ", quote_name(name), " SET NOT NULL"] 1103 nil -> [] 1104 end 1105 end 1106 1107 defp modify_default(name, type, opts) do 1108 case Keyword.fetch(opts, :default) do 1109 {:ok, val} -> [", ALTER COLUMN ", quote_name(name), " SET", default_expr({:ok, val}, type)] 1110 :error -> [] 1111 end 1112 end 1113 1114 defp column_options(type, opts) do 1115 default = Keyword.fetch(opts, :default) 1116 null = Keyword.get(opts, :null) 1117 1118 [default_expr(default, type), null_expr(null)] 1119 end 1120 1121 defp null_expr(false), do: " NOT NULL" 1122 defp null_expr(true), do: " NULL" 1123 defp null_expr(_), do: [] 1124 1125 defp new_constraint_expr(%Constraint{check: check} = constraint) when is_binary(check) do 1126 ["CONSTRAINT ", quote_name(constraint.name), " CHECK (", check, ")", validate(constraint.validate)] 1127 end 1128 defp new_constraint_expr(%Constraint{exclude: exclude} = constraint) when is_binary(exclude) do 1129 ["CONSTRAINT ", quote_name(constraint.name), " EXCLUDE USING ", exclude, validate(constraint.validate)] 1130 end 1131 1132 defp default_expr({:ok, nil}, _type), do: " DEFAULT NULL" 1133 defp default_expr({:ok, literal}, type), do: [" DEFAULT ", default_type(literal, type)] 1134 defp default_expr(:error, _), do: [] 1135 1136 defp default_type(list, {:array, inner} = type) when is_list(list) do 1137 ["ARRAY[", Enum.map(list, &default_type(&1, inner)) |> Enum.intersperse(?,), "]::", ecto_to_db(type)] 1138 end 1139 defp default_type(literal, _type) when is_binary(literal) do 1140 if :binary.match(literal, <<0>>) == :nomatch and String.valid?(literal) do 1141 single_quote(literal) 1142 else 1143 encoded = "\\x" <> Base.encode16(literal, case: :lower) 1144 raise ArgumentError, "default values are interpolated as UTF-8 strings and cannot contain null bytes. " <> 1145 "`#{inspect literal}` is invalid. If you want to write it as a binary, use \"#{encoded}\", " <> 1146 "otherwise refer to PostgreSQL documentation for instructions on how to escape this SQL type" 1147 end 1148 end 1149 defp default_type(literal, _type) when is_number(literal), do: to_string(literal) 1150 defp default_type(literal, _type) when is_boolean(literal), do: to_string(literal) 1151 defp default_type(%{} = map, :map) do 1152 library = Application.get_env(:postgrex, :json_library, Jason) 1153 default = IO.iodata_to_binary(library.encode_to_iodata!(map)) 1154 [single_quote(default)] 1155 end 1156 defp default_type({:fragment, expr}, _type), 1157 do: [expr] 1158 defp default_type(expr, type), 1159 do: raise(ArgumentError, "unknown default `#{inspect expr}` for type `#{inspect type}`. " <> 1160 ":default may be a string, number, boolean, list of strings, list of integers, map (when type is Map), or a fragment(...)") 1161 1162 defp index_expr(literal) when is_binary(literal), 1163 do: literal 1164 defp index_expr(literal), 1165 do: quote_name(literal) 1166 1167 defp options_expr(nil), 1168 do: [] 1169 defp options_expr(keyword) when is_list(keyword), 1170 do: error!(nil, "PostgreSQL adapter does not support keyword lists in :options") 1171 defp options_expr(options), 1172 do: [?\s, options] 1173 1174 defp column_type({:array, type}, opts), 1175 do: [column_type(type, opts), "[]"] 1176 1177 defp column_type(type, _opts) when type in ~w(time utc_datetime naive_datetime)a, 1178 do: [ecto_to_db(type), "(0)"] 1179 1180 defp column_type(type, opts) when type in ~w(time_usec utc_datetime_usec naive_datetime_usec)a do 1181 precision = Keyword.get(opts, :precision) 1182 type_name = ecto_to_db(type) 1183 1184 if precision do 1185 [type_name, ?(, to_string(precision), ?)] 1186 else 1187 type_name 1188 end 1189 end 1190 1191 defp column_type(:identity, opts) do 1192 start_value = [Keyword.get(opts, :start_value)] 1193 increment = [Keyword.get(opts, :increment)] 1194 type_name = ecto_to_db(:identity) 1195 1196 cleanup = fn v -> is_integer(v) and v > 0 end 1197 1198 sequence = 1199 start_value 1200 |> Enum.filter(cleanup) 1201 |> Enum.map(&"START WITH #{&1}") 1202 |> Kernel.++( 1203 increment 1204 |> Enum.filter(cleanup) 1205 |> Enum.map(&"INCREMENT BY #{&1}") 1206 ) 1207 1208 case sequence do 1209 [] -> [type_name, " GENERATED BY DEFAULT AS IDENTITY"] 1210 _ -> [type_name, " GENERATED BY DEFAULT AS IDENTITY(", Enum.join(sequence, " "), ") "] 1211 end 1212 end 1213 1214 defp column_type(type, opts) do 1215 size = Keyword.get(opts, :size) 1216 precision = Keyword.get(opts, :precision) 1217 scale = Keyword.get(opts, :scale) 1218 type_name = ecto_to_db(type) 1219 1220 cond do 1221 size -> [type_name, ?(, to_string(size), ?)] 1222 precision -> [type_name, ?(, to_string(precision), ?,, to_string(scale || 0), ?)] 1223 type == :string -> [type_name, "(255)"] 1224 true -> type_name 1225 end 1226 end 1227 1228 defp reference_expr(%Reference{} = ref, table, name) do 1229 {current_columns, reference_columns} = Enum.unzip([{name, ref.column} | ref.with]) 1230 1231 ["CONSTRAINT ", reference_name(ref, table, name), ?\s, 1232 "FOREIGN KEY (", quote_names(current_columns), ") REFERENCES ", 1233 quote_table(ref.prefix || table.prefix, ref.table), ?(, quote_names(reference_columns), ?), 1234 reference_match(ref.match), reference_on_delete(ref.on_delete), 1235 reference_on_update(ref.on_update), validate(ref.validate)] 1236 end 1237 1238 defp drop_reference_expr(%Reference{} = ref, table, name), 1239 do: ["DROP CONSTRAINT ", reference_name(ref, table, name), ", "] 1240 defp drop_reference_expr(_, _, _), 1241 do: [] 1242 1243 defp drop_reference_if_exists_expr(%Reference{} = ref, table, name), 1244 do: ["DROP CONSTRAINT IF EXISTS ", reference_name(ref, table, name), ", "] 1245 defp drop_reference_if_exists_expr(_, _, _), 1246 do: [] 1247 1248 defp reference_name(%Reference{name: nil}, table, column), 1249 do: quote_name("#{table.name}_#{column}_fkey") 1250 defp reference_name(%Reference{name: name}, _table, _column), 1251 do: quote_name(name) 1252 1253 defp reference_column_type(:serial, _opts), do: "integer" 1254 defp reference_column_type(:bigserial, _opts), do: "bigint" 1255 defp reference_column_type(:identity, _opts), do: "bigint" 1256 defp reference_column_type(type, opts), do: column_type(type, opts) 1257 1258 defp reference_on_delete(:nilify_all), do: " ON DELETE SET NULL" 1259 defp reference_on_delete(:delete_all), do: " ON DELETE CASCADE" 1260 defp reference_on_delete(:restrict), do: " ON DELETE RESTRICT" 1261 defp reference_on_delete(_), do: [] 1262 1263 defp reference_on_update(:nilify_all), do: " ON UPDATE SET NULL" 1264 defp reference_on_update(:update_all), do: " ON UPDATE CASCADE" 1265 defp reference_on_update(:restrict), do: " ON UPDATE RESTRICT" 1266 defp reference_on_update(_), do: [] 1267 1268 defp reference_match(nil), do: [] 1269 defp reference_match(:full), do: " MATCH FULL" 1270 defp reference_match(:simple), do: " MATCH SIMPLE" 1271 defp reference_match(:partial), do: " MATCH PARTIAL" 1272 1273 defp validate(false), do: " NOT VALID" 1274 defp validate(_), do: [] 1275 1276 ## Helpers 1277 1278 defp get_source(query, sources, ix, source) do 1279 {expr, name, _schema} = elem(sources, ix) 1280 {expr || expr(source, sources, query), name} 1281 end 1282 1283 defp get_parent_sources_ix(query, as) do 1284 case query.aliases[@parent_as] do 1285 {%{aliases: %{^as => ix}}, sources} -> {ix, sources} 1286 {%{} = parent, _sources} -> get_parent_sources_ix(parent, as) 1287 end 1288 end 1289 1290 defp quote_qualified_name(name, sources, ix) do 1291 {_, source, _} = elem(sources, ix) 1292 [source, ?. | quote_name(name)] 1293 end 1294 1295 defp quote_names(names) do 1296 intersperse_map(names, ?,, "e_name/1) 1297 end 1298 1299 defp quote_name(name) when is_atom(name) do 1300 quote_name(Atom.to_string(name)) 1301 end 1302 1303 defp quote_name(name) when is_binary(name) do 1304 if String.contains?(name, "\"") do 1305 error!(nil, "bad literal/field/table name #{inspect name} (\" is not permitted)") 1306 end 1307 1308 [?", name, ?"] 1309 end 1310 1311 defp quote_table(nil, name), do: quote_table(name) 1312 defp quote_table(prefix, name), do: [quote_table(prefix), ?., quote_table(name)] 1313 1314 defp quote_table(name) when is_atom(name), 1315 do: quote_table(Atom.to_string(name)) 1316 defp quote_table(name) do 1317 if String.contains?(name, "\"") do 1318 error!(nil, "bad table name #{inspect name}") 1319 end 1320 [?", name, ?"] 1321 end 1322 1323 # TRUE, ON, or 1 to enable the option, and FALSE, OFF, or 0 to disable it 1324 defp quote_boolean(nil), do: nil 1325 defp quote_boolean(true), do: "TRUE" 1326 defp quote_boolean(false), do: "FALSE" 1327 defp quote_boolean(value), do: error!(nil, "bad boolean value #{value}") 1328 1329 defp format_to_sql(:text), do: "FORMAT TEXT" 1330 defp format_to_sql(:map), do: "FORMAT JSON" 1331 defp format_to_sql(:yaml), do: "FORMAT YAML" 1332 1333 defp single_quote(value), do: [?', escape_string(value), ?'] 1334 1335 defp intersperse_map(list, separator, mapper, acc \\ []) 1336 defp intersperse_map([], _separator, _mapper, acc), 1337 do: acc 1338 defp intersperse_map([elem], _separator, mapper, acc), 1339 do: [acc | mapper.(elem)] 1340 defp intersperse_map([elem | rest], separator, mapper, acc), 1341 do: intersperse_map(rest, separator, mapper, [acc, mapper.(elem), separator]) 1342 1343 defp intersperse_reduce(list, separator, user_acc, reducer, acc \\ []) 1344 defp intersperse_reduce([], _separator, user_acc, _reducer, acc), 1345 do: {acc, user_acc} 1346 defp intersperse_reduce([elem], _separator, user_acc, reducer, acc) do 1347 {elem, user_acc} = reducer.(elem, user_acc) 1348 {[acc | elem], user_acc} 1349 end 1350 defp intersperse_reduce([elem | rest], separator, user_acc, reducer, acc) do 1351 {elem, user_acc} = reducer.(elem, user_acc) 1352 intersperse_reduce(rest, separator, user_acc, reducer, [acc, elem, separator]) 1353 end 1354 1355 defp if_do(condition, value) do 1356 if condition, do: value, else: [] 1357 end 1358 1359 defp escape_string(value) when is_binary(value) do 1360 :binary.replace(value, "'", "''", [:global]) 1361 end 1362 1363 defp escape_json(value) when is_binary(value) do 1364 escaped = 1365 value 1366 |> escape_string() 1367 |> :binary.replace("\"", "\\\"", [:global]) 1368 1369 [?", escaped, ?"] 1370 end 1371 1372 defp escape_json(value) when is_integer(value) do 1373 Integer.to_string(value) 1374 end 1375 1376 defp escape_json(true), do: ["true"] 1377 defp escape_json(false), do: ["false"] 1378 1379 defp ecto_to_db({:array, t}), do: [ecto_to_db(t), ?[, ?]] 1380 defp ecto_to_db(:id), do: "integer" 1381 defp ecto_to_db(:identity), do: "bigint" 1382 defp ecto_to_db(:serial), do: "serial" 1383 defp ecto_to_db(:bigserial), do: "bigserial" 1384 defp ecto_to_db(:binary_id), do: "uuid" 1385 defp ecto_to_db(:string), do: "varchar" 1386 defp ecto_to_db(:binary), do: "bytea" 1387 defp ecto_to_db(:map), do: Application.fetch_env!(:ecto_sql, :postgres_map_type) 1388 defp ecto_to_db({:map, _}), do: Application.fetch_env!(:ecto_sql, :postgres_map_type) 1389 defp ecto_to_db(:time_usec), do: "time" 1390 defp ecto_to_db(:utc_datetime), do: "timestamp" 1391 defp ecto_to_db(:utc_datetime_usec), do: "timestamp" 1392 defp ecto_to_db(:naive_datetime), do: "timestamp" 1393 defp ecto_to_db(:naive_datetime_usec), do: "timestamp" 1394 defp ecto_to_db(atom) when is_atom(atom), do: Atom.to_string(atom) 1395 defp ecto_to_db(type) do 1396 raise ArgumentError, 1397 "unsupported type `#{inspect(type)}`. The type can either be an atom, a string " <> 1398 "or a tuple of the form `{:map, t}` or `{:array, t}` where `t` itself follows the same conditions." 1399 end 1400 1401 defp error!(nil, message) do 1402 raise ArgumentError, message 1403 end 1404 defp error!(query, message) do 1405 raise Ecto.QueryError, query: query, message: message 1406 end 1407 end 1408 end