zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

http2.ex (78049B)


      1 defmodule Mint.HTTP2 do
      2   @moduledoc """
      3   Processless HTTP client with support for HTTP/2.
      4 
      5   This module provides a data structure that represents an HTTP/2 connection to
      6   a given server. The connection is represented as an opaque struct `%Mint.HTTP2{}`.
      7   The connection is a data structure and is not backed by a process, and all the
      8   connection handling happens in the process that creates the struct.
      9 
     10   This module and data structure work exactly like the ones described in the `Mint.HTTP`
     11   module, with the exception that `Mint.HTTP2` specifically deals with HTTP/2 while
     12   `Mint.HTTP` deals seamlessly with HTTP/1.1 and HTTP/2. For more information on
     13   how to use the data structure and client architecture, see `Mint.HTTP`.
     14 
     15   ## HTTP/2 streams and requests
     16 
     17   HTTP/2 introduces the concept of **streams**. A stream is an isolated conversation
     18   between the client and the server. Each stream is unique and identified by a unique
     19   **stream ID**, which means that there's no order when data comes on different streams
     20   since they can be identified uniquely. A stream closely corresponds to a request, so
     21   in this documentation and client we will mostly refer to streams as "requests".
     22   We mentioned data on streams can come in arbitrary order, and streams are requests,
     23   so the practical effect of this is that performing request A and then request B
     24   does not mean that the response to request A will come before the response to request B.
     25   This is why we identify each request with a unique reference returned by `request/5`.
     26   See `request/5` for more information.
     27 
     28   ## Closed connection
     29 
     30   In HTTP/2, the connection can either be open, closed, or only closed for writing.
     31   When a connection is closed for writing, the client cannot send requests or stream
     32   body chunks, but it can still read data that the server might be sending. When the
     33   connection gets closed on the writing side, a `:server_closed_connection` error is
     34   returned. `{:error, request_ref, error}` is returned for requests that haven't been
     35   processed by the server, with the reason of `error` being `:unprocessed`.
     36   These requests are safe to retry.
     37 
     38   ## HTTP/2 settings
     39 
     40   HTTP/2 supports settings negotiation between servers and clients. The server advertises
     41   its settings to the client and the client advertises its settings to the server. A peer
     42   (server or client) has to acknowledge the settings advertised by the other peer before
     43   those settings come into action (that's why it's called a negotiation).
     44 
     45   A first settings negotiation happens right when the connection starts.
     46   Servers and clients can renegotiate settings at any time during the life of the
     47   connection.
     48 
     49   Mint users don't need to care about settings acknowledgements directly since they're
     50   handled transparently by `stream/2`.
     51 
     52   To retrieve the server settings, you can use `get_server_setting/2`. Doing so is often
     53   useful to be able to tune your requests based on the server settings.
     54 
     55   To communicate client settings to the server, use `put_settings/2` or pass them when
     56   starting up a connection with `connect/4`. Note that the server needs to acknowledge
     57   the settings sent through `put_setting/2` before those settings come into effect. The
     58   server ack is processed transparently by `stream/2`, but this means that if you change
     59   a setting through `put_settings/2` and try to retrieve the value of that setting right
     60   after with `get_client_setting/2`, you'll likely get the old value of that setting. Once
     61   the server acknowledges the new settings, the updated value will be returned by
     62   `get_client_setting/2`.
     63 
     64   ## Server push
     65 
     66   HTTP/2 supports [server push](https://en.wikipedia.org/wiki/HTTP/2_Server_Push), which
     67   is a way for a server to send a response to a client without the client needing to make
     68   the corresponding request. The server sends a `:push_promise` response to a normal request:
     69   this creates a new request reference. Then, the server sends normal responses for the newly
     70   created request reference.
     71 
     72   Let's see an example. We will ask the server for `"/index.html"` and the server will
     73   send us a push promise for `"/style.css"`.
     74 
     75       {:ok, conn} = Mint.HTTP2.connect(:https, "example.com", 443)
     76       {:ok, conn, request_ref} = Mint.HTTP2.request(conn, "GET", "/index.html", _headers = [], _body = "")
     77 
     78       next_message =
     79         receive do
     80           msg -> msg
     81         end
     82 
     83       {:ok, conn, responses} = Mint.HTTP2.stream(conn, next_message)
     84 
     85       [
     86         {:push_promise, ^request_ref, promised_request_ref, promised_headers},
     87         {:status, ^request_ref, 200},
     88         {:headers, ^request_ref, []},
     89         {:data, ^request_ref, "<html>..."},
     90         {:done, ^request_ref}
     91       ] = responses
     92 
     93       promised_headers
     94       #=> [{":method", "GET"}, {":path", "/style.css"}]
     95 
     96   As you can see in the example above, when the server sends a push promise then a
     97   `:push_promise` response is returned as a response to a request. The `:push_promise`
     98   response contains a `promised_request_ref` and some `promised_headers`. The
     99   `promised_request_ref` is the new request ref that pushed responses will be tagged with.
    100   `promised_headers` are headers that tell the client *what request* the promised response
    101   will respond to. The idea is that the server tells the client a request the client will
    102   want to make and then preemptively sends a response for that request. Promised headers
    103   will always include `:method`, `:path`, and `:authority`.
    104 
    105       next_message =
    106         receive do
    107           msg -> msg
    108         end
    109 
    110       {:ok, conn, responses} = Mint.HTTP2.stream(conn, next_message)
    111 
    112       [
    113         {:status, ^promised_request_ref, 200},
    114         {:headers, ^promised_request_ref, []},
    115         {:data, ^promised_request_ref, "body { ... }"},
    116         {:done, ^promised_request_ref}
    117       ]
    118 
    119   The response to a promised request is like a response to any normal request.
    120 
    121   ### Disabling server pushes
    122 
    123   HTTP/2 exposes a boolean setting for enabling or disabling server pushes with `:enable_push`.
    124   You can pass this option when connecting or in `put_settings/2`. By default server push
    125   is enabled.
    126   """
    127 
    128   import Mint.Core.Util
    129   import Mint.HTTP2.Frame, except: [encode: 1, decode_next: 1]
    130 
    131   alias Mint.{HTTPError, TransportError}
    132   alias Mint.Types
    133   alias Mint.Core.Util
    134   alias Mint.HTTP2.Frame
    135 
    136   require Logger
    137   require Integer
    138 
    139   @behaviour Mint.Core.Conn
    140 
    141   ## Constants
    142 
    143   @connection_preface "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
    144   @transport_opts [alpn_advertised_protocols: ["h2"]]
    145 
    146   @default_window_size 65_535
    147   @max_window_size 2_147_483_647
    148 
    149   @default_max_frame_size 16_384
    150   @valid_max_frame_size_range @default_max_frame_size..16_777_215
    151 
    152   @user_agent "mint/" <> Mix.Project.config()[:version]
    153 
    154   # HTTP/2 connection struct.
    155   defstruct [
    156     # Transport things.
    157     :transport,
    158     :socket,
    159     :mode,
    160 
    161     # Host things.
    162     :hostname,
    163     :port,
    164     :scheme,
    165 
    166     # Connection state (open, closed, and so on).
    167     :state,
    168 
    169     # Fields of the connection.
    170     buffer: "",
    171     window_size: @default_window_size,
    172     encode_table: HPAX.new(4096),
    173     decode_table: HPAX.new(4096),
    174 
    175     # Queue for sent PING frames.
    176     ping_queue: :queue.new(),
    177 
    178     # Queue for sent SETTINGS frames.
    179     client_settings_queue: :queue.new(),
    180 
    181     # Stream-set-related things.
    182     next_stream_id: 3,
    183     streams: %{},
    184     open_client_stream_count: 0,
    185     open_server_stream_count: 0,
    186     ref_to_stream_id: %{},
    187 
    188     # Settings that the server communicates to the client.
    189     server_settings: %{
    190       enable_push: true,
    191       max_concurrent_streams: 100,
    192       initial_window_size: @default_window_size,
    193       max_frame_size: @default_max_frame_size,
    194       max_header_list_size: :infinity,
    195       # Only supported by the server: https://www.rfc-editor.org/rfc/rfc8441.html#section-3
    196       enable_connect_protocol: false
    197     },
    198 
    199     # Settings that the client communicates to the server.
    200     client_settings: %{
    201       max_concurrent_streams: 100,
    202       max_frame_size: @default_max_frame_size,
    203       enable_push: true
    204     },
    205 
    206     # Headers being processed (when headers are split into multiple frames with CONTINUATIONS, all
    207     # the continuation frames must come one right after the other).
    208     headers_being_processed: nil,
    209 
    210     # Stores the headers returned by the proxy in the `CONNECT` method
    211     proxy_headers: [],
    212 
    213     # Private store.
    214     private: %{}
    215   ]
    216 
    217   ## Types
    218 
    219   @typedoc """
    220   HTTP/2 setting with its value.
    221 
    222   This type represents both server settings as well as client settings. To retrieve
    223   server settings use `get_server_setting/2` and to retrieve client settings use
    224   `get_client_setting/2`. To send client settings to the server, see `put_settings/2`.
    225 
    226   The supported settings are the following:
    227 
    228     * `:header_table_size` - (integer) corresponds to `SETTINGS_HEADER_TABLE_SIZE`.
    229 
    230     * `:enable_push` - (boolean) corresponds to `SETTINGS_ENABLE_PUSH`. Sets whether
    231       push promises are supported. If you don't want to support push promises,
    232       use `put_settings/2` to tell the server that your client doesn't want push promises.
    233 
    234     * `:max_concurrent_streams` - (integer) corresponds to `SETTINGS_MAX_CONCURRENT_STREAMS`.
    235       Tells what is the maximum number of streams that the peer sending this (client or server)
    236       supports. As mentioned in the module documentation, HTTP/2 streams are equivalent to
    237       requests, so knowing the maximum number of streams that the server supports can be useful
    238       to know how many concurrent requests can be open at any time. Use `get_server_setting/2`
    239       to find out how many concurrent streams the server supports.
    240 
    241     * `:initial_window_size` - (integer smaller than `#{inspect(@max_window_size)}`)
    242       corresponds to `SETTINGS_INITIAL_WINDOW_SIZE`. Tells what is the value of
    243       the initial HTTP/2 window size for the peer that sends this setting.
    244 
    245     * `:max_frame_size` - (integer in the range `#{inspect(@valid_max_frame_size_range)}`)
    246       corresponds to `SETTINGS_MAX_FRAME_SIZE`. Tells what is the maximum size of an HTTP/2
    247       frame for the peer that sends this setting.
    248 
    249     * `:max_header_list_size` - (integer) corresponds to `SETTINGS_MAX_HEADER_LIST_SIZE`.
    250 
    251     * `:enable_connect_protocol` - (boolean) corresponds to `SETTINGS_ENABLE_CONNECT_PROTOCOL`.
    252       Sets whether the client may invoke the extended connect protocol which is used to
    253       bootstrap WebSocket connections.
    254 
    255   """
    256   @type setting() ::
    257           {:enable_push, boolean()}
    258           | {:max_concurrent_streams, pos_integer()}
    259           | {:initial_window_size, 1..2_147_483_647}
    260           | {:max_frame_size, 16_384..16_777_215}
    261           | {:max_header_list_size, :infinity | pos_integer()}
    262           | {:enable_connect_protocol, boolean()}
    263 
    264   @typedoc """
    265   HTTP/2 settings.
    266 
    267   See `t:setting/0`.
    268   """
    269   @type settings() :: [setting()]
    270 
    271   @typedoc """
    272   An HTTP/2-specific error reason.
    273 
    274   The values can be:
    275 
    276     * `:closed` - when you try to make a request or stream a body chunk but the connection
    277       is closed.
    278 
    279     * `:closed_for_writing` - when you try to make a request or stream a body chunk but
    280       the connection is closed for writing. This means you cannot issue any more requests.
    281       See the "Closed connection" section in the module documentation for more information.
    282 
    283     * `:too_many_concurrent_requests` - when the maximum number of concurrent requests
    284       allowed by the server is reached. To find out what this limit is, use `get_setting/2`
    285       with the `:max_concurrent_streams` setting name.
    286 
    287     * `{:max_header_list_size_exceeded, size, max_size}` - when the maximum size of
    288       the header list is reached. `size` is the actual value of the header list size,
    289       `max_size` is the maximum value allowed. See `get_setting/2` to retrieve the
    290       value of the max size.
    291 
    292     * `{:exceeds_window_size, what, window_size}` - when the data you're trying to send
    293       exceeds the window size of the connection (if `what` is `:connection`) or of a request
    294       (if `what` is `:request`). `window_size` is the allowed window size. See
    295       `get_window_size/2`.
    296 
    297     * `{:stream_not_found, stream_id}` - when the given request is not found.
    298 
    299     * `:unknown_request_to_stream` - when you're trying to stream data on an unknown
    300       request.
    301 
    302     * `:request_is_not_streaming` - when you try to send data (with `stream_request_body/3`)
    303       on a request that is not open for streaming.
    304 
    305     * `:unprocessed` - when a request was closed because it was not processed by the server.
    306       When this error is returned, it means that the server hasn't processed the request at all,
    307       so it's safe to retry the given request on a different or new connection.
    308 
    309     * `{:server_closed_request, error_code}` - when the server closes the request.
    310       `error_code` is the reason why the request was closed.
    311 
    312     * `{:server_closed_connection, reason, debug_data}` - when the server closes the connection
    313       gracefully or because of an error. In HTTP/2, this corresponds to a `GOAWAY` frame.
    314       `error` is the reason why the connection was closed. `debug_data` is additional debug data.
    315 
    316     * `{:frame_size_error, frame}` - when there's an error with the size of a frame.
    317       `frame` is the frame type, such as `:settings` or `:window_update`.
    318 
    319     * `{:protocol_error, debug_data}` - when there's a protocol error.
    320       `debug_data` is a string that explains the nature of the error.
    321 
    322     * `{:compression_error, debug_data}` - when there's a header compression error.
    323       `debug_data` is a string that explains the nature of the error.
    324 
    325     * `{:flow_control_error, debug_data}` - when there's a flow control error.
    326       `debug_data` is a string that explains the nature of the error.
    327 
    328   """
    329   @type error_reason() :: term()
    330 
    331   @opaque t() :: %Mint.HTTP2{}
    332 
    333   ## Public interface
    334 
    335   @doc """
    336   Same as `Mint.HTTP.connect/4`, but forces a HTTP/2 connection.
    337   """
    338   @spec connect(Types.scheme(), Types.address(), :inet.port_number(), keyword()) ::
    339           {:ok, t()} | {:error, Types.error()}
    340   def connect(scheme, address, port, opts \\ []) do
    341     hostname = Mint.Core.Util.hostname(opts, address)
    342 
    343     transport_opts =
    344       opts
    345       |> Keyword.get(:transport_opts, [])
    346       |> Keyword.merge(@transport_opts)
    347       |> Keyword.put(:hostname, hostname)
    348 
    349     case negotiate(address, port, scheme, transport_opts) do
    350       {:ok, socket} ->
    351         initiate(scheme, socket, hostname, port, opts)
    352 
    353       {:error, reason} ->
    354         {:error, reason}
    355     end
    356   end
    357 
    358   @doc false
    359   @spec upgrade(
    360           Types.scheme(),
    361           Mint.Types.socket(),
    362           Types.scheme(),
    363           String.t(),
    364           :inet.port_number(),
    365           keyword()
    366         ) :: {:ok, t()} | {:error, Types.error()}
    367   def upgrade(old_scheme, socket, new_scheme, hostname, port, opts) do
    368     transport = scheme_to_transport(new_scheme)
    369 
    370     transport_opts =
    371       opts
    372       |> Keyword.get(:transport_opts, [])
    373       |> Keyword.merge(@transport_opts)
    374 
    375     with {:ok, socket} <- transport.upgrade(socket, old_scheme, hostname, port, transport_opts) do
    376       initiate(new_scheme, socket, hostname, port, opts)
    377     end
    378   end
    379 
    380   @doc """
    381   See `Mint.HTTP.close/1`.
    382   """
    383   @impl true
    384   @spec close(t()) :: {:ok, t()}
    385   def close(conn)
    386 
    387   def close(%__MODULE__{state: :open} = conn) do
    388     send_connection_error!(conn, :no_error, "connection peacefully closed by client")
    389   catch
    390     {:mint, conn, %HTTPError{reason: {:no_error, _}}} ->
    391       {:ok, conn}
    392 
    393     # We could have an error sending the GOAWAY frame, but we want to ignore that since
    394     # we're closing the connection anyways.
    395     {:mint, conn, %TransportError{}} ->
    396       conn = put_in(conn.state, :closed)
    397       {:ok, conn}
    398   end
    399 
    400   def close(%__MODULE__{state: {:goaway, _error_code, _debug_data}} = conn) do
    401     _ = conn.transport.close(conn.socket)
    402     {:ok, put_in(conn.state, :closed)}
    403   end
    404 
    405   def close(%__MODULE__{state: :closed} = conn) do
    406     {:ok, conn}
    407   end
    408 
    409   @doc """
    410   See `Mint.HTTP.open?/1`.
    411   """
    412   @impl true
    413   @spec open?(t(), :read | :write | :read_write) :: boolean()
    414   def open?(%Mint.HTTP2{state: state} = _conn, type \\ :read_write)
    415       when type in [:read, :write, :read_write] do
    416     case state do
    417       :open -> true
    418       {:goaway, _error_code, _debug_data} -> type == :read
    419       :closed -> false
    420     end
    421   end
    422 
    423   @doc """
    424   See `Mint.HTTP.request/5`.
    425 
    426   In HTTP/2, opening a request means opening a new HTTP/2 stream (see the
    427   module documentation). This means that a request could fail because the
    428   maximum number of concurrent streams allowed by the server has been reached.
    429   In that case, the error reason `:too_many_concurrent_requests` is returned.
    430   If you want to avoid incurring in this error, you can retrieve the value of
    431   the maximum number of concurrent streams supported by the server through
    432   `get_server_setting/2` (passing in the `:max_concurrent_streams` setting name).
    433 
    434   ## Header list size
    435 
    436   In HTTP/2, the server can optionally specify a maximum header list size that
    437   the client needs to respect when sending headers. The header list size is calculated
    438   by summing the length (in bytes) of each header name plus value, plus 32 bytes for
    439   each header. Note that pseudo-headers (like `:path` or `:method`) count towards
    440   this size. If the size is exceeded, an error is returned. To check what the size
    441   is, use `get_server_setting/2`.
    442 
    443   ## Request body size
    444 
    445   If the request body size will exceed the window size of the HTTP/2 stream created by the
    446   request or the window size of the connection Mint will return a `:exceeds_window_size`
    447   error.
    448 
    449   To ensure you do not exceed the window size it is recommended to stream the request
    450   body by initially passing `:stream` as the body and sending the body in chunks using
    451   `stream_request_body/3` and using `get_window_size/2` to get the window size of the
    452   request and connection.
    453   """
    454   @impl true
    455   @spec request(
    456           t(),
    457           method :: String.t(),
    458           path :: String.t(),
    459           Types.headers(),
    460           body :: iodata() | nil | :stream
    461         ) ::
    462           {:ok, t(), Types.request_ref()}
    463           | {:error, t(), Types.error()}
    464   def request(conn, method, path, headers, body)
    465 
    466   def request(%Mint.HTTP2{state: :closed} = conn, _method, _path, _headers, _body) do
    467     {:error, conn, wrap_error(:closed)}
    468   end
    469 
    470   def request(
    471         %Mint.HTTP2{state: {:goaway, _error_code, _debug_data}} = conn,
    472         _method,
    473         _path,
    474         _headers,
    475         _body
    476       ) do
    477     {:error, conn, wrap_error(:closed_for_writing)}
    478   end
    479 
    480   def request(%Mint.HTTP2{} = conn, method, path, headers, body)
    481       when is_binary(method) and is_binary(path) and is_list(headers) do
    482     headers =
    483       headers
    484       |> downcase_header_names()
    485       |> add_pseudo_headers(conn, method, path)
    486       |> add_default_headers(body)
    487       |> sort_pseudo_headers_to_front()
    488 
    489     {conn, stream_id, ref} = open_stream(conn)
    490     {conn, payload} = encode_request_payload(conn, stream_id, headers, body)
    491     conn = send!(conn, payload)
    492     {:ok, conn, ref}
    493   catch
    494     :throw, {:mint, _conn, reason} ->
    495       # The stream is invalid and "_conn" may be tracking it, so we return the original connection instead.
    496       {:error, conn, reason}
    497   end
    498 
    499   @doc """
    500   See `Mint.HTTP.stream_request_body/3`.
    501   """
    502   @impl true
    503   @spec stream_request_body(
    504           t(),
    505           Types.request_ref(),
    506           iodata() | :eof | {:eof, trailing_headers :: Types.headers()}
    507         ) :: {:ok, t()} | {:error, t(), Types.error()}
    508   def stream_request_body(conn, request_ref, chunk)
    509 
    510   def stream_request_body(%Mint.HTTP2{state: :closed} = conn, _request_ref, _chunk) do
    511     {:error, conn, wrap_error(:closed)}
    512   end
    513 
    514   def stream_request_body(
    515         %Mint.HTTP2{state: {:goaway, _error_code, _debug_data}} = conn,
    516         _request_ref,
    517         _chunk
    518       ) do
    519     {:error, conn, wrap_error(:closed_for_writing)}
    520   end
    521 
    522   def stream_request_body(%Mint.HTTP2{} = conn, request_ref, chunk)
    523       when is_reference(request_ref) do
    524     case Map.fetch(conn.ref_to_stream_id, request_ref) do
    525       {:ok, stream_id} ->
    526         {conn, payload} = encode_stream_body_request_payload(conn, stream_id, chunk)
    527         conn = send!(conn, payload)
    528         {:ok, conn}
    529 
    530       :error ->
    531         {:error, conn, wrap_error(:unknown_request_to_stream)}
    532     end
    533   catch
    534     :throw, {:mint, _conn, reason} ->
    535       # The stream is invalid and "_conn" may be tracking it, so we return the original connection instead.
    536       {:error, conn, reason}
    537   end
    538 
    539   @doc """
    540   Pings the server.
    541 
    542   This function is specific to HTTP/2 connections. It sends a **ping** request to
    543   the server `conn` is connected to. A `{:ok, conn, request_ref}` tuple is returned,
    544   where `conn` is the updated connection and `request_ref` is a unique reference that
    545   identifies this ping request. The response to a ping request is returned by `stream/2`
    546   as a `{:pong, request_ref}` tuple. If there's an error, this function returns
    547   `{:error, conn, reason}` where `conn` is the updated connection and `reason` is the
    548   error reason.
    549 
    550   `payload` must be an 8-byte binary with arbitrary content. When the server responds to
    551   a ping request, it will use that same payload. By default, the payload is an 8-byte
    552   binary with all bits set to `0`.
    553 
    554   Pinging can be used to measure the latency with the server and to ensure the connection
    555   is alive and well.
    556 
    557   ## Examples
    558 
    559       {:ok, conn, ref} = Mint.HTTP2.ping(conn)
    560 
    561   """
    562   @spec ping(t(), <<_::8>>) :: {:ok, t(), Types.request_ref()} | {:error, t(), Types.error()}
    563   def ping(%Mint.HTTP2{} = conn, payload \\ :binary.copy(<<0>>, 8))
    564       when byte_size(payload) == 8 do
    565     {conn, ref} = send_ping(conn, payload)
    566     {:ok, conn, ref}
    567   catch
    568     :throw, {:mint, conn, error} -> {:error, conn, error}
    569   end
    570 
    571   @doc """
    572   Communicates the given client settings to the server.
    573 
    574   This function is HTTP/2-specific.
    575 
    576   This function takes a connection and a keyword list of HTTP/2 settings and sends
    577   the values of those settings to the server. The settings won't be effective until
    578   the server acknowledges them, which will be handled transparently by `stream/2`.
    579 
    580   This function returns `{:ok, conn}` when sending the settings to the server is
    581   successful, with `conn` being the updated connection. If there's an error, this
    582   function returns `{:error, conn, reason}` with `conn` being the updated connection
    583   and `reason` being the reason of the error.
    584 
    585   ## Supported settings
    586 
    587   See `t:setting/0` for the supported settings. You can see the meaning
    588   of these settings [in the corresponding section in the HTTP/2
    589   RFC](https://httpwg.org/specs/rfc7540.html#SettingValues).
    590 
    591   See the "HTTP/2 settings" section in the module documentation for more information.
    592 
    593   ## Examples
    594 
    595       {:ok, conn} = Mint.HTTP2.put_settings(conn, max_frame_size: 100)
    596 
    597   """
    598   @spec put_settings(t(), settings()) :: {:ok, t()} | {:error, t(), Types.error()}
    599   def put_settings(%Mint.HTTP2{} = conn, settings) when is_list(settings) do
    600     conn = send_settings(conn, settings)
    601     {:ok, conn}
    602   catch
    603     :throw, {:mint, conn, error} -> {:error, conn, error}
    604   end
    605 
    606   @doc """
    607   Gets the value of the given HTTP/2 server settings.
    608 
    609   This function returns the value of the given HTTP/2 setting that the server
    610   advertised to the client. This function is HTTP/2 specific.
    611   For more information on HTTP/2 settings, see [the related section in
    612   the RFC](https://httpwg.org/specs/rfc7540.html#SettingValues).
    613 
    614   See the "HTTP/2 settings" section in the module documentation for more information.
    615 
    616   ## Supported settings
    617 
    618   The possible settings that can be retrieved are described in `t:setting/0`.
    619   Any other atom passed as `name` will raise an error.
    620 
    621   ## Examples
    622 
    623       Mint.HTTP2.get_server_setting(conn, :max_concurrent_streams)
    624       #=> 500
    625 
    626   """
    627   @spec get_server_setting(t(), atom()) :: term()
    628   def get_server_setting(%Mint.HTTP2{} = conn, name) when is_atom(name) do
    629     get_setting(conn.server_settings, name)
    630   end
    631 
    632   @doc """
    633   Gets the value of the given HTTP/2 client setting.
    634 
    635   This function returns the value of the given HTTP/2 setting that the client
    636   advertised to the server. Client settings can be advertised through `put_settings/2`
    637   or when starting up a connection.
    638 
    639   Client settings have to be acknowledged by the server before coming into effect.
    640 
    641   This function is HTTP/2 specific. For more information on HTTP/2 settings, see
    642   [the related section in the RFC](https://httpwg.org/specs/rfc7540.html#SettingValues).
    643 
    644   See the "HTTP/2 settings" section in the module documentation for more information.
    645 
    646   ## Supported settings
    647 
    648   The possible settings that can be retrieved are described in `t:setting/0`.
    649   Any other atom passed as `name` will raise an error.
    650 
    651   ## Examples
    652 
    653       Mint.HTTP2.get_client_setting(conn, :max_concurrent_streams)
    654       #=> 500
    655 
    656   """
    657   @spec get_client_setting(t(), atom()) :: term()
    658   def get_client_setting(%Mint.HTTP2{} = conn, name) when is_atom(name) do
    659     get_setting(conn.client_settings, name)
    660   end
    661 
    662   defp get_setting(settings, name) do
    663     case Map.fetch(settings, name) do
    664       {:ok, value} -> value
    665       :error -> raise ArgumentError, "unknown HTTP/2 setting: #{inspect(name)}"
    666     end
    667   end
    668 
    669   @doc """
    670   Cancels an in-flight request.
    671 
    672   This function is HTTP/2 specific. It cancels an in-flight request. The server could have
    673   already sent responses for the request you want to cancel: those responses will be parsed
    674   by the connection but not returned to the user. No more responses
    675   to a request will be returned after you call `cancel_request/2` on that request.
    676 
    677   If there's no error in canceling the request, `{:ok, conn}` is returned where `conn` is
    678   the updated connection. If there's an error, `{:error, conn, reason}` is returned where
    679   `conn` is the updated connection and `reason` is the error reason.
    680 
    681   ## Examples
    682 
    683       {:ok, conn, ref} = Mint.HTTP2.request(conn, "GET", "/", _headers = [])
    684       {:ok, conn} = Mint.HTTP2.cancel_request(conn, ref)
    685 
    686   """
    687   @spec cancel_request(t(), Types.request_ref()) :: {:ok, t()} | {:error, t(), Types.error()}
    688   def cancel_request(%Mint.HTTP2{} = conn, request_ref) when is_reference(request_ref) do
    689     case Map.fetch(conn.ref_to_stream_id, request_ref) do
    690       {:ok, stream_id} ->
    691         conn = close_stream!(conn, stream_id, _error_code = :cancel)
    692         {:ok, conn}
    693 
    694       :error ->
    695         {:ok, conn}
    696     end
    697   catch
    698     :throw, {:mint, conn, error} -> {:error, conn, error}
    699   end
    700 
    701   @doc """
    702   Returns the window size of the connection or of a single request.
    703 
    704   This function is HTTP/2 specific. It returns the window size of
    705   either the connection if `connection_or_request` is `:connection` or of a single
    706   request if `connection_or_request` is `{:request, request_ref}`.
    707 
    708   Use this function to check the window size of the connection before sending a
    709   full request. Also use this function to check the window size of both the
    710   connection and of a request if you want to stream body chunks on that request.
    711 
    712   For more information on flow control and window sizes in HTTP/2, see the section
    713   below.
    714 
    715   ## HTTP/2 flow control
    716 
    717   In HTTP/2, flow control is implemented through a
    718   window size. When the client sends data to the server, the window size is decreased
    719   and the server needs to "refill" it on the client side. You don't need to take care of
    720   the refilling of the client window as it happens behind the scenes in `stream/2`.
    721 
    722   A window size is kept for the entire connection and all requests affect this window
    723   size. A window size is also kept per request.
    724 
    725   The only thing that affects the window size is the body of a request, regardless of
    726   if it's a full request sent with `request/5` or body chunks sent through
    727   `stream_request_body/3`. That means that if we make a request with a body that is
    728   five bytes long, like `"hello"`, the window size of the connection and the window size
    729   of that particular request will decrease by five bytes.
    730 
    731   If we use all the window size before the server refills it, functions like
    732   `request/5` will return an error.
    733 
    734   ## Examples
    735 
    736   On the connection:
    737 
    738       HTTP2.get_window_size(conn, :connection)
    739       #=> 65_536
    740 
    741   On a single streamed request:
    742 
    743       {:ok, conn, request_ref} = HTTP2.request(conn, "GET", "/", [], :stream)
    744       HTTP2.get_window_size(conn, {:request, request_ref})
    745       #=> 65_536
    746 
    747       {:ok, conn} = HTTP2.stream_request_body(conn, request_ref, "hello")
    748       HTTP2.get_window_size(conn, {:request, request_ref})
    749       #=> 65_531
    750 
    751   """
    752   @spec get_window_size(t(), :connection | {:request, Types.request_ref()}) :: non_neg_integer()
    753   def get_window_size(conn, connection_or_request)
    754 
    755   def get_window_size(%Mint.HTTP2{} = conn, :connection) do
    756     conn.window_size
    757   end
    758 
    759   def get_window_size(%Mint.HTTP2{} = conn, {:request, request_ref}) do
    760     case Map.fetch(conn.ref_to_stream_id, request_ref) do
    761       {:ok, stream_id} ->
    762         conn.streams[stream_id].window_size
    763 
    764       :error ->
    765         raise ArgumentError,
    766               "request with request reference #{inspect(request_ref)} was not found"
    767     end
    768   end
    769 
    770   @doc """
    771   See `Mint.HTTP.stream/2`.
    772   """
    773   @impl true
    774   @spec stream(t(), term()) ::
    775           {:ok, t(), [Types.response()]}
    776           | {:error, t(), Types.error(), [Types.response()]}
    777           | :unknown
    778   def stream(conn, message)
    779 
    780   def stream(%Mint.HTTP2{socket: socket} = conn, {tag, socket, reason})
    781       when tag in [:tcp_error, :ssl_error] do
    782     error = conn.transport.wrap_error(reason)
    783     {:error, %{conn | state: :closed}, error, _responses = []}
    784   end
    785 
    786   def stream(%Mint.HTTP2{socket: socket} = conn, {tag, socket})
    787       when tag in [:tcp_closed, :ssl_closed] do
    788     handle_closed(conn)
    789   end
    790 
    791   def stream(%Mint.HTTP2{transport: transport, socket: socket} = conn, {tag, socket, data})
    792       when tag in [:tcp, :ssl] do
    793     case maybe_concat_and_handle_new_data(conn, data) do
    794       {:ok, %{mode: mode, state: state} = conn, responses}
    795       when mode == :active and state != :closed ->
    796         case transport.setopts(socket, active: :once) do
    797           :ok -> {:ok, conn, responses}
    798           {:error, reason} -> {:error, put_in(conn.state, :closed), reason, responses}
    799         end
    800 
    801       other ->
    802         other
    803     end
    804   catch
    805     :throw, {:mint, conn, error, responses} -> {:error, conn, error, responses}
    806   end
    807 
    808   def stream(%Mint.HTTP2{}, _message) do
    809     :unknown
    810   end
    811 
    812   @doc """
    813   See `Mint.HTTP.open_request_count/1`.
    814 
    815   In HTTP/2, the number of open requests is the number of requests **opened by the client**
    816   that have not yet received a `:done` response. It's important to note that only
    817   requests opened by the client (with `request/5`) count towards the number of open
    818   requests, as requests opened from the server with server pushes (see the "Server push"
    819   section in the module documentation) are not considered open requests. We do this because
    820   clients might need to know how many open requests there are because the server limits
    821   the number of concurrent requests the client can open. To know how many requests the client
    822   can open, see `get_server_setting/2` with the `:max_concurrent_streams` setting.
    823   """
    824   @impl true
    825   @spec open_request_count(t()) :: non_neg_integer()
    826   def open_request_count(%Mint.HTTP2{} = conn) do
    827     conn.open_client_stream_count
    828   end
    829 
    830   @doc """
    831   See `Mint.HTTP.recv/3`.
    832   """
    833   @impl true
    834   @spec recv(t(), non_neg_integer(), timeout()) ::
    835           {:ok, t(), [Types.response()]}
    836           | {:error, t(), Types.error(), [Types.response()]}
    837   def recv(conn, byte_count, timeout)
    838 
    839   def recv(%__MODULE__{mode: :passive} = conn, byte_count, timeout) do
    840     case conn.transport.recv(conn.socket, byte_count, timeout) do
    841       {:ok, data} ->
    842         maybe_concat_and_handle_new_data(conn, data)
    843 
    844       {:error, %TransportError{reason: :closed}} ->
    845         handle_closed(conn)
    846 
    847       {:error, error} ->
    848         {:error, %{conn | state: :closed}, error, _responses = []}
    849     end
    850   catch
    851     :throw, {:mint, conn, error, responses} -> {:error, conn, error, responses}
    852   end
    853 
    854   def recv(_conn, _byte_count, _timeout) do
    855     raise ArgumentError,
    856           "can't use recv/3 to synchronously receive data when the mode is :active. " <>
    857             "Use Mint.HTTP.set_mode/2 to set the connection to passive mode"
    858   end
    859 
    860   @doc """
    861   See `Mint.HTTP.set_mode/2`.
    862   """
    863   @impl true
    864   @spec set_mode(t(), :active | :passive) :: {:ok, t()} | {:error, Types.error()}
    865   def set_mode(%__MODULE__{} = conn, mode) when mode in [:active, :passive] do
    866     active =
    867       case mode do
    868         :active -> :once
    869         :passive -> false
    870       end
    871 
    872     with :ok <- conn.transport.setopts(conn.socket, active: active) do
    873       {:ok, put_in(conn.mode, mode)}
    874     end
    875   end
    876 
    877   @doc """
    878   See `Mint.HTTP.controlling_process/2`.
    879   """
    880   @impl true
    881   @spec controlling_process(t(), pid()) :: {:ok, t()} | {:error, Types.error()}
    882   def controlling_process(%__MODULE__{} = conn, new_pid) when is_pid(new_pid) do
    883     with :ok <- conn.transport.controlling_process(conn.socket, new_pid) do
    884       {:ok, conn}
    885     end
    886   end
    887 
    888   @doc """
    889   See `Mint.HTTP.put_private/3`.
    890   """
    891   @impl true
    892   @spec put_private(t(), atom(), term()) :: t()
    893   def put_private(%Mint.HTTP2{private: private} = conn, key, value) when is_atom(key) do
    894     %{conn | private: Map.put(private, key, value)}
    895   end
    896 
    897   @doc """
    898   See `Mint.HTTP.get_private/3`.
    899   """
    900   @impl true
    901   @spec get_private(t(), atom(), term()) :: term()
    902   def get_private(%Mint.HTTP2{private: private} = _conn, key, default \\ nil) when is_atom(key) do
    903     Map.get(private, key, default)
    904   end
    905 
    906   @doc """
    907   See `Mint.HTTP.delete_private/2`.
    908   """
    909   @impl true
    910   @spec delete_private(t(), atom()) :: t()
    911   def delete_private(%Mint.HTTP2{private: private} = conn, key) when is_atom(key) do
    912     %{conn | private: Map.delete(private, key)}
    913   end
    914 
    915   # http://httpwg.org/specs/rfc7540.html#rfc.section.6.5
    916   # SETTINGS parameters are not negotiated. We keep client settings and server settings separate.
    917   @doc false
    918   @impl true
    919   @spec initiate(
    920           Types.scheme(),
    921           Types.socket(),
    922           String.t(),
    923           :inet.port_number(),
    924           keyword()
    925         ) :: {:ok, t()} | {:error, Types.error()}
    926   def initiate(scheme, socket, hostname, port, opts) do
    927     transport = scheme_to_transport(scheme)
    928     mode = Keyword.get(opts, :mode, :active)
    929     client_settings_params = Keyword.get(opts, :client_settings, [])
    930     validate_settings!(client_settings_params)
    931 
    932     unless mode in [:active, :passive] do
    933       raise ArgumentError,
    934             "the :mode option must be either :active or :passive, got: #{inspect(mode)}"
    935     end
    936 
    937     conn = %Mint.HTTP2{
    938       hostname: hostname,
    939       port: port,
    940       transport: scheme_to_transport(scheme),
    941       socket: socket,
    942       mode: mode,
    943       scheme: Atom.to_string(scheme),
    944       state: :open
    945     }
    946 
    947     with :ok <- inet_opts(transport, socket),
    948          client_settings = settings(stream_id: 0, params: client_settings_params),
    949          preface = [@connection_preface, Frame.encode(client_settings)],
    950          :ok <- transport.send(socket, preface),
    951          conn = update_in(conn.client_settings_queue, &:queue.in(client_settings_params, &1)),
    952          {:ok, server_settings, buffer, socket} <- receive_server_settings(transport, socket),
    953          server_settings_ack =
    954            settings(stream_id: 0, params: [], flags: set_flags(:settings, [:ack])),
    955          :ok <- transport.send(socket, Frame.encode(server_settings_ack)),
    956          conn = put_in(conn.buffer, buffer),
    957          conn = put_in(conn.socket, socket),
    958          conn = apply_server_settings(conn, settings(server_settings, :params)),
    959          :ok <- if(mode == :active, do: transport.setopts(socket, active: :once), else: :ok) do
    960       {:ok, conn}
    961     else
    962       error ->
    963         transport.close(socket)
    964         error
    965     end
    966   catch
    967     {:mint, conn, error} ->
    968       {:ok, _conn} = close(conn)
    969       {:error, error}
    970   end
    971 
    972   @doc """
    973   See `Mint.HTTP.get_socket/1`.
    974   """
    975   @impl true
    976   @spec get_socket(t()) :: Mint.Types.socket()
    977   def get_socket(%Mint.HTTP2{socket: socket} = _conn) do
    978     socket
    979   end
    980 
    981   @doc """
    982   See `Mint.HTTP.get_proxy_headers/1`.
    983   """
    984   if Version.compare(System.version(), "1.7.0") in [:eq, :gt] do
    985     @doc since: "1.4.0"
    986   end
    987 
    988   @impl true
    989   @spec get_proxy_headers(t()) :: Mint.Types.headers()
    990   def get_proxy_headers(%__MODULE__{proxy_headers: proxy_headers}), do: proxy_headers
    991 
    992   ## Helpers
    993 
    994   defp handle_closed(conn) do
    995     conn = put_in(conn.state, :closed)
    996 
    997     if conn.open_client_stream_count > 0 or conn.open_server_stream_count > 0 do
    998       error = conn.transport.wrap_error(:closed)
    999       {:error, conn, error, _responses = []}
   1000     else
   1001       {:ok, conn, _responses = []}
   1002     end
   1003   end
   1004 
   1005   defp negotiate(address, port, :http, transport_opts) do
   1006     # We don't support protocol negotiation for TCP connections
   1007     # so currently we just assume the HTTP/2 protocol
   1008     transport = scheme_to_transport(:http)
   1009     transport.connect(address, port, transport_opts)
   1010   end
   1011 
   1012   defp negotiate(address, port, :https, transport_opts) do
   1013     transport = scheme_to_transport(:https)
   1014 
   1015     with {:ok, socket} <- transport.connect(address, port, transport_opts),
   1016          {:ok, protocol} <- transport.negotiated_protocol(socket) do
   1017       if protocol == "h2" do
   1018         {:ok, socket}
   1019       else
   1020         {:error, transport.wrap_error({:bad_alpn_protocol, protocol})}
   1021       end
   1022     end
   1023   end
   1024 
   1025   defp receive_server_settings(transport, socket) do
   1026     case recv_next_frame(transport, socket, _buffer = "") do
   1027       {:ok, settings(), _buffer, _socket} = result ->
   1028         result
   1029 
   1030       {:ok, goaway(error_code: error_code, debug_data: debug_data), _buffer, _socket} ->
   1031         error = wrap_error({:server_closed_connection, error_code, debug_data})
   1032         {:error, error}
   1033 
   1034       {:ok, frame, _buffer, _socket} ->
   1035         debug_data = "received invalid frame #{elem(frame, 0)} during handshake"
   1036         {:error, wrap_error({:protocol_error, debug_data})}
   1037 
   1038       {:error, error} ->
   1039         {:error, error}
   1040     end
   1041   end
   1042 
   1043   defp recv_next_frame(transport, socket, buffer) do
   1044     case Frame.decode_next(buffer, @default_max_frame_size) do
   1045       {:ok, frame, rest} ->
   1046         {:ok, frame, rest, socket}
   1047 
   1048       :more ->
   1049         with {:ok, data} <- transport.recv(socket, 0, _timeout = 10_000) do
   1050           data = maybe_concat(buffer, data)
   1051           recv_next_frame(transport, socket, data)
   1052         end
   1053 
   1054       {:error, {kind, _info} = reason} when kind in [:frame_size_error, :protocol_error] ->
   1055         {:error, wrap_error(reason)}
   1056     end
   1057   end
   1058 
   1059   defp open_stream(conn) do
   1060     max_concurrent_streams = conn.server_settings.max_concurrent_streams
   1061 
   1062     if conn.open_client_stream_count >= max_concurrent_streams do
   1063       throw({:mint, conn, wrap_error(:too_many_concurrent_requests)})
   1064     end
   1065 
   1066     stream = %{
   1067       id: conn.next_stream_id,
   1068       ref: make_ref(),
   1069       state: :idle,
   1070       window_size: conn.server_settings.initial_window_size,
   1071       received_first_headers?: false
   1072     }
   1073 
   1074     conn = put_in(conn.streams[stream.id], stream)
   1075     conn = put_in(conn.ref_to_stream_id[stream.ref], stream.id)
   1076     conn = update_in(conn.next_stream_id, &(&1 + 2))
   1077     {conn, stream.id, stream.ref}
   1078   end
   1079 
   1080   defp encode_stream_body_request_payload(conn, stream_id, :eof) do
   1081     encode_data(conn, stream_id, "", [:end_stream])
   1082   end
   1083 
   1084   defp encode_stream_body_request_payload(conn, stream_id, {:eof, trailing_headers}) do
   1085     lowered_headers = downcase_header_names(trailing_headers)
   1086 
   1087     if unallowed_trailing_header = Util.find_unallowed_trailing_header(lowered_headers) do
   1088       error = wrap_error({:unallowed_trailing_header, unallowed_trailing_header})
   1089       throw({:mint, conn, error})
   1090     end
   1091 
   1092     encode_headers(conn, stream_id, trailing_headers, [:end_headers, :end_stream])
   1093   end
   1094 
   1095   defp encode_stream_body_request_payload(conn, stream_id, iodata) do
   1096     encode_data(conn, stream_id, iodata, [])
   1097   end
   1098 
   1099   defp encode_request_payload(conn, stream_id, headers, :stream) do
   1100     encode_headers(conn, stream_id, headers, [:end_headers])
   1101   end
   1102 
   1103   defp encode_request_payload(conn, stream_id, headers, nil) do
   1104     encode_headers(conn, stream_id, headers, [:end_stream, :end_headers])
   1105   end
   1106 
   1107   defp encode_request_payload(conn, stream_id, headers, iodata) do
   1108     {conn, headers_payload} = encode_headers(conn, stream_id, headers, [:end_headers])
   1109     {conn, data_payload} = encode_data(conn, stream_id, iodata, [:end_stream])
   1110     {conn, [headers_payload, data_payload]}
   1111   end
   1112 
   1113   defp encode_headers(conn, stream_id, headers, enabled_flags) do
   1114     assert_headers_smaller_than_max_header_list_size(conn, headers)
   1115 
   1116     headers = Enum.map(headers, fn {name, value} -> {:store_name, name, value} end)
   1117     {hbf, conn} = get_and_update_in(conn.encode_table, &HPAX.encode(headers, &1))
   1118 
   1119     payload = headers_to_encoded_frames(conn, stream_id, hbf, enabled_flags)
   1120 
   1121     stream_state = if :end_stream in enabled_flags, do: :half_closed_local, else: :open
   1122 
   1123     conn = put_in(conn.streams[stream_id].state, stream_state)
   1124     conn = update_in(conn.open_client_stream_count, &(&1 + 1))
   1125 
   1126     {conn, payload}
   1127   end
   1128 
   1129   defp assert_headers_smaller_than_max_header_list_size(
   1130          %{server_settings: %{max_header_list_size: :infinity}},
   1131          _headers
   1132        ) do
   1133     :ok
   1134   end
   1135 
   1136   defp assert_headers_smaller_than_max_header_list_size(conn, headers) do
   1137     # The value is based on the uncompressed size of header fields, including the length
   1138     # of the name and value in octets plus an overhead of 32 octets for each header field.
   1139     total_size =
   1140       Enum.reduce(headers, 0, fn {name, value}, acc ->
   1141         acc + byte_size(name) + byte_size(value) + 32
   1142       end)
   1143 
   1144     max_header_list_size = conn.server_settings.max_header_list_size
   1145 
   1146     if total_size <= max_header_list_size do
   1147       :ok
   1148     else
   1149       error = wrap_error({:max_header_list_size_exceeded, total_size, max_header_list_size})
   1150       throw({:mint, conn, error})
   1151     end
   1152   end
   1153 
   1154   defp headers_to_encoded_frames(conn, stream_id, hbf, enabled_flags) do
   1155     if IO.iodata_length(hbf) > conn.server_settings.max_frame_size do
   1156       hbf
   1157       |> IO.iodata_to_binary()
   1158       |> split_payload_in_chunks(conn.server_settings.max_frame_size)
   1159       |> split_hbf_to_encoded_frames(stream_id, enabled_flags)
   1160     else
   1161       Frame.encode(
   1162         headers(stream_id: stream_id, hbf: hbf, flags: set_flags(:headers, enabled_flags))
   1163       )
   1164     end
   1165   end
   1166 
   1167   defp split_hbf_to_encoded_frames({[first_chunk | chunks], last_chunk}, stream_id, enabled_flags) do
   1168     flags = set_flags(:headers, enabled_flags -- [:end_headers])
   1169     first_frame = Frame.encode(headers(stream_id: stream_id, hbf: first_chunk, flags: flags))
   1170 
   1171     middle_frames =
   1172       Enum.map(chunks, fn chunk ->
   1173         Frame.encode(continuation(stream_id: stream_id, hbf: chunk))
   1174       end)
   1175 
   1176     flags =
   1177       if :end_headers in enabled_flags do
   1178         set_flags(:continuation, [:end_headers])
   1179       else
   1180         set_flags(:continuation, [])
   1181       end
   1182 
   1183     last_frame = Frame.encode(continuation(stream_id: stream_id, hbf: last_chunk, flags: flags))
   1184 
   1185     [first_frame, middle_frames, last_frame]
   1186   end
   1187 
   1188   defp encode_data(conn, stream_id, data, enabled_flags) do
   1189     stream = fetch_stream!(conn, stream_id)
   1190 
   1191     if stream.state != :open do
   1192       error = wrap_error(:request_is_not_streaming)
   1193       throw({:mint, conn, error})
   1194     end
   1195 
   1196     data_size = IO.iodata_length(data)
   1197 
   1198     cond do
   1199       data_size > stream.window_size ->
   1200         throw({:mint, conn, wrap_error({:exceeds_window_size, :request, stream.window_size})})
   1201 
   1202       data_size > conn.window_size ->
   1203         throw({:mint, conn, wrap_error({:exceeds_window_size, :connection, conn.window_size})})
   1204 
   1205       # If the data size is greater than the max frame size, we chunk automatically based
   1206       # on the max frame size.
   1207       data_size > conn.server_settings.max_frame_size ->
   1208         {chunks, last_chunk} =
   1209           data
   1210           |> IO.iodata_to_binary()
   1211           |> split_payload_in_chunks(conn.server_settings.max_frame_size)
   1212 
   1213         {encoded_chunks, conn} =
   1214           Enum.map_reduce(chunks, conn, fn chunk, acc ->
   1215             {acc, encoded} = encode_data_chunk(acc, stream_id, chunk, [])
   1216             {encoded, acc}
   1217           end)
   1218 
   1219         {conn, encoded_last_chunk} = encode_data_chunk(conn, stream_id, last_chunk, enabled_flags)
   1220         {conn, [encoded_chunks, encoded_last_chunk]}
   1221 
   1222       true ->
   1223         encode_data_chunk(conn, stream_id, data, enabled_flags)
   1224     end
   1225   end
   1226 
   1227   defp encode_data_chunk(%__MODULE__{} = conn, stream_id, chunk, enabled_flags)
   1228        when is_integer(stream_id) and is_list(enabled_flags) do
   1229     chunk_size = IO.iodata_length(chunk)
   1230     frame = data(stream_id: stream_id, flags: set_flags(:data, enabled_flags), data: chunk)
   1231     conn = update_in(conn.streams[stream_id].window_size, &(&1 - chunk_size))
   1232     conn = update_in(conn.window_size, &(&1 - chunk_size))
   1233 
   1234     conn =
   1235       if :end_stream in enabled_flags do
   1236         put_in(conn.streams[stream_id].state, :half_closed_local)
   1237       else
   1238         conn
   1239       end
   1240 
   1241     {conn, Frame.encode(frame)}
   1242   end
   1243 
   1244   defp split_payload_in_chunks(binary, chunk_size),
   1245     do: split_payload_in_chunks(binary, chunk_size, [])
   1246 
   1247   defp split_payload_in_chunks(chunk, chunk_size, acc) when byte_size(chunk) <= chunk_size do
   1248     {Enum.reverse(acc), chunk}
   1249   end
   1250 
   1251   defp split_payload_in_chunks(binary, chunk_size, acc) do
   1252     <<chunk::size(chunk_size)-binary, rest::binary>> = binary
   1253     split_payload_in_chunks(rest, chunk_size, [chunk | acc])
   1254   end
   1255 
   1256   defp send_ping(conn, payload) do
   1257     frame = Frame.ping(stream_id: 0, opaque_data: payload)
   1258     conn = send!(conn, Frame.encode(frame))
   1259     ref = make_ref()
   1260     conn = update_in(conn.ping_queue, &:queue.in({ref, payload}, &1))
   1261     {conn, ref}
   1262   end
   1263 
   1264   defp send_settings(conn, settings) do
   1265     validate_settings!(settings)
   1266     frame = settings(stream_id: 0, params: settings)
   1267     conn = send!(conn, Frame.encode(frame))
   1268     conn = update_in(conn.client_settings_queue, &:queue.in(settings, &1))
   1269     conn
   1270   end
   1271 
   1272   defp validate_settings!(settings) do
   1273     unless Keyword.keyword?(settings) do
   1274       raise ArgumentError, "settings must be a keyword list"
   1275     end
   1276 
   1277     Enum.each(settings, fn
   1278       {:header_table_size, value} ->
   1279         unless is_integer(value) do
   1280           raise ArgumentError, ":header_table_size must be an integer, got: #{inspect(value)}"
   1281         end
   1282 
   1283       {:enable_push, value} ->
   1284         unless is_boolean(value) do
   1285           raise ArgumentError, ":enable_push must be a boolean, got: #{inspect(value)}"
   1286         end
   1287 
   1288       {:max_concurrent_streams, value} ->
   1289         unless is_integer(value) do
   1290           raise ArgumentError,
   1291                 ":max_concurrent_streams must be an integer, got: #{inspect(value)}"
   1292         end
   1293 
   1294       {:initial_window_size, value} ->
   1295         unless is_integer(value) and value <= @max_window_size do
   1296           raise ArgumentError,
   1297                 ":initial_window_size must be an integer < #{@max_window_size}, " <>
   1298                   "got: #{inspect(value)}"
   1299         end
   1300 
   1301       {:max_frame_size, value} ->
   1302         unless is_integer(value) and value in @valid_max_frame_size_range do
   1303           raise ArgumentError,
   1304                 ":max_frame_size must be an integer in #{inspect(@valid_max_frame_size_range)}, " <>
   1305                   "got: #{inspect(value)}"
   1306         end
   1307 
   1308       {:max_header_list_size, value} ->
   1309         unless is_integer(value) do
   1310           raise ArgumentError, ":max_header_list_size must be an integer, got: #{inspect(value)}"
   1311         end
   1312 
   1313       {:enable_connect_protocol, value} ->
   1314         unless is_boolean(value) do
   1315           raise ArgumentError,
   1316                 ":enable_connect_protocol must be a boolean, got: #{inspect(value)}"
   1317         end
   1318 
   1319       {name, _value} ->
   1320         raise ArgumentError, "unknown setting parameter #{inspect(name)}"
   1321     end)
   1322   end
   1323 
   1324   defp downcase_header_names(headers) do
   1325     for {name, value} <- headers, do: {Util.downcase_ascii(name), value}
   1326   end
   1327 
   1328   defp add_default_headers(headers, body) do
   1329     headers
   1330     |> Util.put_new_header("user-agent", @user_agent)
   1331     |> add_default_content_length_header(body)
   1332   end
   1333 
   1334   defp add_default_content_length_header(headers, body) when body in [nil, :stream] do
   1335     headers
   1336   end
   1337 
   1338   defp add_default_content_length_header(headers, body) do
   1339     Util.put_new_header_lazy(headers, "content-length", fn ->
   1340       body |> IO.iodata_length() |> Integer.to_string()
   1341     end)
   1342   end
   1343 
   1344   defp add_pseudo_headers(headers, conn, method, path) do
   1345     if String.upcase(method) == "CONNECT" do
   1346       [
   1347         {":method", method},
   1348         {":authority", authority_pseudo_header(conn.scheme, conn.port, conn.hostname)}
   1349         | headers
   1350       ]
   1351     else
   1352       [
   1353         {":method", method},
   1354         {":path", path},
   1355         {":scheme", conn.scheme},
   1356         {":authority", authority_pseudo_header(conn.scheme, conn.port, conn.hostname)}
   1357         | headers
   1358       ]
   1359     end
   1360   end
   1361 
   1362   defp sort_pseudo_headers_to_front(headers) do
   1363     Enum.sort_by(headers, fn {key, _value} ->
   1364       not String.starts_with?(key, ":")
   1365     end)
   1366   end
   1367 
   1368   ## Frame handling
   1369 
   1370   defp maybe_concat_and_handle_new_data(conn, data) do
   1371     data = maybe_concat(conn.buffer, data)
   1372     {conn, responses} = handle_new_data(conn, data, [])
   1373     {:ok, conn, Enum.reverse(responses)}
   1374   end
   1375 
   1376   defp handle_new_data(%Mint.HTTP2{} = conn, data, responses) do
   1377     case Frame.decode_next(data, conn.client_settings.max_frame_size) do
   1378       {:ok, frame, rest} ->
   1379         assert_valid_frame(conn, frame)
   1380         {conn, responses} = handle_frame(conn, frame, responses)
   1381         handle_new_data(conn, rest, responses)
   1382 
   1383       :more ->
   1384         conn = put_in(conn.buffer, data)
   1385         handle_consumed_all_frames(conn, responses)
   1386 
   1387       {:error, :payload_too_big} ->
   1388         # TODO: sometimes, this could be handled with RST_STREAM instead of a GOAWAY frame (for
   1389         # example, if the payload of a DATA frame is too big).
   1390         # http://httpwg.org/specs/rfc7540.html#rfc.section.4.2
   1391         debug_data = "frame payload exceeds connection's max frame size"
   1392         send_connection_error!(conn, :frame_size_error, debug_data)
   1393 
   1394       {:error, {:frame_size_error, frame}} ->
   1395         debug_data = "error with size of frame: #{inspect(frame)}"
   1396         send_connection_error!(conn, :frame_size_error, debug_data)
   1397 
   1398       {:error, {:protocol_error, info}} ->
   1399         debug_data = "error when decoding frame: #{inspect(info)}"
   1400         send_connection_error!(conn, :protocol_error, debug_data)
   1401     end
   1402   catch
   1403     :throw, {:mint, conn, error} -> throw({:mint, conn, error, responses})
   1404     :throw, {:mint, _conn, _error, _responses} = thrown -> throw(thrown)
   1405   end
   1406 
   1407   defp handle_consumed_all_frames(%{state: state} = conn, responses) do
   1408     case state do
   1409       # TODO: should we do something with the debug data here, like logging it?
   1410       {:goaway, :no_error, _debug_data} ->
   1411         {conn, responses}
   1412 
   1413       {:goaway, error_code, debug_data} ->
   1414         error = wrap_error({:server_closed_connection, error_code, debug_data})
   1415         throw({:mint, conn, error, responses})
   1416 
   1417       _ ->
   1418         {conn, responses}
   1419     end
   1420   end
   1421 
   1422   defp assert_valid_frame(_conn, unknown()) do
   1423     # Unknown frames MUST be ignored:
   1424     # https://datatracker.ietf.org/doc/html/rfc7540#section-4.1
   1425     :ok
   1426   end
   1427 
   1428   defp assert_valid_frame(conn, frame) do
   1429     stream_id = elem(frame, 1)
   1430 
   1431     assert_frame_on_right_level(conn, elem(frame, 0), stream_id)
   1432     assert_stream_id_is_allowed(conn, stream_id)
   1433     assert_frame_doesnt_interrupt_header_streaming(conn, frame)
   1434   end
   1435 
   1436   # http://httpwg.org/specs/rfc7540.html#HttpSequence
   1437   defp assert_frame_doesnt_interrupt_header_streaming(conn, frame) do
   1438     case {conn.headers_being_processed, frame} do
   1439       {nil, continuation()} ->
   1440         debug_data = "CONTINUATION received outside of headers streaming"
   1441         send_connection_error!(conn, :protocol_error, debug_data)
   1442 
   1443       {nil, _frame} ->
   1444         :ok
   1445 
   1446       {{stream_id, _, _}, continuation(stream_id: stream_id)} ->
   1447         :ok
   1448 
   1449       _other ->
   1450         debug_data =
   1451           "headers are streaming but got a #{inspect(elem(frame, 0))} frame instead " <>
   1452             "of a CONTINUATION frame"
   1453 
   1454         send_connection_error!(conn, :protocol_error, debug_data)
   1455     end
   1456   end
   1457 
   1458   stream_level_frames = [:data, :headers, :priority, :rst_stream, :push_promise, :continuation]
   1459   connection_level_frames = [:settings, :ping, :goaway]
   1460 
   1461   defp assert_frame_on_right_level(conn, frame, _stream_id = 0)
   1462        when frame in unquote(stream_level_frames) do
   1463     debug_data = "frame #{inspect(frame)} not allowed at the connection level (stream_id = 0)"
   1464     send_connection_error!(conn, :protocol_error, debug_data)
   1465   end
   1466 
   1467   defp assert_frame_on_right_level(conn, frame, stream_id)
   1468        when frame in unquote(connection_level_frames) and stream_id != 0 do
   1469     debug_data = "frame #{inspect(frame)} only allowed at the connection level"
   1470     send_connection_error!(conn, :protocol_error, debug_data)
   1471   end
   1472 
   1473   defp assert_frame_on_right_level(_conn, _frame, _stream_id) do
   1474     :ok
   1475   end
   1476 
   1477   defp assert_stream_id_is_allowed(conn, stream_id) do
   1478     if Integer.is_odd(stream_id) and stream_id >= conn.next_stream_id do
   1479       debug_data = "frame with stream ID #{inspect(stream_id)} has not been opened yet"
   1480       send_connection_error!(conn, :protocol_error, debug_data)
   1481     else
   1482       :ok
   1483     end
   1484   end
   1485 
   1486   for frame_name <- stream_level_frames ++ connection_level_frames ++ [:window_update, :unknown] do
   1487     function_name = :"handle_#{frame_name}"
   1488 
   1489     defp handle_frame(conn, Frame.unquote(frame_name)() = frame, responses) do
   1490       unquote(function_name)(conn, frame, responses)
   1491     end
   1492   end
   1493 
   1494   defp handle_unknown(conn, _frame, responses) do
   1495     # Implementations MUST ignore and discard any frame that has a type that is unknown.
   1496     # see: https://datatracker.ietf.org/doc/html/rfc7540#section-4.1
   1497 
   1498     {conn, responses}
   1499   end
   1500 
   1501   # DATA
   1502 
   1503   defp handle_data(conn, frame, responses) do
   1504     data(stream_id: stream_id, flags: flags, data: data, padding: padding) = frame
   1505 
   1506     # Regardless of whether we have the stream or not, we need to abide by flow
   1507     # control rules so we still refill the client window for the stream_id we got.
   1508     window_size_increment = byte_size(data) + byte_size(padding || "")
   1509 
   1510     conn =
   1511       if window_size_increment > 0 do
   1512         refill_client_windows(conn, stream_id, window_size_increment)
   1513       else
   1514         conn
   1515       end
   1516 
   1517     case Map.fetch(conn.streams, stream_id) do
   1518       {:ok, stream} ->
   1519         assert_stream_in_state(conn, stream, [:open, :half_closed_local])
   1520         responses = [{:data, stream.ref, data} | responses]
   1521 
   1522         if flag_set?(flags, :data, :end_stream) do
   1523           conn = close_stream!(conn, stream.id, :no_error)
   1524           {conn, [{:done, stream.ref} | responses]}
   1525         else
   1526           {conn, responses}
   1527         end
   1528 
   1529       :error ->
   1530         _ = Logger.debug(fn -> "Received DATA frame on closed stream ID #{stream_id}" end)
   1531         {conn, responses}
   1532     end
   1533   end
   1534 
   1535   defp refill_client_windows(conn, stream_id, data_size) do
   1536     connection_frame = window_update(stream_id: 0, window_size_increment: data_size)
   1537     stream_frame = window_update(stream_id: stream_id, window_size_increment: data_size)
   1538 
   1539     if open?(conn) do
   1540       send!(conn, [Frame.encode(connection_frame), Frame.encode(stream_frame)])
   1541     else
   1542       conn
   1543     end
   1544   end
   1545 
   1546   # HEADERS
   1547 
   1548   defp handle_headers(conn, frame, responses) do
   1549     headers(stream_id: stream_id, flags: flags, hbf: hbf) = frame
   1550 
   1551     stream = Map.get(conn.streams, stream_id)
   1552     end_stream? = flag_set?(flags, :headers, :end_stream)
   1553 
   1554     if stream do
   1555       assert_stream_in_state(conn, stream, [:open, :half_closed_local, :reserved_remote])
   1556     end
   1557 
   1558     if flag_set?(flags, :headers, :end_headers) do
   1559       decode_hbf_and_add_responses(conn, responses, hbf, stream, end_stream?)
   1560     else
   1561       callback = &decode_hbf_and_add_responses(&1, &2, &3, &4, end_stream?)
   1562       conn = put_in(conn.headers_being_processed, {stream_id, hbf, callback})
   1563       {conn, responses}
   1564     end
   1565   end
   1566 
   1567   # Here, "stream" can be nil in case the stream was closed. In that case, we
   1568   # still need to process the hbf so that the HPACK table is updated, but then
   1569   # we don't add any responses.
   1570   defp decode_hbf_and_add_responses(conn, responses, hbf, stream, end_stream?) do
   1571     {conn, headers} = decode_hbf(conn, hbf)
   1572 
   1573     if stream do
   1574       handle_decoded_headers_for_stream(conn, responses, stream, headers, end_stream?)
   1575     else
   1576       _ = Logger.debug(fn -> "Received HEADERS frame on closed stream ID" end)
   1577       {conn, responses}
   1578     end
   1579   end
   1580 
   1581   defp handle_decoded_headers_for_stream(conn, responses, stream, headers, end_stream?) do
   1582     %{ref: ref, received_first_headers?: received_first_headers?} = stream
   1583 
   1584     case headers do
   1585       # Interim response (1xx), which is made of only one HEADERS plus zero or more CONTINUATIONs.
   1586       # There can be zero or more interim responses before a "proper" response.
   1587       # https://httpwg.org/specs/rfc9113.html#HttpFraming
   1588       [{":status", <<?1, _, _>> = status} | headers] ->
   1589         cond do
   1590           received_first_headers? ->
   1591             conn = close_stream!(conn, stream.id, :protocol_error)
   1592 
   1593             debug_data =
   1594               "informational response (1xx) must appear before final response, got a #{status} status"
   1595 
   1596             error = wrap_error({:protocol_error, debug_data})
   1597             responses = [{:error, stream.ref, error} | responses]
   1598             {conn, responses}
   1599 
   1600           end_stream? ->
   1601             conn = close_stream!(conn, stream.id, :protocol_error)
   1602             debug_data = "informational response (1xx) must not have the END_STREAM flag set"
   1603             error = wrap_error({:protocol_error, debug_data})
   1604             responses = [{:error, stream.ref, error} | responses]
   1605             {conn, responses}
   1606 
   1607           true ->
   1608             assert_stream_in_state(conn, stream, [:open, :half_closed_local])
   1609             status = String.to_integer(status)
   1610             headers = join_cookie_headers(headers)
   1611             new_responses = [{:headers, ref, headers}, {:status, ref, status} | responses]
   1612             {conn, new_responses}
   1613         end
   1614 
   1615       [{":status", status} | headers] when not received_first_headers? ->
   1616         conn = put_in(conn.streams[stream.id].received_first_headers?, true)
   1617         status = String.to_integer(status)
   1618         headers = join_cookie_headers(headers)
   1619         new_responses = [{:headers, ref, headers}, {:status, ref, status} | responses]
   1620 
   1621         cond do
   1622           # :reserved_remote means that this was a promised stream. As soon as headers come,
   1623           # the stream goes in the :half_closed_local state (unless it's not allowed because
   1624           # of the client's max concurrent streams limit, or END_STREAM is set).
   1625           stream.state == :reserved_remote ->
   1626             cond do
   1627               conn.open_server_stream_count >= conn.client_settings.max_concurrent_streams ->
   1628                 conn = close_stream!(conn, stream.id, :refused_stream)
   1629                 {conn, responses}
   1630 
   1631               end_stream? ->
   1632                 conn = close_stream!(conn, stream.id, :no_error)
   1633                 {conn, [{:done, ref} | new_responses]}
   1634 
   1635               true ->
   1636                 conn = update_in(conn.open_server_stream_count, &(&1 + 1))
   1637                 conn = put_in(conn.streams[stream.id].state, :half_closed_local)
   1638                 {conn, new_responses}
   1639             end
   1640 
   1641           end_stream? ->
   1642             conn = close_stream!(conn, stream.id, :no_error)
   1643             {conn, [{:done, ref} | new_responses]}
   1644 
   1645           true ->
   1646             {conn, new_responses}
   1647         end
   1648 
   1649       # Trailing headers. We don't care about the :status header here.
   1650       headers when received_first_headers? ->
   1651         if end_stream? do
   1652           conn = close_stream!(conn, stream.id, :no_error)
   1653           headers = headers |> Util.remove_unallowed_trailing_headers() |> join_cookie_headers()
   1654           {conn, [{:done, ref}, {:headers, ref, headers} | responses]}
   1655         else
   1656           # Trailing headers must set the END_STREAM flag because they're
   1657           # the last thing allowed on the stream (other than RST_STREAM and
   1658           # the usual frames).
   1659           conn = close_stream!(conn, stream.id, :protocol_error)
   1660           debug_data = "trailing headers didn't set the END_STREAM flag"
   1661           error = wrap_error({:protocol_error, debug_data})
   1662           responses = [{:error, stream.ref, error} | responses]
   1663           {conn, responses}
   1664         end
   1665 
   1666       # Non-trailing headers need to have a :status header, otherwise
   1667       # it's a protocol error.
   1668       _headers ->
   1669         conn = close_stream!(conn, stream.id, :protocol_error)
   1670         error = wrap_error(:missing_status_header)
   1671         responses = [{:error, stream.ref, error} | responses]
   1672         {conn, responses}
   1673     end
   1674   end
   1675 
   1676   defp decode_hbf(conn, hbf) do
   1677     case HPAX.decode(hbf, conn.decode_table) do
   1678       {:ok, headers, decode_table} ->
   1679         conn = put_in(conn.decode_table, decode_table)
   1680         {conn, headers}
   1681 
   1682       {:error, reason} ->
   1683         debug_data = "unable to decode headers: #{inspect(reason)}"
   1684         send_connection_error!(conn, :compression_error, debug_data)
   1685     end
   1686   end
   1687 
   1688   # If the port is the default for the scheme, don't add it to the :authority pseudo-header
   1689   defp authority_pseudo_header(scheme, port, hostname) do
   1690     if URI.default_port(scheme) == port do
   1691       hostname
   1692     else
   1693       "#{hostname}:#{port}"
   1694     end
   1695   end
   1696 
   1697   defp join_cookie_headers(headers) do
   1698     # If we have 0 or 1 Cookie headers, we just use the old list of headers.
   1699     case Enum.split_with(headers, fn {name, _value} -> Util.downcase_ascii(name) == "cookie" end) do
   1700       {[], _headers} ->
   1701         headers
   1702 
   1703       {[_], _headers} ->
   1704         headers
   1705 
   1706       {cookies, headers} ->
   1707         cookie = Enum.map_join(cookies, "; ", fn {_name, value} -> value end)
   1708         [{"cookie", cookie} | headers]
   1709     end
   1710   end
   1711 
   1712   # PRIORITY
   1713 
   1714   # For now we ignore all PRIORITY frames. This shouldn't cause practical trouble.
   1715   defp handle_priority(conn, frame, responses) do
   1716     _ = Logger.warn(fn -> "Ignoring PRIORITY frame: #{inspect(frame)}" end)
   1717     {conn, responses}
   1718   end
   1719 
   1720   # RST_STREAM
   1721 
   1722   defp handle_rst_stream(conn, frame, responses) do
   1723     rst_stream(stream_id: stream_id, error_code: error_code) = frame
   1724 
   1725     # If we receive RST_STREAM on a closed stream, we ignore it.
   1726     case Map.fetch(conn.streams, stream_id) do
   1727       {:ok, stream} ->
   1728         # If we receive RST_STREAM then the stream is definitely closed.
   1729         # We won't send anything else on the stream so we can simply delete
   1730         # it, so that if we get things like DATA on that stream we error out.
   1731         conn = delete_stream(conn, stream)
   1732 
   1733         if error_code == :no_error do
   1734           {conn, [{:done, stream.ref} | responses]}
   1735         else
   1736           error = wrap_error({:server_closed_request, error_code})
   1737           {conn, [{:error, stream.ref, error} | responses]}
   1738         end
   1739 
   1740       :error ->
   1741         {conn, responses}
   1742     end
   1743   end
   1744 
   1745   # SETTINGS
   1746 
   1747   defp handle_settings(conn, frame, responses) do
   1748     settings(flags: flags, params: params) = frame
   1749 
   1750     if flag_set?(flags, :settings, :ack) do
   1751       {{:value, params}, conn} = get_and_update_in(conn.client_settings_queue, &:queue.out/1)
   1752       conn = apply_client_settings(conn, params)
   1753       {conn, responses}
   1754     else
   1755       conn = apply_server_settings(conn, params)
   1756       frame = settings(flags: set_flags(:settings, [:ack]), params: [])
   1757       conn = send!(conn, Frame.encode(frame))
   1758       {conn, responses}
   1759     end
   1760   end
   1761 
   1762   defp apply_server_settings(conn, server_settings) do
   1763     Enum.reduce(server_settings, conn, fn
   1764       {:header_table_size, header_table_size}, conn ->
   1765         update_in(conn.encode_table, &HPAX.resize(&1, header_table_size))
   1766 
   1767       {:enable_push, enable_push?}, conn ->
   1768         put_in(conn.server_settings.enable_push, enable_push?)
   1769 
   1770       {:max_concurrent_streams, max_concurrent_streams}, conn ->
   1771         put_in(conn.server_settings.max_concurrent_streams, max_concurrent_streams)
   1772 
   1773       {:initial_window_size, initial_window_size}, conn ->
   1774         if initial_window_size > @max_window_size do
   1775           debug_data = "INITIAL_WINDOW_SIZE setting of #{initial_window_size} is too big"
   1776           send_connection_error!(conn, :flow_control_error, debug_data)
   1777         end
   1778 
   1779         update_server_initial_window_size(conn, initial_window_size)
   1780 
   1781       {:max_frame_size, max_frame_size}, conn ->
   1782         if max_frame_size not in @valid_max_frame_size_range do
   1783           debug_data = "MAX_FRAME_SIZE setting parameter outside of allowed range"
   1784           send_connection_error!(conn, :protocol_error, debug_data)
   1785         end
   1786 
   1787         put_in(conn.server_settings.max_frame_size, max_frame_size)
   1788 
   1789       {:max_header_list_size, max_header_list_size}, conn ->
   1790         put_in(conn.server_settings.max_header_list_size, max_header_list_size)
   1791 
   1792       {:enable_connect_protocol, enable_connect_protocol?}, conn ->
   1793         put_in(conn.server_settings.enable_connect_protocol, enable_connect_protocol?)
   1794     end)
   1795   end
   1796 
   1797   defp apply_client_settings(conn, client_settings) do
   1798     Enum.reduce(client_settings, conn, fn
   1799       {:max_frame_size, value}, conn ->
   1800         put_in(conn.client_settings.max_frame_size, value)
   1801 
   1802       {:max_concurrent_streams, value}, conn ->
   1803         put_in(conn.client_settings.max_concurrent_streams, value)
   1804 
   1805       {:enable_push, value}, conn ->
   1806         put_in(conn.client_settings.enable_push, value)
   1807     end)
   1808   end
   1809 
   1810   defp update_server_initial_window_size(conn, new_iws) do
   1811     diff = new_iws - conn.server_settings.initial_window_size
   1812 
   1813     conn =
   1814       update_in(conn.streams, fn streams ->
   1815         for {stream_id, stream} <- streams,
   1816             stream.state in [:open, :half_closed_remote],
   1817             into: streams do
   1818           window_size = stream.window_size + diff
   1819 
   1820           if window_size > @max_window_size do
   1821             debug_data =
   1822               "INITIAL_WINDOW_SIZE parameter of #{window_size} makes some window sizes too big"
   1823 
   1824             send_connection_error!(conn, :flow_control_error, debug_data)
   1825           end
   1826 
   1827           {stream_id, %{stream | window_size: window_size}}
   1828         end
   1829       end)
   1830 
   1831     put_in(conn.server_settings.initial_window_size, new_iws)
   1832   end
   1833 
   1834   # PUSH_PROMISE
   1835 
   1836   defp handle_push_promise(
   1837          %Mint.HTTP2{client_settings: %{enable_push: false}} = conn,
   1838          push_promise(),
   1839          _responses
   1840        ) do
   1841     debug_data = "received PUSH_PROMISE frame when SETTINGS_ENABLE_PUSH was false"
   1842     send_connection_error!(conn, :protocol_error, debug_data)
   1843   end
   1844 
   1845   defp handle_push_promise(conn, push_promise() = frame, responses) do
   1846     push_promise(
   1847       stream_id: stream_id,
   1848       flags: flags,
   1849       promised_stream_id: promised_stream_id,
   1850       hbf: hbf
   1851     ) = frame
   1852 
   1853     assert_valid_promised_stream_id(conn, promised_stream_id)
   1854 
   1855     stream = fetch_stream!(conn, stream_id)
   1856     assert_stream_in_state(conn, stream, [:open, :half_closed_local])
   1857 
   1858     if flag_set?(flags, :push_promise, :end_headers) do
   1859       decode_push_promise_headers_and_add_response(
   1860         conn,
   1861         responses,
   1862         hbf,
   1863         stream,
   1864         promised_stream_id
   1865       )
   1866     else
   1867       callback = &decode_push_promise_headers_and_add_response(&1, &2, &3, &4, promised_stream_id)
   1868       conn = put_in(conn.headers_being_processed, {stream_id, hbf, callback})
   1869       {conn, responses}
   1870     end
   1871   end
   1872 
   1873   defp decode_push_promise_headers_and_add_response(
   1874          conn,
   1875          responses,
   1876          hbf,
   1877          stream,
   1878          promised_stream_id
   1879        ) do
   1880     {conn, headers} = decode_hbf(conn, hbf)
   1881 
   1882     promised_stream = %{
   1883       id: promised_stream_id,
   1884       ref: make_ref(),
   1885       state: :reserved_remote,
   1886       window_size: conn.server_settings.initial_window_size,
   1887       received_first_headers?: false
   1888     }
   1889 
   1890     conn = put_in(conn.streams[promised_stream.id], promised_stream)
   1891     new_response = {:push_promise, stream.ref, promised_stream.ref, headers}
   1892     {conn, [new_response | responses]}
   1893   end
   1894 
   1895   defp assert_valid_promised_stream_id(conn, promised_stream_id) do
   1896     cond do
   1897       not is_integer(promised_stream_id) or Integer.is_odd(promised_stream_id) ->
   1898         debug_data = "invalid promised stream ID: #{inspect(promised_stream_id)}"
   1899         send_connection_error!(conn, :protocol_error, debug_data)
   1900 
   1901       Map.has_key?(conn.streams, promised_stream_id) ->
   1902         debug_data =
   1903           "stream with ID #{inspect(promised_stream_id)} already exists and can't be " <>
   1904             "reserved by the server"
   1905 
   1906         send_connection_error!(conn, :protocol_error, debug_data)
   1907 
   1908       true ->
   1909         :ok
   1910     end
   1911   end
   1912 
   1913   # PING
   1914 
   1915   defp handle_ping(conn, Frame.ping() = frame, responses) do
   1916     Frame.ping(flags: flags, opaque_data: opaque_data) = frame
   1917 
   1918     if flag_set?(flags, :ping, :ack) do
   1919       handle_ping_ack(conn, opaque_data, responses)
   1920     else
   1921       ack = Frame.ping(stream_id: 0, flags: set_flags(:ping, [:ack]), opaque_data: opaque_data)
   1922       conn = send!(conn, Frame.encode(ack))
   1923       {conn, responses}
   1924     end
   1925   end
   1926 
   1927   defp handle_ping_ack(conn, opaque_data, responses) do
   1928     case :queue.peek(conn.ping_queue) do
   1929       {:value, {ref, ^opaque_data}} ->
   1930         conn = update_in(conn.ping_queue, &:queue.drop/1)
   1931         {conn, [{:pong, ref} | responses]}
   1932 
   1933       {:value, _} ->
   1934         _ = Logger.warn("Received PING ack that doesn't match next PING request in the queue")
   1935         {conn, responses}
   1936 
   1937       :empty ->
   1938         _ = Logger.warn("Received PING ack but no PING requests are pending")
   1939         {conn, responses}
   1940     end
   1941   end
   1942 
   1943   # GOAWAY
   1944 
   1945   defp handle_goaway(conn, frame, responses) do
   1946     goaway(
   1947       last_stream_id: last_stream_id,
   1948       error_code: error_code,
   1949       debug_data: debug_data
   1950     ) = frame
   1951 
   1952     # We gather all the unprocessed requests and form {:error, _, _} tuples for each one.
   1953     # At the same time, we delete all the unprocessed requests from the stream set.
   1954     {unprocessed_request_responses, conn} =
   1955       Enum.flat_map_reduce(conn.streams, conn, fn
   1956         {stream_id, _stream}, conn_acc when stream_id <= last_stream_id ->
   1957           {[], conn_acc}
   1958 
   1959         {_stream_id, stream}, conn_acc ->
   1960           conn_acc = delete_stream(conn_acc, stream)
   1961           {[{:error, stream.ref, wrap_error(:unprocessed)}], conn_acc}
   1962       end)
   1963 
   1964     conn = put_in(conn.state, {:goaway, error_code, debug_data})
   1965     {conn, unprocessed_request_responses ++ responses}
   1966   end
   1967 
   1968   # WINDOW_UPDATE
   1969 
   1970   defp handle_window_update(
   1971          conn,
   1972          window_update(stream_id: 0, window_size_increment: wsi),
   1973          responses
   1974        ) do
   1975     new_window_size = conn.window_size + wsi
   1976 
   1977     if new_window_size > @max_window_size do
   1978       send_connection_error!(conn, :flow_control_error, "window size too big")
   1979     else
   1980       conn = put_in(conn.window_size, new_window_size)
   1981       {conn, responses}
   1982     end
   1983   end
   1984 
   1985   defp handle_window_update(
   1986          conn,
   1987          window_update(stream_id: stream_id, window_size_increment: wsi),
   1988          responses
   1989        ) do
   1990     stream = fetch_stream!(conn, stream_id)
   1991     new_window_size = conn.streams[stream_id].window_size + wsi
   1992 
   1993     if new_window_size > @max_window_size do
   1994       conn = close_stream!(conn, stream_id, :flow_control_error)
   1995       error = wrap_error({:flow_control_error, "window size too big"})
   1996       {conn, [{:error, stream.ref, error} | responses]}
   1997     else
   1998       conn = put_in(conn.streams[stream_id].window_size, new_window_size)
   1999       {conn, responses}
   2000     end
   2001   end
   2002 
   2003   # CONTINUATION
   2004 
   2005   defp handle_continuation(conn, frame, responses) do
   2006     continuation(stream_id: stream_id, flags: flags, hbf: hbf_chunk) = frame
   2007     stream = Map.get(conn.streams, stream_id)
   2008 
   2009     if stream do
   2010       assert_stream_in_state(conn, stream, [:open, :half_closed_local, :reserved_remote])
   2011     end
   2012 
   2013     {^stream_id, hbf_acc, callback} = conn.headers_being_processed
   2014 
   2015     if flag_set?(flags, :continuation, :end_headers) do
   2016       hbf = IO.iodata_to_binary([hbf_acc, hbf_chunk])
   2017       conn = put_in(conn.headers_being_processed, nil)
   2018       callback.(conn, responses, hbf, stream)
   2019     else
   2020       conn = put_in(conn.headers_being_processed, {stream_id, [hbf_acc, hbf_chunk], callback})
   2021       {conn, responses}
   2022     end
   2023   end
   2024 
   2025   ## General helpers
   2026 
   2027   defp send_connection_error!(conn, error_code, debug_data) do
   2028     frame =
   2029       goaway(stream_id: 0, last_stream_id: 2, error_code: error_code, debug_data: debug_data)
   2030 
   2031     conn = send!(conn, Frame.encode(frame))
   2032     _ = conn.transport.close(conn.socket)
   2033     conn = put_in(conn.state, :closed)
   2034     throw({:mint, conn, wrap_error({error_code, debug_data})})
   2035   end
   2036 
   2037   defp close_stream!(conn, stream_id, error_code) do
   2038     stream = Map.fetch!(conn.streams, stream_id)
   2039 
   2040     # First of all we send a RST_STREAM with the given error code so that we
   2041     # move the stream to the :closed state (that is, we remove it).
   2042     rst_stream_frame = rst_stream(stream_id: stream_id, error_code: error_code)
   2043 
   2044     conn =
   2045       if open?(conn) do
   2046         send!(conn, Frame.encode(rst_stream_frame))
   2047       else
   2048         conn
   2049       end
   2050 
   2051     delete_stream(conn, stream)
   2052   end
   2053 
   2054   defp delete_stream(conn, stream) do
   2055     conn = update_in(conn.streams, &Map.delete(&1, stream.id))
   2056     conn = update_in(conn.ref_to_stream_id, &Map.delete(&1, stream.ref))
   2057 
   2058     stream_open? = stream.state in [:open, :half_closed_local, :half_closed_remote]
   2059 
   2060     conn =
   2061       cond do
   2062         # Stream initiated by the client.
   2063         stream_open? and Integer.is_odd(stream.id) ->
   2064           update_in(conn.open_client_stream_count, &(&1 - 1))
   2065 
   2066         # Stream initiated by the server.
   2067         stream_open? and Integer.is_even(stream.id) ->
   2068           update_in(conn.open_server_stream_count, &(&1 - 1))
   2069 
   2070         true ->
   2071           conn
   2072       end
   2073 
   2074     conn
   2075   end
   2076 
   2077   defp fetch_stream!(conn, stream_id) do
   2078     case Map.fetch(conn.streams, stream_id) do
   2079       {:ok, stream} -> stream
   2080       :error -> throw({:mint, conn, wrap_error({:stream_not_found, stream_id})})
   2081     end
   2082   end
   2083 
   2084   defp assert_stream_in_state(conn, %{state: state}, expected_states) do
   2085     if state not in expected_states do
   2086       debug_data =
   2087         "stream was in state #{inspect(state)} and not in one of the expected states: " <>
   2088           Enum.map_join(expected_states, ", ", &inspect/1)
   2089 
   2090       send_connection_error!(conn, :protocol_error, debug_data)
   2091     end
   2092   end
   2093 
   2094   defp send!(%Mint.HTTP2{transport: transport, socket: socket} = conn, bytes) do
   2095     case transport.send(socket, bytes) do
   2096       :ok ->
   2097         conn
   2098 
   2099       {:error, %TransportError{reason: :closed} = error} ->
   2100         throw({:mint, %{conn | state: :closed}, error})
   2101 
   2102       {:error, reason} ->
   2103         throw({:mint, conn, reason})
   2104     end
   2105   end
   2106 
   2107   defp wrap_error(reason) do
   2108     %HTTPError{reason: reason, module: __MODULE__}
   2109   end
   2110 
   2111   @doc false
   2112   def format_error(reason)
   2113 
   2114   def format_error(:closed) do
   2115     "the connection is closed"
   2116   end
   2117 
   2118   def format_error(:closed_for_writing) do
   2119     "the connection is closed for writing, which means that you cannot issue any more " <>
   2120       "requests on the connection but you can expect responses to still be delivered for " <>
   2121       "part of the requests that are in flight. If a connection is closed for writing, " <>
   2122       "it usually means that you got a :server_closed_request error already."
   2123   end
   2124 
   2125   def format_error(:too_many_concurrent_requests) do
   2126     "the number of max concurrent HTTP/2 requests supported by the server has been reached. " <>
   2127       "Use Mint.HTTP2.get_server_setting/2 with the :max_concurrent_streams setting name " <>
   2128       "to find out the maximum number of concurrent requests supported by the server."
   2129   end
   2130 
   2131   def format_error({:max_header_list_size_exceeded, size, max_size}) do
   2132     "the given header list (of size #{size}) goes over the max header list size of " <>
   2133       "#{max_size} supported by the server. In HTTP/2, the header list size is calculated " <>
   2134       "by summing up the size in bytes of each header name, value, plus 32 for each header."
   2135   end
   2136 
   2137   def format_error({:exceeds_window_size, what, window_size}) do
   2138     what =
   2139       case what do
   2140         :request -> "request"
   2141         :connection -> "connection"
   2142       end
   2143 
   2144     "the given data exceeds the #{what} window size, which is #{window_size}. " <>
   2145       "The server will refill the window size of the #{what} when ready. This will be " <>
   2146       "handled transparently by stream/2."
   2147   end
   2148 
   2149   def format_error({:stream_not_found, stream_id}) do
   2150     "request not found (with stream_id #{inspect(stream_id)})"
   2151   end
   2152 
   2153   def format_error(:unknown_request_to_stream) do
   2154     "can't stream chunk of data because the request is unknown"
   2155   end
   2156 
   2157   def format_error(:request_is_not_streaming) do
   2158     "can't send more data on this request since it's not streaming"
   2159   end
   2160 
   2161   def format_error({:unallowed_trailing_header, {name, value}}) do
   2162     "header #{inspect(name)} (with value #{inspect(value)}) is not allowed as a trailing header"
   2163   end
   2164 
   2165   def format_error(:missing_status_header) do
   2166     "the :status pseudo-header (which is required in HTTP/2) is missing from the response"
   2167   end
   2168 
   2169   def format_error({:server_closed_request, error_code}) do
   2170     "server closed request with error code #{inspect(error_code)}"
   2171   end
   2172 
   2173   def format_error({:server_closed_connection, error, debug_data}) do
   2174     "server closed connection with error code #{inspect(error)} and debug data: " <> debug_data
   2175   end
   2176 
   2177   def format_error(:unprocessed) do
   2178     "request was not processed by the server, which means that it's safe to retry on a " <>
   2179       "different or new connection"
   2180   end
   2181 
   2182   def format_error({:frame_size_error, frame}) do
   2183     "frame size error for #{inspect(frame)} frame"
   2184   end
   2185 
   2186   def format_error({:protocol_error, debug_data}) do
   2187     "protocol error: " <> debug_data
   2188   end
   2189 
   2190   def format_error({:compression_error, debug_data}) do
   2191     "compression error: " <> debug_data
   2192   end
   2193 
   2194   def format_error({:flow_control_error, debug_data}) do
   2195     "flow control error: " <> debug_data
   2196   end
   2197 end