zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

migrator.ex (24894B)


      1 defmodule Ecto.Migrator do
      2   @moduledoc """
      3   Lower level API for managing migrations.
      4 
      5   EctoSQL provides three mix tasks for running and managing migrations:
      6 
      7     * `mix ecto.migrate` - migrates a repository
      8     * `mix ecto.rollback` - rolls back a particular migration
      9     * `mix ecto.migrations` - shows all migrations and their status
     10 
     11   Those tasks are built on top of the functions in this module.
     12   While the tasks above cover most use cases, it may be necessary
     13   from time to time to jump into the lower level API. For example,
     14   if you are assembling an Elixir release, Mix is not available,
     15   so this module provides a nice complement to still migrate your
     16   system.
     17 
     18   To learn more about migrations in general, see `Ecto.Migration`.
     19 
     20   ## Example: Running an individual migration
     21 
     22   Imagine you have this migration:
     23 
     24       defmodule MyApp.MigrationExample do
     25         use Ecto.Migration
     26 
     27         def up do
     28           execute "CREATE TABLE users(id serial PRIMARY_KEY, username text)"
     29         end
     30 
     31         def down do
     32           execute "DROP TABLE users"
     33         end
     34       end
     35 
     36   You can execute it manually with:
     37 
     38       Ecto.Migrator.up(Repo, 20080906120000, MyApp.MigrationExample)
     39 
     40   ## Example: Running migrations in a release
     41 
     42   Elixir v1.9 introduces `mix release`, which generates a self-contained
     43   directory that consists of your application code, all of its dependencies,
     44   plus the whole Erlang Virtual Machine (VM) and runtime.
     45 
     46   When a release is assembled, Mix is no longer available inside a release
     47   and therefore none of the Mix tasks. Users may still need a mechanism to
     48   migrate their databases. This can be achieved with using the `Ecto.Migrator`
     49   module:
     50 
     51       defmodule MyApp.Release do
     52         @app :my_app
     53 
     54         def migrate do
     55           for repo <- repos() do
     56             {:ok, _, _} = Ecto.Migrator.with_repo(repo, &Ecto.Migrator.run(&1, :up, all: true))
     57           end
     58         end
     59 
     60         def rollback(repo, version) do
     61           {:ok, _, _} = Ecto.Migrator.with_repo(repo, &Ecto.Migrator.run(&1, :down, to: version))
     62         end
     63 
     64         defp repos do
     65           Application.load(@app)
     66           Application.fetch_env!(@app, :ecto_repos)
     67         end
     68       end
     69 
     70   The example above uses `with_repo/3` to make sure the repository is
     71   started and then runs all migrations up or a given migration down.
     72   Note you will have to replace `MyApp` and `:my_app` on the first two
     73   lines by your actual application name. Once the file above is added
     74   to your application, you can assemble a new release and invoke the
     75   commands above in the release root like this:
     76 
     77       $ bin/my_app eval "MyApp.Release.migrate"
     78       $ bin/my_app eval "MyApp.Release.rollback(MyApp.Repo, 20190417140000)"
     79 
     80   """
     81 
     82   require Logger
     83   require Ecto.Query
     84 
     85   alias Ecto.Migration.Runner
     86   alias Ecto.Migration.SchemaMigration
     87 
     88   @doc """
     89   Ensures the repo is started to perform migration operations.
     90 
     91   All of the application required to run the repo will be started
     92   before hand with chosen mode. If the repo has not yet been started,
     93   it is manually started, with a `:pool_size` of 2, before the given
     94   function is executed, and the repo is then terminated. If the repo
     95   was already started, then the function is directly executed, without
     96   terminating the repo afterwards.
     97 
     98   Although this function was designed to start repositories for running
     99   migrations, it can be used by any code, Mix task, or release tooling
    100   that needs to briefly start a repository to perform a certain operation
    101   and then terminate.
    102 
    103   The repo may also configure a `:start_apps_before_migration` option
    104   which is a list of applications to be started before the migration
    105   runs.
    106 
    107   It returns `{:ok, fun_return, apps}`, with all apps that have been
    108   started, or `{:error, term}`.
    109 
    110   ## Options
    111 
    112     * `:pool_size` - The pool size to start the repo for migrations.
    113       Defaults to 2.
    114     * `:mode` - The mode to start all applications.
    115       Defaults to `:permanent`.
    116 
    117   ## Examples
    118 
    119       {:ok, _, _} =
    120         Ecto.Migrator.with_repo(repo, fn repo ->
    121           Ecto.Migrator.run(repo, :up, all: true)
    122         end)
    123 
    124   """
    125   def with_repo(repo, fun, opts \\ []) do
    126     config = repo.config()
    127     mode = Keyword.get(opts, :mode, :permanent)
    128     apps = [:ecto_sql | config[:start_apps_before_migration] || []]
    129 
    130     extra_started =
    131       Enum.flat_map(apps, fn app ->
    132         {:ok, started} = Application.ensure_all_started(app, mode)
    133         started
    134       end)
    135 
    136     {:ok, repo_started} = repo.__adapter__().ensure_all_started(config, mode)
    137     started = extra_started ++ repo_started
    138     pool_size = Keyword.get(opts, :pool_size, 2)
    139     migration_repo = config[:migration_repo] || repo
    140 
    141     case ensure_repo_started(repo, pool_size) do
    142       {:ok, repo_after} ->
    143         case ensure_migration_repo_started(migration_repo, repo) do
    144           {:ok, migration_repo_after} ->
    145             try do
    146               {:ok, fun.(repo), started}
    147             after
    148               after_action(repo, repo_after)
    149               after_action(migration_repo, migration_repo_after)
    150             end
    151 
    152           {:error, _} = error ->
    153             after_action(repo, repo_after)
    154             error
    155         end
    156 
    157       {:error, _} = error ->
    158         error
    159     end
    160   end
    161 
    162   @doc """
    163   Gets the migrations path from a repository.
    164 
    165   This function accepts an optional second parameter to customize the
    166   migrations directory. This can be used to specify a custom migrations
    167   path.
    168   """
    169   @spec migrations_path(Ecto.Repo.t, String.t) :: String.t
    170   def migrations_path(repo, directory \\ "migrations") do
    171     config = repo.config()
    172     priv = config[:priv] || "priv/#{repo |> Module.split |> List.last |> Macro.underscore}"
    173     app = Keyword.fetch!(config, :otp_app)
    174     Application.app_dir(app, Path.join(priv, directory))
    175   end
    176 
    177   @doc """
    178   Gets all migrated versions.
    179 
    180   This function ensures the migration table exists
    181   if no table has been defined yet.
    182 
    183   ## Options
    184 
    185     * `:prefix` - the prefix to run the migrations on
    186     * `:dynamic_repo` - the name of the Repo supervisor process.
    187       See `c:Ecto.Repo.put_dynamic_repo/1`.
    188     * `:skip_table_creation` - skips any attempt to create the migration table
    189       Useful for situations where user needs to check migrations but has
    190       insufficient permissions to create the table.  Note that migrations
    191       commands may fail if this is set to true. Defaults to `false`.  Accepts a
    192       boolean.
    193   """
    194   @spec migrated_versions(Ecto.Repo.t, Keyword.t) :: [integer]
    195   def migrated_versions(repo, opts \\ []) do
    196     lock_for_migrations true, repo, opts, fn _config, versions -> versions end
    197   end
    198 
    199   @doc """
    200   Runs an up migration on the given repository.
    201 
    202   ## Options
    203 
    204     * `:log` - the level to use for logging of migration instructions.
    205       Defaults to `:info`. Can be any of `Logger.level/0` values or a boolean.
    206     * `:log_migrations_sql` - the level to use for logging of SQL commands
    207       generated by migrations. Defaults to `false`. Can be any of `Logger.level/0` values
    208       or a boolean.
    209     * `:log_migrator_sql` - the level to use for logging of SQL commands emitted
    210       by the migrator, such as transactions, locks, etc. Defaults to `false`.
    211     * `:prefix` - the prefix to run the migrations on
    212     * `:dynamic_repo` - the name of the Repo supervisor process.
    213       See `c:Ecto.Repo.put_dynamic_repo/1`.
    214     * `:strict_version_order` - abort when applying a migration with old timestamp
    215       (otherwise it emits a warning)
    216   """
    217   @spec up(Ecto.Repo.t, integer, module, Keyword.t) :: :ok | :already_up
    218   def up(repo, version, module, opts \\ []) do
    219     conditional_lock_for_migrations module, version, repo, opts, fn config, versions ->
    220       if version in versions do
    221         :already_up
    222       else
    223         result = do_up(repo, config, version, module, opts)
    224 
    225         if version != Enum.max([version | versions]) do
    226           latest = Enum.max(versions)
    227 
    228           message = """
    229           You are running migration #{version} but an older \
    230           migration with version #{latest} has already run.
    231 
    232           This can be an issue if you have already ran #{latest} in production \
    233           because a new deployment may migrate #{version} but a rollback command \
    234           would revert #{latest} instead of #{version}.
    235 
    236           If this can be an issue, we recommend to rollback #{version} and change \
    237           it to a version later than #{latest}.
    238           """
    239 
    240           if opts[:strict_version_order] do
    241             raise Ecto.MigrationError, message
    242           else
    243             Logger.warn message
    244           end
    245         end
    246 
    247         result
    248       end
    249     end
    250   end
    251 
    252   defp do_up(repo, config, version, module, opts) do
    253     async_migrate_maybe_in_transaction(repo, config, version, module, :up, opts, fn ->
    254       attempt(repo, config, version, module, :forward, :up, :up, opts)
    255         || attempt(repo, config, version, module, :forward, :change, :up, opts)
    256         || {:error, Ecto.MigrationError.exception(
    257             "#{inspect module} does not implement a `up/0` or `change/0` function")}
    258     end)
    259   end
    260 
    261   @doc """
    262   Runs a down migration on the given repository.
    263 
    264   ## Options
    265 
    266     * `:log` - the level to use for logging of migration commands. Defaults to `:info`.
    267       Can be any of `Logger.level/0` values or a boolean.
    268     * `:log_migrations_sql` - the level to use for logging of SQL commands
    269       generated by migrations. Defaults to `false`. Can be any of `Logger.level/0`
    270       values or a boolean.
    271     * `:log_migrator_sql` - the level to use for logging of SQL commands emitted
    272       by the migrator, such as transactions, locks, etc. Defaults to `false`.
    273       Can be any of `Logger.level/0` values or a boolean.
    274     * `:prefix` - the prefix to run the migrations on
    275     * `:dynamic_repo` - the name of the Repo supervisor process.
    276       See `c:Ecto.Repo.put_dynamic_repo/1`.
    277 
    278   """
    279   @spec down(Ecto.Repo.t, integer, module) :: :ok | :already_down
    280   def down(repo, version, module, opts \\ []) do
    281     conditional_lock_for_migrations module, version, repo, opts, fn config, versions ->
    282       if version in versions do
    283         do_down(repo, config, version, module, opts)
    284       else
    285         :already_down
    286       end
    287     end
    288   end
    289 
    290   defp do_down(repo, config, version, module, opts) do
    291     async_migrate_maybe_in_transaction(repo, config, version, module, :down, opts, fn ->
    292       attempt(repo, config, version, module, :forward, :down, :down, opts)
    293         || attempt(repo, config, version, module, :backward, :change, :down, opts)
    294         || {:error, Ecto.MigrationError.exception(
    295             "#{inspect module} does not implement a `down/0` or `change/0` function")}
    296     end)
    297   end
    298 
    299   defp async_migrate_maybe_in_transaction(repo, config, version, module, direction, opts, fun) do
    300     dynamic_repo = repo.get_dynamic_repo()
    301 
    302     fun_with_status = fn ->
    303       result = fun.()
    304       apply(SchemaMigration, direction, [repo, config, version, opts])
    305       result
    306     end
    307 
    308     fn -> run_maybe_in_transaction(repo, dynamic_repo, module, fun_with_status, opts) end
    309     |> Task.async()
    310     |> Task.await(:infinity)
    311   end
    312 
    313   defp run_maybe_in_transaction(repo, dynamic_repo, module, fun, opts) do
    314     repo.put_dynamic_repo(dynamic_repo)
    315 
    316     if module.__migration__[:disable_ddl_transaction] ||
    317          not repo.__adapter__().supports_ddl_transaction? do
    318       fun.()
    319     else
    320       {:ok, result} =
    321         repo.transaction(fun, log: migrator_log(opts), timeout: :infinity)
    322 
    323       result
    324     end
    325   catch kind, reason ->
    326     {kind, reason, __STACKTRACE__}
    327   end
    328 
    329   defp attempt(repo, config, version, module, direction, operation, reference, opts) do
    330     if Code.ensure_loaded?(module) and
    331        function_exported?(module, operation, 0) do
    332       Runner.run(repo, config, version, module, direction, operation, reference, opts)
    333       :ok
    334     end
    335   end
    336 
    337   @doc """
    338   Runs migrations for the given repository.
    339 
    340   Equivalent to:
    341 
    342       Ecto.Migrator.run(repo, [Ecto.Migrator.migrations_path(repo)], direction, opts)
    343 
    344   See `run/4` for more information.
    345   """
    346   @spec run(Ecto.Repo.t, atom, Keyword.t) :: [integer]
    347   def run(repo, direction, opts) do
    348     run(repo, [migrations_path(repo)], direction, opts)
    349   end
    350 
    351   @doc ~S"""
    352   Apply migrations to a repository with a given strategy.
    353 
    354   The second argument identifies where the migrations are sourced from.
    355   A binary representing directory (or a list of binaries representing
    356   directories) may be passed, in which case we will load all files
    357   following the "#{VERSION}_#{NAME}.exs" schema. The `migration_source`
    358   may also be a list of tuples that identify the version number and
    359   migration modules to be run, for example:
    360 
    361       Ecto.Migrator.run(Repo, [{0, MyApp.Migration1}, {1, MyApp.Migration2}, ...], :up, opts)
    362 
    363   A strategy (which is one of `:all`, `:step`, `:to`, or `:to_exclusive`) must be given as
    364   an option.
    365 
    366   ## Execution model
    367 
    368   In order to run migrations, at least two database connections are
    369   necessary. One is used to lock the "schema_migrations" table and
    370   the other one to effectively run the migrations. This allows multiple
    371   nodes to run migrations at the same time, but guarantee that only one
    372   of them will effectively migrate the database.
    373 
    374   A downside of this approach is that migrations cannot run dynamically
    375   during test under the `Ecto.Adapters.SQL.Sandbox`, as the sandbox has
    376   to share a single connection across processes to guarantee the changes
    377   can be reverted.
    378 
    379   ## Options
    380 
    381     * `:all` - runs all available if `true`
    382 
    383     * `:step` - runs the specific number of migrations
    384 
    385     * `:to` - runs all until the supplied version is reached
    386       (including the version given in `:to`)
    387 
    388     * `:to_exclusive` - runs all until the supplied version is reached
    389       (excluding the version given in `:to_exclusive`)
    390 
    391   Plus all other options described in `up/4`.
    392   """
    393   @spec run(Ecto.Repo.t, String.t | [String.t] | [{integer, module}], atom, Keyword.t) :: [integer]
    394   def run(repo, migration_source, direction, opts) do
    395     migration_source = List.wrap(migration_source)
    396 
    397     pending =
    398       lock_for_migrations true, repo, opts, fn _config, versions ->
    399         cond do
    400           opts[:all] ->
    401             pending_all(versions, migration_source, direction)
    402           to = opts[:to] ->
    403             pending_to(versions, migration_source, direction, to)
    404           to_exclusive = opts[:to_exclusive] ->
    405             pending_to_exclusive(versions, migration_source, direction, to_exclusive)
    406           step = opts[:step] ->
    407             pending_step(versions, migration_source, direction, step)
    408           true ->
    409             {:error, ArgumentError.exception("expected one of :all, :to, :to_exclusive, or :step strategies")}
    410         end
    411       end
    412 
    413     # The lock above already created the table, so we can now skip it.
    414     opts = Keyword.put(opts, :skip_table_creation, true)
    415 
    416     ensure_no_duplication!(pending)
    417     migrate(Enum.map(pending, &load_migration!/1), direction, repo, opts)
    418   end
    419 
    420   @doc """
    421   Returns an array of tuples as the migration status of the given repo,
    422   without actually running any migrations.
    423 
    424   Equivalent to:
    425 
    426       Ecto.Migrator.migrations(repo, [Ecto.Migrator.migrations_path(repo)])
    427 
    428   """
    429   @spec migrations(Ecto.Repo.t) :: [{:up | :down, id :: integer(), name :: String.t}]
    430   def migrations(repo) do
    431     migrations(repo, [migrations_path(repo)])
    432   end
    433 
    434   @doc """
    435   Returns an array of tuples as the migration status of the given repo,
    436   without actually running any migrations.
    437   """
    438   @spec migrations(Ecto.Repo.t, String.t | [String.t], Keyword.t) ::
    439           [{:up | :down, id :: integer(), name :: String.t}]
    440   def migrations(repo, directories, opts \\ []) do
    441     directories = List.wrap(directories)
    442 
    443     repo
    444     |> migrated_versions(opts)
    445     |> collect_migrations(directories)
    446     |> Enum.sort_by(fn {_, version, _} -> version end)
    447   end
    448 
    449   defp collect_migrations(versions, migration_source) do
    450     ups_with_file =
    451       versions
    452       |> pending_in_direction(migration_source, :down)
    453       |> Enum.map(fn {version, name, _} -> {:up, version, name} end)
    454 
    455     ups_without_file =
    456       versions
    457       |> versions_without_file(migration_source)
    458       |> Enum.map(fn version -> {:up, version, "** FILE NOT FOUND **"} end)
    459 
    460     downs =
    461       versions
    462       |> pending_in_direction(migration_source, :up)
    463       |> Enum.map(fn {version, name, _} -> {:down, version, name} end)
    464 
    465     ups_with_file ++ ups_without_file ++ downs
    466   end
    467 
    468   defp versions_without_file(versions, migration_source) do
    469     versions_with_file =
    470       migration_source
    471       |> migrations_for()
    472       |> Enum.map(fn {version, _, _} -> version end)
    473 
    474     versions -- versions_with_file
    475   end
    476 
    477   defp lock_for_migrations(lock_or_migration_number, repo, opts, fun) do
    478     dynamic_repo = Keyword.get(opts, :dynamic_repo, repo)
    479     skip_table_creation = Keyword.get(opts, :skip_table_creation, false)
    480     previous_dynamic_repo = repo.put_dynamic_repo(dynamic_repo)
    481 
    482     try do
    483       config = repo.config()
    484 
    485       unless skip_table_creation do
    486         verbose_schema_migration repo, "create schema migrations table", fn ->
    487           SchemaMigration.ensure_schema_migrations_table!(repo, config, opts)
    488         end
    489       end
    490 
    491       {migration_repo, query, all_opts} = SchemaMigration.versions(repo, config, opts[:prefix])
    492 
    493       migration_lock? =
    494         Keyword.get(opts, :migration_lock, Keyword.get(config, :migration_lock, true))
    495 
    496       opts =
    497         opts
    498         |> Keyword.put(:migration_source, config[:migration_source] || "schema_migrations")
    499         |> Keyword.put(:log, migrator_log(opts))
    500 
    501       result =
    502         if lock_or_migration_number && migration_lock? do
    503           # If there is a migration_repo, it wins over dynamic_repo,
    504           # otherwise the dynamic_repo is the one locked in migrations.
    505           meta_repo = if migration_repo != repo, do: migration_repo, else: dynamic_repo
    506           meta = Ecto.Adapter.lookup_meta(meta_repo)
    507 
    508           migration_repo.__adapter__().lock_for_migrations(meta, opts, fn ->
    509             fun.(config, migration_repo.all(query, all_opts))
    510           end)
    511         else
    512           fun.(config, migration_repo.all(query, all_opts))
    513         end
    514 
    515       case result do
    516         {kind, reason, stacktrace} ->
    517           :erlang.raise(kind, reason, stacktrace)
    518 
    519         {:error, error} ->
    520           raise error
    521 
    522         result ->
    523           result
    524       end
    525     after
    526       repo.put_dynamic_repo(previous_dynamic_repo)
    527     end
    528   end
    529 
    530   defp conditional_lock_for_migrations(module, version, repo, opts, fun) do
    531     lock = if module.__migration__[:disable_migration_lock], do: false, else: version
    532     lock_for_migrations(lock, repo, opts, fun)
    533   end
    534 
    535   defp pending_to(versions, migration_source, direction, target) do
    536     within_target_version? = fn
    537       {version, _, _}, target, :up ->
    538         version <= target
    539       {version, _, _}, target, :down ->
    540         version >= target
    541     end
    542 
    543     pending_in_direction(versions, migration_source, direction)
    544     |> Enum.take_while(&(within_target_version?.(&1, target, direction)))
    545   end
    546 
    547   defp pending_to_exclusive(versions, migration_source, direction, target) do
    548     within_target_version? = fn
    549       {version, _, _}, target, :up ->
    550         version < target
    551       {version, _, _}, target, :down ->
    552         version > target
    553     end
    554 
    555     pending_in_direction(versions, migration_source, direction)
    556     |> Enum.take_while(&(within_target_version?.(&1, target, direction)))
    557   end
    558 
    559   defp pending_step(versions, migration_source, direction, count) do
    560     pending_in_direction(versions, migration_source, direction)
    561     |> Enum.take(count)
    562   end
    563 
    564   defp pending_all(versions, migration_source, direction) do
    565     pending_in_direction(versions, migration_source, direction)
    566   end
    567 
    568   defp pending_in_direction(versions, migration_source, :up) do
    569     migration_source
    570     |> migrations_for()
    571     |> Enum.filter(fn {version, _name, _file} -> not (version in versions) end)
    572   end
    573 
    574   defp pending_in_direction(versions, migration_source, :down) do
    575     migration_source
    576     |> migrations_for()
    577     |> Enum.filter(fn {version, _name, _file} -> version in versions end)
    578     |> Enum.reverse
    579   end
    580 
    581   defp migrations_for(migration_source) when is_list(migration_source) do
    582     migration_source
    583     |> Enum.flat_map(fn
    584       directory when is_binary(directory) ->
    585         Path.join([directory, "**", "*.exs"])
    586         |> Path.wildcard()
    587         |> Enum.map(&extract_migration_info/1)
    588         |> Enum.filter(& &1)
    589 
    590       {version, module} ->
    591         [{version, module, module}]
    592     end)
    593     |> Enum.sort()
    594   end
    595 
    596   defp extract_migration_info(file) do
    597     base = Path.basename(file)
    598 
    599     case Integer.parse(Path.rootname(base)) do
    600       {integer, "_" <> name} -> {integer, name, file}
    601       _ -> nil
    602     end
    603   end
    604 
    605   defp ensure_no_duplication!([{version, name, _} | t]) do
    606     cond do
    607       List.keyfind(t, version, 0) ->
    608         raise Ecto.MigrationError, "migrations can't be executed, migration version #{version} is duplicated"
    609 
    610       List.keyfind(t, name, 1) ->
    611         raise Ecto.MigrationError, "migrations can't be executed, migration name #{name} is duplicated"
    612 
    613       true ->
    614         ensure_no_duplication!(t)
    615     end
    616   end
    617 
    618   defp ensure_no_duplication!([]), do: :ok
    619 
    620   defp load_migration!({version, _, mod}) when is_atom(mod) do
    621     if migration?(mod) do
    622       {version, mod}
    623     else
    624       raise Ecto.MigrationError, "module #{inspect(mod)} is not an Ecto.Migration"
    625     end
    626   end
    627 
    628   defp load_migration!({version, _, file}) when is_binary(file) do
    629     loaded_modules = file |> Code.compile_file() |> Enum.map(&elem(&1, 0))
    630 
    631     if mod = Enum.find(loaded_modules, &migration?/1) do
    632       {version, mod}
    633     else
    634       raise Ecto.MigrationError, "file #{Path.relative_to_cwd(file)} does not define an Ecto.Migration"
    635     end
    636   end
    637 
    638   defp migration?(mod) do
    639     function_exported?(mod, :__migration__, 0)
    640   end
    641 
    642   defp migrate([], direction, _repo, opts) do
    643     level = Keyword.get(opts, :log, :info)
    644     log(level, "Migrations already #{direction}")
    645     []
    646   end
    647 
    648   defp migrate(migrations, direction, repo, opts) do
    649     for {version, mod} <- migrations,
    650         do_direction(direction, repo, version, mod, opts),
    651         do: version
    652   end
    653 
    654   defp do_direction(:up, repo, version, mod, opts) do
    655     conditional_lock_for_migrations mod, version, repo, opts, fn config, versions ->
    656       unless version in versions do
    657         do_up(repo, config, version, mod, opts)
    658       end
    659     end
    660   end
    661 
    662   defp do_direction(:down, repo, version, mod, opts) do
    663     conditional_lock_for_migrations mod, version, repo, opts, fn config, versions ->
    664       if version in versions do
    665         do_down(repo, config, version, mod, opts)
    666       end
    667     end
    668   end
    669 
    670   defp verbose_schema_migration(repo, reason, fun) do
    671     try do
    672       fun.()
    673     rescue
    674       error ->
    675         Logger.error """
    676         Could not #{reason}. This error usually happens due to the following:
    677 
    678           * The database does not exist
    679           * The "schema_migrations" table, which Ecto uses for managing
    680             migrations, was defined by another library
    681           * There is a deadlock while migrating (such as using concurrent
    682             indexes with a migration_lock)
    683 
    684         To fix the first issue, run "mix ecto.create".
    685 
    686         To address the second, you can run "mix ecto.drop" followed by
    687         "mix ecto.create". Alternatively you may configure Ecto to use
    688         another table and/or repository for managing migrations:
    689 
    690             config #{inspect repo.config[:otp_app]}, #{inspect repo},
    691               migration_source: "some_other_table_for_schema_migrations",
    692               migration_repo: AnotherRepoForSchemaMigrations
    693 
    694         The full error report is shown below.
    695         """
    696         reraise error, __STACKTRACE__
    697     end
    698   end
    699 
    700   defp log(false, _msg), do: :ok
    701   defp log(true, msg), do: Logger.info(msg)
    702   defp log(level, msg),  do: Logger.log(level, msg)
    703 
    704   defp migrator_log(opts) do
    705     Keyword.get(opts, :log_migrator_sql, false)
    706   end
    707 
    708   defp ensure_repo_started(repo, pool_size) do
    709     case repo.start_link(pool_size: pool_size) do
    710       {:ok, _} ->
    711         {:ok, :stop}
    712 
    713       {:error, {:already_started, _pid}} ->
    714         {:ok, :restart}
    715 
    716       {:error, _} = error ->
    717         error
    718     end
    719   end
    720 
    721   defp ensure_migration_repo_started(repo, repo) do
    722     {:ok, :noop}
    723   end
    724 
    725   defp ensure_migration_repo_started(migration_repo, _repo) do
    726     case migration_repo.start_link() do
    727       {:ok, _} ->
    728         {:ok, :stop}
    729 
    730       {:error, {:already_started, _pid}} ->
    731         {:ok, :noop}
    732 
    733       {:error, _} = error ->
    734         error
    735     end
    736   end
    737 
    738   defp after_action(repo, :restart) do
    739     if Process.whereis(repo) do
    740       %{pid: pid} = Ecto.Adapter.lookup_meta(repo)
    741       Supervisor.restart_child(repo, pid)
    742     end
    743   end
    744 
    745   defp after_action(repo, :stop) do
    746     repo.stop()
    747   end
    748 
    749   defp after_action(_repo, :noop) do
    750     :noop
    751   end
    752 end