type_server.ex (4382B)
1 defmodule Postgrex.TypeServer do 2 @moduledoc false 3 4 use GenServer, restart: :temporary 5 6 defstruct [:types, :connections, :lock, :waiting] 7 8 @timeout 60_000 9 10 @doc """ 11 Starts a type server. 12 """ 13 @spec start_link({module, pid, keyword}) :: GenServer.on_start() 14 def start_link({module, starter, opts}) do 15 GenServer.start_link(__MODULE__, {module, starter}, opts) 16 end 17 18 @doc """ 19 Fetches a lock for the given type server. 20 21 We attempt to achieve a lock on the type server for updating the entries. 22 If another process got the lock we wait for it to finish. 23 """ 24 @spec fetch(pid) :: 25 {:lock, reference, Postgrex.Types.state()} | :noproc | :error 26 def fetch(server) do 27 try do 28 GenServer.call(server, :fetch, @timeout) 29 catch 30 # module timed out, pretend it did not exist. 31 :exit, {:normal, _} -> :noproc 32 :exit, {:noproc, _} -> :noproc 33 end 34 end 35 36 @doc """ 37 Update the type server using the given reference and configuration. 38 """ 39 @spec update(pid, reference, [Postgrex.TypeInfo.t()]) :: :ok 40 def update(server, ref, [_ | _] = type_infos) do 41 GenServer.call(server, {:update, ref, type_infos}, @timeout) 42 end 43 44 def update(server, ref, []) do 45 done(server, ref) 46 end 47 48 @doc """ 49 Unlocks the given reference for a given module if no update. 50 """ 51 @spec done(pid, reference) :: :ok 52 def done(server, ref) do 53 GenServer.cast(server, {:done, ref}) 54 end 55 56 ## Callbacks 57 58 def init({module, starter}) do 59 _ = Process.flag(:trap_exit, true) 60 Process.link(starter) 61 62 state = %__MODULE__{ 63 types: Postgrex.Types.new(module), 64 connections: MapSet.new([starter]), 65 waiting: :queue.new() 66 } 67 68 {:ok, state} 69 end 70 71 def handle_call(:fetch, from, %{lock: nil} = state) do 72 lock(state, from) 73 end 74 75 def handle_call(:fetch, from, %{lock: ref} = state) when is_reference(ref) do 76 wait(state, from) 77 end 78 79 def handle_call({:update, ref, type_infos}, from, %{lock: ref} = state) 80 when is_reference(ref) do 81 associate(state, type_infos, from) 82 end 83 84 def handle_cast({:done, ref}, %{lock: ref} = state) when is_reference(ref) do 85 Process.demonitor(ref, [:flush]) 86 next(state) 87 end 88 89 def handle_info({:DOWN, ref, _, _, _}, %{lock: ref} = state) 90 when is_reference(ref) do 91 next(state) 92 end 93 94 def handle_info({:DOWN, ref, _, _, _}, state) do 95 down(state, ref) 96 end 97 98 def handle_info({:EXIT, pid, _}, state) do 99 exit(state, pid) 100 end 101 102 def handle_info(:timeout, state) do 103 {:stop, :normal, state} 104 end 105 106 ## Helpers 107 108 defp lock(%{connections: connections, types: types} = state, {pid, _}) do 109 Process.link(pid) 110 mref = Process.monitor(pid) 111 state = %{state | lock: mref, connections: MapSet.put(connections, pid)} 112 {:reply, {:lock, mref, types}, state} 113 end 114 115 defp wait(state, {pid, _} = from) do 116 %{connections: connections, waiting: waiting} = state 117 Process.link(pid) 118 mref = Process.monitor(pid) 119 120 state = %{ 121 state 122 | connections: MapSet.put(connections, pid), 123 waiting: :queue.in({mref, from}, waiting) 124 } 125 126 {:noreply, state} 127 end 128 129 defp associate(%{types: types, lock: ref} = state, type_infos, from) do 130 Postgrex.Types.associate_type_infos(type_infos, types) 131 Process.demonitor(ref, [:flush]) 132 GenServer.reply(from, :go) 133 next(state) 134 end 135 136 defp next(%{types: types, waiting: waiting} = state) do 137 case :queue.out(waiting) do 138 {{:value, {mref, from}}, waiting} -> 139 GenServer.reply(from, {:lock, mref, types}) 140 {:noreply, %{state | lock: mref, waiting: waiting}} 141 142 {:empty, waiting} -> 143 check_processes(%{state | lock: nil, waiting: waiting}) 144 end 145 end 146 147 defp down(%{waiting: waiting} = state, ref) do 148 check_processes(%{state | waiting: :queue.filter(fn {mref, _} -> mref != ref end, waiting)}) 149 end 150 151 defp exit(%{connections: connections} = state, pid) do 152 check_processes(%{state | connections: MapSet.delete(connections, pid)}) 153 end 154 155 defp check_processes(%{lock: ref} = state) when is_reference(ref) do 156 {:noreply, state} 157 end 158 159 defp check_processes(%{connections: connections} = state) do 160 case MapSet.size(connections) do 161 0 -> 162 timeout = Application.fetch_env!(:postgrex, :type_server_reap_after) 163 {:noreply, state, timeout} 164 165 _ -> 166 {:noreply, state} 167 end 168 end 169 end