connection.ex (41470B)
1 if Code.ensure_loaded?(MyXQL) do 2 defmodule Ecto.Adapters.MyXQL.Connection do 3 @moduledoc false 4 alias Ecto.Adapters.SQL 5 6 @behaviour Ecto.Adapters.SQL.Connection 7 8 ## Connection 9 10 @impl true 11 def child_spec(opts) do 12 MyXQL.child_spec(opts) 13 end 14 15 ## Query 16 17 @impl true 18 def prepare_execute(conn, name, sql, params, opts) do 19 MyXQL.prepare_execute(conn, name, sql, params, opts) 20 end 21 22 @impl true 23 def query(conn, sql, params, opts) do 24 opts = Keyword.put_new(opts, :query_type, :binary_then_text) 25 MyXQL.query(conn, sql, params, opts) 26 end 27 28 @impl true 29 def query_many(conn, sql, params, opts) do 30 opts = Keyword.put_new(opts, :query_type, :text) 31 MyXQL.query_many(conn, sql, params, opts) 32 end 33 34 @impl true 35 def execute(conn, query, params, opts) do 36 case MyXQL.execute(conn, query, params, opts) do 37 {:ok, _, result} -> {:ok, result} 38 {:error, _} = error -> error 39 end 40 end 41 42 @impl true 43 def stream(conn, sql, params, opts) do 44 MyXQL.stream(conn, sql, params, opts) 45 end 46 47 @impl true 48 def to_constraints(%MyXQL.Error{mysql: %{name: :ER_DUP_ENTRY}, message: message}, opts) do 49 case :binary.split(message, " for key ") do 50 [_, quoted] -> [unique: normalize_index_name(quoted, opts[:source])] 51 _ -> [] 52 end 53 end 54 def to_constraints(%MyXQL.Error{mysql: %{name: name}, message: message}, _opts) 55 when name in [:ER_ROW_IS_REFERENCED_2, :ER_NO_REFERENCED_ROW_2] do 56 case :binary.split(message, [" CONSTRAINT ", " FOREIGN KEY "], [:global]) do 57 [_, quoted, _] -> [foreign_key: strip_quotes(quoted)] 58 _ -> [] 59 end 60 end 61 def to_constraints(_, _), 62 do: [] 63 64 defp strip_quotes(quoted) do 65 size = byte_size(quoted) - 2 66 <<_, unquoted::binary-size(size), _>> = quoted 67 unquoted 68 end 69 70 defp normalize_index_name(quoted, source) do 71 name = strip_quotes(quoted) 72 73 if source do 74 String.trim_leading(name, "#{source}.") 75 else 76 name 77 end 78 end 79 80 ## Query 81 82 @parent_as __MODULE__ 83 alias Ecto.Query.{BooleanExpr, JoinExpr, QueryExpr, WithExpr} 84 85 @impl true 86 def all(query, as_prefix \\ []) do 87 sources = create_names(query, as_prefix) 88 89 cte = cte(query, sources) 90 from = from(query, sources) 91 select = select(query, sources) 92 join = join(query, sources) 93 where = where(query, sources) 94 group_by = group_by(query, sources) 95 having = having(query, sources) 96 window = window(query, sources) 97 combinations = combinations(query) 98 order_by = order_by(query, sources) 99 limit = limit(query, sources) 100 offset = offset(query, sources) 101 lock = lock(query, sources) 102 103 [cte, select, from, join, where, group_by, having, window, combinations, order_by, limit, offset | lock] 104 end 105 106 @impl true 107 def update_all(query, prefix \\ nil) do 108 %{from: %{source: source}, select: select} = query 109 110 if select do 111 error!(nil, ":select is not supported in update_all by MySQL") 112 end 113 114 sources = create_names(query, []) 115 cte = cte(query, sources) 116 {from, name} = get_source(query, sources, 0, source) 117 118 fields = if prefix do 119 update_fields(:on_conflict, query, sources) 120 else 121 update_fields(:update, query, sources) 122 end 123 124 {join, wheres} = using_join(query, :update_all, sources) 125 prefix = prefix || ["UPDATE ", from, " AS ", name, join, " SET "] 126 where = where(%{query | wheres: wheres ++ query.wheres}, sources) 127 128 [cte, prefix, fields | where] 129 end 130 131 @impl true 132 def delete_all(query) do 133 if query.select do 134 error!(nil, ":select is not supported in delete_all by MySQL") 135 end 136 137 sources = create_names(query, []) 138 cte = cte(query, sources) 139 {_, name, _} = elem(sources, 0) 140 141 from = from(query, sources) 142 join = join(query, sources) 143 where = where(query, sources) 144 145 [cte, "DELETE ", name, ".*", from, join | where] 146 end 147 148 @impl true 149 def insert(prefix, table, header, rows, on_conflict, [], []) do 150 fields = quote_names(header) 151 ["INSERT INTO ", quote_table(prefix, table), " (", fields, ") ", 152 insert_all(rows) | on_conflict(on_conflict, header)] 153 end 154 def insert(_prefix, _table, _header, _rows, _on_conflict, _returning, []) do 155 error!(nil, ":returning is not supported in insert/insert_all by MySQL") 156 end 157 def insert(_prefix, _table, _header, _rows, _on_conflict, _returning, _placeholders) do 158 error!(nil, ":placeholders is not supported by MySQL") 159 end 160 161 defp on_conflict({_, _, [_ | _]}, _header) do 162 error!(nil, ":conflict_target is not supported in insert/insert_all by MySQL") 163 end 164 defp on_conflict({:raise, _, []}, _header) do 165 [] 166 end 167 defp on_conflict({:nothing, _, []}, [field | _]) do 168 quoted = quote_name(field) 169 [" ON DUPLICATE KEY UPDATE ", quoted, " = " | quoted] 170 end 171 defp on_conflict({fields, _, []}, _header) when is_list(fields) do 172 [" ON DUPLICATE KEY UPDATE " | 173 intersperse_map(fields, ?,, fn field -> 174 quoted = quote_name(field) 175 [quoted, " = VALUES(", quoted, ?)] 176 end)] 177 end 178 defp on_conflict({%{wheres: []} = query, _, []}, _header) do 179 [" ON DUPLICATE KEY " | update_all(query, "UPDATE ")] 180 end 181 defp on_conflict({_query, _, []}, _header) do 182 error!(nil, "Using a query with :where in combination with the :on_conflict option is not supported by MySQL") 183 end 184 185 defp insert_all(rows) when is_list(rows) do 186 ["VALUES ", intersperse_map(rows, ?,, fn row -> 187 [?(, intersperse_map(row, ?,, &insert_all_value/1), ?)] 188 end)] 189 end 190 191 defp insert_all(%Ecto.Query{} = query) do 192 [?(, all(query), ?)] 193 end 194 195 defp insert_all_value(nil), do: "DEFAULT" 196 defp insert_all_value({%Ecto.Query{} = query, _params_counter}), do: [?(, all(query), ?)] 197 defp insert_all_value(_), do: '?' 198 199 @impl true 200 def update(prefix, table, fields, filters, _returning) do 201 fields = intersperse_map(fields, ", ", &[quote_name(&1), " = ?"]) 202 filters = intersperse_map(filters, " AND ", fn 203 {field, nil} -> 204 [quote_name(field), " IS NULL"] 205 206 {field, _value} -> 207 [quote_name(field), " = ?"] 208 end) 209 ["UPDATE ", quote_table(prefix, table), " SET ", fields, " WHERE " | filters] 210 end 211 212 @impl true 213 def delete(prefix, table, filters, _returning) do 214 filters = intersperse_map(filters, " AND ", fn 215 {field, nil} -> 216 [quote_name(field), " IS NULL"] 217 218 {field, _value} -> 219 [quote_name(field), " = ?"] 220 end) 221 ["DELETE FROM ", quote_table(prefix, table), " WHERE " | filters] 222 end 223 224 @impl true 225 # DB explain opts are deprecated, so they aren't used to build the explain query. 226 # See Notes at https://dev.mysql.com/doc/refman/5.7/en/explain.html 227 def explain_query(conn, query, params, opts) do 228 case query(conn, build_explain_query(query), params, opts) do 229 {:ok, %MyXQL.Result{} = result} -> 230 {:ok, SQL.format_table(result)} 231 232 error -> 233 error 234 end 235 end 236 237 def build_explain_query(query) do 238 ["EXPLAIN ", query] 239 |> IO.iodata_to_binary() 240 end 241 242 ## Query generation 243 244 binary_ops = 245 [==: " = ", !=: " != ", <=: " <= ", >=: " >= ", <: " < ", >: " > ", 246 +: " + ", -: " - ", *: " * ", /: " / ", 247 and: " AND ", or: " OR ", like: " LIKE "] 248 249 @binary_ops Keyword.keys(binary_ops) 250 251 Enum.map(binary_ops, fn {op, str} -> 252 defp handle_call(unquote(op), 2), do: {:binary_op, unquote(str)} 253 end) 254 255 defp handle_call(fun, _arity), do: {:fun, Atom.to_string(fun)} 256 257 defp select(%{select: %{fields: fields}, distinct: distinct} = query, sources) do 258 ["SELECT ", distinct(distinct, sources, query) | select(fields, sources, query)] 259 end 260 261 defp distinct(nil, _sources, _query), do: [] 262 defp distinct(%QueryExpr{expr: true}, _sources, _query), do: "DISTINCT " 263 defp distinct(%QueryExpr{expr: false}, _sources, _query), do: [] 264 defp distinct(%QueryExpr{expr: exprs}, _sources, query) when is_list(exprs) do 265 error!(query, "DISTINCT with multiple columns is not supported by MySQL") 266 end 267 268 defp select([], _sources, _query), 269 do: "TRUE" 270 defp select(fields, sources, query) do 271 intersperse_map(fields, ", ", fn 272 {:&, _, [idx]} -> 273 case elem(sources, idx) do 274 {nil, source, nil} -> 275 error!(query, "MySQL adapter does not support selecting all fields from fragment #{source}. " <> 276 "Please specify exactly which fields you want to select") 277 {source, _, nil} -> 278 error!(query, "MySQL adapter does not support selecting all fields from #{source} without a schema. " <> 279 "Please specify a schema or specify exactly which fields you want to select") 280 {_, source, _} -> 281 source 282 end 283 {key, value} -> 284 [expr(value, sources, query), " AS ", quote_name(key)] 285 value -> 286 expr(value, sources, query) 287 end) 288 end 289 290 defp from(%{from: %{source: source, hints: hints}} = query, sources) do 291 {from, name} = get_source(query, sources, 0, source) 292 [" FROM ", from, " AS ", name | Enum.map(hints, &[?\s | &1])] 293 end 294 295 defp cte(%{with_ctes: %WithExpr{recursive: recursive, queries: [_ | _] = queries}} = query, sources) do 296 recursive_opt = if recursive, do: "RECURSIVE ", else: "" 297 ctes = intersperse_map(queries, ", ", &cte_expr(&1, sources, query)) 298 ["WITH ", recursive_opt, ctes, " "] 299 end 300 301 defp cte(%{with_ctes: _}, _), do: [] 302 303 defp cte_expr({name, cte}, sources, query) do 304 [quote_name(name), " AS ", cte_query(cte, sources, query)] 305 end 306 307 defp cte_query(%Ecto.Query{} = query, sources, parent_query) do 308 query = put_in(query.aliases[@parent_as], {parent_query, sources}) 309 ["(", all(query, subquery_as_prefix(sources)), ")"] 310 end 311 312 defp cte_query(%QueryExpr{expr: expr}, sources, query) do 313 expr(expr, sources, query) 314 end 315 316 defp update_fields(type, %{updates: updates} = query, sources) do 317 fields = for(%{expr: expr} <- updates, 318 {op, kw} <- expr, 319 {key, value} <- kw, 320 do: update_op(op, update_key(type, key, query, sources), value, sources, query)) 321 Enum.intersperse(fields, ", ") 322 end 323 324 defp update_key(:update, key, %{from: from} = query, sources) do 325 {_from, name} = get_source(query, sources, 0, from) 326 327 [name, ?. | quote_name(key)] 328 end 329 defp update_key(:on_conflict, key, _query, _sources) do 330 quote_name(key) 331 end 332 333 defp update_op(:set, quoted_key, value, sources, query) do 334 [quoted_key, " = " | expr(value, sources, query)] 335 end 336 337 defp update_op(:inc, quoted_key, value, sources, query) do 338 [quoted_key, " = ", quoted_key, " + " | expr(value, sources, query)] 339 end 340 341 defp update_op(command, _quoted_key, _value, _sources, query) do 342 error!(query, "Unknown update operation #{inspect command} for MySQL") 343 end 344 345 defp using_join(%{joins: []}, _kind, _sources), do: {[], []} 346 defp using_join(%{joins: joins} = query, kind, sources) do 347 froms = 348 intersperse_map(joins, ", ", fn 349 %JoinExpr{source: %Ecto.SubQuery{params: [_ | _]}} -> 350 error!(query, "MySQL adapter does not support subqueries with parameters in update_all/delete_all joins") 351 352 %JoinExpr{qual: :inner, ix: ix, source: source} -> 353 {join, name} = get_source(query, sources, ix, source) 354 [join, " AS " | name] 355 356 %JoinExpr{qual: qual} -> 357 error!(query, "MySQL adapter supports only inner joins on #{kind}, got: `#{qual}`") 358 end) 359 360 wheres = 361 for %JoinExpr{on: %QueryExpr{expr: value} = expr} <- joins, 362 value != true, 363 do: expr |> Map.put(:__struct__, BooleanExpr) |> Map.put(:op, :and) 364 365 {[?,, ?\s | froms], wheres} 366 end 367 368 defp join(%{joins: []}, _sources), do: [] 369 defp join(%{joins: joins} = query, sources) do 370 Enum.map(joins, fn 371 %JoinExpr{on: %QueryExpr{expr: expr}, qual: qual, ix: ix, source: source, hints: hints} -> 372 {join, name} = get_source(query, sources, ix, source) 373 [join_qual(qual, query), join, " AS ", name, Enum.map(hints, &[?\s | &1]) | join_on(qual, expr, sources, query)] 374 end) 375 end 376 377 defp join_on(:cross, true, _sources, _query), do: [] 378 defp join_on(_qual, expr, sources, query), do: [" ON " | expr(expr, sources, query)] 379 380 defp join_qual(:inner, _), do: " INNER JOIN " 381 defp join_qual(:inner_lateral, _), do: " INNER JOIN LATERAL " 382 defp join_qual(:left, _), do: " LEFT OUTER JOIN " 383 defp join_qual(:left_lateral, _), do: " LEFT OUTER JOIN LATERAL " 384 defp join_qual(:right, _), do: " RIGHT OUTER JOIN " 385 defp join_qual(:full, _), do: " FULL OUTER JOIN " 386 defp join_qual(:cross, _), do: " CROSS JOIN " 387 388 defp where(%{wheres: wheres} = query, sources) do 389 boolean(" WHERE ", wheres, sources, query) 390 end 391 392 defp having(%{havings: havings} = query, sources) do 393 boolean(" HAVING ", havings, sources, query) 394 end 395 396 defp group_by(%{group_bys: []}, _sources), do: [] 397 defp group_by(%{group_bys: group_bys} = query, sources) do 398 [" GROUP BY " | 399 intersperse_map(group_bys, ", ", fn %QueryExpr{expr: expr} -> 400 intersperse_map(expr, ", ", &expr(&1, sources, query)) 401 end)] 402 end 403 404 defp window(%{windows: []}, _sources), do: [] 405 defp window(%{windows: windows} = query, sources) do 406 [" WINDOW " | 407 intersperse_map(windows, ", ", fn {name, %{expr: kw}} -> 408 [quote_name(name), " AS " | window_exprs(kw, sources, query)] 409 end)] 410 end 411 412 defp window_exprs(kw, sources, query) do 413 [?(, intersperse_map(kw, ?\s, &window_expr(&1, sources, query)), ?)] 414 end 415 416 defp window_expr({:partition_by, fields}, sources, query) do 417 ["PARTITION BY " | intersperse_map(fields, ", ", &expr(&1, sources, query))] 418 end 419 420 defp window_expr({:order_by, fields}, sources, query) do 421 ["ORDER BY " | intersperse_map(fields, ", ", &order_by_expr(&1, sources, query))] 422 end 423 424 defp window_expr({:frame, {:fragment, _, _} = fragment}, sources, query) do 425 expr(fragment, sources, query) 426 end 427 428 defp order_by(%{order_bys: []}, _sources), do: [] 429 defp order_by(%{order_bys: order_bys} = query, sources) do 430 [" ORDER BY " | 431 intersperse_map(order_bys, ", ", fn %QueryExpr{expr: expr} -> 432 intersperse_map(expr, ", ", &order_by_expr(&1, sources, query)) 433 end)] 434 end 435 436 defp order_by_expr({dir, expr}, sources, query) do 437 str = expr(expr, sources, query) 438 439 case dir do 440 :asc -> str 441 :desc -> [str | " DESC"] 442 _ -> error!(query, "#{dir} is not supported in ORDER BY in MySQL") 443 end 444 end 445 446 defp limit(%{limit: nil}, _sources), do: [] 447 defp limit(%{limit: %QueryExpr{expr: expr}} = query, sources) do 448 [" LIMIT " | expr(expr, sources, query)] 449 end 450 451 defp offset(%{offset: nil}, _sources), do: [] 452 defp offset(%{offset: %QueryExpr{expr: expr}} = query, sources) do 453 [" OFFSET " | expr(expr, sources, query)] 454 end 455 456 defp combinations(%{combinations: combinations}) do 457 Enum.map(combinations, fn 458 {:union, query} -> [" UNION (", all(query), ")"] 459 {:union_all, query} -> [" UNION ALL (", all(query), ")"] 460 {:except, query} -> [" EXCEPT (", all(query), ")"] 461 {:except_all, query} -> [" EXCEPT ALL (", all(query), ")"] 462 {:intersect, query} -> [" INTERSECT (", all(query), ")"] 463 {:intersect_all, query} -> [" INTERSECT ALL (", all(query), ")"] 464 end) 465 end 466 467 defp lock(%{lock: nil}, _sources), do: [] 468 defp lock(%{lock: binary}, _sources) when is_binary(binary), do: [?\s | binary] 469 defp lock(%{lock: expr} = query, sources), do: [?\s | expr(expr, sources, query)] 470 471 defp boolean(_name, [], _sources, _query), do: [] 472 defp boolean(name, [%{expr: expr, op: op} | query_exprs], sources, query) do 473 [name, 474 Enum.reduce(query_exprs, {op, paren_expr(expr, sources, query)}, fn 475 %BooleanExpr{expr: expr, op: op}, {op, acc} -> 476 {op, [acc, operator_to_boolean(op) | paren_expr(expr, sources, query)]} 477 %BooleanExpr{expr: expr, op: op}, {_, acc} -> 478 {op, [?(, acc, ?), operator_to_boolean(op) | paren_expr(expr, sources, query)]} 479 end) |> elem(1)] 480 end 481 482 defp operator_to_boolean(:and), do: " AND " 483 defp operator_to_boolean(:or), do: " OR " 484 485 defp parens_for_select([first_expr | _] = expr) do 486 if is_binary(first_expr) and String.match?(first_expr, ~r/^\s*select/i) do 487 [?(, expr, ?)] 488 else 489 expr 490 end 491 end 492 493 defp paren_expr(expr, sources, query) do 494 [?(, expr(expr, sources, query), ?)] 495 end 496 497 defp expr({:^, [], [_ix]}, _sources, _query) do 498 '?' 499 end 500 501 defp expr({{:., _, [{:parent_as, _, [as]}, field]}, _, []}, _sources, query) 502 when is_atom(field) do 503 {ix, sources} = get_parent_sources_ix(query, as) 504 {_, name, _} = elem(sources, ix) 505 [name, ?. | quote_name(field)] 506 end 507 508 defp expr({{:., _, [{:&, _, [idx]}, field]}, _, []}, sources, _query) 509 when is_atom(field) do 510 {_, name, _} = elem(sources, idx) 511 [name, ?. | quote_name(field)] 512 end 513 514 defp expr({:&, _, [idx]}, sources, _query) do 515 {_, source, _} = elem(sources, idx) 516 source 517 end 518 519 defp expr({:in, _, [_left, []]}, _sources, _query) do 520 "false" 521 end 522 523 defp expr({:in, _, [left, right]}, sources, query) when is_list(right) do 524 args = intersperse_map(right, ?,, &expr(&1, sources, query)) 525 [expr(left, sources, query), " IN (", args, ?)] 526 end 527 528 defp expr({:in, _, [_, {:^, _, [_, 0]}]}, _sources, _query) do 529 "false" 530 end 531 532 defp expr({:in, _, [left, {:^, _, [_, length]}]}, sources, query) do 533 args = Enum.intersperse(List.duplicate(??, length), ?,) 534 [expr(left, sources, query), " IN (", args, ?)] 535 end 536 537 defp expr({:in, _, [left, %Ecto.SubQuery{} = subquery]}, sources, query) do 538 [expr(left, sources, query), " IN ", expr(subquery, sources, query)] 539 end 540 541 defp expr({:in, _, [left, right]}, sources, query) do 542 [expr(left, sources, query), " = ANY(", expr(right, sources, query), ?)] 543 end 544 545 defp expr({:is_nil, _, [arg]}, sources, query) do 546 [expr(arg, sources, query) | " IS NULL"] 547 end 548 549 defp expr({:not, _, [expr]}, sources, query) do 550 ["NOT (", expr(expr, sources, query), ?)] 551 end 552 553 defp expr({:filter, _, _}, _sources, query) do 554 error!(query, "MySQL adapter does not support aggregate filters") 555 end 556 557 defp expr(%Ecto.SubQuery{query: query}, sources, parent_query) do 558 query = put_in(query.aliases[@parent_as], {parent_query, sources}) 559 [?(, all(query, subquery_as_prefix(sources)), ?)] 560 end 561 562 defp expr({:fragment, _, [kw]}, _sources, query) when is_list(kw) or tuple_size(kw) == 3 do 563 error!(query, "MySQL adapter does not support keyword or interpolated fragments") 564 end 565 566 defp expr({:fragment, _, parts}, sources, query) do 567 Enum.map(parts, fn 568 {:raw, part} -> part 569 {:expr, expr} -> expr(expr, sources, query) 570 end) 571 |> parens_for_select 572 end 573 574 defp expr({:literal, _, [literal]}, _sources, _query) do 575 quote_name(literal) 576 end 577 578 defp expr({:selected_as, _, [name]}, _sources, _query) do 579 [quote_name(name)] 580 end 581 582 defp expr({:datetime_add, _, [datetime, count, interval]}, sources, query) do 583 ["date_add(", expr(datetime, sources, query), ", ", 584 interval(count, interval, sources, query) | ")"] 585 end 586 587 defp expr({:date_add, _, [date, count, interval]}, sources, query) do 588 ["CAST(date_add(", expr(date, sources, query), ", ", 589 interval(count, interval, sources, query) | ") AS date)"] 590 end 591 592 defp expr({:ilike, _, [_, _]}, _sources, query) do 593 error!(query, "ilike is not supported by MySQL") 594 end 595 596 defp expr({:over, _, [agg, name]}, sources, query) when is_atom(name) do 597 aggregate = expr(agg, sources, query) 598 [aggregate, " OVER " | quote_name(name)] 599 end 600 601 defp expr({:over, _, [agg, kw]}, sources, query) do 602 aggregate = expr(agg, sources, query) 603 [aggregate, " OVER " | window_exprs(kw, sources, query)] 604 end 605 606 defp expr({:{}, _, elems}, sources, query) do 607 [?(, intersperse_map(elems, ?,, &expr(&1, sources, query)), ?)] 608 end 609 610 defp expr({:count, _, []}, _sources, _query), do: "count(*)" 611 612 defp expr({:json_extract_path, _, [expr, path]}, sources, query) do 613 path = 614 Enum.map(path, fn 615 binary when is_binary(binary) -> 616 [?., ?", escape_json_key(binary), ?"] 617 618 integer when is_integer(integer) -> 619 "[#{integer}]" 620 end) 621 622 ["json_extract(", expr(expr, sources, query), ", '$", path, "')"] 623 end 624 625 defp expr({fun, _, args}, sources, query) when is_atom(fun) and is_list(args) do 626 {modifier, args} = 627 case args do 628 [rest, :distinct] -> {"DISTINCT ", [rest]} 629 _ -> {[], args} 630 end 631 632 case handle_call(fun, length(args)) do 633 {:binary_op, op} -> 634 [left, right] = args 635 [op_to_binary(left, sources, query), op | op_to_binary(right, sources, query)] 636 {:fun, fun} -> 637 [fun, ?(, modifier, intersperse_map(args, ", ", &expr(&1, sources, query)), ?)] 638 end 639 end 640 641 defp expr(list, _sources, query) when is_list(list) do 642 error!(query, "Array type is not supported by MySQL") 643 end 644 645 defp expr(%Decimal{} = decimal, _sources, _query) do 646 Decimal.to_string(decimal, :normal) 647 end 648 649 defp expr(%Ecto.Query.Tagged{value: binary, type: :binary}, _sources, _query) 650 when is_binary(binary) do 651 hex = Base.encode16(binary, case: :lower) 652 [?x, ?', hex, ?'] 653 end 654 655 defp expr(%Ecto.Query.Tagged{value: other, type: type}, sources, query) 656 when type in [:decimal, :float] do 657 [expr(other, sources, query), " + 0"] 658 end 659 660 defp expr(%Ecto.Query.Tagged{value: other, type: type}, sources, query) do 661 ["CAST(", expr(other, sources, query), " AS ", ecto_cast_to_db(type, query), ?)] 662 end 663 664 defp expr(nil, _sources, _query), do: "NULL" 665 defp expr(true, _sources, _query), do: "TRUE" 666 defp expr(false, _sources, _query), do: "FALSE" 667 668 defp expr(literal, _sources, _query) when is_binary(literal) do 669 [?', escape_string(literal), ?'] 670 end 671 672 defp expr(literal, _sources, _query) when is_integer(literal) do 673 Integer.to_string(literal) 674 end 675 676 defp expr(literal, _sources, _query) when is_float(literal) do 677 # MySQL doesn't support float cast 678 ["(0 + ", Float.to_string(literal), ?)] 679 end 680 681 defp expr(expr, _sources, query) do 682 error!(query, "unsupported expression: #{inspect(expr)}") 683 end 684 685 defp interval(count, "millisecond", sources, query) do 686 ["INTERVAL (", expr(count, sources, query) | " * 1000) microsecond"] 687 end 688 689 defp interval(count, interval, sources, query) do 690 ["INTERVAL ", expr(count, sources, query), ?\s | interval] 691 end 692 693 defp op_to_binary({op, _, [_, _]} = expr, sources, query) when op in @binary_ops, 694 do: paren_expr(expr, sources, query) 695 696 defp op_to_binary({:is_nil, _, [_]} = expr, sources, query), 697 do: paren_expr(expr, sources, query) 698 699 defp op_to_binary(expr, sources, query), 700 do: expr(expr, sources, query) 701 702 defp create_names(%{sources: sources}, as_prefix) do 703 create_names(sources, 0, tuple_size(sources), as_prefix) |> List.to_tuple() 704 end 705 706 defp create_names(sources, pos, limit, as_prefix) when pos < limit do 707 [create_name(sources, pos, as_prefix) | create_names(sources, pos + 1, limit, as_prefix)] 708 end 709 710 defp create_names(_sources, pos, pos, as_prefix) do 711 [as_prefix] 712 end 713 714 defp subquery_as_prefix(sources) do 715 [?s | :erlang.element(tuple_size(sources), sources)] 716 end 717 718 defp create_name(sources, pos, as_prefix) do 719 case elem(sources, pos) do 720 {:fragment, _, _} -> 721 {nil, as_prefix ++ [?f | Integer.to_string(pos)], nil} 722 723 {table, schema, prefix} -> 724 name = as_prefix ++ [create_alias(table) | Integer.to_string(pos)] 725 {quote_table(prefix, table), name, schema} 726 727 %Ecto.SubQuery{} -> 728 {nil, as_prefix ++ [?s | Integer.to_string(pos)], nil} 729 end 730 end 731 732 defp create_alias(<<first, _rest::binary>>) when first in ?a..?z when first in ?A..?Z do 733 first 734 end 735 defp create_alias(_) do 736 ?t 737 end 738 739 ## DDL 740 741 alias Ecto.Migration.{Table, Index, Reference, Constraint} 742 743 @impl true 744 def execute_ddl({command, %Table{} = table, columns}) when command in [:create, :create_if_not_exists] do 745 table_structure = 746 case column_definitions(table, columns) ++ pk_definitions(columns, ", ") do 747 [] -> [] 748 list -> [?\s, ?(, list, ?)] 749 end 750 751 [["CREATE TABLE ", 752 if_do(command == :create_if_not_exists, "IF NOT EXISTS "), 753 quote_table(table.prefix, table.name), 754 table_structure, 755 comment_expr(table.comment, true), 756 engine_expr(table.engine), options_expr(table.options)]] 757 end 758 759 760 def execute_ddl({command, %Table{} = table, mode}) when command in [:drop, :drop_if_exists] do 761 [["DROP TABLE ", if_do(command == :drop_if_exists, "IF EXISTS "), 762 quote_table(table.prefix, table.name), drop_mode(mode)]] 763 end 764 765 def execute_ddl({:alter, %Table{} = table, changes}) do 766 [["ALTER TABLE ", quote_table(table.prefix, table.name), ?\s, 767 column_changes(table, changes), pk_definitions(changes, ", ADD ")]] 768 ++ 769 if_do(table.comment, 770 [["ALTER TABLE ", quote_table(table.prefix, table.name), comment_expr(table.comment)]] 771 ) 772 end 773 774 def execute_ddl({:create, %Index{} = index}) do 775 if index.where do 776 error!(nil, "MySQL adapter does not support where in indexes") 777 end 778 779 if index.nulls_distinct == false do 780 error!(nil, "MySQL adapter does not support nulls_distinct set to false in indexes") 781 end 782 783 [["CREATE", if_do(index.unique, " UNIQUE"), " INDEX ", 784 quote_name(index.name), 785 " ON ", 786 quote_table(index.prefix, index.table), ?\s, 787 ?(, intersperse_map(index.columns, ", ", &index_expr/1), ?), 788 if_do(index.using, [" USING ", to_string(index.using)]), 789 if_do(index.concurrently, " LOCK=NONE")]] 790 end 791 792 def execute_ddl({:create_if_not_exists, %Index{}}), 793 do: error!(nil, "MySQL adapter does not support create if not exists for index") 794 795 def execute_ddl({:create, %Constraint{check: check}}) when is_binary(check), 796 do: error!(nil, "MySQL adapter does not support check constraints") 797 def execute_ddl({:create, %Constraint{exclude: exclude}}) when is_binary(exclude), 798 do: error!(nil, "MySQL adapter does not support exclusion constraints") 799 800 def execute_ddl({:drop, %Index{}, :cascade}), 801 do: error!(nil, "MySQL adapter does not support cascade in drop index") 802 803 def execute_ddl({:drop, %Index{} = index, :restrict}) do 804 [["DROP INDEX ", 805 quote_name(index.name), 806 " ON ", quote_table(index.prefix, index.table), 807 if_do(index.concurrently, " LOCK=NONE")]] 808 end 809 810 def execute_ddl({:drop, %Constraint{}, _}), 811 do: error!(nil, "MySQL adapter does not support constraints") 812 813 def execute_ddl({:drop_if_exists, %Constraint{}, _}), 814 do: error!(nil, "MySQL adapter does not support constraints") 815 816 def execute_ddl({:drop_if_exists, %Index{}, _}), 817 do: error!(nil, "MySQL adapter does not support drop if exists for index") 818 819 def execute_ddl({:rename, %Table{} = current_table, %Table{} = new_table}) do 820 [["RENAME TABLE ", quote_table(current_table.prefix, current_table.name), 821 " TO ", quote_table(new_table.prefix, new_table.name)]] 822 end 823 824 def execute_ddl({:rename, %Table{} = table, current_column, new_column}) do 825 [["ALTER TABLE ", quote_table(table.prefix, table.name), " RENAME COLUMN ", 826 quote_name(current_column), " TO ", quote_name(new_column)]] 827 end 828 829 def execute_ddl(string) when is_binary(string), do: [string] 830 831 def execute_ddl(keyword) when is_list(keyword), 832 do: error!(nil, "MySQL adapter does not support keyword lists in execute") 833 834 @impl true 835 def ddl_logs(_), do: [] 836 837 @impl true 838 def table_exists_query(table) do 839 {"SELECT true FROM information_schema.tables WHERE table_name = ? AND table_schema = DATABASE() LIMIT 1", [table]} 840 end 841 842 defp drop_mode(:cascade), do: " CASCADE" 843 defp drop_mode(:restrict), do: [] 844 845 defp pk_definitions(columns, prefix) do 846 pks = 847 for {_, name, _, opts} <- columns, 848 opts[:primary_key], 849 do: name 850 851 case pks do 852 [] -> [] 853 _ -> [[prefix, "PRIMARY KEY (", quote_names(pks), ?)]] 854 end 855 end 856 857 defp column_definitions(table, columns) do 858 intersperse_map(columns, ", ", &column_definition(table, &1)) 859 end 860 861 defp column_definition(table, {:add, name, %Reference{} = ref, opts}) do 862 [quote_name(name), ?\s, reference_column_type(ref.type, opts), 863 column_options(opts), reference_expr(ref, table, name)] 864 end 865 866 defp column_definition(_table, {:add, name, type, opts}) do 867 [quote_name(name), ?\s, column_type(type, opts), column_options(opts)] 868 end 869 870 defp column_changes(table, columns) do 871 intersperse_map(columns, ", ", &column_change(table, &1)) 872 end 873 874 defp column_change(_table, {_command, _name, %Reference{validate: false}, _opts}) do 875 error!(nil, "validate: false on references is not supported in MyXQL") 876 end 877 878 defp column_change(table, {:add, name, %Reference{} = ref, opts}) do 879 ["ADD ", quote_name(name), ?\s, reference_column_type(ref.type, opts), 880 column_options(opts), constraint_expr(ref, table, name)] 881 end 882 883 defp column_change(_table, {:add, name, type, opts}) do 884 ["ADD ", quote_name(name), ?\s, column_type(type, opts), column_options(opts)] 885 end 886 887 defp column_change(table, {:add_if_not_exists, name, %Reference{} = ref, opts}) do 888 ["ADD IF NOT EXISTS ", quote_name(name), ?\s, reference_column_type(ref.type, opts), 889 column_options(opts), constraint_if_not_exists_expr(ref, table, name)] 890 end 891 892 defp column_change(_table, {:add_if_not_exists, name, type, opts}) do 893 ["ADD IF NOT EXISTS ", quote_name(name), ?\s, column_type(type, opts), column_options(opts)] 894 end 895 896 defp column_change(table, {:modify, name, %Reference{} = ref, opts}) do 897 [drop_constraint_expr(opts[:from], table, name), "MODIFY ", quote_name(name), ?\s, reference_column_type(ref.type, opts), 898 column_options(opts), constraint_expr(ref, table, name)] 899 end 900 901 defp column_change(table, {:modify, name, type, opts}) do 902 [drop_constraint_expr(opts[:from], table, name), "MODIFY ", quote_name(name), ?\s, column_type(type, opts), column_options(opts)] 903 end 904 905 defp column_change(_table, {:remove, name}), do: ["DROP ", quote_name(name)] 906 defp column_change(table, {:remove, name, %Reference{} = ref, _opts}) do 907 [drop_constraint_expr(ref, table, name), "DROP ", quote_name(name)] 908 end 909 defp column_change(_table, {:remove, name, _type, _opts}), do: ["DROP ", quote_name(name)] 910 911 defp column_change(table, {:remove_if_exists, name, %Reference{} = ref}) do 912 [drop_constraint_if_exists_expr(ref, table, name), "DROP IF EXISTS ", quote_name(name)] 913 end 914 defp column_change(_table, {:remove_if_exists, name, _type}), do: ["DROP IF EXISTS ", quote_name(name)] 915 916 defp column_options(opts) do 917 default = Keyword.fetch(opts, :default) 918 null = Keyword.get(opts, :null) 919 after_column = Keyword.get(opts, :after) 920 comment = Keyword.get(opts, :comment) 921 922 [default_expr(default), null_expr(null), after_expr(after_column), comment_expr(comment)] 923 end 924 925 defp comment_expr(comment, create_table? \\ false) 926 defp comment_expr(comment, true) when is_binary(comment), do: " COMMENT = '#{escape_string(comment)}'" 927 defp comment_expr(comment, false) when is_binary(comment), do: " COMMENT '#{escape_string(comment)}'" 928 defp comment_expr(_, _), do: [] 929 930 defp after_expr(nil), do: [] 931 defp after_expr(column) when is_atom(column) or is_binary(column), do: " AFTER `#{column}`" 932 defp after_expr(_), do: [] 933 934 defp null_expr(false), do: " NOT NULL" 935 defp null_expr(true), do: " NULL" 936 defp null_expr(_), do: [] 937 938 defp default_expr({:ok, nil}), 939 do: " DEFAULT NULL" 940 defp default_expr({:ok, literal}) when is_binary(literal), 941 do: [" DEFAULT '", escape_string(literal), ?'] 942 defp default_expr({:ok, literal}) when is_number(literal) or is_boolean(literal), 943 do: [" DEFAULT ", to_string(literal)] 944 defp default_expr({:ok, {:fragment, expr}}), 945 do: [" DEFAULT ", expr] 946 defp default_expr({:ok, value}) when is_map(value) do 947 library = Application.get_env(:myxql, :json_library, Jason) 948 expr = IO.iodata_to_binary(library.encode_to_iodata!(value)) 949 [" DEFAULT ", ?(, ?', escape_string(expr), ?', ?)] 950 end 951 defp default_expr(:error), 952 do: [] 953 954 defp index_expr(literal) when is_binary(literal), 955 do: literal 956 defp index_expr(literal), do: quote_name(literal) 957 958 defp engine_expr(storage_engine), 959 do: [" ENGINE = ", String.upcase(to_string(storage_engine || "INNODB"))] 960 961 defp options_expr(nil), 962 do: [] 963 defp options_expr(keyword) when is_list(keyword), 964 do: error!(nil, "MySQL adapter does not support keyword lists in :options") 965 defp options_expr(options), 966 do: [?\s, to_string(options)] 967 968 defp column_type(type, _opts) when type in ~w(time utc_datetime naive_datetime)a, 969 do: ecto_to_db(type) 970 971 defp column_type(type, opts) when type in ~w(time_usec utc_datetime_usec naive_datetime_usec)a do 972 precision = Keyword.get(opts, :precision, 6) 973 type_name = ecto_to_db(type) 974 975 [type_name, ?(, to_string(precision), ?)] 976 end 977 978 defp column_type(type, opts) do 979 size = Keyword.get(opts, :size) 980 precision = Keyword.get(opts, :precision) 981 scale = Keyword.get(opts, :scale) 982 983 cond do 984 size -> [ecto_size_to_db(type), ?(, to_string(size), ?)] 985 precision -> [ecto_to_db(type), ?(, to_string(precision), ?,, to_string(scale || 0), ?)] 986 type == :string -> ["varchar(255)"] 987 true -> ecto_to_db(type) 988 end 989 end 990 991 defp reference_expr(type, ref, table, name) do 992 {current_columns, reference_columns} = Enum.unzip([{name, ref.column} | ref.with]) 993 994 if ref.match do 995 error!(nil, ":match is not supported in references for tds") 996 end 997 998 ["CONSTRAINT ", reference_name(ref, table, name), 999 " ", type, " (", quote_names(current_columns), ?), 1000 " REFERENCES ", quote_table(ref.prefix || table.prefix, ref.table), 1001 ?(, quote_names(reference_columns), ?), 1002 reference_on_delete(ref.on_delete), reference_on_update(ref.on_update)] 1003 end 1004 1005 defp reference_expr(%Reference{} = ref, table, name), 1006 do: [", " | reference_expr("FOREIGN KEY", ref, table, name)] 1007 1008 defp constraint_expr(%Reference{} = ref, table, name), 1009 do: [", ADD " | reference_expr("FOREIGN KEY", ref, table, name)] 1010 1011 defp constraint_if_not_exists_expr(%Reference{} = ref, table, name), 1012 do: [", ADD " | reference_expr("FOREIGN KEY IF NOT EXISTS", ref, table, name)] 1013 1014 defp drop_constraint_expr(%Reference{} = ref, table, name), 1015 do: ["DROP FOREIGN KEY ", reference_name(ref, table, name), ", "] 1016 defp drop_constraint_expr(_, _, _), 1017 do: [] 1018 1019 defp drop_constraint_if_exists_expr(%Reference{} = ref, table, name), 1020 do: ["DROP FOREIGN KEY IF EXISTS ", reference_name(ref, table, name), ", "] 1021 defp drop_constraint_if_exists_expr(_, _, _), 1022 do: [] 1023 1024 defp reference_name(%Reference{name: nil}, table, column), 1025 do: quote_name("#{table.name}_#{column}_fkey") 1026 defp reference_name(%Reference{name: name}, _table, _column), 1027 do: quote_name(name) 1028 1029 defp reference_column_type(:serial, _opts), do: "BIGINT UNSIGNED" 1030 defp reference_column_type(:bigserial, _opts), do: "BIGINT UNSIGNED" 1031 defp reference_column_type(type, opts), do: column_type(type, opts) 1032 1033 defp reference_on_delete(:nilify_all), do: " ON DELETE SET NULL" 1034 defp reference_on_delete(:delete_all), do: " ON DELETE CASCADE" 1035 defp reference_on_delete(:restrict), do: " ON DELETE RESTRICT" 1036 defp reference_on_delete(_), do: [] 1037 1038 defp reference_on_update(:nilify_all), do: " ON UPDATE SET NULL" 1039 defp reference_on_update(:update_all), do: " ON UPDATE CASCADE" 1040 defp reference_on_update(:restrict), do: " ON UPDATE RESTRICT" 1041 defp reference_on_update(_), do: [] 1042 1043 ## Helpers 1044 1045 defp get_source(query, sources, ix, source) do 1046 {expr, name, _schema} = elem(sources, ix) 1047 {expr || expr(source, sources, query), name} 1048 end 1049 1050 defp get_parent_sources_ix(query, as) do 1051 case query.aliases[@parent_as] do 1052 {%{aliases: %{^as => ix}}, sources} -> {ix, sources} 1053 {%{} = parent, _sources} -> get_parent_sources_ix(parent, as) 1054 end 1055 end 1056 1057 defp quote_name(name) when is_atom(name) do 1058 quote_name(Atom.to_string(name)) 1059 end 1060 1061 defp quote_name(name) when is_binary(name) do 1062 if String.contains?(name, "`") do 1063 error!(nil, "bad literal/field/table name #{inspect name} (` is not permitted)") 1064 end 1065 1066 [?`, name, ?`] 1067 end 1068 1069 defp quote_names(names), do: intersperse_map(names, ?,, "e_name/1) 1070 1071 defp quote_table(nil, name), do: quote_table(name) 1072 defp quote_table(prefix, name), do: [quote_table(prefix), ?., quote_table(name)] 1073 1074 defp quote_table(name) when is_atom(name), 1075 do: quote_table(Atom.to_string(name)) 1076 defp quote_table(name) do 1077 if String.contains?(name, "`") do 1078 error!(nil, "bad table name #{inspect name}") 1079 end 1080 [?`, name, ?`] 1081 end 1082 1083 defp intersperse_map(list, separator, mapper, acc \\ []) 1084 defp intersperse_map([], _separator, _mapper, acc), 1085 do: acc 1086 defp intersperse_map([elem], _separator, mapper, acc), 1087 do: [acc | mapper.(elem)] 1088 defp intersperse_map([elem | rest], separator, mapper, acc), 1089 do: intersperse_map(rest, separator, mapper, [acc, mapper.(elem), separator]) 1090 1091 defp if_do(condition, value) do 1092 if condition, do: value, else: [] 1093 end 1094 1095 defp escape_string(value) when is_binary(value) do 1096 value 1097 |> :binary.replace("'", "''", [:global]) 1098 |> :binary.replace("\\", "\\\\", [:global]) 1099 end 1100 1101 defp escape_json_key(value) when is_binary(value) do 1102 value 1103 |> escape_string() 1104 |> :binary.replace("\"", "\\\\\"", [:global]) 1105 end 1106 1107 defp ecto_cast_to_db(:id, _query), do: "unsigned" 1108 defp ecto_cast_to_db(:integer, _query), do: "unsigned" 1109 defp ecto_cast_to_db(:string, _query), do: "char" 1110 defp ecto_cast_to_db(:utc_datetime_usec, _query), do: "datetime(6)" 1111 defp ecto_cast_to_db(:naive_datetime_usec, _query), do: "datetime(6)" 1112 defp ecto_cast_to_db(type, query), do: ecto_to_db(type, query) 1113 1114 defp ecto_size_to_db(:binary), do: "varbinary" 1115 defp ecto_size_to_db(type), do: ecto_to_db(type) 1116 1117 defp ecto_to_db(type, query \\ nil) 1118 defp ecto_to_db({:array, _}, query), do: error!(query, "Array type is not supported by MySQL") 1119 defp ecto_to_db(:id, _query), do: "integer" 1120 defp ecto_to_db(:serial, _query), do: "bigint unsigned not null auto_increment" 1121 defp ecto_to_db(:bigserial, _query), do: "bigint unsigned not null auto_increment" 1122 defp ecto_to_db(:binary_id, _query), do: "binary(16)" 1123 defp ecto_to_db(:string, _query), do: "varchar" 1124 defp ecto_to_db(:float, _query), do: "double" 1125 defp ecto_to_db(:binary, _query), do: "blob" 1126 defp ecto_to_db(:uuid, _query), do: "binary(16)" # MySQL does not support uuid 1127 defp ecto_to_db(:map, _query), do: "json" 1128 defp ecto_to_db({:map, _}, _query), do: "json" 1129 defp ecto_to_db(:time_usec, _query), do: "time" 1130 defp ecto_to_db(:utc_datetime, _query), do: "datetime" 1131 defp ecto_to_db(:utc_datetime_usec, _query), do: "datetime" 1132 defp ecto_to_db(:naive_datetime, _query), do: "datetime" 1133 defp ecto_to_db(:naive_datetime_usec, _query), do: "datetime" 1134 defp ecto_to_db(atom, _query) when is_atom(atom), do: Atom.to_string(atom) 1135 defp ecto_to_db(type, _query) do 1136 raise ArgumentError, 1137 "unsupported type `#{inspect(type)}`. The type can either be an atom, a string " <> 1138 "or a tuple of the form `{:map, t}` where `t` itself follows the same conditions." 1139 end 1140 1141 defp error!(nil, message) do 1142 raise ArgumentError, message 1143 end 1144 defp error!(query, message) do 1145 raise Ecto.QueryError, query: query, message: message 1146 end 1147 end 1148 end