zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

drainer.ex (3254B)


      1 defmodule Plug.Cowboy.Drainer do
      2   @moduledoc """
      3   Process to drain cowboy connections at shutdown.
      4 
      5   When starting `Plug.Cowboy` in a supervision tree, it will create a listener that receives
      6   requests and creates a connection process to handle that request. During shutdown, a
      7   `Plug.Cowboy` process will immediately exit, closing the listener and any open connections
      8   that are still being served. However, in most cases, it is desirable to allow connections
      9   to complete before shutting down.
     10 
     11   This module provides a process that during shutdown will close listeners and wait
     12   for connections to complete. It should be placed after other supervised processes that
     13   handle cowboy connections.
     14 
     15   ## Options
     16 
     17   The following options can be given to the child spec:
     18 
     19     * `:refs` - A list of refs to drain. `:all` is also supported and will drain all cowboy
     20       listeners, including those started by means other than `Plug.Cowboy`.
     21 
     22     * `:id` - The ID for the process.
     23       Defaults to `Plug.Cowboy.Drainer`.
     24 
     25     * `:shutdown` - How long to wait for connections to drain.
     26       Defaults to 5000ms.
     27 
     28     * `:drain_check_interval` - How frequently to check if a listener's
     29       connections have been drained.
     30       Defaults to 1000ms.
     31 
     32   ## Examples
     33 
     34       # In your application
     35       def start(_type, _args) do
     36         children = [
     37           {Plug.Cowboy, scheme: :http, plug: MyApp, options: [port: 4040]},
     38           {Plug.Cowboy, scheme: :https, plug: MyApp, options: [port: 4041]},
     39           {Plug.Cowboy.Drainer, refs: [MyApp.HTTP, MyApp.HTTPS]}
     40         ]
     41 
     42         opts = [strategy: :one_for_one, name: MyApp.Supervisor]
     43         Supervisor.start_link(children, opts)
     44       end
     45   """
     46   use GenServer
     47 
     48   @doc false
     49   @spec child_spec(opts :: Keyword.t()) :: Supervisor.child_spec()
     50   def child_spec(opts) when is_list(opts) do
     51     {spec_opts, opts} = Keyword.split(opts, [:id, :shutdown])
     52 
     53     Supervisor.child_spec(
     54       %{
     55         id: __MODULE__,
     56         start: {__MODULE__, :start_link, [opts]},
     57         type: :worker
     58       },
     59       spec_opts
     60     )
     61   end
     62 
     63   @doc false
     64   def start_link(opts) do
     65     opts
     66     |> Keyword.fetch!(:refs)
     67     |> validate_refs!()
     68 
     69     GenServer.start_link(__MODULE__, opts)
     70   end
     71 
     72   @doc false
     73   @impl true
     74   def init(opts) do
     75     Process.flag(:trap_exit, true)
     76     {:ok, opts}
     77   end
     78 
     79   @doc false
     80   @impl true
     81   def terminate(_reason, opts) do
     82     opts
     83     |> Keyword.fetch!(:refs)
     84     |> drain(Keyword.get(opts, :drain_check_interval, 1_000))
     85   end
     86 
     87   defp drain(:all, drain_check_interval) do
     88     :ranch.info()
     89     |> Enum.map(&elem(&1, 0))
     90     |> drain(drain_check_interval)
     91   end
     92 
     93   defp drain(refs, drain_check_interval) do
     94     refs
     95     |> Enum.filter(&suspend_listener/1)
     96     |> Enum.each(&wait_for_connections(&1, drain_check_interval))
     97   end
     98 
     99   defp suspend_listener(ref) do
    100     :ranch.suspend_listener(ref) == :ok
    101   end
    102 
    103   defp wait_for_connections(ref, drain_check_interval) do
    104     :ranch.wait_for_connections(ref, :==, 0, drain_check_interval)
    105   end
    106 
    107   defp validate_refs!(:all), do: :ok
    108   defp validate_refs!(refs) when is_list(refs), do: :ok
    109 
    110   defp validate_refs!(refs) do
    111     raise ArgumentError,
    112           ":refs should be :all or a list of references, got: #{inspect(refs)}"
    113   end
    114 end