zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

myxql.ex (14650B)


      1 defmodule Ecto.Adapters.MyXQL do
      2   @moduledoc """
      3   Adapter module for MySQL.
      4 
      5   It uses `MyXQL` for communicating to the database.
      6 
      7   ## Options
      8 
      9   MySQL options split in different categories described
     10   below. All options can be given via the repository
     11   configuration:
     12 
     13   ### Connection options
     14 
     15     * `:protocol` - Set to `:socket` for using UNIX domain socket, or `:tcp` for TCP
     16       (default: `:socket`)
     17     * `:socket` - Connect to MySQL via UNIX sockets in the given path.
     18     * `:hostname` - Server hostname
     19     * `:port` - Server port (default: 3306)
     20     * `:username` - Username
     21     * `:password` - User password
     22     * `:database` - the database to connect to
     23     * `:pool` - The connection pool module, may be set to `Ecto.Adapters.SQL.Sandbox`
     24     * `:ssl` - Set to true if ssl should be used (default: false)
     25     * `:ssl_opts` - A list of ssl options, see Erlang's `ssl` docs
     26     * `:connect_timeout` - The timeout for establishing new connections (default: 5000)
     27     * `:cli_protocol` - The protocol used for the mysql client connection (default: `"tcp"`).
     28       This option is only used for `mix ecto.load` and `mix ecto.dump`,
     29       via the `mysql` command. For more information, please check
     30       [MySQL docs](https://dev.mysql.com/doc/en/connecting.html)
     31     * `:socket_options` - Specifies socket configuration
     32     * `:show_sensitive_data_on_connection_error` - show connection data and
     33       configuration whenever there is an error attempting to connect to the
     34       database
     35 
     36   The `:socket_options` are particularly useful when configuring the size
     37   of both send and receive buffers. For example, when Ecto starts with a
     38   pool of 20 connections, the memory usage may quickly grow from 20MB to
     39   50MB based on the operating system default values for TCP buffers. It is
     40   advised to stick with the operating system defaults but they can be
     41   tweaked if desired:
     42 
     43       socket_options: [recbuf: 8192, sndbuf: 8192]
     44 
     45   We also recommend developers to consult the `MyXQL.start_link/1` documentation
     46   for a complete listing of all supported options.
     47 
     48   ### Storage options
     49 
     50     * `:charset` - the database encoding (default: "utf8mb4")
     51     * `:collation` - the collation order
     52     * `:dump_path` - where to place dumped structures
     53 
     54   ### After connect callback
     55 
     56   If you want to execute a callback as soon as connection is established
     57   to the database, you can use the `:after_connect` configuration. For
     58   example, in your repository configuration you can add:
     59 
     60       after_connect: {MyXQL, :query!, ["SET variable = value", []]}
     61 
     62   You can also specify your own module that will receive the MyXQL
     63   connection as argument.
     64 
     65   ## Limitations
     66 
     67   There are some limitations when using Ecto with MySQL that one
     68   needs to be aware of.
     69 
     70   ### Engine
     71 
     72   Tables created by Ecto are guaranteed to use InnoDB, regardless
     73   of the MySQL version.
     74 
     75   ### UUIDs
     76 
     77   MySQL does not support UUID types. Ecto emulates them by using
     78   `binary(16)`.
     79 
     80   ### Read after writes
     81 
     82   Because MySQL does not support RETURNING clauses in INSERT and
     83   UPDATE, it does not support the `:read_after_writes` option of
     84   `Ecto.Schema.field/3`.
     85 
     86   ### DDL Transaction
     87 
     88   MySQL does not support migrations inside transactions as it
     89   automatically commits after some commands like CREATE TABLE.
     90   Therefore MySQL migrations does not run inside transactions.
     91 
     92   ## Old MySQL versions
     93 
     94   ### JSON support
     95 
     96   MySQL introduced a native JSON type in v5.7.8, if your server is
     97   using this version or higher, you may use `:map` type for your
     98   column in migration:
     99 
    100       add :some_field, :map
    101 
    102   If you're using older server versions, use a `TEXT` field instead:
    103 
    104       add :some_field, :text
    105 
    106   in either case, the adapter will automatically encode/decode the
    107   value from JSON.
    108 
    109   ### usec in datetime
    110 
    111   Old MySQL versions did not support usec in datetime while
    112   more recent versions would round or truncate the usec value.
    113 
    114   Therefore, in case the user decides to use microseconds in
    115   datetimes and timestamps with MySQL, be aware of such
    116   differences and consult the documentation for your MySQL
    117   version.
    118 
    119   If your version of MySQL supports microsecond precision, you
    120   will be able to utilize Ecto's usec types.
    121 
    122   ## Multiple Result Support
    123 
    124   MyXQL supports the execution of queries that return multiple
    125   results, such as text queries with multiple statements separated
    126   by semicolons or stored procedures. These can be executed with
    127   `Ecto.Adapters.SQL.query_many/4` or the `YourRepo.query_many/3`
    128   shortcut.
    129 
    130   Be default, these queries will be executed with the `:query_type`
    131   option set to `:text`. To take advantage of prepared statements
    132   when executing a stored procedure, set the `:query_type` option
    133   to `:binary`.
    134   """
    135 
    136   # Inherit all behaviour from Ecto.Adapters.SQL
    137   use Ecto.Adapters.SQL, driver: :myxql
    138 
    139   # And provide a custom storage implementation
    140   @behaviour Ecto.Adapter.Storage
    141   @behaviour Ecto.Adapter.Structure
    142 
    143   ## Custom MySQL types
    144 
    145   @impl true
    146   def loaders({:map, _}, type),   do: [&json_decode/1, &Ecto.Type.embedded_load(type, &1, :json)]
    147   def loaders(:map, type),        do: [&json_decode/1, type]
    148   def loaders(:float, type),      do: [&float_decode/1, type]
    149   def loaders(:boolean, type),    do: [&bool_decode/1, type]
    150   def loaders(:binary_id, type),  do: [Ecto.UUID, type]
    151   def loaders(_, type),           do: [type]
    152 
    153   defp bool_decode(<<0>>), do: {:ok, false}
    154   defp bool_decode(<<1>>), do: {:ok, true}
    155   defp bool_decode(<<0::size(1)>>), do: {:ok, false}
    156   defp bool_decode(<<1::size(1)>>), do: {:ok, true}
    157   defp bool_decode(0), do: {:ok, false}
    158   defp bool_decode(1), do: {:ok, true}
    159   defp bool_decode(x), do: {:ok, x}
    160 
    161   defp float_decode(%Decimal{} = decimal), do: {:ok, Decimal.to_float(decimal)}
    162   defp float_decode(x), do: {:ok, x}
    163 
    164   defp json_decode(x) when is_binary(x), do: {:ok, MyXQL.json_library().decode!(x)}
    165   defp json_decode(x), do: {:ok, x}
    166 
    167   ## Storage API
    168 
    169   @impl true
    170   def storage_up(opts) do
    171     database = Keyword.fetch!(opts, :database) || raise ":database is nil in repository configuration"
    172     opts = Keyword.delete(opts, :database)
    173     charset = opts[:charset] || "utf8mb4"
    174 
    175     check_existence_command = "SELECT TRUE FROM information_schema.schemata WHERE schema_name = '#{database}'"
    176     case run_query(check_existence_command, opts) do
    177       {:ok, %{num_rows: 1}} ->
    178         {:error, :already_up}
    179       _ ->
    180         create_command =
    181           ~s(CREATE DATABASE `#{database}` DEFAULT CHARACTER SET = #{charset})
    182           |> concat_if(opts[:collation], &"DEFAULT COLLATE = #{&1}")
    183 
    184         case run_query(create_command, opts) do
    185           {:ok, _} ->
    186             :ok
    187           {:error, %{mysql: %{name: :ER_DB_CREATE_EXISTS}}} ->
    188             {:error, :already_up}
    189           {:error, error} ->
    190             {:error, Exception.message(error)}
    191           {:exit, exit} ->
    192             {:error, exit_to_exception(exit)}
    193         end
    194     end
    195   end
    196 
    197   defp concat_if(content, nil, _fun),  do: content
    198   defp concat_if(content, value, fun), do: content <> " " <> fun.(value)
    199 
    200   @impl true
    201   def storage_down(opts) do
    202     database = Keyword.fetch!(opts, :database) || raise ":database is nil in repository configuration"
    203     opts = Keyword.delete(opts, :database)
    204     command = "DROP DATABASE `#{database}`"
    205 
    206     case run_query(command, opts) do
    207       {:ok, _} ->
    208         :ok
    209       {:error, %{mysql: %{name: :ER_DB_DROP_EXISTS}}} ->
    210         {:error, :already_down}
    211       {:error, %{mysql: %{name: :ER_BAD_DB_ERROR}}} ->
    212         {:error, :already_down}
    213       {:exit, :killed} ->
    214         {:error, :already_down}
    215       {:exit, exit} ->
    216         {:error, exit_to_exception(exit)}
    217     end
    218   end
    219 
    220   @impl Ecto.Adapter.Storage
    221   def storage_status(opts) do
    222     database = Keyword.fetch!(opts, :database) || raise ":database is nil in repository configuration"
    223     opts = Keyword.delete(opts, :database)
    224 
    225     check_database_query = "SELECT schema_name FROM information_schema.schemata WHERE schema_name = '#{database}'"
    226 
    227     case run_query(check_database_query, opts) do
    228       {:ok, %{num_rows: 0}} -> :down
    229       {:ok, %{num_rows: _num_rows}} -> :up
    230       other -> {:error, other}
    231     end
    232   end
    233 
    234   @impl true
    235   def supports_ddl_transaction? do
    236     false
    237   end
    238 
    239   @impl true
    240   def lock_for_migrations(meta, opts, fun) do
    241     %{opts: adapter_opts, repo: repo} = meta
    242 
    243     if Keyword.fetch(adapter_opts, :pool_size) == {:ok, 1} do
    244       Ecto.Adapters.SQL.raise_migration_pool_size_error()
    245     end
    246 
    247     opts = Keyword.merge(opts, [timeout: :infinity, telemetry_options: [schema_migration: true]])
    248 
    249     {:ok, result} =
    250       transaction(meta, opts, fn ->
    251         lock_name = "\"ecto_#{inspect(repo)}\""
    252 
    253         try do
    254           {:ok, _} = Ecto.Adapters.SQL.query(meta, "SELECT GET_LOCK(#{lock_name}, -1)", [], opts)
    255           fun.()
    256         after
    257           {:ok, _} = Ecto.Adapters.SQL.query(meta, "SELECT RELEASE_LOCK(#{lock_name})", [], opts)
    258         end
    259       end)
    260 
    261     result
    262   end
    263 
    264   @impl true
    265   def insert(adapter_meta, schema_meta, params, on_conflict, returning, opts) do
    266     %{source: source, prefix: prefix} = schema_meta
    267     {_, query_params, _} = on_conflict
    268 
    269     key = primary_key!(schema_meta, returning)
    270     {fields, values} = :lists.unzip(params)
    271     sql = @conn.insert(prefix, source, fields, [fields], on_conflict, [], [])
    272     opts = if is_nil(Keyword.get(opts, :cache_statement)) do
    273       [{:cache_statement, "ecto_insert_#{source}_#{length(fields)}"} | opts]
    274     else
    275       opts
    276     end
    277 
    278     case Ecto.Adapters.SQL.query(adapter_meta, sql, values ++ query_params, opts) do
    279       {:ok, %{num_rows: 1, last_insert_id: last_insert_id}} ->
    280         {:ok, last_insert_id(key, last_insert_id)}
    281 
    282       {:ok, %{num_rows: 2, last_insert_id: last_insert_id}} ->
    283         {:ok, last_insert_id(key, last_insert_id)}
    284 
    285       {:error, err} ->
    286         case @conn.to_constraints(err, source: source) do
    287           []          -> raise err
    288           constraints -> {:invalid, constraints}
    289         end
    290     end
    291   end
    292 
    293   defp primary_key!(%{autogenerate_id: {_, key, _type}}, [key]), do: key
    294   defp primary_key!(_, []), do: nil
    295   defp primary_key!(%{schema: schema}, returning) do
    296     raise ArgumentError, "MySQL does not support :read_after_writes in schemas for non-primary keys. " <>
    297                          "The following fields in #{inspect schema} are tagged as such: #{inspect returning}"
    298   end
    299 
    300   defp last_insert_id(nil, _last_insert_id), do: []
    301   defp last_insert_id(_key, 0), do: []
    302   defp last_insert_id(key, last_insert_id), do: [{key, last_insert_id}]
    303 
    304   @impl true
    305   def structure_dump(default, config) do
    306     table = config[:migration_source] || "schema_migrations"
    307     path  = config[:dump_path] || Path.join(default, "structure.sql")
    308 
    309     with {:ok, versions} <- select_versions(table, config),
    310          {:ok, contents} <- mysql_dump(config),
    311          {:ok, contents} <- append_versions(table, versions, contents) do
    312       File.mkdir_p!(Path.dirname(path))
    313       File.write!(path, contents)
    314       {:ok, path}
    315     end
    316   end
    317 
    318   defp select_versions(table, config) do
    319     case run_query(~s[SELECT version FROM `#{table}` ORDER BY version], config) do
    320       {:ok, %{rows: rows}} -> {:ok, Enum.map(rows, &hd/1)}
    321       {:error, %{mysql: %{name: :ER_NO_SUCH_TABLE}}} -> {:ok, []}
    322       {:error, _} = error -> error
    323       {:exit, exit} -> {:error, exit_to_exception(exit)}
    324     end
    325   end
    326 
    327   defp mysql_dump(config) do
    328     case run_with_cmd("mysqldump", config, ["--no-data", "--routines", config[:database]]) do
    329       {output, 0} -> {:ok, output}
    330       {output, _} -> {:error, output}
    331     end
    332   end
    333 
    334   defp append_versions(_table, [], contents) do
    335     {:ok, contents}
    336   end
    337   defp append_versions(table, versions, contents) do
    338     {:ok,
    339       contents <>
    340       Enum.map_join(versions, &~s[INSERT INTO `#{table}` (version) VALUES (#{&1});\n])}
    341   end
    342 
    343   @impl true
    344   def structure_load(default, config) do
    345     path = config[:dump_path] || Path.join(default, "structure.sql")
    346 
    347     args = ["--execute", "SET FOREIGN_KEY_CHECKS = 0; SOURCE #{path}; SET FOREIGN_KEY_CHECKS = 1"]
    348 
    349     case run_with_cmd("mysql", config, args) do
    350       {_output, 0} -> {:ok, path}
    351       {output, _}  -> {:error, output}
    352     end
    353   end
    354 
    355   @impl true
    356   def dump_cmd(args, opts \\ [], config) when is_list(config) and is_list(args),
    357     do: run_with_cmd("mysqldump", config, args, opts)
    358 
    359   ## Helpers
    360 
    361   defp run_query(sql, opts) do
    362     {:ok, _} = Application.ensure_all_started(:ecto_sql)
    363     {:ok, _} = Application.ensure_all_started(:myxql)
    364 
    365     opts =
    366       opts
    367       |> Keyword.drop([:name, :log, :pool, :pool_size])
    368       |> Keyword.put(:backoff_type, :stop)
    369       |> Keyword.put(:max_restarts, 0)
    370 
    371     task = Task.Supervisor.async_nolink(Ecto.Adapters.SQL.StorageSupervisor, fn ->
    372       {:ok, conn} = MyXQL.start_link(opts)
    373 
    374       value = MyXQL.query(conn, sql, [], opts)
    375       GenServer.stop(conn)
    376       value
    377     end)
    378 
    379     timeout = Keyword.get(opts, :timeout, 15_000)
    380 
    381     case Task.yield(task, timeout) || Task.shutdown(task) do
    382       {:ok, {:ok, result}} ->
    383         {:ok, result}
    384       {:ok, {:error, error}} ->
    385         {:error, error}
    386       {:exit, exit} ->
    387         {:exit, exit}
    388       nil ->
    389         {:error, RuntimeError.exception("command timed out")}
    390     end
    391   end
    392 
    393   defp exit_to_exception({%{__struct__: struct} = error, _})
    394        when struct in [MyXQL.Error, DBConnection.Error],
    395        do: error
    396 
    397   defp exit_to_exception(reason), do: RuntimeError.exception(Exception.format_exit(reason))
    398 
    399   defp run_with_cmd(cmd, opts, opt_args, cmd_opts \\ []) do
    400     unless System.find_executable(cmd) do
    401       raise "could not find executable `#{cmd}` in path, " <>
    402             "please guarantee it is available before running ecto commands"
    403     end
    404 
    405     env =
    406       if password = opts[:password] do
    407         [{"MYSQL_PWD", password}]
    408       else
    409         []
    410       end
    411 
    412     host     = opts[:hostname] || System.get_env("MYSQL_HOST") || "localhost"
    413     port     = opts[:port] || System.get_env("MYSQL_TCP_PORT") || "3306"
    414     protocol = opts[:cli_protocol] || System.get_env("MYSQL_CLI_PROTOCOL") || "tcp"
    415 
    416     user_args =
    417       if username = opts[:username] do
    418         ["--user", username]
    419       else
    420         []
    421       end
    422 
    423     database_args =
    424       if database = opts[:database] do
    425         ["--database", database]
    426       else
    427         []
    428       end
    429 
    430     args =
    431       [
    432         "--host", host,
    433         "--port", to_string(port),
    434         "--protocol", protocol
    435       ] ++ user_args ++ database_args ++ opt_args
    436 
    437     cmd_opts =
    438       cmd_opts
    439       |> Keyword.put_new(:stderr_to_stdout, true)
    440       |> Keyword.update(:env, env, &Enum.concat(env, &1))
    441 
    442 
    443     System.cmd(cmd, args, cmd_opts)
    444   end
    445 end