zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

type_server.ex (4382B)


      1 defmodule Postgrex.TypeServer do
      2   @moduledoc false
      3 
      4   use GenServer, restart: :temporary
      5 
      6   defstruct [:types, :connections, :lock, :waiting]
      7 
      8   @timeout 60_000
      9 
     10   @doc """
     11   Starts a type server.
     12   """
     13   @spec start_link({module, pid, keyword}) :: GenServer.on_start()
     14   def start_link({module, starter, opts}) do
     15     GenServer.start_link(__MODULE__, {module, starter}, opts)
     16   end
     17 
     18   @doc """
     19   Fetches a lock for the given type server.
     20 
     21   We attempt to achieve a lock on the type server for updating the entries.
     22   If another process got the lock we wait for it to finish.
     23   """
     24   @spec fetch(pid) ::
     25           {:lock, reference, Postgrex.Types.state()} | :noproc | :error
     26   def fetch(server) do
     27     try do
     28       GenServer.call(server, :fetch, @timeout)
     29     catch
     30       # module timed out, pretend it did not exist.
     31       :exit, {:normal, _} -> :noproc
     32       :exit, {:noproc, _} -> :noproc
     33     end
     34   end
     35 
     36   @doc """
     37   Update the type server using the given reference and configuration.
     38   """
     39   @spec update(pid, reference, [Postgrex.TypeInfo.t()]) :: :ok
     40   def update(server, ref, [_ | _] = type_infos) do
     41     GenServer.call(server, {:update, ref, type_infos}, @timeout)
     42   end
     43 
     44   def update(server, ref, []) do
     45     done(server, ref)
     46   end
     47 
     48   @doc """
     49   Unlocks the given reference for a given module if no update.
     50   """
     51   @spec done(pid, reference) :: :ok
     52   def done(server, ref) do
     53     GenServer.cast(server, {:done, ref})
     54   end
     55 
     56   ## Callbacks
     57 
     58   def init({module, starter}) do
     59     _ = Process.flag(:trap_exit, true)
     60     Process.link(starter)
     61 
     62     state = %__MODULE__{
     63       types: Postgrex.Types.new(module),
     64       connections: MapSet.new([starter]),
     65       waiting: :queue.new()
     66     }
     67 
     68     {:ok, state}
     69   end
     70 
     71   def handle_call(:fetch, from, %{lock: nil} = state) do
     72     lock(state, from)
     73   end
     74 
     75   def handle_call(:fetch, from, %{lock: ref} = state) when is_reference(ref) do
     76     wait(state, from)
     77   end
     78 
     79   def handle_call({:update, ref, type_infos}, from, %{lock: ref} = state)
     80       when is_reference(ref) do
     81     associate(state, type_infos, from)
     82   end
     83 
     84   def handle_cast({:done, ref}, %{lock: ref} = state) when is_reference(ref) do
     85     Process.demonitor(ref, [:flush])
     86     next(state)
     87   end
     88 
     89   def handle_info({:DOWN, ref, _, _, _}, %{lock: ref} = state)
     90       when is_reference(ref) do
     91     next(state)
     92   end
     93 
     94   def handle_info({:DOWN, ref, _, _, _}, state) do
     95     down(state, ref)
     96   end
     97 
     98   def handle_info({:EXIT, pid, _}, state) do
     99     exit(state, pid)
    100   end
    101 
    102   def handle_info(:timeout, state) do
    103     {:stop, :normal, state}
    104   end
    105 
    106   ## Helpers
    107 
    108   defp lock(%{connections: connections, types: types} = state, {pid, _}) do
    109     Process.link(pid)
    110     mref = Process.monitor(pid)
    111     state = %{state | lock: mref, connections: MapSet.put(connections, pid)}
    112     {:reply, {:lock, mref, types}, state}
    113   end
    114 
    115   defp wait(state, {pid, _} = from) do
    116     %{connections: connections, waiting: waiting} = state
    117     Process.link(pid)
    118     mref = Process.monitor(pid)
    119 
    120     state = %{
    121       state
    122       | connections: MapSet.put(connections, pid),
    123         waiting: :queue.in({mref, from}, waiting)
    124     }
    125 
    126     {:noreply, state}
    127   end
    128 
    129   defp associate(%{types: types, lock: ref} = state, type_infos, from) do
    130     Postgrex.Types.associate_type_infos(type_infos, types)
    131     Process.demonitor(ref, [:flush])
    132     GenServer.reply(from, :go)
    133     next(state)
    134   end
    135 
    136   defp next(%{types: types, waiting: waiting} = state) do
    137     case :queue.out(waiting) do
    138       {{:value, {mref, from}}, waiting} ->
    139         GenServer.reply(from, {:lock, mref, types})
    140         {:noreply, %{state | lock: mref, waiting: waiting}}
    141 
    142       {:empty, waiting} ->
    143         check_processes(%{state | lock: nil, waiting: waiting})
    144     end
    145   end
    146 
    147   defp down(%{waiting: waiting} = state, ref) do
    148     check_processes(%{state | waiting: :queue.filter(fn {mref, _} -> mref != ref end, waiting)})
    149   end
    150 
    151   defp exit(%{connections: connections} = state, pid) do
    152     check_processes(%{state | connections: MapSet.delete(connections, pid)})
    153   end
    154 
    155   defp check_processes(%{lock: ref} = state) when is_reference(ref) do
    156     {:noreply, state}
    157   end
    158 
    159   defp check_processes(%{connections: connections} = state) do
    160     case MapSet.size(connections) do
    161       0 ->
    162         timeout = Application.fetch_env!(:postgrex, :type_server_reap_after)
    163         {:noreply, state, timeout}
    164 
    165       _ ->
    166         {:noreply, state}
    167     end
    168   end
    169 end