runner.ex (14748B)
1 defmodule Ecto.Migration.Runner do 2 @moduledoc false 3 use Agent, restart: :temporary 4 5 require Logger 6 7 alias Ecto.Migration.Table 8 alias Ecto.Migration.Index 9 alias Ecto.Migration.Constraint 10 alias Ecto.Migration.Command 11 12 @doc """ 13 Runs the given migration. 14 """ 15 def run(repo, config, version, module, direction, operation, migrator_direction, opts) do 16 level = Keyword.get(opts, :log, :info) 17 sql = Keyword.get(opts, :log_migrations_sql, false) 18 log = %{level: level, sql: sql} 19 args = {self(), repo, config, module, direction, migrator_direction, log} 20 21 {:ok, runner} = DynamicSupervisor.start_child(Ecto.MigratorSupervisor, {__MODULE__, args}) 22 metadata(runner, opts) 23 24 log(level, "== Running #{version} #{inspect module}.#{operation}/0 #{direction}") 25 {time, _} = :timer.tc(fn -> perform_operation(repo, module, operation) end) 26 log(level, "== Migrated #{version} in #{inspect(div(time, 100_000) / 10)}s") 27 28 stop() 29 end 30 31 @doc """ 32 Stores the runner metadata. 33 """ 34 def metadata(runner, opts) do 35 prefix = opts[:prefix] 36 Process.put(:ecto_migration, %{runner: runner, prefix: prefix && to_string(prefix)}) 37 end 38 39 @doc """ 40 Starts the runner for the specified repo. 41 """ 42 def start_link({parent, repo, config, module, direction, migrator_direction, log}) do 43 Agent.start_link(fn -> 44 Process.link(parent) 45 46 %{ 47 direction: direction, 48 repo: repo, 49 migration: module, 50 migrator_direction: migrator_direction, 51 command: nil, 52 subcommands: [], 53 log: log, 54 commands: [], 55 config: config 56 } 57 end) 58 end 59 60 @doc """ 61 Stops the runner. 62 """ 63 def stop() do 64 Agent.stop(runner()) 65 end 66 67 @doc """ 68 Accesses the given repository configuration. 69 """ 70 def repo_config(key, default) do 71 Agent.get(runner(), &Keyword.get(&1.config, key, default)) 72 end 73 74 @doc """ 75 Returns the migrator command (up or down). 76 77 * forward + up: up 78 * forward + down: down 79 * forward + change: up 80 * backward + change: down 81 82 """ 83 def migrator_direction do 84 Agent.get(runner(), & &1.migrator_direction) 85 end 86 87 @doc """ 88 Gets the repo for this migration 89 """ 90 def repo do 91 Agent.get(runner(), & &1.repo) 92 end 93 94 @doc """ 95 Gets the prefix for this migration 96 """ 97 def prefix do 98 case Process.get(:ecto_migration) do 99 %{prefix: prefix} -> prefix 100 _ -> raise "could not find migration runner process for #{inspect self()}" 101 end 102 end 103 104 @doc """ 105 Executes queue migration commands. 106 107 Reverses the order commands are executed when doing a rollback 108 on a change/0 function and resets commands queue. 109 """ 110 def flush do 111 %{commands: commands, direction: direction, repo: repo, log: log, migration: migration} = 112 Agent.get_and_update(runner(), fn state -> {state, %{state | commands: []}} end) 113 114 commands = if direction == :backward, do: commands, else: Enum.reverse(commands) 115 116 for command <- commands do 117 execute_in_direction(repo, migration, direction, log, command) 118 end 119 end 120 121 @doc """ 122 Queues command tuples or strings for execution. 123 124 Ecto.MigrationError will be raised when the server 125 is in `:backward` direction and `command` is irreversible. 126 """ 127 def execute(command) do 128 reply = 129 Agent.get_and_update(runner(), fn 130 %{command: nil} = state -> 131 {:ok, %{state | subcommands: [], commands: [command|state.commands]}} 132 %{command: _} = state -> 133 {:error, %{state | command: nil}} 134 end) 135 136 case reply do 137 :ok -> 138 :ok 139 :error -> 140 raise Ecto.MigrationError, "cannot execute nested commands" 141 end 142 end 143 144 @doc """ 145 Starts a command. 146 """ 147 def start_command(command) do 148 reply = 149 Agent.get_and_update(runner(), fn 150 %{command: nil} = state -> 151 {:ok, %{state | command: command}} 152 %{command: _} = state -> 153 {:error, %{state | command: command}} 154 end) 155 156 case reply do 157 :ok -> 158 :ok 159 :error -> 160 raise Ecto.MigrationError, "cannot execute nested commands" 161 end 162 end 163 164 @doc """ 165 Queues and clears current command. Must call `start_command/1` first. 166 """ 167 def end_command do 168 Agent.update runner(), fn state -> 169 {operation, object} = state.command 170 command = {operation, object, Enum.reverse(state.subcommands)} 171 %{state | command: nil, subcommands: [], commands: [command|state.commands]} 172 end 173 end 174 175 @doc """ 176 Adds a subcommand to the current command. Must call `start_command/1` first. 177 """ 178 def subcommand(subcommand) do 179 reply = 180 Agent.get_and_update(runner(), fn 181 %{command: nil} = state -> 182 {:error, state} 183 state -> 184 {:ok, update_in(state.subcommands, &[subcommand|&1])} 185 end) 186 187 case reply do 188 :ok -> 189 :ok 190 :error -> 191 raise Ecto.MigrationError, message: "cannot execute command outside of block" 192 end 193 end 194 195 ## Execute 196 197 defp execute_in_direction(repo, migration, :forward, log, %Command{up: up}) do 198 log_and_execute_ddl(repo, migration, log, up) 199 end 200 201 defp execute_in_direction(repo, migration, :forward, log, command) do 202 log_and_execute_ddl(repo, migration, log, command) 203 end 204 205 defp execute_in_direction(repo, migration, :backward, log, %Command{down: down}) do 206 log_and_execute_ddl(repo, migration, log, down) 207 end 208 209 defp execute_in_direction(repo, migration, :backward, log, command) do 210 if reversed = reverse(command) do 211 log_and_execute_ddl(repo, migration, log, reversed) 212 else 213 raise Ecto.MigrationError, message: 214 "cannot reverse migration command: #{command command}. " <> 215 "You will need to explicitly define up/0 and down/0 in your migration" 216 end 217 end 218 219 defp reverse({:create, %Index{} = index}), 220 do: {:drop, index, :restrict} 221 defp reverse({:create_if_not_exists, %Index{} = index}), 222 do: {:drop_if_exists, index, :restrict} 223 defp reverse({:drop, %Index{} = index, _}), 224 do: {:create, index} 225 defp reverse({:drop_if_exists, %Index{} = index, _}), 226 do: {:create_if_not_exists, index} 227 228 defp reverse({:create, %Table{} = table, _columns}), 229 do: {:drop, table, :restrict} 230 defp reverse({:create_if_not_exists, %Table{} = table, _columns}), 231 do: {:drop_if_exists, table, :restrict} 232 defp reverse({:rename, %Table{} = table_current, %Table{} = table_new}), 233 do: {:rename, table_new, table_current} 234 defp reverse({:rename, %Table{} = table, current_column, new_column}), 235 do: {:rename, table, new_column, current_column} 236 defp reverse({:alter, %Table{} = table, changes}) do 237 if reversed = table_reverse(changes, []) do 238 {:alter, table, reversed} 239 end 240 end 241 242 # It is not a good idea to reverse constraints because 243 # we can't guarantee data integrity when applying them back. 244 defp reverse({:create_if_not_exists, %Constraint{} = constraint}), 245 do: {:drop_if_exists, constraint, :restrict} 246 defp reverse({:create, %Constraint{} = constraint}), 247 do: {:drop, constraint, :restrict} 248 249 defp reverse(_command), do: false 250 251 defp table_reverse([{:remove, name, type, opts}| t], acc) do 252 table_reverse(t, [{:add, name, type, opts} | acc]) 253 end 254 defp table_reverse([{:modify, name, type, opts} | t], acc) do 255 case opts[:from] do 256 nil -> 257 false 258 259 {reverse_type, from_opts} when is_list(from_opts) -> 260 reverse_from = {type, Keyword.delete(opts, :from)} 261 reverse_opts = Keyword.put(from_opts, :from, reverse_from) 262 table_reverse(t, [{:modify, name, reverse_type, reverse_opts} | acc]) 263 264 reverse_type -> 265 reverse_opts = Keyword.put(opts, :from, type) 266 table_reverse(t, [{:modify, name, reverse_type, reverse_opts} | acc]) 267 end 268 end 269 defp table_reverse([{:add, name, _type, _opts} | t], acc) do 270 table_reverse(t, [{:remove, name} | acc]) 271 end 272 defp table_reverse([_ | _], _acc) do 273 false 274 end 275 defp table_reverse([], acc) do 276 acc 277 end 278 279 ## Helpers 280 281 defp perform_operation(repo, module, operation) do 282 if function_exported?(repo, :in_transaction?, 0) and repo.in_transaction?() do 283 if function_exported?(module, :after_begin, 0) do 284 module.after_begin() 285 flush() 286 end 287 288 apply(module, operation, []) 289 flush() 290 291 if function_exported?(module, :before_commit, 0) do 292 module.before_commit() 293 flush() 294 end 295 else 296 apply(module, operation, []) 297 flush() 298 end 299 end 300 301 defp runner do 302 case Process.get(:ecto_migration) do 303 %{runner: runner} -> runner 304 _ -> raise "could not find migration runner process for #{inspect self()}" 305 end 306 end 307 308 defp log_and_execute_ddl(repo, migration, log, {instruction, %Index{} = index}) do 309 maybe_warn_index_ddl_transaction(index, migration) 310 maybe_warn_index_migration_lock(index, repo, migration) 311 log_and_execute_ddl(repo, log, {instruction, index}) 312 end 313 314 defp log_and_execute_ddl(repo, _migration, log, command) do 315 log_and_execute_ddl(repo, log, command) 316 end 317 318 defp log_and_execute_ddl(_repo, _log, func) when is_function(func, 0) do 319 func.() 320 :ok 321 end 322 323 defp log_and_execute_ddl(repo, %{level: level, sql: sql}, command) do 324 log(level, command(command)) 325 meta = Ecto.Adapter.lookup_meta(repo.get_dynamic_repo()) 326 {:ok, logs} = repo.__adapter__().execute_ddl(meta, command, timeout: :infinity, log: sql) 327 328 Enum.each(logs, fn {level, message, metadata} -> 329 log(level, message, metadata) 330 end) 331 332 :ok 333 end 334 335 defp log(level, msg, metadata \\ []) 336 defp log(false, _msg, _metadata), do: :ok 337 defp log(true, msg, metadata), do: Logger.log(:info, msg, metadata) 338 defp log(level, msg, metadata), do: Logger.log(level, msg, metadata) 339 340 defp maybe_warn_index_ddl_transaction(%{concurrently: true} = index, migration) do 341 migration_config = migration.__migration__() 342 343 if not migration_config[:disable_ddl_transaction] do 344 IO.warn """ 345 Migration #{inspect(migration)} has set index `#{index.name}` on table \ 346 `#{index.table}` to concurrently but did not disable ddl transaction. \ 347 Please set: 348 349 use Ecto.Migration 350 @disable_ddl_transaction true 351 """, [] 352 end 353 end 354 defp maybe_warn_index_ddl_transaction(_index, _migration), do: :ok 355 356 defp maybe_warn_index_migration_lock(%{concurrently: true} = index, repo, migration) do 357 migration_lock_disabled = migration.__migration__()[:disable_migration_lock] 358 lock_strategy = repo.config()[:migration_lock] 359 adapter = repo.__adapter__() 360 361 case {migration_lock_disabled, adapter, lock_strategy} do 362 {false, Ecto.Adapters.Postgres, :pg_advisory_lock} -> 363 :ok 364 365 {false, Ecto.Adapters.Postgres, _} -> 366 IO.warn """ 367 Migration #{inspect(migration)} has set index `#{index.name}` on table \ 368 `#{index.table}` to concurrently but did not disable migration lock. \ 369 Please set: 370 371 use Ecto.Migration 372 @disable_migration_lock true 373 374 Alternatively, consider using advisory locks during migrations in the \ 375 repo configuration: 376 377 config #{inspect(repo)}, migration_lock: :pg_advisory_lock 378 """, [] 379 380 {false, _adapter, _migration_lock} -> 381 IO.warn """ 382 Migration #{inspect(migration)} has set index `#{index.name}` on table \ 383 `#{index.table}` to concurrently but did not disable migration lock. \ 384 Please set: 385 386 use Ecto.Migration 387 @disable_migration_lock true 388 """, [] 389 390 _ -> 391 :ok 392 end 393 end 394 defp maybe_warn_index_migration_lock(_index, _repo, _migration), do: :ok 395 396 defp command(ddl) when is_binary(ddl) or is_list(ddl), 397 do: "execute #{inspect ddl}" 398 399 defp command({:create, %Table{} = table, _}), 400 do: "create table #{quote_name(table.prefix, table.name)}" 401 defp command({:create_if_not_exists, %Table{} = table, _}), 402 do: "create table if not exists #{quote_name(table.prefix, table.name)}" 403 defp command({:alter, %Table{} = table, _}), 404 do: "alter table #{quote_name(table.prefix, table.name)}" 405 defp command({:drop, %Table{} = table, mode}), 406 do: "drop table #{quote_name(table.prefix, table.name)}#{drop_mode(mode)}" 407 defp command({:drop_if_exists, %Table{} = table, mode}), 408 do: "drop table if exists #{quote_name(table.prefix, table.name)}#{drop_mode(mode)}" 409 410 defp command({:create, %Index{} = index}), 411 do: "create index #{quote_name(index.prefix, index.name)}" 412 defp command({:create_if_not_exists, %Index{} = index}), 413 do: "create index if not exists #{quote_name(index.prefix, index.name)}" 414 defp command({:drop, %Index{} = index, mode}), 415 do: "drop index #{quote_name(index.prefix, index.name)}#{drop_mode(mode)}" 416 defp command({:drop_if_exists, %Index{} = index, mode}), 417 do: "drop index if exists #{quote_name(index.prefix, index.name)}#{drop_mode(mode)}" 418 defp command({:rename, %Table{} = current_table, %Table{} = new_table}), 419 do: "rename table #{quote_name(current_table.prefix, current_table.name)} to #{quote_name(new_table.prefix, new_table.name)}" 420 defp command({:rename, %Table{} = table, current_column, new_column}), 421 do: "rename column #{current_column} to #{new_column} on table #{quote_name(table.prefix, table.name)}" 422 423 defp command({:create, %Constraint{check: nil, exclude: nil}}), 424 do: raise(ArgumentError, "a constraint must have either a check or exclude option") 425 defp command({:create, %Constraint{check: check, exclude: exclude}}) when is_binary(check) and is_binary(exclude), 426 do: raise(ArgumentError, "a constraint must not have both check and exclude options") 427 defp command({:create, %Constraint{check: check} = constraint}) when is_binary(check), 428 do: "create check constraint #{constraint.name} on table #{quote_name(constraint.prefix, constraint.table)}" 429 defp command({:create, %Constraint{exclude: exclude} = constraint}) when is_binary(exclude), 430 do: "create exclude constraint #{constraint.name} on table #{quote_name(constraint.prefix, constraint.table)}" 431 defp command({:drop, %Constraint{} = constraint, _}), 432 do: "drop constraint #{constraint.name} from table #{quote_name(constraint.prefix, constraint.table)}" 433 defp command({:drop_if_exists, %Constraint{} = constraint, _}), 434 do: "drop constraint if exists #{constraint.name} from table #{quote_name(constraint.prefix, constraint.table)}" 435 436 defp drop_mode(:restrict), do: "" 437 defp drop_mode(:cascade), do: " cascade" 438 439 defp quote_name(nil, name), do: quote_name(name) 440 defp quote_name(prefix, name), do: quote_name(prefix) <> "." <> quote_name(name) 441 defp quote_name(name) when is_atom(name), do: quote_name(Atom.to_string(name)) 442 defp quote_name(name), do: name 443 end