drainer.ex (3254B)
1 defmodule Plug.Cowboy.Drainer do 2 @moduledoc """ 3 Process to drain cowboy connections at shutdown. 4 5 When starting `Plug.Cowboy` in a supervision tree, it will create a listener that receives 6 requests and creates a connection process to handle that request. During shutdown, a 7 `Plug.Cowboy` process will immediately exit, closing the listener and any open connections 8 that are still being served. However, in most cases, it is desirable to allow connections 9 to complete before shutting down. 10 11 This module provides a process that during shutdown will close listeners and wait 12 for connections to complete. It should be placed after other supervised processes that 13 handle cowboy connections. 14 15 ## Options 16 17 The following options can be given to the child spec: 18 19 * `:refs` - A list of refs to drain. `:all` is also supported and will drain all cowboy 20 listeners, including those started by means other than `Plug.Cowboy`. 21 22 * `:id` - The ID for the process. 23 Defaults to `Plug.Cowboy.Drainer`. 24 25 * `:shutdown` - How long to wait for connections to drain. 26 Defaults to 5000ms. 27 28 * `:drain_check_interval` - How frequently to check if a listener's 29 connections have been drained. 30 Defaults to 1000ms. 31 32 ## Examples 33 34 # In your application 35 def start(_type, _args) do 36 children = [ 37 {Plug.Cowboy, scheme: :http, plug: MyApp, options: [port: 4040]}, 38 {Plug.Cowboy, scheme: :https, plug: MyApp, options: [port: 4041]}, 39 {Plug.Cowboy.Drainer, refs: [MyApp.HTTP, MyApp.HTTPS]} 40 ] 41 42 opts = [strategy: :one_for_one, name: MyApp.Supervisor] 43 Supervisor.start_link(children, opts) 44 end 45 """ 46 use GenServer 47 48 @doc false 49 @spec child_spec(opts :: Keyword.t()) :: Supervisor.child_spec() 50 def child_spec(opts) when is_list(opts) do 51 {spec_opts, opts} = Keyword.split(opts, [:id, :shutdown]) 52 53 Supervisor.child_spec( 54 %{ 55 id: __MODULE__, 56 start: {__MODULE__, :start_link, [opts]}, 57 type: :worker 58 }, 59 spec_opts 60 ) 61 end 62 63 @doc false 64 def start_link(opts) do 65 opts 66 |> Keyword.fetch!(:refs) 67 |> validate_refs!() 68 69 GenServer.start_link(__MODULE__, opts) 70 end 71 72 @doc false 73 @impl true 74 def init(opts) do 75 Process.flag(:trap_exit, true) 76 {:ok, opts} 77 end 78 79 @doc false 80 @impl true 81 def terminate(_reason, opts) do 82 opts 83 |> Keyword.fetch!(:refs) 84 |> drain(Keyword.get(opts, :drain_check_interval, 1_000)) 85 end 86 87 defp drain(:all, drain_check_interval) do 88 :ranch.info() 89 |> Enum.map(&elem(&1, 0)) 90 |> drain(drain_check_interval) 91 end 92 93 defp drain(refs, drain_check_interval) do 94 refs 95 |> Enum.filter(&suspend_listener/1) 96 |> Enum.each(&wait_for_connections(&1, drain_check_interval)) 97 end 98 99 defp suspend_listener(ref) do 100 :ranch.suspend_listener(ref) == :ok 101 end 102 103 defp wait_for_connections(ref, drain_check_interval) do 104 :ranch.wait_for_connections(ref, :==, 0, drain_check_interval) 105 end 106 107 defp validate_refs!(:all), do: :ok 108 defp validate_refs!(refs) when is_list(refs), do: :ok 109 110 defp validate_refs!(refs) do 111 raise ArgumentError, 112 ":refs should be :all or a list of references, got: #{inspect(refs)}" 113 end 114 end