supervisor.ex (6021B)
1 defmodule Ecto.Repo.Supervisor do 2 @moduledoc false 3 use Supervisor 4 5 @defaults [timeout: 15000, pool_size: 10] 6 @integer_url_query_params ["timeout", "pool_size", "idle_interval"] 7 8 @doc """ 9 Starts the repo supervisor. 10 """ 11 def start_link(repo, otp_app, adapter, opts) do 12 name = Keyword.get(opts, :name, repo) 13 sup_opts = if name, do: [name: name], else: [] 14 Supervisor.start_link(__MODULE__, {name, repo, otp_app, adapter, opts}, sup_opts) 15 end 16 17 @doc """ 18 Retrieves the runtime configuration. 19 """ 20 def runtime_config(type, repo, otp_app, opts) do 21 config = Application.get_env(otp_app, repo, []) 22 config = [otp_app: otp_app] ++ (@defaults |> Keyword.merge(config) |> Keyword.merge(opts)) 23 config = Keyword.put_new_lazy(config, :telemetry_prefix, fn -> telemetry_prefix(repo) end) 24 25 case repo_init(type, repo, config) do 26 {:ok, config} -> 27 {url, config} = Keyword.pop(config, :url) 28 {:ok, Keyword.merge(config, parse_url(url || ""))} 29 30 :ignore -> 31 :ignore 32 end 33 end 34 35 defp telemetry_prefix(repo) do 36 repo 37 |> Module.split() 38 |> Enum.map(& &1 |> Macro.underscore() |> String.to_atom()) 39 end 40 41 defp repo_init(type, repo, config) do 42 if Code.ensure_loaded?(repo) and function_exported?(repo, :init, 2) do 43 repo.init(type, config) 44 else 45 {:ok, config} 46 end 47 end 48 49 @doc """ 50 Retrieves the compile time configuration. 51 """ 52 def compile_config(_repo, opts) do 53 otp_app = Keyword.fetch!(opts, :otp_app) 54 adapter = opts[:adapter] 55 56 unless adapter do 57 raise ArgumentError, "missing :adapter option on use Ecto.Repo" 58 end 59 60 if Code.ensure_compiled(adapter) != {:module, adapter} do 61 raise ArgumentError, "adapter #{inspect adapter} was not compiled, " <> 62 "ensure it is correct and it is included as a project dependency" 63 end 64 65 behaviours = 66 for {:behaviour, behaviours} <- adapter.__info__(:attributes), 67 behaviour <- behaviours, 68 do: behaviour 69 70 unless Ecto.Adapter in behaviours do 71 raise ArgumentError, 72 "expected :adapter option given to Ecto.Repo to list Ecto.Adapter as a behaviour" 73 end 74 75 {otp_app, adapter, behaviours} 76 end 77 78 @doc """ 79 Parses an Ecto URL allowed in configuration. 80 81 The format must be: 82 83 "ecto://username:password@hostname:port/database?ssl=true&timeout=1000" 84 85 """ 86 def parse_url(""), do: [] 87 88 def parse_url(url) when is_binary(url) do 89 info = URI.parse(url) 90 91 if is_nil(info.host) do 92 raise Ecto.InvalidURLError, url: url, message: "host is not present" 93 end 94 95 if is_nil(info.path) or not (info.path =~ ~r"^/([^/])+$") do 96 raise Ecto.InvalidURLError, url: url, message: "path should be a database name" 97 end 98 99 destructure [username, password], info.userinfo && String.split(info.userinfo, ":") 100 "/" <> database = info.path 101 102 url_opts = [ 103 username: username, 104 password: password, 105 database: database, 106 port: info.port 107 ] 108 109 url_opts = put_hostname_if_present(url_opts, info.host) 110 query_opts = parse_uri_query(info) 111 112 for {k, v} <- url_opts ++ query_opts, 113 not is_nil(v), 114 do: {k, if(is_binary(v), do: URI.decode(v), else: v)} 115 end 116 117 defp put_hostname_if_present(keyword, "") do 118 keyword 119 end 120 121 defp put_hostname_if_present(keyword, hostname) when is_binary(hostname) do 122 Keyword.put(keyword, :hostname, hostname) 123 end 124 125 defp parse_uri_query(%URI{query: nil}), 126 do: [] 127 defp parse_uri_query(%URI{query: query} = url) do 128 query 129 |> URI.query_decoder() 130 |> Enum.reduce([], fn 131 {"ssl", "true"}, acc -> 132 [{:ssl, true}] ++ acc 133 134 {"ssl", "false"}, acc -> 135 [{:ssl, false}] ++ acc 136 137 {key, value}, acc when key in @integer_url_query_params -> 138 [{String.to_atom(key), parse_integer!(key, value, url)}] ++ acc 139 140 {key, value}, acc -> 141 [{String.to_atom(key), value}] ++ acc 142 end) 143 end 144 145 defp parse_integer!(key, value, url) do 146 case Integer.parse(value) do 147 {int, ""} -> 148 int 149 150 _ -> 151 raise Ecto.InvalidURLError, 152 url: url, 153 message: "can not parse value `#{value}` for parameter `#{key}` as an integer" 154 end 155 end 156 157 @doc false 158 def tuplet(name, opts) do 159 adapter_meta = Ecto.Repo.Registry.lookup(name) 160 161 if opts[:stacktrace] || Map.get(adapter_meta, :stacktrace) do 162 {:current_stacktrace, stacktrace} = :erlang.process_info(self(), :current_stacktrace) 163 {adapter_meta, Keyword.put(opts, :stacktrace, stacktrace)} 164 else 165 {adapter_meta, opts} 166 end 167 end 168 169 ## Callbacks 170 171 @doc false 172 def init({name, repo, otp_app, adapter, opts}) do 173 # Normalize name to atom, ignore via/global names 174 name = if is_atom(name), do: name, else: nil 175 176 case runtime_config(:supervisor, repo, otp_app, opts) do 177 {:ok, opts} -> 178 :telemetry.execute( 179 [:ecto, :repo, :init], 180 %{system_time: System.system_time()}, 181 %{repo: repo, opts: opts} 182 ) 183 184 {:ok, child, meta} = adapter.init([repo: repo] ++ opts) 185 cache = Ecto.Query.Planner.new_query_cache(name) 186 meta = Map.merge(meta, %{repo: repo, cache: cache}) 187 child_spec = wrap_child_spec(child, [name, adapter, meta]) 188 Supervisor.init([child_spec], strategy: :one_for_one, max_restarts: 0) 189 190 :ignore -> 191 :ignore 192 end 193 end 194 195 def start_child({mod, fun, args}, name, adapter, meta) do 196 case apply(mod, fun, args) do 197 {:ok, pid} -> 198 meta = Map.merge(meta, %{pid: pid, adapter: adapter}) 199 Ecto.Repo.Registry.associate(self(), name, meta) 200 {:ok, pid} 201 202 other -> 203 other 204 end 205 end 206 207 defp wrap_child_spec({id, start, restart, shutdown, type, mods}, args) do 208 {id, {__MODULE__, :start_child, [start | args]}, restart, shutdown, type, mods} 209 end 210 211 defp wrap_child_spec(%{start: start} = spec, args) do 212 %{spec | start: {__MODULE__, :start_child, [start | args]}} 213 end 214 end