zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

connection.ex (41470B)


      1 if Code.ensure_loaded?(MyXQL) do
      2   defmodule Ecto.Adapters.MyXQL.Connection do
      3     @moduledoc false
      4     alias Ecto.Adapters.SQL
      5 
      6     @behaviour Ecto.Adapters.SQL.Connection
      7 
      8     ## Connection
      9 
     10     @impl true
     11     def child_spec(opts) do
     12       MyXQL.child_spec(opts)
     13     end
     14 
     15     ## Query
     16 
     17     @impl true
     18     def prepare_execute(conn, name, sql, params, opts) do
     19       MyXQL.prepare_execute(conn, name, sql, params, opts)
     20     end
     21 
     22     @impl true
     23     def query(conn, sql, params, opts) do
     24       opts = Keyword.put_new(opts, :query_type, :binary_then_text)
     25       MyXQL.query(conn, sql, params, opts)
     26     end
     27 
     28     @impl true
     29     def query_many(conn, sql, params, opts) do
     30       opts = Keyword.put_new(opts, :query_type, :text)
     31       MyXQL.query_many(conn, sql, params, opts)
     32     end
     33 
     34     @impl true
     35     def execute(conn, query, params, opts) do
     36       case MyXQL.execute(conn, query, params, opts) do
     37         {:ok, _, result} -> {:ok, result}
     38         {:error, _} = error -> error
     39       end
     40     end
     41 
     42     @impl true
     43     def stream(conn, sql, params, opts) do
     44       MyXQL.stream(conn, sql, params, opts)
     45     end
     46 
     47     @impl true
     48     def to_constraints(%MyXQL.Error{mysql: %{name: :ER_DUP_ENTRY}, message: message}, opts) do
     49       case :binary.split(message, " for key ") do
     50         [_, quoted] -> [unique: normalize_index_name(quoted, opts[:source])]
     51         _ -> []
     52       end
     53     end
     54     def to_constraints(%MyXQL.Error{mysql: %{name: name}, message: message}, _opts)
     55         when name in [:ER_ROW_IS_REFERENCED_2, :ER_NO_REFERENCED_ROW_2] do
     56       case :binary.split(message, [" CONSTRAINT ", " FOREIGN KEY "], [:global]) do
     57         [_, quoted, _] -> [foreign_key: strip_quotes(quoted)]
     58         _ -> []
     59       end
     60     end
     61     def to_constraints(_, _),
     62       do: []
     63 
     64     defp strip_quotes(quoted) do
     65       size = byte_size(quoted) - 2
     66       <<_, unquoted::binary-size(size), _>> = quoted
     67       unquoted
     68     end
     69 
     70     defp normalize_index_name(quoted, source) do
     71       name = strip_quotes(quoted)
     72 
     73       if source do
     74         String.trim_leading(name, "#{source}.")
     75       else
     76         name
     77       end
     78     end
     79 
     80     ## Query
     81 
     82     @parent_as __MODULE__
     83     alias Ecto.Query.{BooleanExpr, JoinExpr, QueryExpr, WithExpr}
     84 
     85     @impl true
     86     def all(query, as_prefix \\ []) do
     87       sources = create_names(query, as_prefix)
     88 
     89       cte = cte(query, sources)
     90       from = from(query, sources)
     91       select = select(query, sources)
     92       join = join(query, sources)
     93       where = where(query, sources)
     94       group_by = group_by(query, sources)
     95       having = having(query, sources)
     96       window = window(query, sources)
     97       combinations = combinations(query)
     98       order_by = order_by(query, sources)
     99       limit = limit(query, sources)
    100       offset = offset(query, sources)
    101       lock = lock(query, sources)
    102 
    103       [cte, select, from, join, where, group_by, having, window, combinations, order_by, limit, offset | lock]
    104     end
    105 
    106     @impl true
    107     def update_all(query, prefix \\ nil) do
    108       %{from: %{source: source}, select: select} = query
    109 
    110       if select do
    111         error!(nil, ":select is not supported in update_all by MySQL")
    112       end
    113 
    114       sources = create_names(query, [])
    115       cte = cte(query, sources)
    116       {from, name} = get_source(query, sources, 0, source)
    117 
    118       fields = if prefix do
    119         update_fields(:on_conflict, query, sources)
    120       else
    121         update_fields(:update, query, sources)
    122       end
    123 
    124       {join, wheres} = using_join(query, :update_all, sources)
    125       prefix = prefix || ["UPDATE ", from, " AS ", name, join, " SET "]
    126       where  = where(%{query | wheres: wheres ++ query.wheres}, sources)
    127 
    128       [cte, prefix, fields | where]
    129     end
    130 
    131     @impl true
    132     def delete_all(query) do
    133       if query.select do
    134         error!(nil, ":select is not supported in delete_all by MySQL")
    135       end
    136 
    137       sources = create_names(query, [])
    138       cte = cte(query, sources)
    139       {_, name, _} = elem(sources, 0)
    140 
    141       from   = from(query, sources)
    142       join   = join(query, sources)
    143       where  = where(query, sources)
    144 
    145       [cte, "DELETE ", name, ".*", from, join | where]
    146     end
    147 
    148     @impl true
    149     def insert(prefix, table, header, rows, on_conflict, [], []) do
    150       fields = quote_names(header)
    151       ["INSERT INTO ", quote_table(prefix, table), " (", fields, ") ",
    152        insert_all(rows) | on_conflict(on_conflict, header)]
    153     end
    154     def insert(_prefix, _table, _header, _rows, _on_conflict, _returning, []) do
    155       error!(nil, ":returning is not supported in insert/insert_all by MySQL")
    156     end
    157     def insert(_prefix, _table, _header, _rows, _on_conflict, _returning, _placeholders) do
    158       error!(nil, ":placeholders is not supported by MySQL")
    159     end
    160 
    161     defp on_conflict({_, _, [_ | _]}, _header) do
    162       error!(nil, ":conflict_target is not supported in insert/insert_all by MySQL")
    163     end
    164     defp on_conflict({:raise, _, []}, _header) do
    165       []
    166     end
    167     defp on_conflict({:nothing, _, []}, [field | _]) do
    168       quoted = quote_name(field)
    169       [" ON DUPLICATE KEY UPDATE ", quoted, " = " | quoted]
    170     end
    171     defp on_conflict({fields, _, []}, _header) when is_list(fields) do
    172       [" ON DUPLICATE KEY UPDATE " |
    173        intersperse_map(fields, ?,, fn field ->
    174          quoted = quote_name(field)
    175          [quoted, " = VALUES(", quoted, ?)]
    176        end)]
    177     end
    178     defp on_conflict({%{wheres: []} = query, _, []}, _header) do
    179       [" ON DUPLICATE KEY " | update_all(query, "UPDATE ")]
    180     end
    181     defp on_conflict({_query, _, []}, _header) do
    182       error!(nil, "Using a query with :where in combination with the :on_conflict option is not supported by MySQL")
    183     end
    184 
    185     defp insert_all(rows) when is_list(rows) do
    186       ["VALUES ", intersperse_map(rows, ?,, fn row ->
    187         [?(, intersperse_map(row, ?,, &insert_all_value/1), ?)]
    188       end)]
    189     end
    190 
    191     defp insert_all(%Ecto.Query{} = query) do
    192       [?(, all(query), ?)]
    193     end
    194 
    195     defp insert_all_value(nil), do: "DEFAULT"
    196     defp insert_all_value({%Ecto.Query{} = query, _params_counter}), do: [?(, all(query), ?)]
    197     defp insert_all_value(_),   do: '?'
    198 
    199     @impl true
    200     def update(prefix, table, fields, filters, _returning) do
    201       fields = intersperse_map(fields, ", ", &[quote_name(&1), " = ?"])
    202       filters = intersperse_map(filters, " AND ", fn
    203         {field, nil} ->
    204           [quote_name(field), " IS NULL"]
    205 
    206         {field, _value} ->
    207           [quote_name(field), " = ?"]
    208       end)
    209       ["UPDATE ", quote_table(prefix, table), " SET ", fields, " WHERE " | filters]
    210     end
    211 
    212     @impl true
    213     def delete(prefix, table, filters, _returning) do
    214       filters = intersperse_map(filters, " AND ", fn
    215         {field, nil} ->
    216           [quote_name(field), " IS NULL"]
    217 
    218         {field, _value} ->
    219           [quote_name(field), " = ?"]
    220       end)
    221       ["DELETE FROM ", quote_table(prefix, table), " WHERE " | filters]
    222     end
    223 
    224     @impl true
    225     # DB explain opts are deprecated, so they aren't used to build the explain query.
    226     # See Notes at https://dev.mysql.com/doc/refman/5.7/en/explain.html
    227     def explain_query(conn, query, params, opts) do
    228       case query(conn, build_explain_query(query), params, opts) do
    229         {:ok, %MyXQL.Result{} = result} ->
    230           {:ok, SQL.format_table(result)}
    231 
    232         error ->
    233           error
    234       end
    235     end
    236 
    237     def build_explain_query(query) do
    238       ["EXPLAIN ", query]
    239       |> IO.iodata_to_binary()
    240     end
    241 
    242     ## Query generation
    243 
    244     binary_ops =
    245       [==: " = ", !=: " != ", <=: " <= ", >=: " >= ", <: " < ", >: " > ",
    246        +: " + ", -: " - ", *: " * ", /: " / ",
    247        and: " AND ", or: " OR ", like: " LIKE "]
    248 
    249     @binary_ops Keyword.keys(binary_ops)
    250 
    251     Enum.map(binary_ops, fn {op, str} ->
    252       defp handle_call(unquote(op), 2), do: {:binary_op, unquote(str)}
    253     end)
    254 
    255     defp handle_call(fun, _arity), do: {:fun, Atom.to_string(fun)}
    256 
    257     defp select(%{select: %{fields: fields}, distinct: distinct} = query, sources) do
    258       ["SELECT ", distinct(distinct, sources, query) | select(fields, sources, query)]
    259     end
    260 
    261     defp distinct(nil, _sources, _query), do: []
    262     defp distinct(%QueryExpr{expr: true}, _sources, _query),  do: "DISTINCT "
    263     defp distinct(%QueryExpr{expr: false}, _sources, _query), do: []
    264     defp distinct(%QueryExpr{expr: exprs}, _sources, query) when is_list(exprs) do
    265       error!(query, "DISTINCT with multiple columns is not supported by MySQL")
    266     end
    267 
    268     defp select([], _sources, _query),
    269       do: "TRUE"
    270     defp select(fields, sources, query) do
    271       intersperse_map(fields, ", ", fn
    272         {:&, _, [idx]} ->
    273           case elem(sources, idx) do
    274             {nil, source, nil} ->
    275               error!(query, "MySQL adapter does not support selecting all fields from fragment #{source}. " <>
    276                             "Please specify exactly which fields you want to select")
    277             {source, _, nil} ->
    278               error!(query, "MySQL adapter does not support selecting all fields from #{source} without a schema. " <>
    279                             "Please specify a schema or specify exactly which fields you want to select")
    280             {_, source, _} ->
    281               source
    282           end
    283         {key, value} ->
    284           [expr(value, sources, query), " AS ", quote_name(key)]
    285         value ->
    286           expr(value, sources, query)
    287       end)
    288     end
    289 
    290     defp from(%{from: %{source: source, hints: hints}} = query, sources) do
    291       {from, name} = get_source(query, sources, 0, source)
    292       [" FROM ", from, " AS ", name | Enum.map(hints, &[?\s | &1])]
    293     end
    294 
    295     defp cte(%{with_ctes: %WithExpr{recursive: recursive, queries: [_ | _] = queries}} = query, sources) do
    296       recursive_opt = if recursive, do: "RECURSIVE ", else: ""
    297       ctes = intersperse_map(queries, ", ", &cte_expr(&1, sources, query))
    298       ["WITH ", recursive_opt, ctes, " "]
    299     end
    300 
    301     defp cte(%{with_ctes: _}, _), do: []
    302 
    303     defp cte_expr({name, cte}, sources, query) do
    304       [quote_name(name), " AS ", cte_query(cte, sources, query)]
    305     end
    306 
    307     defp cte_query(%Ecto.Query{} = query, sources, parent_query) do
    308       query = put_in(query.aliases[@parent_as], {parent_query, sources})
    309       ["(", all(query, subquery_as_prefix(sources)), ")"]
    310     end
    311 
    312     defp cte_query(%QueryExpr{expr: expr}, sources, query) do
    313       expr(expr, sources, query)
    314     end
    315 
    316     defp update_fields(type, %{updates: updates} = query, sources) do
    317      fields = for(%{expr: expr} <- updates,
    318                    {op, kw} <- expr,
    319                    {key, value} <- kw,
    320                    do: update_op(op, update_key(type, key, query, sources), value, sources, query))
    321       Enum.intersperse(fields, ", ")
    322     end
    323 
    324     defp update_key(:update, key, %{from: from} = query, sources) do
    325       {_from, name} = get_source(query, sources, 0, from)
    326 
    327       [name, ?. | quote_name(key)]
    328     end
    329     defp update_key(:on_conflict, key, _query, _sources) do
    330       quote_name(key)
    331     end
    332 
    333     defp update_op(:set, quoted_key, value, sources, query) do
    334       [quoted_key, " = " | expr(value, sources, query)]
    335     end
    336 
    337     defp update_op(:inc, quoted_key, value, sources, query) do
    338       [quoted_key, " = ", quoted_key, " + " | expr(value, sources, query)]
    339     end
    340 
    341     defp update_op(command, _quoted_key, _value, _sources, query) do
    342       error!(query, "Unknown update operation #{inspect command} for MySQL")
    343     end
    344 
    345     defp using_join(%{joins: []}, _kind, _sources), do: {[], []}
    346     defp using_join(%{joins: joins} = query, kind, sources) do
    347       froms =
    348         intersperse_map(joins, ", ", fn
    349           %JoinExpr{source: %Ecto.SubQuery{params: [_ | _]}} ->
    350             error!(query, "MySQL adapter does not support subqueries with parameters in update_all/delete_all joins")
    351 
    352           %JoinExpr{qual: :inner, ix: ix, source: source} ->
    353             {join, name} = get_source(query, sources, ix, source)
    354             [join, " AS " | name]
    355 
    356           %JoinExpr{qual: qual} ->
    357             error!(query, "MySQL adapter supports only inner joins on #{kind}, got: `#{qual}`")
    358         end)
    359 
    360       wheres =
    361         for %JoinExpr{on: %QueryExpr{expr: value} = expr} <- joins,
    362             value != true,
    363             do: expr |> Map.put(:__struct__, BooleanExpr) |> Map.put(:op, :and)
    364 
    365       {[?,, ?\s | froms], wheres}
    366     end
    367 
    368     defp join(%{joins: []}, _sources), do: []
    369     defp join(%{joins: joins} = query, sources) do
    370       Enum.map(joins, fn
    371         %JoinExpr{on: %QueryExpr{expr: expr}, qual: qual, ix: ix, source: source, hints: hints} ->
    372           {join, name} = get_source(query, sources, ix, source)
    373           [join_qual(qual, query), join, " AS ", name, Enum.map(hints, &[?\s | &1]) | join_on(qual, expr, sources, query)]
    374       end)
    375     end
    376 
    377     defp join_on(:cross, true, _sources, _query), do: []
    378     defp join_on(_qual, expr, sources, query), do: [" ON " | expr(expr, sources, query)]
    379 
    380     defp join_qual(:inner, _), do: " INNER JOIN "
    381     defp join_qual(:inner_lateral, _), do: " INNER JOIN LATERAL "
    382     defp join_qual(:left, _),  do: " LEFT OUTER JOIN "
    383     defp join_qual(:left_lateral, _),  do: " LEFT OUTER JOIN LATERAL "
    384     defp join_qual(:right, _), do: " RIGHT OUTER JOIN "
    385     defp join_qual(:full, _),  do: " FULL OUTER JOIN "
    386     defp join_qual(:cross, _), do: " CROSS JOIN "
    387 
    388     defp where(%{wheres: wheres} = query, sources) do
    389       boolean(" WHERE ", wheres, sources, query)
    390     end
    391 
    392     defp having(%{havings: havings} = query, sources) do
    393       boolean(" HAVING ", havings, sources, query)
    394     end
    395 
    396     defp group_by(%{group_bys: []}, _sources), do: []
    397     defp group_by(%{group_bys: group_bys} = query, sources) do
    398       [" GROUP BY " |
    399        intersperse_map(group_bys, ", ", fn %QueryExpr{expr: expr} ->
    400          intersperse_map(expr, ", ", &expr(&1, sources, query))
    401        end)]
    402     end
    403 
    404     defp window(%{windows: []}, _sources), do: []
    405     defp window(%{windows: windows} = query, sources) do
    406       [" WINDOW " |
    407        intersperse_map(windows, ", ", fn {name, %{expr: kw}} ->
    408          [quote_name(name), " AS " | window_exprs(kw, sources, query)]
    409        end)]
    410     end
    411 
    412     defp window_exprs(kw, sources, query) do
    413       [?(, intersperse_map(kw, ?\s, &window_expr(&1, sources, query)), ?)]
    414     end
    415 
    416     defp window_expr({:partition_by, fields}, sources, query) do
    417       ["PARTITION BY " | intersperse_map(fields, ", ", &expr(&1, sources, query))]
    418     end
    419 
    420     defp window_expr({:order_by, fields}, sources, query) do
    421       ["ORDER BY " | intersperse_map(fields, ", ", &order_by_expr(&1, sources, query))]
    422     end
    423 
    424     defp window_expr({:frame, {:fragment, _, _} = fragment}, sources, query) do
    425       expr(fragment, sources, query)
    426     end
    427 
    428     defp order_by(%{order_bys: []}, _sources), do: []
    429     defp order_by(%{order_bys: order_bys} = query, sources) do
    430       [" ORDER BY " |
    431        intersperse_map(order_bys, ", ", fn %QueryExpr{expr: expr} ->
    432          intersperse_map(expr, ", ", &order_by_expr(&1, sources, query))
    433        end)]
    434     end
    435 
    436     defp order_by_expr({dir, expr}, sources, query) do
    437       str = expr(expr, sources, query)
    438 
    439       case dir do
    440         :asc  -> str
    441         :desc -> [str | " DESC"]
    442         _ -> error!(query, "#{dir} is not supported in ORDER BY in MySQL")
    443       end
    444     end
    445 
    446     defp limit(%{limit: nil}, _sources), do: []
    447     defp limit(%{limit: %QueryExpr{expr: expr}} = query, sources) do
    448       [" LIMIT " | expr(expr, sources, query)]
    449     end
    450 
    451     defp offset(%{offset: nil}, _sources), do: []
    452     defp offset(%{offset: %QueryExpr{expr: expr}} = query, sources) do
    453       [" OFFSET " | expr(expr, sources, query)]
    454     end
    455 
    456     defp combinations(%{combinations: combinations}) do
    457       Enum.map(combinations, fn
    458         {:union, query} -> [" UNION (", all(query), ")"]
    459         {:union_all, query} -> [" UNION ALL (", all(query), ")"]
    460         {:except, query} -> [" EXCEPT (", all(query), ")"]
    461         {:except_all, query} -> [" EXCEPT ALL (", all(query), ")"]
    462         {:intersect, query} -> [" INTERSECT (", all(query), ")"]
    463         {:intersect_all, query} -> [" INTERSECT ALL (", all(query), ")"]
    464       end)
    465     end
    466 
    467     defp lock(%{lock: nil}, _sources), do: []
    468     defp lock(%{lock: binary}, _sources) when is_binary(binary), do: [?\s | binary]
    469     defp lock(%{lock: expr} = query, sources), do: [?\s | expr(expr, sources, query)]
    470 
    471     defp boolean(_name, [], _sources, _query), do: []
    472     defp boolean(name, [%{expr: expr, op: op} | query_exprs], sources, query) do
    473       [name,
    474        Enum.reduce(query_exprs, {op, paren_expr(expr, sources, query)}, fn
    475          %BooleanExpr{expr: expr, op: op}, {op, acc} ->
    476            {op, [acc, operator_to_boolean(op) | paren_expr(expr, sources, query)]}
    477          %BooleanExpr{expr: expr, op: op}, {_, acc} ->
    478            {op, [?(, acc, ?), operator_to_boolean(op) | paren_expr(expr, sources, query)]}
    479        end) |> elem(1)]
    480     end
    481 
    482     defp operator_to_boolean(:and), do: " AND "
    483     defp operator_to_boolean(:or), do: " OR "
    484 
    485     defp parens_for_select([first_expr | _] = expr) do
    486       if is_binary(first_expr) and String.match?(first_expr, ~r/^\s*select/i) do
    487         [?(, expr, ?)]
    488       else
    489         expr
    490       end
    491     end
    492 
    493     defp paren_expr(expr, sources, query) do
    494       [?(, expr(expr, sources, query), ?)]
    495     end
    496 
    497     defp expr({:^, [], [_ix]}, _sources, _query) do
    498       '?'
    499     end
    500 
    501     defp expr({{:., _, [{:parent_as, _, [as]}, field]}, _, []}, _sources, query)
    502          when is_atom(field) do
    503       {ix, sources} = get_parent_sources_ix(query, as)
    504       {_, name, _} = elem(sources, ix)
    505       [name, ?. | quote_name(field)]
    506     end
    507 
    508     defp expr({{:., _, [{:&, _, [idx]}, field]}, _, []}, sources, _query)
    509         when is_atom(field) do
    510       {_, name, _} = elem(sources, idx)
    511       [name, ?. | quote_name(field)]
    512     end
    513 
    514     defp expr({:&, _, [idx]}, sources, _query) do
    515       {_, source, _} = elem(sources, idx)
    516       source
    517     end
    518 
    519     defp expr({:in, _, [_left, []]}, _sources, _query) do
    520       "false"
    521     end
    522 
    523     defp expr({:in, _, [left, right]}, sources, query) when is_list(right) do
    524       args = intersperse_map(right, ?,, &expr(&1, sources, query))
    525       [expr(left, sources, query), " IN (", args, ?)]
    526     end
    527 
    528     defp expr({:in, _, [_, {:^, _, [_, 0]}]}, _sources, _query) do
    529       "false"
    530     end
    531 
    532     defp expr({:in, _, [left, {:^, _, [_, length]}]}, sources, query) do
    533       args = Enum.intersperse(List.duplicate(??, length), ?,)
    534       [expr(left, sources, query), " IN (", args, ?)]
    535     end
    536 
    537     defp expr({:in, _, [left, %Ecto.SubQuery{} = subquery]}, sources, query) do
    538       [expr(left, sources, query), " IN ", expr(subquery, sources, query)]
    539     end
    540 
    541     defp expr({:in, _, [left, right]}, sources, query) do
    542       [expr(left, sources, query), " = ANY(", expr(right, sources, query), ?)]
    543     end
    544 
    545     defp expr({:is_nil, _, [arg]}, sources, query) do
    546       [expr(arg, sources, query) | " IS NULL"]
    547     end
    548 
    549     defp expr({:not, _, [expr]}, sources, query) do
    550       ["NOT (", expr(expr, sources, query), ?)]
    551     end
    552 
    553     defp expr({:filter, _, _}, _sources, query) do
    554       error!(query, "MySQL adapter does not support aggregate filters")
    555     end
    556 
    557     defp expr(%Ecto.SubQuery{query: query}, sources, parent_query) do
    558       query = put_in(query.aliases[@parent_as], {parent_query, sources})
    559       [?(, all(query, subquery_as_prefix(sources)), ?)]
    560     end
    561 
    562     defp expr({:fragment, _, [kw]}, _sources, query) when is_list(kw) or tuple_size(kw) == 3 do
    563       error!(query, "MySQL adapter does not support keyword or interpolated fragments")
    564     end
    565 
    566     defp expr({:fragment, _, parts}, sources, query) do
    567       Enum.map(parts, fn
    568         {:raw, part}  -> part
    569         {:expr, expr} -> expr(expr, sources, query)
    570       end)
    571       |> parens_for_select
    572     end
    573 
    574     defp expr({:literal, _, [literal]}, _sources, _query) do
    575       quote_name(literal)
    576     end
    577 
    578     defp expr({:selected_as, _, [name]}, _sources, _query) do
    579       [quote_name(name)]
    580     end
    581 
    582     defp expr({:datetime_add, _, [datetime, count, interval]}, sources, query) do
    583       ["date_add(", expr(datetime, sources, query), ", ",
    584        interval(count, interval, sources, query) | ")"]
    585     end
    586 
    587     defp expr({:date_add, _, [date, count, interval]}, sources, query) do
    588       ["CAST(date_add(", expr(date, sources, query), ", ",
    589        interval(count, interval, sources, query) | ") AS date)"]
    590     end
    591 
    592     defp expr({:ilike, _, [_, _]}, _sources, query) do
    593       error!(query, "ilike is not supported by MySQL")
    594     end
    595 
    596     defp expr({:over, _, [agg, name]}, sources, query) when is_atom(name) do
    597       aggregate = expr(agg, sources, query)
    598       [aggregate, " OVER " | quote_name(name)]
    599     end
    600 
    601     defp expr({:over, _, [agg, kw]}, sources, query) do
    602       aggregate = expr(agg, sources, query)
    603       [aggregate, " OVER " | window_exprs(kw, sources, query)]
    604     end
    605 
    606     defp expr({:{}, _, elems}, sources, query) do
    607       [?(, intersperse_map(elems, ?,, &expr(&1, sources, query)), ?)]
    608     end
    609 
    610     defp expr({:count, _, []}, _sources, _query), do: "count(*)"
    611 
    612     defp expr({:json_extract_path, _, [expr, path]}, sources, query) do
    613       path =
    614         Enum.map(path, fn
    615           binary when is_binary(binary) ->
    616             [?., ?", escape_json_key(binary), ?"]
    617 
    618           integer when is_integer(integer) ->
    619             "[#{integer}]"
    620         end)
    621 
    622       ["json_extract(", expr(expr, sources, query), ", '$", path, "')"]
    623     end
    624 
    625     defp expr({fun, _, args}, sources, query) when is_atom(fun) and is_list(args) do
    626       {modifier, args} =
    627         case args do
    628           [rest, :distinct] -> {"DISTINCT ", [rest]}
    629           _ -> {[], args}
    630         end
    631 
    632       case handle_call(fun, length(args)) do
    633         {:binary_op, op} ->
    634           [left, right] = args
    635           [op_to_binary(left, sources, query), op | op_to_binary(right, sources, query)]
    636         {:fun, fun} ->
    637           [fun, ?(, modifier, intersperse_map(args, ", ", &expr(&1, sources, query)), ?)]
    638       end
    639     end
    640 
    641     defp expr(list, _sources, query) when is_list(list) do
    642       error!(query, "Array type is not supported by MySQL")
    643     end
    644 
    645     defp expr(%Decimal{} = decimal, _sources, _query) do
    646       Decimal.to_string(decimal, :normal)
    647     end
    648 
    649     defp expr(%Ecto.Query.Tagged{value: binary, type: :binary}, _sources, _query)
    650         when is_binary(binary) do
    651       hex = Base.encode16(binary, case: :lower)
    652       [?x, ?', hex, ?']
    653     end
    654 
    655     defp expr(%Ecto.Query.Tagged{value: other, type: type}, sources, query)
    656          when type in [:decimal, :float] do
    657       [expr(other, sources, query), " + 0"]
    658     end
    659 
    660     defp expr(%Ecto.Query.Tagged{value: other, type: type}, sources, query) do
    661       ["CAST(", expr(other, sources, query), " AS ", ecto_cast_to_db(type, query), ?)]
    662     end
    663 
    664     defp expr(nil, _sources, _query),   do: "NULL"
    665     defp expr(true, _sources, _query),  do: "TRUE"
    666     defp expr(false, _sources, _query), do: "FALSE"
    667 
    668     defp expr(literal, _sources, _query) when is_binary(literal) do
    669       [?', escape_string(literal), ?']
    670     end
    671 
    672     defp expr(literal, _sources, _query) when is_integer(literal) do
    673       Integer.to_string(literal)
    674     end
    675 
    676     defp expr(literal, _sources, _query) when is_float(literal) do
    677       # MySQL doesn't support float cast
    678       ["(0 + ", Float.to_string(literal), ?)]
    679     end
    680 
    681     defp expr(expr, _sources, query) do
    682       error!(query, "unsupported expression: #{inspect(expr)}")
    683     end
    684 
    685     defp interval(count, "millisecond", sources, query) do
    686       ["INTERVAL (", expr(count, sources, query) | " * 1000) microsecond"]
    687     end
    688 
    689     defp interval(count, interval, sources, query) do
    690       ["INTERVAL ", expr(count, sources, query), ?\s | interval]
    691     end
    692 
    693     defp op_to_binary({op, _, [_, _]} = expr, sources, query) when op in @binary_ops,
    694       do: paren_expr(expr, sources, query)
    695 
    696     defp op_to_binary({:is_nil, _, [_]} = expr, sources, query),
    697       do: paren_expr(expr, sources, query)
    698 
    699     defp op_to_binary(expr, sources, query),
    700       do: expr(expr, sources, query)
    701 
    702     defp create_names(%{sources: sources}, as_prefix) do
    703       create_names(sources, 0, tuple_size(sources), as_prefix) |> List.to_tuple()
    704     end
    705 
    706     defp create_names(sources, pos, limit, as_prefix) when pos < limit do
    707       [create_name(sources, pos, as_prefix) | create_names(sources, pos + 1, limit, as_prefix)]
    708     end
    709 
    710     defp create_names(_sources, pos, pos, as_prefix) do
    711       [as_prefix]
    712     end
    713 
    714     defp subquery_as_prefix(sources) do
    715       [?s | :erlang.element(tuple_size(sources), sources)]
    716     end
    717 
    718     defp create_name(sources, pos, as_prefix) do
    719       case elem(sources, pos) do
    720         {:fragment, _, _} ->
    721           {nil, as_prefix ++ [?f | Integer.to_string(pos)], nil}
    722 
    723         {table, schema, prefix} ->
    724           name = as_prefix ++ [create_alias(table) | Integer.to_string(pos)]
    725           {quote_table(prefix, table), name, schema}
    726 
    727         %Ecto.SubQuery{} ->
    728           {nil, as_prefix ++ [?s | Integer.to_string(pos)], nil}
    729       end
    730     end
    731 
    732     defp create_alias(<<first, _rest::binary>>) when first in ?a..?z when first in ?A..?Z do
    733       first
    734     end
    735     defp create_alias(_) do
    736       ?t
    737     end
    738 
    739     ## DDL
    740 
    741     alias Ecto.Migration.{Table, Index, Reference, Constraint}
    742 
    743     @impl true
    744     def execute_ddl({command, %Table{} = table, columns}) when command in [:create, :create_if_not_exists] do
    745       table_structure =
    746         case column_definitions(table, columns) ++ pk_definitions(columns, ", ") do
    747           [] -> []
    748           list -> [?\s, ?(, list, ?)]
    749         end
    750 
    751       [["CREATE TABLE ",
    752         if_do(command == :create_if_not_exists, "IF NOT EXISTS "),
    753         quote_table(table.prefix, table.name),
    754         table_structure,
    755         comment_expr(table.comment, true),
    756         engine_expr(table.engine), options_expr(table.options)]]
    757     end
    758 
    759 
    760     def execute_ddl({command, %Table{} = table, mode}) when command in [:drop, :drop_if_exists] do
    761       [["DROP TABLE ", if_do(command == :drop_if_exists, "IF EXISTS "),
    762         quote_table(table.prefix, table.name), drop_mode(mode)]]
    763     end
    764 
    765     def execute_ddl({:alter, %Table{} = table, changes}) do
    766       [["ALTER TABLE ", quote_table(table.prefix, table.name), ?\s,
    767         column_changes(table, changes), pk_definitions(changes, ", ADD ")]]
    768       ++
    769       if_do(table.comment,
    770         [["ALTER TABLE ", quote_table(table.prefix, table.name), comment_expr(table.comment)]]
    771       )
    772     end
    773 
    774     def execute_ddl({:create, %Index{} = index}) do
    775       if index.where do
    776         error!(nil, "MySQL adapter does not support where in indexes")
    777       end
    778 
    779       if index.nulls_distinct == false do
    780         error!(nil, "MySQL adapter does not support nulls_distinct set to false in indexes")
    781       end
    782 
    783       [["CREATE", if_do(index.unique, " UNIQUE"), " INDEX ",
    784         quote_name(index.name),
    785         " ON ",
    786         quote_table(index.prefix, index.table), ?\s,
    787         ?(, intersperse_map(index.columns, ", ", &index_expr/1), ?),
    788         if_do(index.using, [" USING ", to_string(index.using)]),
    789         if_do(index.concurrently, " LOCK=NONE")]]
    790     end
    791 
    792     def execute_ddl({:create_if_not_exists, %Index{}}),
    793       do: error!(nil, "MySQL adapter does not support create if not exists for index")
    794 
    795     def execute_ddl({:create, %Constraint{check: check}}) when is_binary(check),
    796       do: error!(nil, "MySQL adapter does not support check constraints")
    797     def execute_ddl({:create, %Constraint{exclude: exclude}}) when is_binary(exclude),
    798       do: error!(nil, "MySQL adapter does not support exclusion constraints")
    799 
    800     def execute_ddl({:drop, %Index{}, :cascade}),
    801       do: error!(nil, "MySQL adapter does not support cascade in drop index")
    802 
    803     def execute_ddl({:drop, %Index{} = index, :restrict}) do
    804       [["DROP INDEX ",
    805         quote_name(index.name),
    806         " ON ", quote_table(index.prefix, index.table),
    807         if_do(index.concurrently, " LOCK=NONE")]]
    808     end
    809 
    810     def execute_ddl({:drop, %Constraint{}, _}),
    811       do: error!(nil, "MySQL adapter does not support constraints")
    812 
    813     def execute_ddl({:drop_if_exists, %Constraint{}, _}),
    814       do: error!(nil, "MySQL adapter does not support constraints")
    815 
    816     def execute_ddl({:drop_if_exists, %Index{}, _}),
    817       do: error!(nil, "MySQL adapter does not support drop if exists for index")
    818 
    819     def execute_ddl({:rename, %Table{} = current_table, %Table{} = new_table}) do
    820       [["RENAME TABLE ", quote_table(current_table.prefix, current_table.name),
    821         " TO ", quote_table(new_table.prefix, new_table.name)]]
    822     end
    823 
    824     def execute_ddl({:rename, %Table{} = table, current_column, new_column}) do
    825       [["ALTER TABLE ", quote_table(table.prefix, table.name), " RENAME COLUMN ",
    826         quote_name(current_column), " TO ", quote_name(new_column)]]
    827     end
    828 
    829     def execute_ddl(string) when is_binary(string), do: [string]
    830 
    831     def execute_ddl(keyword) when is_list(keyword),
    832       do: error!(nil, "MySQL adapter does not support keyword lists in execute")
    833 
    834     @impl true
    835     def ddl_logs(_), do: []
    836 
    837     @impl true
    838     def table_exists_query(table) do
    839       {"SELECT true FROM information_schema.tables WHERE table_name = ? AND table_schema = DATABASE() LIMIT 1", [table]}
    840     end
    841 
    842     defp drop_mode(:cascade), do: " CASCADE"
    843     defp drop_mode(:restrict), do: []
    844 
    845     defp pk_definitions(columns, prefix) do
    846       pks =
    847         for {_, name, _, opts} <- columns,
    848             opts[:primary_key],
    849             do: name
    850 
    851       case pks do
    852         [] -> []
    853         _  -> [[prefix, "PRIMARY KEY (", quote_names(pks), ?)]]
    854       end
    855     end
    856 
    857     defp column_definitions(table, columns) do
    858       intersperse_map(columns, ", ", &column_definition(table, &1))
    859     end
    860 
    861     defp column_definition(table, {:add, name, %Reference{} = ref, opts}) do
    862       [quote_name(name), ?\s, reference_column_type(ref.type, opts),
    863        column_options(opts), reference_expr(ref, table, name)]
    864     end
    865 
    866     defp column_definition(_table, {:add, name, type, opts}) do
    867       [quote_name(name), ?\s, column_type(type, opts), column_options(opts)]
    868     end
    869 
    870     defp column_changes(table, columns) do
    871       intersperse_map(columns, ", ", &column_change(table, &1))
    872     end
    873 
    874     defp column_change(_table, {_command, _name, %Reference{validate: false}, _opts}) do
    875       error!(nil, "validate: false on references is not supported in MyXQL")
    876     end
    877 
    878     defp column_change(table, {:add, name, %Reference{} = ref, opts}) do
    879       ["ADD ", quote_name(name), ?\s, reference_column_type(ref.type, opts),
    880        column_options(opts), constraint_expr(ref, table, name)]
    881     end
    882 
    883     defp column_change(_table, {:add, name, type, opts}) do
    884       ["ADD ", quote_name(name), ?\s, column_type(type, opts), column_options(opts)]
    885     end
    886 
    887     defp column_change(table, {:add_if_not_exists, name, %Reference{} = ref, opts}) do
    888       ["ADD IF NOT EXISTS ", quote_name(name), ?\s, reference_column_type(ref.type, opts),
    889        column_options(opts), constraint_if_not_exists_expr(ref, table, name)]
    890     end
    891 
    892     defp column_change(_table, {:add_if_not_exists, name, type, opts}) do
    893       ["ADD IF NOT EXISTS ", quote_name(name), ?\s, column_type(type, opts), column_options(opts)]
    894     end
    895 
    896     defp column_change(table, {:modify, name, %Reference{} = ref, opts}) do
    897       [drop_constraint_expr(opts[:from], table, name), "MODIFY ", quote_name(name), ?\s, reference_column_type(ref.type, opts),
    898        column_options(opts), constraint_expr(ref, table, name)]
    899     end
    900 
    901     defp column_change(table, {:modify, name, type, opts}) do
    902       [drop_constraint_expr(opts[:from], table, name), "MODIFY ", quote_name(name), ?\s, column_type(type, opts), column_options(opts)]
    903     end
    904 
    905     defp column_change(_table, {:remove, name}), do: ["DROP ", quote_name(name)]
    906     defp column_change(table, {:remove, name, %Reference{} = ref, _opts}) do
    907       [drop_constraint_expr(ref, table, name), "DROP ", quote_name(name)]
    908     end
    909     defp column_change(_table, {:remove, name, _type, _opts}), do: ["DROP ", quote_name(name)]
    910 
    911     defp column_change(table, {:remove_if_exists, name, %Reference{} = ref}) do
    912       [drop_constraint_if_exists_expr(ref, table, name), "DROP IF EXISTS ", quote_name(name)]
    913     end
    914     defp column_change(_table, {:remove_if_exists, name, _type}), do: ["DROP IF EXISTS ", quote_name(name)]
    915 
    916     defp column_options(opts) do
    917       default = Keyword.fetch(opts, :default)
    918       null    = Keyword.get(opts, :null)
    919       after_column = Keyword.get(opts, :after)
    920       comment = Keyword.get(opts, :comment)
    921 
    922       [default_expr(default), null_expr(null), after_expr(after_column), comment_expr(comment)]
    923     end
    924 
    925     defp comment_expr(comment, create_table? \\ false)
    926     defp comment_expr(comment, true) when is_binary(comment), do: " COMMENT = '#{escape_string(comment)}'"
    927     defp comment_expr(comment, false) when is_binary(comment), do: " COMMENT '#{escape_string(comment)}'"
    928     defp comment_expr(_, _), do: []
    929 
    930     defp after_expr(nil), do: []
    931     defp after_expr(column) when is_atom(column) or is_binary(column), do: " AFTER `#{column}`"
    932     defp after_expr(_), do: []
    933 
    934     defp null_expr(false), do: " NOT NULL"
    935     defp null_expr(true), do: " NULL"
    936     defp null_expr(_), do: []
    937 
    938     defp default_expr({:ok, nil}),
    939       do: " DEFAULT NULL"
    940     defp default_expr({:ok, literal}) when is_binary(literal),
    941       do: [" DEFAULT '", escape_string(literal), ?']
    942     defp default_expr({:ok, literal}) when is_number(literal) or is_boolean(literal),
    943       do: [" DEFAULT ", to_string(literal)]
    944     defp default_expr({:ok, {:fragment, expr}}),
    945       do: [" DEFAULT ", expr]
    946     defp default_expr({:ok, value}) when is_map(value) do
    947       library = Application.get_env(:myxql, :json_library, Jason)
    948       expr = IO.iodata_to_binary(library.encode_to_iodata!(value))
    949       [" DEFAULT ", ?(, ?', escape_string(expr), ?', ?)]
    950     end
    951     defp default_expr(:error),
    952       do: []
    953 
    954     defp index_expr(literal) when is_binary(literal),
    955       do: literal
    956     defp index_expr(literal), do: quote_name(literal)
    957 
    958     defp engine_expr(storage_engine),
    959       do: [" ENGINE = ", String.upcase(to_string(storage_engine || "INNODB"))]
    960 
    961     defp options_expr(nil),
    962       do: []
    963     defp options_expr(keyword) when is_list(keyword),
    964       do: error!(nil, "MySQL adapter does not support keyword lists in :options")
    965     defp options_expr(options),
    966       do: [?\s, to_string(options)]
    967 
    968     defp column_type(type, _opts) when type in ~w(time utc_datetime naive_datetime)a,
    969       do: ecto_to_db(type)
    970 
    971     defp column_type(type, opts) when type in ~w(time_usec utc_datetime_usec naive_datetime_usec)a do
    972       precision = Keyword.get(opts, :precision, 6)
    973       type_name = ecto_to_db(type)
    974 
    975       [type_name, ?(, to_string(precision), ?)]
    976     end
    977 
    978     defp column_type(type, opts) do
    979       size      = Keyword.get(opts, :size)
    980       precision = Keyword.get(opts, :precision)
    981       scale     = Keyword.get(opts, :scale)
    982 
    983       cond do
    984         size -> [ecto_size_to_db(type), ?(, to_string(size), ?)]
    985         precision -> [ecto_to_db(type), ?(, to_string(precision), ?,, to_string(scale || 0), ?)]
    986         type == :string -> ["varchar(255)"]
    987         true -> ecto_to_db(type)
    988       end
    989     end
    990 
    991     defp reference_expr(type, ref, table, name) do
    992       {current_columns, reference_columns} = Enum.unzip([{name, ref.column} | ref.with])
    993 
    994       if ref.match do
    995         error!(nil, ":match is not supported in references for tds")
    996       end
    997 
    998       ["CONSTRAINT ", reference_name(ref, table, name),
    999        " ", type, " (", quote_names(current_columns), ?),
   1000        " REFERENCES ", quote_table(ref.prefix || table.prefix, ref.table),
   1001        ?(, quote_names(reference_columns), ?),
   1002        reference_on_delete(ref.on_delete), reference_on_update(ref.on_update)]
   1003     end
   1004 
   1005     defp reference_expr(%Reference{} = ref, table, name),
   1006       do: [", " | reference_expr("FOREIGN KEY", ref, table, name)]
   1007 
   1008     defp constraint_expr(%Reference{} = ref, table, name),
   1009       do: [", ADD " | reference_expr("FOREIGN KEY", ref, table, name)]
   1010 
   1011     defp constraint_if_not_exists_expr(%Reference{} = ref, table, name),
   1012       do: [", ADD " | reference_expr("FOREIGN KEY IF NOT EXISTS", ref, table, name)]
   1013 
   1014     defp drop_constraint_expr(%Reference{} = ref, table, name),
   1015       do: ["DROP FOREIGN KEY ", reference_name(ref, table, name), ", "]
   1016     defp drop_constraint_expr(_, _, _),
   1017       do: []
   1018 
   1019     defp drop_constraint_if_exists_expr(%Reference{} = ref, table, name),
   1020       do: ["DROP FOREIGN KEY IF EXISTS ", reference_name(ref, table, name), ", "]
   1021     defp drop_constraint_if_exists_expr(_, _, _),
   1022       do: []
   1023 
   1024     defp reference_name(%Reference{name: nil}, table, column),
   1025       do: quote_name("#{table.name}_#{column}_fkey")
   1026     defp reference_name(%Reference{name: name}, _table, _column),
   1027       do: quote_name(name)
   1028 
   1029     defp reference_column_type(:serial, _opts), do: "BIGINT UNSIGNED"
   1030     defp reference_column_type(:bigserial, _opts), do: "BIGINT UNSIGNED"
   1031     defp reference_column_type(type, opts), do: column_type(type, opts)
   1032 
   1033     defp reference_on_delete(:nilify_all), do: " ON DELETE SET NULL"
   1034     defp reference_on_delete(:delete_all), do: " ON DELETE CASCADE"
   1035     defp reference_on_delete(:restrict), do: " ON DELETE RESTRICT"
   1036     defp reference_on_delete(_), do: []
   1037 
   1038     defp reference_on_update(:nilify_all), do: " ON UPDATE SET NULL"
   1039     defp reference_on_update(:update_all), do: " ON UPDATE CASCADE"
   1040     defp reference_on_update(:restrict), do: " ON UPDATE RESTRICT"
   1041     defp reference_on_update(_), do: []
   1042 
   1043     ## Helpers
   1044 
   1045     defp get_source(query, sources, ix, source) do
   1046       {expr, name, _schema} = elem(sources, ix)
   1047       {expr || expr(source, sources, query), name}
   1048     end
   1049 
   1050     defp get_parent_sources_ix(query, as) do
   1051       case query.aliases[@parent_as] do
   1052         {%{aliases: %{^as => ix}}, sources} -> {ix, sources}
   1053         {%{} = parent, _sources} -> get_parent_sources_ix(parent, as)
   1054       end
   1055     end
   1056 
   1057     defp quote_name(name) when is_atom(name) do
   1058       quote_name(Atom.to_string(name))
   1059     end
   1060 
   1061     defp quote_name(name) when is_binary(name) do
   1062       if String.contains?(name, "`") do
   1063         error!(nil, "bad literal/field/table name #{inspect name} (` is not permitted)")
   1064       end
   1065 
   1066       [?`, name, ?`]
   1067     end
   1068 
   1069     defp quote_names(names), do: intersperse_map(names, ?,, &quote_name/1)
   1070 
   1071     defp quote_table(nil, name),    do: quote_table(name)
   1072     defp quote_table(prefix, name), do: [quote_table(prefix), ?., quote_table(name)]
   1073 
   1074     defp quote_table(name) when is_atom(name),
   1075       do: quote_table(Atom.to_string(name))
   1076     defp quote_table(name) do
   1077       if String.contains?(name, "`") do
   1078         error!(nil, "bad table name #{inspect name}")
   1079       end
   1080       [?`, name, ?`]
   1081     end
   1082 
   1083     defp intersperse_map(list, separator, mapper, acc \\ [])
   1084     defp intersperse_map([], _separator, _mapper, acc),
   1085       do: acc
   1086     defp intersperse_map([elem], _separator, mapper, acc),
   1087       do: [acc | mapper.(elem)]
   1088     defp intersperse_map([elem | rest], separator, mapper, acc),
   1089       do: intersperse_map(rest, separator, mapper, [acc, mapper.(elem), separator])
   1090 
   1091     defp if_do(condition, value) do
   1092       if condition, do: value, else: []
   1093     end
   1094 
   1095     defp escape_string(value) when is_binary(value) do
   1096       value
   1097       |> :binary.replace("'", "''", [:global])
   1098       |> :binary.replace("\\", "\\\\", [:global])
   1099     end
   1100 
   1101     defp escape_json_key(value) when is_binary(value) do
   1102       value
   1103       |> escape_string()
   1104       |> :binary.replace("\"", "\\\\\"", [:global])
   1105     end
   1106 
   1107     defp ecto_cast_to_db(:id, _query), do: "unsigned"
   1108     defp ecto_cast_to_db(:integer, _query), do: "unsigned"
   1109     defp ecto_cast_to_db(:string, _query), do: "char"
   1110     defp ecto_cast_to_db(:utc_datetime_usec, _query), do: "datetime(6)"
   1111     defp ecto_cast_to_db(:naive_datetime_usec, _query), do: "datetime(6)"
   1112     defp ecto_cast_to_db(type, query), do: ecto_to_db(type, query)
   1113 
   1114     defp ecto_size_to_db(:binary), do: "varbinary"
   1115     defp ecto_size_to_db(type), do: ecto_to_db(type)
   1116 
   1117     defp ecto_to_db(type, query \\ nil)
   1118     defp ecto_to_db({:array, _}, query),           do: error!(query, "Array type is not supported by MySQL")
   1119     defp ecto_to_db(:id, _query),                  do: "integer"
   1120     defp ecto_to_db(:serial, _query),              do: "bigint unsigned not null auto_increment"
   1121     defp ecto_to_db(:bigserial, _query),           do: "bigint unsigned not null auto_increment"
   1122     defp ecto_to_db(:binary_id, _query),           do: "binary(16)"
   1123     defp ecto_to_db(:string, _query),              do: "varchar"
   1124     defp ecto_to_db(:float, _query),               do: "double"
   1125     defp ecto_to_db(:binary, _query),              do: "blob"
   1126     defp ecto_to_db(:uuid, _query),                do: "binary(16)" # MySQL does not support uuid
   1127     defp ecto_to_db(:map, _query),                 do: "json"
   1128     defp ecto_to_db({:map, _}, _query),            do: "json"
   1129     defp ecto_to_db(:time_usec, _query),           do: "time"
   1130     defp ecto_to_db(:utc_datetime, _query),        do: "datetime"
   1131     defp ecto_to_db(:utc_datetime_usec, _query),   do: "datetime"
   1132     defp ecto_to_db(:naive_datetime, _query),      do: "datetime"
   1133     defp ecto_to_db(:naive_datetime_usec, _query), do: "datetime"
   1134     defp ecto_to_db(atom, _query) when is_atom(atom),  do: Atom.to_string(atom)
   1135     defp ecto_to_db(type, _query) do
   1136       raise ArgumentError,
   1137             "unsupported type `#{inspect(type)}`. The type can either be an atom, a string " <>
   1138               "or a tuple of the form `{:map, t}` where `t` itself follows the same conditions."
   1139     end
   1140 
   1141     defp error!(nil, message) do
   1142       raise ArgumentError, message
   1143     end
   1144     defp error!(query, message) do
   1145       raise Ecto.QueryError, query: query, message: message
   1146     end
   1147   end
   1148 end