zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

supervisor.ex (6021B)


      1 defmodule Ecto.Repo.Supervisor do
      2   @moduledoc false
      3   use Supervisor
      4 
      5   @defaults [timeout: 15000, pool_size: 10]
      6   @integer_url_query_params ["timeout", "pool_size", "idle_interval"]
      7 
      8   @doc """
      9   Starts the repo supervisor.
     10   """
     11   def start_link(repo, otp_app, adapter, opts) do
     12     name = Keyword.get(opts, :name, repo)
     13     sup_opts = if name, do: [name: name], else: []
     14     Supervisor.start_link(__MODULE__, {name, repo, otp_app, adapter, opts}, sup_opts)
     15   end
     16 
     17   @doc """
     18   Retrieves the runtime configuration.
     19   """
     20   def runtime_config(type, repo, otp_app, opts) do
     21     config = Application.get_env(otp_app, repo, [])
     22     config = [otp_app: otp_app] ++ (@defaults |> Keyword.merge(config) |> Keyword.merge(opts))
     23     config = Keyword.put_new_lazy(config, :telemetry_prefix, fn -> telemetry_prefix(repo) end)
     24 
     25     case repo_init(type, repo, config) do
     26       {:ok, config} ->
     27         {url, config} = Keyword.pop(config, :url)
     28         {:ok, Keyword.merge(config, parse_url(url || ""))}
     29 
     30       :ignore ->
     31         :ignore
     32     end
     33   end
     34 
     35   defp telemetry_prefix(repo) do
     36     repo
     37     |> Module.split()
     38     |> Enum.map(& &1 |> Macro.underscore() |> String.to_atom())
     39   end
     40 
     41   defp repo_init(type, repo, config) do
     42     if Code.ensure_loaded?(repo) and function_exported?(repo, :init, 2) do
     43       repo.init(type, config)
     44     else
     45       {:ok, config}
     46     end
     47   end
     48 
     49   @doc """
     50   Retrieves the compile time configuration.
     51   """
     52   def compile_config(_repo, opts) do
     53     otp_app = Keyword.fetch!(opts, :otp_app)
     54     adapter = opts[:adapter]
     55 
     56     unless adapter do
     57       raise ArgumentError, "missing :adapter option on use Ecto.Repo"
     58     end
     59 
     60     if Code.ensure_compiled(adapter) != {:module, adapter} do
     61       raise ArgumentError, "adapter #{inspect adapter} was not compiled, " <>
     62                            "ensure it is correct and it is included as a project dependency"
     63     end
     64 
     65     behaviours =
     66       for {:behaviour, behaviours} <- adapter.__info__(:attributes),
     67           behaviour <- behaviours,
     68           do: behaviour
     69 
     70     unless Ecto.Adapter in behaviours do
     71       raise ArgumentError,
     72             "expected :adapter option given to Ecto.Repo to list Ecto.Adapter as a behaviour"
     73     end
     74 
     75     {otp_app, adapter, behaviours}
     76   end
     77 
     78   @doc """
     79   Parses an Ecto URL allowed in configuration.
     80 
     81   The format must be:
     82 
     83       "ecto://username:password@hostname:port/database?ssl=true&timeout=1000"
     84 
     85   """
     86   def parse_url(""), do: []
     87 
     88   def parse_url(url) when is_binary(url) do
     89     info = URI.parse(url)
     90 
     91     if is_nil(info.host) do
     92       raise Ecto.InvalidURLError, url: url, message: "host is not present"
     93     end
     94 
     95     if is_nil(info.path) or not (info.path =~ ~r"^/([^/])+$") do
     96       raise Ecto.InvalidURLError, url: url, message: "path should be a database name"
     97     end
     98 
     99     destructure [username, password], info.userinfo && String.split(info.userinfo, ":")
    100     "/" <> database = info.path
    101 
    102     url_opts = [
    103       username: username,
    104       password: password,
    105       database: database,
    106       port: info.port
    107     ]
    108 
    109     url_opts = put_hostname_if_present(url_opts, info.host)
    110     query_opts = parse_uri_query(info)
    111 
    112     for {k, v} <- url_opts ++ query_opts,
    113         not is_nil(v),
    114         do: {k, if(is_binary(v), do: URI.decode(v), else: v)}
    115   end
    116 
    117   defp put_hostname_if_present(keyword, "") do
    118     keyword
    119   end
    120 
    121   defp put_hostname_if_present(keyword, hostname) when is_binary(hostname) do
    122     Keyword.put(keyword, :hostname, hostname)
    123   end
    124 
    125   defp parse_uri_query(%URI{query: nil}),
    126     do: []
    127   defp parse_uri_query(%URI{query: query} = url) do
    128     query
    129     |> URI.query_decoder()
    130     |> Enum.reduce([], fn
    131       {"ssl", "true"}, acc ->
    132         [{:ssl, true}] ++ acc
    133 
    134       {"ssl", "false"}, acc ->
    135         [{:ssl, false}] ++ acc
    136 
    137       {key, value}, acc when key in @integer_url_query_params ->
    138         [{String.to_atom(key), parse_integer!(key, value, url)}] ++ acc
    139 
    140       {key, value}, acc ->
    141         [{String.to_atom(key), value}] ++ acc
    142     end)
    143   end
    144 
    145   defp parse_integer!(key, value, url) do
    146     case Integer.parse(value) do
    147       {int, ""} ->
    148         int
    149 
    150       _ ->
    151         raise Ecto.InvalidURLError,
    152               url: url,
    153               message: "can not parse value `#{value}` for parameter `#{key}` as an integer"
    154     end
    155   end
    156 
    157   @doc false
    158   def tuplet(name, opts) do
    159     adapter_meta = Ecto.Repo.Registry.lookup(name)
    160 
    161     if opts[:stacktrace] || Map.get(adapter_meta, :stacktrace) do
    162       {:current_stacktrace, stacktrace} = :erlang.process_info(self(), :current_stacktrace)
    163       {adapter_meta, Keyword.put(opts, :stacktrace, stacktrace)}
    164     else
    165       {adapter_meta, opts}
    166     end
    167   end
    168 
    169   ## Callbacks
    170 
    171   @doc false
    172   def init({name, repo, otp_app, adapter, opts}) do
    173     # Normalize name to atom, ignore via/global names
    174     name = if is_atom(name), do: name, else: nil
    175 
    176     case runtime_config(:supervisor, repo, otp_app, opts) do
    177       {:ok, opts} ->
    178         :telemetry.execute(
    179           [:ecto, :repo, :init],
    180           %{system_time: System.system_time()},
    181           %{repo: repo, opts: opts}
    182         )
    183 
    184         {:ok, child, meta} = adapter.init([repo: repo] ++ opts)
    185         cache = Ecto.Query.Planner.new_query_cache(name)
    186         meta = Map.merge(meta, %{repo: repo, cache: cache})
    187         child_spec = wrap_child_spec(child, [name, adapter, meta])
    188         Supervisor.init([child_spec], strategy: :one_for_one, max_restarts: 0)
    189 
    190       :ignore ->
    191         :ignore
    192     end
    193   end
    194 
    195   def start_child({mod, fun, args}, name, adapter, meta) do
    196     case apply(mod, fun, args) do
    197       {:ok, pid} ->
    198         meta = Map.merge(meta, %{pid: pid, adapter: adapter})
    199         Ecto.Repo.Registry.associate(self(), name, meta)
    200         {:ok, pid}
    201 
    202       other ->
    203         other
    204     end
    205   end
    206 
    207   defp wrap_child_spec({id, start, restart, shutdown, type, mods}, args) do
    208     {id, {__MODULE__, :start_child, [start | args]}, restart, shutdown, type, mods}
    209   end
    210 
    211   defp wrap_child_spec(%{start: start} = spec, args) do
    212     %{spec | start: {__MODULE__, :start_child, [start | args]}}
    213   end
    214 end