myxql.ex (14650B)
1 defmodule Ecto.Adapters.MyXQL do 2 @moduledoc """ 3 Adapter module for MySQL. 4 5 It uses `MyXQL` for communicating to the database. 6 7 ## Options 8 9 MySQL options split in different categories described 10 below. All options can be given via the repository 11 configuration: 12 13 ### Connection options 14 15 * `:protocol` - Set to `:socket` for using UNIX domain socket, or `:tcp` for TCP 16 (default: `:socket`) 17 * `:socket` - Connect to MySQL via UNIX sockets in the given path. 18 * `:hostname` - Server hostname 19 * `:port` - Server port (default: 3306) 20 * `:username` - Username 21 * `:password` - User password 22 * `:database` - the database to connect to 23 * `:pool` - The connection pool module, may be set to `Ecto.Adapters.SQL.Sandbox` 24 * `:ssl` - Set to true if ssl should be used (default: false) 25 * `:ssl_opts` - A list of ssl options, see Erlang's `ssl` docs 26 * `:connect_timeout` - The timeout for establishing new connections (default: 5000) 27 * `:cli_protocol` - The protocol used for the mysql client connection (default: `"tcp"`). 28 This option is only used for `mix ecto.load` and `mix ecto.dump`, 29 via the `mysql` command. For more information, please check 30 [MySQL docs](https://dev.mysql.com/doc/en/connecting.html) 31 * `:socket_options` - Specifies socket configuration 32 * `:show_sensitive_data_on_connection_error` - show connection data and 33 configuration whenever there is an error attempting to connect to the 34 database 35 36 The `:socket_options` are particularly useful when configuring the size 37 of both send and receive buffers. For example, when Ecto starts with a 38 pool of 20 connections, the memory usage may quickly grow from 20MB to 39 50MB based on the operating system default values for TCP buffers. It is 40 advised to stick with the operating system defaults but they can be 41 tweaked if desired: 42 43 socket_options: [recbuf: 8192, sndbuf: 8192] 44 45 We also recommend developers to consult the `MyXQL.start_link/1` documentation 46 for a complete listing of all supported options. 47 48 ### Storage options 49 50 * `:charset` - the database encoding (default: "utf8mb4") 51 * `:collation` - the collation order 52 * `:dump_path` - where to place dumped structures 53 54 ### After connect callback 55 56 If you want to execute a callback as soon as connection is established 57 to the database, you can use the `:after_connect` configuration. For 58 example, in your repository configuration you can add: 59 60 after_connect: {MyXQL, :query!, ["SET variable = value", []]} 61 62 You can also specify your own module that will receive the MyXQL 63 connection as argument. 64 65 ## Limitations 66 67 There are some limitations when using Ecto with MySQL that one 68 needs to be aware of. 69 70 ### Engine 71 72 Tables created by Ecto are guaranteed to use InnoDB, regardless 73 of the MySQL version. 74 75 ### UUIDs 76 77 MySQL does not support UUID types. Ecto emulates them by using 78 `binary(16)`. 79 80 ### Read after writes 81 82 Because MySQL does not support RETURNING clauses in INSERT and 83 UPDATE, it does not support the `:read_after_writes` option of 84 `Ecto.Schema.field/3`. 85 86 ### DDL Transaction 87 88 MySQL does not support migrations inside transactions as it 89 automatically commits after some commands like CREATE TABLE. 90 Therefore MySQL migrations does not run inside transactions. 91 92 ## Old MySQL versions 93 94 ### JSON support 95 96 MySQL introduced a native JSON type in v5.7.8, if your server is 97 using this version or higher, you may use `:map` type for your 98 column in migration: 99 100 add :some_field, :map 101 102 If you're using older server versions, use a `TEXT` field instead: 103 104 add :some_field, :text 105 106 in either case, the adapter will automatically encode/decode the 107 value from JSON. 108 109 ### usec in datetime 110 111 Old MySQL versions did not support usec in datetime while 112 more recent versions would round or truncate the usec value. 113 114 Therefore, in case the user decides to use microseconds in 115 datetimes and timestamps with MySQL, be aware of such 116 differences and consult the documentation for your MySQL 117 version. 118 119 If your version of MySQL supports microsecond precision, you 120 will be able to utilize Ecto's usec types. 121 122 ## Multiple Result Support 123 124 MyXQL supports the execution of queries that return multiple 125 results, such as text queries with multiple statements separated 126 by semicolons or stored procedures. These can be executed with 127 `Ecto.Adapters.SQL.query_many/4` or the `YourRepo.query_many/3` 128 shortcut. 129 130 Be default, these queries will be executed with the `:query_type` 131 option set to `:text`. To take advantage of prepared statements 132 when executing a stored procedure, set the `:query_type` option 133 to `:binary`. 134 """ 135 136 # Inherit all behaviour from Ecto.Adapters.SQL 137 use Ecto.Adapters.SQL, driver: :myxql 138 139 # And provide a custom storage implementation 140 @behaviour Ecto.Adapter.Storage 141 @behaviour Ecto.Adapter.Structure 142 143 ## Custom MySQL types 144 145 @impl true 146 def loaders({:map, _}, type), do: [&json_decode/1, &Ecto.Type.embedded_load(type, &1, :json)] 147 def loaders(:map, type), do: [&json_decode/1, type] 148 def loaders(:float, type), do: [&float_decode/1, type] 149 def loaders(:boolean, type), do: [&bool_decode/1, type] 150 def loaders(:binary_id, type), do: [Ecto.UUID, type] 151 def loaders(_, type), do: [type] 152 153 defp bool_decode(<<0>>), do: {:ok, false} 154 defp bool_decode(<<1>>), do: {:ok, true} 155 defp bool_decode(<<0::size(1)>>), do: {:ok, false} 156 defp bool_decode(<<1::size(1)>>), do: {:ok, true} 157 defp bool_decode(0), do: {:ok, false} 158 defp bool_decode(1), do: {:ok, true} 159 defp bool_decode(x), do: {:ok, x} 160 161 defp float_decode(%Decimal{} = decimal), do: {:ok, Decimal.to_float(decimal)} 162 defp float_decode(x), do: {:ok, x} 163 164 defp json_decode(x) when is_binary(x), do: {:ok, MyXQL.json_library().decode!(x)} 165 defp json_decode(x), do: {:ok, x} 166 167 ## Storage API 168 169 @impl true 170 def storage_up(opts) do 171 database = Keyword.fetch!(opts, :database) || raise ":database is nil in repository configuration" 172 opts = Keyword.delete(opts, :database) 173 charset = opts[:charset] || "utf8mb4" 174 175 check_existence_command = "SELECT TRUE FROM information_schema.schemata WHERE schema_name = '#{database}'" 176 case run_query(check_existence_command, opts) do 177 {:ok, %{num_rows: 1}} -> 178 {:error, :already_up} 179 _ -> 180 create_command = 181 ~s(CREATE DATABASE `#{database}` DEFAULT CHARACTER SET = #{charset}) 182 |> concat_if(opts[:collation], &"DEFAULT COLLATE = #{&1}") 183 184 case run_query(create_command, opts) do 185 {:ok, _} -> 186 :ok 187 {:error, %{mysql: %{name: :ER_DB_CREATE_EXISTS}}} -> 188 {:error, :already_up} 189 {:error, error} -> 190 {:error, Exception.message(error)} 191 {:exit, exit} -> 192 {:error, exit_to_exception(exit)} 193 end 194 end 195 end 196 197 defp concat_if(content, nil, _fun), do: content 198 defp concat_if(content, value, fun), do: content <> " " <> fun.(value) 199 200 @impl true 201 def storage_down(opts) do 202 database = Keyword.fetch!(opts, :database) || raise ":database is nil in repository configuration" 203 opts = Keyword.delete(opts, :database) 204 command = "DROP DATABASE `#{database}`" 205 206 case run_query(command, opts) do 207 {:ok, _} -> 208 :ok 209 {:error, %{mysql: %{name: :ER_DB_DROP_EXISTS}}} -> 210 {:error, :already_down} 211 {:error, %{mysql: %{name: :ER_BAD_DB_ERROR}}} -> 212 {:error, :already_down} 213 {:exit, :killed} -> 214 {:error, :already_down} 215 {:exit, exit} -> 216 {:error, exit_to_exception(exit)} 217 end 218 end 219 220 @impl Ecto.Adapter.Storage 221 def storage_status(opts) do 222 database = Keyword.fetch!(opts, :database) || raise ":database is nil in repository configuration" 223 opts = Keyword.delete(opts, :database) 224 225 check_database_query = "SELECT schema_name FROM information_schema.schemata WHERE schema_name = '#{database}'" 226 227 case run_query(check_database_query, opts) do 228 {:ok, %{num_rows: 0}} -> :down 229 {:ok, %{num_rows: _num_rows}} -> :up 230 other -> {:error, other} 231 end 232 end 233 234 @impl true 235 def supports_ddl_transaction? do 236 false 237 end 238 239 @impl true 240 def lock_for_migrations(meta, opts, fun) do 241 %{opts: adapter_opts, repo: repo} = meta 242 243 if Keyword.fetch(adapter_opts, :pool_size) == {:ok, 1} do 244 Ecto.Adapters.SQL.raise_migration_pool_size_error() 245 end 246 247 opts = Keyword.merge(opts, [timeout: :infinity, telemetry_options: [schema_migration: true]]) 248 249 {:ok, result} = 250 transaction(meta, opts, fn -> 251 lock_name = "\"ecto_#{inspect(repo)}\"" 252 253 try do 254 {:ok, _} = Ecto.Adapters.SQL.query(meta, "SELECT GET_LOCK(#{lock_name}, -1)", [], opts) 255 fun.() 256 after 257 {:ok, _} = Ecto.Adapters.SQL.query(meta, "SELECT RELEASE_LOCK(#{lock_name})", [], opts) 258 end 259 end) 260 261 result 262 end 263 264 @impl true 265 def insert(adapter_meta, schema_meta, params, on_conflict, returning, opts) do 266 %{source: source, prefix: prefix} = schema_meta 267 {_, query_params, _} = on_conflict 268 269 key = primary_key!(schema_meta, returning) 270 {fields, values} = :lists.unzip(params) 271 sql = @conn.insert(prefix, source, fields, [fields], on_conflict, [], []) 272 opts = if is_nil(Keyword.get(opts, :cache_statement)) do 273 [{:cache_statement, "ecto_insert_#{source}_#{length(fields)}"} | opts] 274 else 275 opts 276 end 277 278 case Ecto.Adapters.SQL.query(adapter_meta, sql, values ++ query_params, opts) do 279 {:ok, %{num_rows: 1, last_insert_id: last_insert_id}} -> 280 {:ok, last_insert_id(key, last_insert_id)} 281 282 {:ok, %{num_rows: 2, last_insert_id: last_insert_id}} -> 283 {:ok, last_insert_id(key, last_insert_id)} 284 285 {:error, err} -> 286 case @conn.to_constraints(err, source: source) do 287 [] -> raise err 288 constraints -> {:invalid, constraints} 289 end 290 end 291 end 292 293 defp primary_key!(%{autogenerate_id: {_, key, _type}}, [key]), do: key 294 defp primary_key!(_, []), do: nil 295 defp primary_key!(%{schema: schema}, returning) do 296 raise ArgumentError, "MySQL does not support :read_after_writes in schemas for non-primary keys. " <> 297 "The following fields in #{inspect schema} are tagged as such: #{inspect returning}" 298 end 299 300 defp last_insert_id(nil, _last_insert_id), do: [] 301 defp last_insert_id(_key, 0), do: [] 302 defp last_insert_id(key, last_insert_id), do: [{key, last_insert_id}] 303 304 @impl true 305 def structure_dump(default, config) do 306 table = config[:migration_source] || "schema_migrations" 307 path = config[:dump_path] || Path.join(default, "structure.sql") 308 309 with {:ok, versions} <- select_versions(table, config), 310 {:ok, contents} <- mysql_dump(config), 311 {:ok, contents} <- append_versions(table, versions, contents) do 312 File.mkdir_p!(Path.dirname(path)) 313 File.write!(path, contents) 314 {:ok, path} 315 end 316 end 317 318 defp select_versions(table, config) do 319 case run_query(~s[SELECT version FROM `#{table}` ORDER BY version], config) do 320 {:ok, %{rows: rows}} -> {:ok, Enum.map(rows, &hd/1)} 321 {:error, %{mysql: %{name: :ER_NO_SUCH_TABLE}}} -> {:ok, []} 322 {:error, _} = error -> error 323 {:exit, exit} -> {:error, exit_to_exception(exit)} 324 end 325 end 326 327 defp mysql_dump(config) do 328 case run_with_cmd("mysqldump", config, ["--no-data", "--routines", config[:database]]) do 329 {output, 0} -> {:ok, output} 330 {output, _} -> {:error, output} 331 end 332 end 333 334 defp append_versions(_table, [], contents) do 335 {:ok, contents} 336 end 337 defp append_versions(table, versions, contents) do 338 {:ok, 339 contents <> 340 Enum.map_join(versions, &~s[INSERT INTO `#{table}` (version) VALUES (#{&1});\n])} 341 end 342 343 @impl true 344 def structure_load(default, config) do 345 path = config[:dump_path] || Path.join(default, "structure.sql") 346 347 args = ["--execute", "SET FOREIGN_KEY_CHECKS = 0; SOURCE #{path}; SET FOREIGN_KEY_CHECKS = 1"] 348 349 case run_with_cmd("mysql", config, args) do 350 {_output, 0} -> {:ok, path} 351 {output, _} -> {:error, output} 352 end 353 end 354 355 @impl true 356 def dump_cmd(args, opts \\ [], config) when is_list(config) and is_list(args), 357 do: run_with_cmd("mysqldump", config, args, opts) 358 359 ## Helpers 360 361 defp run_query(sql, opts) do 362 {:ok, _} = Application.ensure_all_started(:ecto_sql) 363 {:ok, _} = Application.ensure_all_started(:myxql) 364 365 opts = 366 opts 367 |> Keyword.drop([:name, :log, :pool, :pool_size]) 368 |> Keyword.put(:backoff_type, :stop) 369 |> Keyword.put(:max_restarts, 0) 370 371 task = Task.Supervisor.async_nolink(Ecto.Adapters.SQL.StorageSupervisor, fn -> 372 {:ok, conn} = MyXQL.start_link(opts) 373 374 value = MyXQL.query(conn, sql, [], opts) 375 GenServer.stop(conn) 376 value 377 end) 378 379 timeout = Keyword.get(opts, :timeout, 15_000) 380 381 case Task.yield(task, timeout) || Task.shutdown(task) do 382 {:ok, {:ok, result}} -> 383 {:ok, result} 384 {:ok, {:error, error}} -> 385 {:error, error} 386 {:exit, exit} -> 387 {:exit, exit} 388 nil -> 389 {:error, RuntimeError.exception("command timed out")} 390 end 391 end 392 393 defp exit_to_exception({%{__struct__: struct} = error, _}) 394 when struct in [MyXQL.Error, DBConnection.Error], 395 do: error 396 397 defp exit_to_exception(reason), do: RuntimeError.exception(Exception.format_exit(reason)) 398 399 defp run_with_cmd(cmd, opts, opt_args, cmd_opts \\ []) do 400 unless System.find_executable(cmd) do 401 raise "could not find executable `#{cmd}` in path, " <> 402 "please guarantee it is available before running ecto commands" 403 end 404 405 env = 406 if password = opts[:password] do 407 [{"MYSQL_PWD", password}] 408 else 409 [] 410 end 411 412 host = opts[:hostname] || System.get_env("MYSQL_HOST") || "localhost" 413 port = opts[:port] || System.get_env("MYSQL_TCP_PORT") || "3306" 414 protocol = opts[:cli_protocol] || System.get_env("MYSQL_CLI_PROTOCOL") || "tcp" 415 416 user_args = 417 if username = opts[:username] do 418 ["--user", username] 419 else 420 [] 421 end 422 423 database_args = 424 if database = opts[:database] do 425 ["--database", database] 426 else 427 [] 428 end 429 430 args = 431 [ 432 "--host", host, 433 "--port", to_string(port), 434 "--protocol", protocol 435 ] ++ user_args ++ database_args ++ opt_args 436 437 cmd_opts = 438 cmd_opts 439 |> Keyword.put_new(:stderr_to_stdout, true) 440 |> Keyword.update(:env, env, &Enum.concat(env, &1)) 441 442 443 System.cmd(cmd, args, cmd_opts) 444 end 445 end