zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

runner.ex (14748B)


      1 defmodule Ecto.Migration.Runner do
      2   @moduledoc false
      3   use Agent, restart: :temporary
      4 
      5   require Logger
      6 
      7   alias Ecto.Migration.Table
      8   alias Ecto.Migration.Index
      9   alias Ecto.Migration.Constraint
     10   alias Ecto.Migration.Command
     11 
     12   @doc """
     13   Runs the given migration.
     14   """
     15   def run(repo, config, version, module, direction, operation, migrator_direction, opts) do
     16     level = Keyword.get(opts, :log, :info)
     17     sql = Keyword.get(opts, :log_migrations_sql, false)
     18     log = %{level: level, sql: sql}
     19     args  = {self(), repo, config, module, direction, migrator_direction, log}
     20 
     21     {:ok, runner} = DynamicSupervisor.start_child(Ecto.MigratorSupervisor, {__MODULE__, args})
     22     metadata(runner, opts)
     23 
     24     log(level, "== Running #{version} #{inspect module}.#{operation}/0 #{direction}")
     25     {time, _} = :timer.tc(fn -> perform_operation(repo, module, operation) end)
     26     log(level, "== Migrated #{version} in #{inspect(div(time, 100_000) / 10)}s")
     27 
     28     stop()
     29   end
     30 
     31   @doc """
     32   Stores the runner metadata.
     33   """
     34   def metadata(runner, opts) do
     35     prefix = opts[:prefix]
     36     Process.put(:ecto_migration, %{runner: runner, prefix: prefix && to_string(prefix)})
     37   end
     38 
     39   @doc """
     40   Starts the runner for the specified repo.
     41   """
     42   def start_link({parent, repo, config, module, direction, migrator_direction, log}) do
     43     Agent.start_link(fn ->
     44       Process.link(parent)
     45 
     46       %{
     47         direction: direction,
     48         repo: repo,
     49         migration: module,
     50         migrator_direction: migrator_direction,
     51         command: nil,
     52         subcommands: [],
     53         log: log,
     54         commands: [],
     55         config: config
     56       }
     57     end)
     58   end
     59 
     60   @doc """
     61   Stops the runner.
     62   """
     63   def stop() do
     64     Agent.stop(runner())
     65   end
     66 
     67   @doc """
     68   Accesses the given repository configuration.
     69   """
     70   def repo_config(key, default) do
     71     Agent.get(runner(), &Keyword.get(&1.config, key, default))
     72   end
     73 
     74   @doc """
     75   Returns the migrator command (up or down).
     76 
     77     * forward + up: up
     78     * forward + down: down
     79     * forward + change: up
     80     * backward + change: down
     81 
     82   """
     83   def migrator_direction do
     84     Agent.get(runner(), & &1.migrator_direction)
     85   end
     86 
     87   @doc """
     88   Gets the repo for this migration
     89   """
     90   def repo do
     91     Agent.get(runner(), & &1.repo)
     92   end
     93 
     94   @doc """
     95   Gets the prefix for this migration
     96   """
     97   def prefix do
     98     case Process.get(:ecto_migration) do
     99       %{prefix: prefix} -> prefix
    100       _ -> raise "could not find migration runner process for #{inspect self()}"
    101     end
    102   end
    103 
    104   @doc """
    105   Executes queue migration commands.
    106 
    107   Reverses the order commands are executed when doing a rollback
    108   on a change/0 function and resets commands queue.
    109   """
    110   def flush do
    111     %{commands: commands, direction: direction, repo: repo, log: log, migration: migration} =
    112       Agent.get_and_update(runner(), fn state -> {state, %{state | commands: []}} end)
    113 
    114     commands = if direction == :backward, do: commands, else: Enum.reverse(commands)
    115 
    116     for command <- commands do
    117       execute_in_direction(repo, migration, direction, log, command)
    118     end
    119   end
    120 
    121   @doc """
    122   Queues command tuples or strings for execution.
    123 
    124   Ecto.MigrationError will be raised when the server
    125   is in `:backward` direction and `command` is irreversible.
    126   """
    127   def execute(command) do
    128     reply =
    129       Agent.get_and_update(runner(), fn
    130         %{command: nil} = state ->
    131           {:ok, %{state | subcommands: [], commands: [command|state.commands]}}
    132         %{command: _} = state ->
    133           {:error, %{state | command: nil}}
    134       end)
    135 
    136     case reply do
    137       :ok ->
    138         :ok
    139       :error ->
    140         raise Ecto.MigrationError, "cannot execute nested commands"
    141     end
    142   end
    143 
    144   @doc """
    145   Starts a command.
    146   """
    147   def start_command(command) do
    148     reply =
    149       Agent.get_and_update(runner(), fn
    150         %{command: nil} = state ->
    151           {:ok, %{state | command: command}}
    152         %{command: _} = state ->
    153           {:error, %{state | command: command}}
    154       end)
    155 
    156     case reply do
    157       :ok ->
    158         :ok
    159       :error ->
    160         raise Ecto.MigrationError, "cannot execute nested commands"
    161     end
    162   end
    163 
    164   @doc """
    165   Queues and clears current command. Must call `start_command/1` first.
    166   """
    167   def end_command do
    168     Agent.update runner(), fn state ->
    169       {operation, object} = state.command
    170       command = {operation, object, Enum.reverse(state.subcommands)}
    171       %{state | command: nil, subcommands: [], commands: [command|state.commands]}
    172     end
    173   end
    174 
    175   @doc """
    176   Adds a subcommand to the current command. Must call `start_command/1` first.
    177   """
    178   def subcommand(subcommand) do
    179     reply =
    180       Agent.get_and_update(runner(), fn
    181         %{command: nil} = state ->
    182           {:error, state}
    183         state ->
    184           {:ok, update_in(state.subcommands, &[subcommand|&1])}
    185       end)
    186 
    187     case reply do
    188       :ok ->
    189         :ok
    190       :error ->
    191         raise Ecto.MigrationError, message: "cannot execute command outside of block"
    192     end
    193   end
    194 
    195   ## Execute
    196 
    197   defp execute_in_direction(repo, migration, :forward, log, %Command{up: up}) do
    198     log_and_execute_ddl(repo, migration, log, up)
    199   end
    200 
    201   defp execute_in_direction(repo, migration, :forward, log, command) do
    202     log_and_execute_ddl(repo, migration, log, command)
    203   end
    204 
    205   defp execute_in_direction(repo, migration, :backward, log, %Command{down: down}) do
    206     log_and_execute_ddl(repo, migration, log, down)
    207   end
    208 
    209   defp execute_in_direction(repo, migration, :backward, log, command) do
    210     if reversed = reverse(command) do
    211       log_and_execute_ddl(repo, migration, log, reversed)
    212     else
    213       raise Ecto.MigrationError, message:
    214         "cannot reverse migration command: #{command command}. " <>
    215         "You will need to explicitly define up/0 and down/0 in your migration"
    216     end
    217   end
    218 
    219   defp reverse({:create, %Index{} = index}),
    220     do: {:drop, index, :restrict}
    221   defp reverse({:create_if_not_exists, %Index{} = index}),
    222     do: {:drop_if_exists, index, :restrict}
    223   defp reverse({:drop, %Index{} = index, _}),
    224     do: {:create, index}
    225   defp reverse({:drop_if_exists, %Index{} = index, _}),
    226     do: {:create_if_not_exists, index}
    227 
    228   defp reverse({:create, %Table{} = table, _columns}),
    229     do: {:drop, table, :restrict}
    230   defp reverse({:create_if_not_exists, %Table{} = table, _columns}),
    231     do: {:drop_if_exists, table, :restrict}
    232   defp reverse({:rename, %Table{} = table_current, %Table{} = table_new}),
    233     do: {:rename, table_new, table_current}
    234   defp reverse({:rename, %Table{} = table, current_column, new_column}),
    235     do: {:rename, table, new_column, current_column}
    236   defp reverse({:alter,  %Table{} = table, changes}) do
    237     if reversed = table_reverse(changes, []) do
    238       {:alter, table, reversed}
    239     end
    240   end
    241 
    242   # It is not a good idea to reverse constraints because
    243   # we can't guarantee data integrity when applying them back.
    244   defp reverse({:create_if_not_exists, %Constraint{} = constraint}),
    245     do: {:drop_if_exists, constraint, :restrict}
    246   defp reverse({:create, %Constraint{} = constraint}),
    247     do: {:drop, constraint, :restrict}
    248 
    249   defp reverse(_command), do: false
    250 
    251   defp table_reverse([{:remove, name, type, opts}| t], acc) do
    252     table_reverse(t, [{:add, name, type, opts} | acc])
    253   end
    254   defp table_reverse([{:modify, name, type, opts} | t], acc) do
    255     case opts[:from] do
    256       nil ->
    257         false
    258 
    259       {reverse_type, from_opts} when is_list(from_opts) ->
    260         reverse_from = {type, Keyword.delete(opts, :from)}
    261         reverse_opts = Keyword.put(from_opts, :from, reverse_from)
    262         table_reverse(t, [{:modify, name, reverse_type, reverse_opts} | acc])
    263 
    264       reverse_type ->
    265         reverse_opts = Keyword.put(opts, :from, type)
    266         table_reverse(t, [{:modify, name, reverse_type, reverse_opts} | acc])
    267     end
    268   end
    269   defp table_reverse([{:add, name, _type, _opts} | t], acc) do
    270     table_reverse(t, [{:remove, name} | acc])
    271   end
    272   defp table_reverse([_ | _], _acc) do
    273     false
    274   end
    275   defp table_reverse([], acc) do
    276     acc
    277   end
    278 
    279   ## Helpers
    280 
    281   defp perform_operation(repo, module, operation) do
    282     if function_exported?(repo, :in_transaction?, 0) and repo.in_transaction?() do
    283       if function_exported?(module, :after_begin, 0) do
    284         module.after_begin()
    285         flush()
    286       end
    287 
    288       apply(module, operation, [])
    289       flush()
    290 
    291       if function_exported?(module, :before_commit, 0) do
    292         module.before_commit()
    293         flush()
    294       end
    295     else
    296       apply(module, operation, [])
    297       flush()
    298     end
    299   end
    300 
    301   defp runner do
    302     case Process.get(:ecto_migration) do
    303       %{runner: runner} -> runner
    304       _ -> raise "could not find migration runner process for #{inspect self()}"
    305     end
    306   end
    307 
    308   defp log_and_execute_ddl(repo, migration, log, {instruction, %Index{} = index}) do
    309     maybe_warn_index_ddl_transaction(index, migration)
    310     maybe_warn_index_migration_lock(index, repo, migration)
    311     log_and_execute_ddl(repo, log, {instruction, index})
    312   end
    313 
    314   defp log_and_execute_ddl(repo, _migration, log, command) do
    315     log_and_execute_ddl(repo, log, command)
    316   end
    317 
    318   defp log_and_execute_ddl(_repo, _log, func) when is_function(func, 0) do
    319     func.()
    320     :ok
    321   end
    322 
    323   defp log_and_execute_ddl(repo, %{level: level, sql: sql}, command) do
    324     log(level, command(command))
    325     meta = Ecto.Adapter.lookup_meta(repo.get_dynamic_repo())
    326     {:ok, logs} = repo.__adapter__().execute_ddl(meta, command, timeout: :infinity, log: sql)
    327 
    328     Enum.each(logs, fn {level, message, metadata} ->
    329       log(level, message, metadata)
    330     end)
    331 
    332     :ok
    333   end
    334 
    335   defp log(level, msg, metadata \\ [])
    336   defp log(false, _msg, _metadata), do: :ok
    337   defp log(true, msg, metadata), do: Logger.log(:info, msg, metadata)
    338   defp log(level, msg, metadata),  do: Logger.log(level, msg, metadata)
    339 
    340   defp maybe_warn_index_ddl_transaction(%{concurrently: true} = index, migration) do
    341     migration_config = migration.__migration__()
    342 
    343     if not migration_config[:disable_ddl_transaction] do
    344       IO.warn """
    345       Migration #{inspect(migration)} has set index `#{index.name}` on table \
    346       `#{index.table}` to concurrently but did not disable ddl transaction. \
    347       Please set:
    348 
    349           use Ecto.Migration
    350           @disable_ddl_transaction true
    351       """, []
    352     end
    353   end
    354   defp maybe_warn_index_ddl_transaction(_index, _migration), do: :ok
    355 
    356   defp maybe_warn_index_migration_lock(%{concurrently: true} = index, repo, migration) do
    357     migration_lock_disabled = migration.__migration__()[:disable_migration_lock]
    358     lock_strategy = repo.config()[:migration_lock]
    359     adapter = repo.__adapter__()
    360 
    361     case {migration_lock_disabled, adapter, lock_strategy} do
    362       {false, Ecto.Adapters.Postgres, :pg_advisory_lock} ->
    363         :ok
    364 
    365       {false, Ecto.Adapters.Postgres, _} ->
    366         IO.warn """
    367         Migration #{inspect(migration)} has set index `#{index.name}` on table \
    368         `#{index.table}` to concurrently but did not disable migration lock. \
    369         Please set:
    370 
    371             use Ecto.Migration
    372             @disable_migration_lock true
    373 
    374         Alternatively, consider using advisory locks during migrations in the \
    375         repo configuration:
    376 
    377             config #{inspect(repo)}, migration_lock: :pg_advisory_lock
    378         """, []
    379 
    380       {false, _adapter, _migration_lock} ->
    381         IO.warn """
    382         Migration #{inspect(migration)} has set index `#{index.name}` on table \
    383         `#{index.table}` to concurrently but did not disable migration lock. \
    384         Please set:
    385 
    386             use Ecto.Migration
    387             @disable_migration_lock true
    388         """, []
    389 
    390       _ ->
    391         :ok
    392     end
    393   end
    394   defp maybe_warn_index_migration_lock(_index, _repo, _migration), do: :ok
    395 
    396   defp command(ddl) when is_binary(ddl) or is_list(ddl),
    397     do: "execute #{inspect ddl}"
    398 
    399   defp command({:create, %Table{} = table, _}),
    400     do: "create table #{quote_name(table.prefix, table.name)}"
    401   defp command({:create_if_not_exists, %Table{} = table, _}),
    402     do: "create table if not exists #{quote_name(table.prefix, table.name)}"
    403   defp command({:alter, %Table{} = table, _}),
    404     do: "alter table #{quote_name(table.prefix, table.name)}"
    405   defp command({:drop, %Table{} = table, mode}),
    406     do: "drop table #{quote_name(table.prefix, table.name)}#{drop_mode(mode)}"
    407   defp command({:drop_if_exists, %Table{} = table, mode}),
    408     do: "drop table if exists #{quote_name(table.prefix, table.name)}#{drop_mode(mode)}"
    409 
    410   defp command({:create, %Index{} = index}),
    411     do: "create index #{quote_name(index.prefix, index.name)}"
    412   defp command({:create_if_not_exists, %Index{} = index}),
    413     do: "create index if not exists #{quote_name(index.prefix, index.name)}"
    414   defp command({:drop, %Index{} = index, mode}),
    415     do: "drop index #{quote_name(index.prefix, index.name)}#{drop_mode(mode)}"
    416   defp command({:drop_if_exists, %Index{} = index, mode}),
    417     do: "drop index if exists #{quote_name(index.prefix, index.name)}#{drop_mode(mode)}"
    418   defp command({:rename, %Table{} = current_table, %Table{} = new_table}),
    419     do: "rename table #{quote_name(current_table.prefix, current_table.name)} to #{quote_name(new_table.prefix, new_table.name)}"
    420   defp command({:rename, %Table{} = table, current_column, new_column}),
    421     do: "rename column #{current_column} to #{new_column} on table #{quote_name(table.prefix, table.name)}"
    422 
    423   defp command({:create, %Constraint{check: nil, exclude: nil}}),
    424     do: raise(ArgumentError, "a constraint must have either a check or exclude option")
    425   defp command({:create, %Constraint{check: check, exclude: exclude}}) when is_binary(check) and is_binary(exclude),
    426     do: raise(ArgumentError, "a constraint must not have both check and exclude options")
    427   defp command({:create, %Constraint{check: check} = constraint}) when is_binary(check),
    428     do: "create check constraint #{constraint.name} on table #{quote_name(constraint.prefix, constraint.table)}"
    429   defp command({:create, %Constraint{exclude: exclude} = constraint}) when is_binary(exclude),
    430     do: "create exclude constraint #{constraint.name} on table #{quote_name(constraint.prefix, constraint.table)}"
    431   defp command({:drop, %Constraint{} = constraint, _}),
    432     do: "drop constraint #{constraint.name} from table #{quote_name(constraint.prefix, constraint.table)}"
    433   defp command({:drop_if_exists, %Constraint{} = constraint, _}),
    434     do: "drop constraint if exists #{constraint.name} from table #{quote_name(constraint.prefix, constraint.table)}"
    435 
    436   defp drop_mode(:restrict), do: ""
    437   defp drop_mode(:cascade), do: " cascade"
    438 
    439   defp quote_name(nil, name), do: quote_name(name)
    440   defp quote_name(prefix, name), do: quote_name(prefix) <> "." <> quote_name(name)
    441   defp quote_name(name) when is_atom(name), do: quote_name(Atom.to_string(name))
    442   defp quote_name(name), do: name
    443 end