http2.ex (78049B)
1 defmodule Mint.HTTP2 do 2 @moduledoc """ 3 Processless HTTP client with support for HTTP/2. 4 5 This module provides a data structure that represents an HTTP/2 connection to 6 a given server. The connection is represented as an opaque struct `%Mint.HTTP2{}`. 7 The connection is a data structure and is not backed by a process, and all the 8 connection handling happens in the process that creates the struct. 9 10 This module and data structure work exactly like the ones described in the `Mint.HTTP` 11 module, with the exception that `Mint.HTTP2` specifically deals with HTTP/2 while 12 `Mint.HTTP` deals seamlessly with HTTP/1.1 and HTTP/2. For more information on 13 how to use the data structure and client architecture, see `Mint.HTTP`. 14 15 ## HTTP/2 streams and requests 16 17 HTTP/2 introduces the concept of **streams**. A stream is an isolated conversation 18 between the client and the server. Each stream is unique and identified by a unique 19 **stream ID**, which means that there's no order when data comes on different streams 20 since they can be identified uniquely. A stream closely corresponds to a request, so 21 in this documentation and client we will mostly refer to streams as "requests". 22 We mentioned data on streams can come in arbitrary order, and streams are requests, 23 so the practical effect of this is that performing request A and then request B 24 does not mean that the response to request A will come before the response to request B. 25 This is why we identify each request with a unique reference returned by `request/5`. 26 See `request/5` for more information. 27 28 ## Closed connection 29 30 In HTTP/2, the connection can either be open, closed, or only closed for writing. 31 When a connection is closed for writing, the client cannot send requests or stream 32 body chunks, but it can still read data that the server might be sending. When the 33 connection gets closed on the writing side, a `:server_closed_connection` error is 34 returned. `{:error, request_ref, error}` is returned for requests that haven't been 35 processed by the server, with the reason of `error` being `:unprocessed`. 36 These requests are safe to retry. 37 38 ## HTTP/2 settings 39 40 HTTP/2 supports settings negotiation between servers and clients. The server advertises 41 its settings to the client and the client advertises its settings to the server. A peer 42 (server or client) has to acknowledge the settings advertised by the other peer before 43 those settings come into action (that's why it's called a negotiation). 44 45 A first settings negotiation happens right when the connection starts. 46 Servers and clients can renegotiate settings at any time during the life of the 47 connection. 48 49 Mint users don't need to care about settings acknowledgements directly since they're 50 handled transparently by `stream/2`. 51 52 To retrieve the server settings, you can use `get_server_setting/2`. Doing so is often 53 useful to be able to tune your requests based on the server settings. 54 55 To communicate client settings to the server, use `put_settings/2` or pass them when 56 starting up a connection with `connect/4`. Note that the server needs to acknowledge 57 the settings sent through `put_setting/2` before those settings come into effect. The 58 server ack is processed transparently by `stream/2`, but this means that if you change 59 a setting through `put_settings/2` and try to retrieve the value of that setting right 60 after with `get_client_setting/2`, you'll likely get the old value of that setting. Once 61 the server acknowledges the new settings, the updated value will be returned by 62 `get_client_setting/2`. 63 64 ## Server push 65 66 HTTP/2 supports [server push](https://en.wikipedia.org/wiki/HTTP/2_Server_Push), which 67 is a way for a server to send a response to a client without the client needing to make 68 the corresponding request. The server sends a `:push_promise` response to a normal request: 69 this creates a new request reference. Then, the server sends normal responses for the newly 70 created request reference. 71 72 Let's see an example. We will ask the server for `"/index.html"` and the server will 73 send us a push promise for `"/style.css"`. 74 75 {:ok, conn} = Mint.HTTP2.connect(:https, "example.com", 443) 76 {:ok, conn, request_ref} = Mint.HTTP2.request(conn, "GET", "/index.html", _headers = [], _body = "") 77 78 next_message = 79 receive do 80 msg -> msg 81 end 82 83 {:ok, conn, responses} = Mint.HTTP2.stream(conn, next_message) 84 85 [ 86 {:push_promise, ^request_ref, promised_request_ref, promised_headers}, 87 {:status, ^request_ref, 200}, 88 {:headers, ^request_ref, []}, 89 {:data, ^request_ref, "<html>..."}, 90 {:done, ^request_ref} 91 ] = responses 92 93 promised_headers 94 #=> [{":method", "GET"}, {":path", "/style.css"}] 95 96 As you can see in the example above, when the server sends a push promise then a 97 `:push_promise` response is returned as a response to a request. The `:push_promise` 98 response contains a `promised_request_ref` and some `promised_headers`. The 99 `promised_request_ref` is the new request ref that pushed responses will be tagged with. 100 `promised_headers` are headers that tell the client *what request* the promised response 101 will respond to. The idea is that the server tells the client a request the client will 102 want to make and then preemptively sends a response for that request. Promised headers 103 will always include `:method`, `:path`, and `:authority`. 104 105 next_message = 106 receive do 107 msg -> msg 108 end 109 110 {:ok, conn, responses} = Mint.HTTP2.stream(conn, next_message) 111 112 [ 113 {:status, ^promised_request_ref, 200}, 114 {:headers, ^promised_request_ref, []}, 115 {:data, ^promised_request_ref, "body { ... }"}, 116 {:done, ^promised_request_ref} 117 ] 118 119 The response to a promised request is like a response to any normal request. 120 121 ### Disabling server pushes 122 123 HTTP/2 exposes a boolean setting for enabling or disabling server pushes with `:enable_push`. 124 You can pass this option when connecting or in `put_settings/2`. By default server push 125 is enabled. 126 """ 127 128 import Mint.Core.Util 129 import Mint.HTTP2.Frame, except: [encode: 1, decode_next: 1] 130 131 alias Mint.{HTTPError, TransportError} 132 alias Mint.Types 133 alias Mint.Core.Util 134 alias Mint.HTTP2.Frame 135 136 require Logger 137 require Integer 138 139 @behaviour Mint.Core.Conn 140 141 ## Constants 142 143 @connection_preface "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" 144 @transport_opts [alpn_advertised_protocols: ["h2"]] 145 146 @default_window_size 65_535 147 @max_window_size 2_147_483_647 148 149 @default_max_frame_size 16_384 150 @valid_max_frame_size_range @default_max_frame_size..16_777_215 151 152 @user_agent "mint/" <> Mix.Project.config()[:version] 153 154 # HTTP/2 connection struct. 155 defstruct [ 156 # Transport things. 157 :transport, 158 :socket, 159 :mode, 160 161 # Host things. 162 :hostname, 163 :port, 164 :scheme, 165 166 # Connection state (open, closed, and so on). 167 :state, 168 169 # Fields of the connection. 170 buffer: "", 171 window_size: @default_window_size, 172 encode_table: HPAX.new(4096), 173 decode_table: HPAX.new(4096), 174 175 # Queue for sent PING frames. 176 ping_queue: :queue.new(), 177 178 # Queue for sent SETTINGS frames. 179 client_settings_queue: :queue.new(), 180 181 # Stream-set-related things. 182 next_stream_id: 3, 183 streams: %{}, 184 open_client_stream_count: 0, 185 open_server_stream_count: 0, 186 ref_to_stream_id: %{}, 187 188 # Settings that the server communicates to the client. 189 server_settings: %{ 190 enable_push: true, 191 max_concurrent_streams: 100, 192 initial_window_size: @default_window_size, 193 max_frame_size: @default_max_frame_size, 194 max_header_list_size: :infinity, 195 # Only supported by the server: https://www.rfc-editor.org/rfc/rfc8441.html#section-3 196 enable_connect_protocol: false 197 }, 198 199 # Settings that the client communicates to the server. 200 client_settings: %{ 201 max_concurrent_streams: 100, 202 max_frame_size: @default_max_frame_size, 203 enable_push: true 204 }, 205 206 # Headers being processed (when headers are split into multiple frames with CONTINUATIONS, all 207 # the continuation frames must come one right after the other). 208 headers_being_processed: nil, 209 210 # Stores the headers returned by the proxy in the `CONNECT` method 211 proxy_headers: [], 212 213 # Private store. 214 private: %{} 215 ] 216 217 ## Types 218 219 @typedoc """ 220 HTTP/2 setting with its value. 221 222 This type represents both server settings as well as client settings. To retrieve 223 server settings use `get_server_setting/2` and to retrieve client settings use 224 `get_client_setting/2`. To send client settings to the server, see `put_settings/2`. 225 226 The supported settings are the following: 227 228 * `:header_table_size` - (integer) corresponds to `SETTINGS_HEADER_TABLE_SIZE`. 229 230 * `:enable_push` - (boolean) corresponds to `SETTINGS_ENABLE_PUSH`. Sets whether 231 push promises are supported. If you don't want to support push promises, 232 use `put_settings/2` to tell the server that your client doesn't want push promises. 233 234 * `:max_concurrent_streams` - (integer) corresponds to `SETTINGS_MAX_CONCURRENT_STREAMS`. 235 Tells what is the maximum number of streams that the peer sending this (client or server) 236 supports. As mentioned in the module documentation, HTTP/2 streams are equivalent to 237 requests, so knowing the maximum number of streams that the server supports can be useful 238 to know how many concurrent requests can be open at any time. Use `get_server_setting/2` 239 to find out how many concurrent streams the server supports. 240 241 * `:initial_window_size` - (integer smaller than `#{inspect(@max_window_size)}`) 242 corresponds to `SETTINGS_INITIAL_WINDOW_SIZE`. Tells what is the value of 243 the initial HTTP/2 window size for the peer that sends this setting. 244 245 * `:max_frame_size` - (integer in the range `#{inspect(@valid_max_frame_size_range)}`) 246 corresponds to `SETTINGS_MAX_FRAME_SIZE`. Tells what is the maximum size of an HTTP/2 247 frame for the peer that sends this setting. 248 249 * `:max_header_list_size` - (integer) corresponds to `SETTINGS_MAX_HEADER_LIST_SIZE`. 250 251 * `:enable_connect_protocol` - (boolean) corresponds to `SETTINGS_ENABLE_CONNECT_PROTOCOL`. 252 Sets whether the client may invoke the extended connect protocol which is used to 253 bootstrap WebSocket connections. 254 255 """ 256 @type setting() :: 257 {:enable_push, boolean()} 258 | {:max_concurrent_streams, pos_integer()} 259 | {:initial_window_size, 1..2_147_483_647} 260 | {:max_frame_size, 16_384..16_777_215} 261 | {:max_header_list_size, :infinity | pos_integer()} 262 | {:enable_connect_protocol, boolean()} 263 264 @typedoc """ 265 HTTP/2 settings. 266 267 See `t:setting/0`. 268 """ 269 @type settings() :: [setting()] 270 271 @typedoc """ 272 An HTTP/2-specific error reason. 273 274 The values can be: 275 276 * `:closed` - when you try to make a request or stream a body chunk but the connection 277 is closed. 278 279 * `:closed_for_writing` - when you try to make a request or stream a body chunk but 280 the connection is closed for writing. This means you cannot issue any more requests. 281 See the "Closed connection" section in the module documentation for more information. 282 283 * `:too_many_concurrent_requests` - when the maximum number of concurrent requests 284 allowed by the server is reached. To find out what this limit is, use `get_setting/2` 285 with the `:max_concurrent_streams` setting name. 286 287 * `{:max_header_list_size_exceeded, size, max_size}` - when the maximum size of 288 the header list is reached. `size` is the actual value of the header list size, 289 `max_size` is the maximum value allowed. See `get_setting/2` to retrieve the 290 value of the max size. 291 292 * `{:exceeds_window_size, what, window_size}` - when the data you're trying to send 293 exceeds the window size of the connection (if `what` is `:connection`) or of a request 294 (if `what` is `:request`). `window_size` is the allowed window size. See 295 `get_window_size/2`. 296 297 * `{:stream_not_found, stream_id}` - when the given request is not found. 298 299 * `:unknown_request_to_stream` - when you're trying to stream data on an unknown 300 request. 301 302 * `:request_is_not_streaming` - when you try to send data (with `stream_request_body/3`) 303 on a request that is not open for streaming. 304 305 * `:unprocessed` - when a request was closed because it was not processed by the server. 306 When this error is returned, it means that the server hasn't processed the request at all, 307 so it's safe to retry the given request on a different or new connection. 308 309 * `{:server_closed_request, error_code}` - when the server closes the request. 310 `error_code` is the reason why the request was closed. 311 312 * `{:server_closed_connection, reason, debug_data}` - when the server closes the connection 313 gracefully or because of an error. In HTTP/2, this corresponds to a `GOAWAY` frame. 314 `error` is the reason why the connection was closed. `debug_data` is additional debug data. 315 316 * `{:frame_size_error, frame}` - when there's an error with the size of a frame. 317 `frame` is the frame type, such as `:settings` or `:window_update`. 318 319 * `{:protocol_error, debug_data}` - when there's a protocol error. 320 `debug_data` is a string that explains the nature of the error. 321 322 * `{:compression_error, debug_data}` - when there's a header compression error. 323 `debug_data` is a string that explains the nature of the error. 324 325 * `{:flow_control_error, debug_data}` - when there's a flow control error. 326 `debug_data` is a string that explains the nature of the error. 327 328 """ 329 @type error_reason() :: term() 330 331 @opaque t() :: %Mint.HTTP2{} 332 333 ## Public interface 334 335 @doc """ 336 Same as `Mint.HTTP.connect/4`, but forces a HTTP/2 connection. 337 """ 338 @spec connect(Types.scheme(), Types.address(), :inet.port_number(), keyword()) :: 339 {:ok, t()} | {:error, Types.error()} 340 def connect(scheme, address, port, opts \\ []) do 341 hostname = Mint.Core.Util.hostname(opts, address) 342 343 transport_opts = 344 opts 345 |> Keyword.get(:transport_opts, []) 346 |> Keyword.merge(@transport_opts) 347 |> Keyword.put(:hostname, hostname) 348 349 case negotiate(address, port, scheme, transport_opts) do 350 {:ok, socket} -> 351 initiate(scheme, socket, hostname, port, opts) 352 353 {:error, reason} -> 354 {:error, reason} 355 end 356 end 357 358 @doc false 359 @spec upgrade( 360 Types.scheme(), 361 Mint.Types.socket(), 362 Types.scheme(), 363 String.t(), 364 :inet.port_number(), 365 keyword() 366 ) :: {:ok, t()} | {:error, Types.error()} 367 def upgrade(old_scheme, socket, new_scheme, hostname, port, opts) do 368 transport = scheme_to_transport(new_scheme) 369 370 transport_opts = 371 opts 372 |> Keyword.get(:transport_opts, []) 373 |> Keyword.merge(@transport_opts) 374 375 with {:ok, socket} <- transport.upgrade(socket, old_scheme, hostname, port, transport_opts) do 376 initiate(new_scheme, socket, hostname, port, opts) 377 end 378 end 379 380 @doc """ 381 See `Mint.HTTP.close/1`. 382 """ 383 @impl true 384 @spec close(t()) :: {:ok, t()} 385 def close(conn) 386 387 def close(%__MODULE__{state: :open} = conn) do 388 send_connection_error!(conn, :no_error, "connection peacefully closed by client") 389 catch 390 {:mint, conn, %HTTPError{reason: {:no_error, _}}} -> 391 {:ok, conn} 392 393 # We could have an error sending the GOAWAY frame, but we want to ignore that since 394 # we're closing the connection anyways. 395 {:mint, conn, %TransportError{}} -> 396 conn = put_in(conn.state, :closed) 397 {:ok, conn} 398 end 399 400 def close(%__MODULE__{state: {:goaway, _error_code, _debug_data}} = conn) do 401 _ = conn.transport.close(conn.socket) 402 {:ok, put_in(conn.state, :closed)} 403 end 404 405 def close(%__MODULE__{state: :closed} = conn) do 406 {:ok, conn} 407 end 408 409 @doc """ 410 See `Mint.HTTP.open?/1`. 411 """ 412 @impl true 413 @spec open?(t(), :read | :write | :read_write) :: boolean() 414 def open?(%Mint.HTTP2{state: state} = _conn, type \\ :read_write) 415 when type in [:read, :write, :read_write] do 416 case state do 417 :open -> true 418 {:goaway, _error_code, _debug_data} -> type == :read 419 :closed -> false 420 end 421 end 422 423 @doc """ 424 See `Mint.HTTP.request/5`. 425 426 In HTTP/2, opening a request means opening a new HTTP/2 stream (see the 427 module documentation). This means that a request could fail because the 428 maximum number of concurrent streams allowed by the server has been reached. 429 In that case, the error reason `:too_many_concurrent_requests` is returned. 430 If you want to avoid incurring in this error, you can retrieve the value of 431 the maximum number of concurrent streams supported by the server through 432 `get_server_setting/2` (passing in the `:max_concurrent_streams` setting name). 433 434 ## Header list size 435 436 In HTTP/2, the server can optionally specify a maximum header list size that 437 the client needs to respect when sending headers. The header list size is calculated 438 by summing the length (in bytes) of each header name plus value, plus 32 bytes for 439 each header. Note that pseudo-headers (like `:path` or `:method`) count towards 440 this size. If the size is exceeded, an error is returned. To check what the size 441 is, use `get_server_setting/2`. 442 443 ## Request body size 444 445 If the request body size will exceed the window size of the HTTP/2 stream created by the 446 request or the window size of the connection Mint will return a `:exceeds_window_size` 447 error. 448 449 To ensure you do not exceed the window size it is recommended to stream the request 450 body by initially passing `:stream` as the body and sending the body in chunks using 451 `stream_request_body/3` and using `get_window_size/2` to get the window size of the 452 request and connection. 453 """ 454 @impl true 455 @spec request( 456 t(), 457 method :: String.t(), 458 path :: String.t(), 459 Types.headers(), 460 body :: iodata() | nil | :stream 461 ) :: 462 {:ok, t(), Types.request_ref()} 463 | {:error, t(), Types.error()} 464 def request(conn, method, path, headers, body) 465 466 def request(%Mint.HTTP2{state: :closed} = conn, _method, _path, _headers, _body) do 467 {:error, conn, wrap_error(:closed)} 468 end 469 470 def request( 471 %Mint.HTTP2{state: {:goaway, _error_code, _debug_data}} = conn, 472 _method, 473 _path, 474 _headers, 475 _body 476 ) do 477 {:error, conn, wrap_error(:closed_for_writing)} 478 end 479 480 def request(%Mint.HTTP2{} = conn, method, path, headers, body) 481 when is_binary(method) and is_binary(path) and is_list(headers) do 482 headers = 483 headers 484 |> downcase_header_names() 485 |> add_pseudo_headers(conn, method, path) 486 |> add_default_headers(body) 487 |> sort_pseudo_headers_to_front() 488 489 {conn, stream_id, ref} = open_stream(conn) 490 {conn, payload} = encode_request_payload(conn, stream_id, headers, body) 491 conn = send!(conn, payload) 492 {:ok, conn, ref} 493 catch 494 :throw, {:mint, _conn, reason} -> 495 # The stream is invalid and "_conn" may be tracking it, so we return the original connection instead. 496 {:error, conn, reason} 497 end 498 499 @doc """ 500 See `Mint.HTTP.stream_request_body/3`. 501 """ 502 @impl true 503 @spec stream_request_body( 504 t(), 505 Types.request_ref(), 506 iodata() | :eof | {:eof, trailing_headers :: Types.headers()} 507 ) :: {:ok, t()} | {:error, t(), Types.error()} 508 def stream_request_body(conn, request_ref, chunk) 509 510 def stream_request_body(%Mint.HTTP2{state: :closed} = conn, _request_ref, _chunk) do 511 {:error, conn, wrap_error(:closed)} 512 end 513 514 def stream_request_body( 515 %Mint.HTTP2{state: {:goaway, _error_code, _debug_data}} = conn, 516 _request_ref, 517 _chunk 518 ) do 519 {:error, conn, wrap_error(:closed_for_writing)} 520 end 521 522 def stream_request_body(%Mint.HTTP2{} = conn, request_ref, chunk) 523 when is_reference(request_ref) do 524 case Map.fetch(conn.ref_to_stream_id, request_ref) do 525 {:ok, stream_id} -> 526 {conn, payload} = encode_stream_body_request_payload(conn, stream_id, chunk) 527 conn = send!(conn, payload) 528 {:ok, conn} 529 530 :error -> 531 {:error, conn, wrap_error(:unknown_request_to_stream)} 532 end 533 catch 534 :throw, {:mint, _conn, reason} -> 535 # The stream is invalid and "_conn" may be tracking it, so we return the original connection instead. 536 {:error, conn, reason} 537 end 538 539 @doc """ 540 Pings the server. 541 542 This function is specific to HTTP/2 connections. It sends a **ping** request to 543 the server `conn` is connected to. A `{:ok, conn, request_ref}` tuple is returned, 544 where `conn` is the updated connection and `request_ref` is a unique reference that 545 identifies this ping request. The response to a ping request is returned by `stream/2` 546 as a `{:pong, request_ref}` tuple. If there's an error, this function returns 547 `{:error, conn, reason}` where `conn` is the updated connection and `reason` is the 548 error reason. 549 550 `payload` must be an 8-byte binary with arbitrary content. When the server responds to 551 a ping request, it will use that same payload. By default, the payload is an 8-byte 552 binary with all bits set to `0`. 553 554 Pinging can be used to measure the latency with the server and to ensure the connection 555 is alive and well. 556 557 ## Examples 558 559 {:ok, conn, ref} = Mint.HTTP2.ping(conn) 560 561 """ 562 @spec ping(t(), <<_::8>>) :: {:ok, t(), Types.request_ref()} | {:error, t(), Types.error()} 563 def ping(%Mint.HTTP2{} = conn, payload \\ :binary.copy(<<0>>, 8)) 564 when byte_size(payload) == 8 do 565 {conn, ref} = send_ping(conn, payload) 566 {:ok, conn, ref} 567 catch 568 :throw, {:mint, conn, error} -> {:error, conn, error} 569 end 570 571 @doc """ 572 Communicates the given client settings to the server. 573 574 This function is HTTP/2-specific. 575 576 This function takes a connection and a keyword list of HTTP/2 settings and sends 577 the values of those settings to the server. The settings won't be effective until 578 the server acknowledges them, which will be handled transparently by `stream/2`. 579 580 This function returns `{:ok, conn}` when sending the settings to the server is 581 successful, with `conn` being the updated connection. If there's an error, this 582 function returns `{:error, conn, reason}` with `conn` being the updated connection 583 and `reason` being the reason of the error. 584 585 ## Supported settings 586 587 See `t:setting/0` for the supported settings. You can see the meaning 588 of these settings [in the corresponding section in the HTTP/2 589 RFC](https://httpwg.org/specs/rfc7540.html#SettingValues). 590 591 See the "HTTP/2 settings" section in the module documentation for more information. 592 593 ## Examples 594 595 {:ok, conn} = Mint.HTTP2.put_settings(conn, max_frame_size: 100) 596 597 """ 598 @spec put_settings(t(), settings()) :: {:ok, t()} | {:error, t(), Types.error()} 599 def put_settings(%Mint.HTTP2{} = conn, settings) when is_list(settings) do 600 conn = send_settings(conn, settings) 601 {:ok, conn} 602 catch 603 :throw, {:mint, conn, error} -> {:error, conn, error} 604 end 605 606 @doc """ 607 Gets the value of the given HTTP/2 server settings. 608 609 This function returns the value of the given HTTP/2 setting that the server 610 advertised to the client. This function is HTTP/2 specific. 611 For more information on HTTP/2 settings, see [the related section in 612 the RFC](https://httpwg.org/specs/rfc7540.html#SettingValues). 613 614 See the "HTTP/2 settings" section in the module documentation for more information. 615 616 ## Supported settings 617 618 The possible settings that can be retrieved are described in `t:setting/0`. 619 Any other atom passed as `name` will raise an error. 620 621 ## Examples 622 623 Mint.HTTP2.get_server_setting(conn, :max_concurrent_streams) 624 #=> 500 625 626 """ 627 @spec get_server_setting(t(), atom()) :: term() 628 def get_server_setting(%Mint.HTTP2{} = conn, name) when is_atom(name) do 629 get_setting(conn.server_settings, name) 630 end 631 632 @doc """ 633 Gets the value of the given HTTP/2 client setting. 634 635 This function returns the value of the given HTTP/2 setting that the client 636 advertised to the server. Client settings can be advertised through `put_settings/2` 637 or when starting up a connection. 638 639 Client settings have to be acknowledged by the server before coming into effect. 640 641 This function is HTTP/2 specific. For more information on HTTP/2 settings, see 642 [the related section in the RFC](https://httpwg.org/specs/rfc7540.html#SettingValues). 643 644 See the "HTTP/2 settings" section in the module documentation for more information. 645 646 ## Supported settings 647 648 The possible settings that can be retrieved are described in `t:setting/0`. 649 Any other atom passed as `name` will raise an error. 650 651 ## Examples 652 653 Mint.HTTP2.get_client_setting(conn, :max_concurrent_streams) 654 #=> 500 655 656 """ 657 @spec get_client_setting(t(), atom()) :: term() 658 def get_client_setting(%Mint.HTTP2{} = conn, name) when is_atom(name) do 659 get_setting(conn.client_settings, name) 660 end 661 662 defp get_setting(settings, name) do 663 case Map.fetch(settings, name) do 664 {:ok, value} -> value 665 :error -> raise ArgumentError, "unknown HTTP/2 setting: #{inspect(name)}" 666 end 667 end 668 669 @doc """ 670 Cancels an in-flight request. 671 672 This function is HTTP/2 specific. It cancels an in-flight request. The server could have 673 already sent responses for the request you want to cancel: those responses will be parsed 674 by the connection but not returned to the user. No more responses 675 to a request will be returned after you call `cancel_request/2` on that request. 676 677 If there's no error in canceling the request, `{:ok, conn}` is returned where `conn` is 678 the updated connection. If there's an error, `{:error, conn, reason}` is returned where 679 `conn` is the updated connection and `reason` is the error reason. 680 681 ## Examples 682 683 {:ok, conn, ref} = Mint.HTTP2.request(conn, "GET", "/", _headers = []) 684 {:ok, conn} = Mint.HTTP2.cancel_request(conn, ref) 685 686 """ 687 @spec cancel_request(t(), Types.request_ref()) :: {:ok, t()} | {:error, t(), Types.error()} 688 def cancel_request(%Mint.HTTP2{} = conn, request_ref) when is_reference(request_ref) do 689 case Map.fetch(conn.ref_to_stream_id, request_ref) do 690 {:ok, stream_id} -> 691 conn = close_stream!(conn, stream_id, _error_code = :cancel) 692 {:ok, conn} 693 694 :error -> 695 {:ok, conn} 696 end 697 catch 698 :throw, {:mint, conn, error} -> {:error, conn, error} 699 end 700 701 @doc """ 702 Returns the window size of the connection or of a single request. 703 704 This function is HTTP/2 specific. It returns the window size of 705 either the connection if `connection_or_request` is `:connection` or of a single 706 request if `connection_or_request` is `{:request, request_ref}`. 707 708 Use this function to check the window size of the connection before sending a 709 full request. Also use this function to check the window size of both the 710 connection and of a request if you want to stream body chunks on that request. 711 712 For more information on flow control and window sizes in HTTP/2, see the section 713 below. 714 715 ## HTTP/2 flow control 716 717 In HTTP/2, flow control is implemented through a 718 window size. When the client sends data to the server, the window size is decreased 719 and the server needs to "refill" it on the client side. You don't need to take care of 720 the refilling of the client window as it happens behind the scenes in `stream/2`. 721 722 A window size is kept for the entire connection and all requests affect this window 723 size. A window size is also kept per request. 724 725 The only thing that affects the window size is the body of a request, regardless of 726 if it's a full request sent with `request/5` or body chunks sent through 727 `stream_request_body/3`. That means that if we make a request with a body that is 728 five bytes long, like `"hello"`, the window size of the connection and the window size 729 of that particular request will decrease by five bytes. 730 731 If we use all the window size before the server refills it, functions like 732 `request/5` will return an error. 733 734 ## Examples 735 736 On the connection: 737 738 HTTP2.get_window_size(conn, :connection) 739 #=> 65_536 740 741 On a single streamed request: 742 743 {:ok, conn, request_ref} = HTTP2.request(conn, "GET", "/", [], :stream) 744 HTTP2.get_window_size(conn, {:request, request_ref}) 745 #=> 65_536 746 747 {:ok, conn} = HTTP2.stream_request_body(conn, request_ref, "hello") 748 HTTP2.get_window_size(conn, {:request, request_ref}) 749 #=> 65_531 750 751 """ 752 @spec get_window_size(t(), :connection | {:request, Types.request_ref()}) :: non_neg_integer() 753 def get_window_size(conn, connection_or_request) 754 755 def get_window_size(%Mint.HTTP2{} = conn, :connection) do 756 conn.window_size 757 end 758 759 def get_window_size(%Mint.HTTP2{} = conn, {:request, request_ref}) do 760 case Map.fetch(conn.ref_to_stream_id, request_ref) do 761 {:ok, stream_id} -> 762 conn.streams[stream_id].window_size 763 764 :error -> 765 raise ArgumentError, 766 "request with request reference #{inspect(request_ref)} was not found" 767 end 768 end 769 770 @doc """ 771 See `Mint.HTTP.stream/2`. 772 """ 773 @impl true 774 @spec stream(t(), term()) :: 775 {:ok, t(), [Types.response()]} 776 | {:error, t(), Types.error(), [Types.response()]} 777 | :unknown 778 def stream(conn, message) 779 780 def stream(%Mint.HTTP2{socket: socket} = conn, {tag, socket, reason}) 781 when tag in [:tcp_error, :ssl_error] do 782 error = conn.transport.wrap_error(reason) 783 {:error, %{conn | state: :closed}, error, _responses = []} 784 end 785 786 def stream(%Mint.HTTP2{socket: socket} = conn, {tag, socket}) 787 when tag in [:tcp_closed, :ssl_closed] do 788 handle_closed(conn) 789 end 790 791 def stream(%Mint.HTTP2{transport: transport, socket: socket} = conn, {tag, socket, data}) 792 when tag in [:tcp, :ssl] do 793 case maybe_concat_and_handle_new_data(conn, data) do 794 {:ok, %{mode: mode, state: state} = conn, responses} 795 when mode == :active and state != :closed -> 796 case transport.setopts(socket, active: :once) do 797 :ok -> {:ok, conn, responses} 798 {:error, reason} -> {:error, put_in(conn.state, :closed), reason, responses} 799 end 800 801 other -> 802 other 803 end 804 catch 805 :throw, {:mint, conn, error, responses} -> {:error, conn, error, responses} 806 end 807 808 def stream(%Mint.HTTP2{}, _message) do 809 :unknown 810 end 811 812 @doc """ 813 See `Mint.HTTP.open_request_count/1`. 814 815 In HTTP/2, the number of open requests is the number of requests **opened by the client** 816 that have not yet received a `:done` response. It's important to note that only 817 requests opened by the client (with `request/5`) count towards the number of open 818 requests, as requests opened from the server with server pushes (see the "Server push" 819 section in the module documentation) are not considered open requests. We do this because 820 clients might need to know how many open requests there are because the server limits 821 the number of concurrent requests the client can open. To know how many requests the client 822 can open, see `get_server_setting/2` with the `:max_concurrent_streams` setting. 823 """ 824 @impl true 825 @spec open_request_count(t()) :: non_neg_integer() 826 def open_request_count(%Mint.HTTP2{} = conn) do 827 conn.open_client_stream_count 828 end 829 830 @doc """ 831 See `Mint.HTTP.recv/3`. 832 """ 833 @impl true 834 @spec recv(t(), non_neg_integer(), timeout()) :: 835 {:ok, t(), [Types.response()]} 836 | {:error, t(), Types.error(), [Types.response()]} 837 def recv(conn, byte_count, timeout) 838 839 def recv(%__MODULE__{mode: :passive} = conn, byte_count, timeout) do 840 case conn.transport.recv(conn.socket, byte_count, timeout) do 841 {:ok, data} -> 842 maybe_concat_and_handle_new_data(conn, data) 843 844 {:error, %TransportError{reason: :closed}} -> 845 handle_closed(conn) 846 847 {:error, error} -> 848 {:error, %{conn | state: :closed}, error, _responses = []} 849 end 850 catch 851 :throw, {:mint, conn, error, responses} -> {:error, conn, error, responses} 852 end 853 854 def recv(_conn, _byte_count, _timeout) do 855 raise ArgumentError, 856 "can't use recv/3 to synchronously receive data when the mode is :active. " <> 857 "Use Mint.HTTP.set_mode/2 to set the connection to passive mode" 858 end 859 860 @doc """ 861 See `Mint.HTTP.set_mode/2`. 862 """ 863 @impl true 864 @spec set_mode(t(), :active | :passive) :: {:ok, t()} | {:error, Types.error()} 865 def set_mode(%__MODULE__{} = conn, mode) when mode in [:active, :passive] do 866 active = 867 case mode do 868 :active -> :once 869 :passive -> false 870 end 871 872 with :ok <- conn.transport.setopts(conn.socket, active: active) do 873 {:ok, put_in(conn.mode, mode)} 874 end 875 end 876 877 @doc """ 878 See `Mint.HTTP.controlling_process/2`. 879 """ 880 @impl true 881 @spec controlling_process(t(), pid()) :: {:ok, t()} | {:error, Types.error()} 882 def controlling_process(%__MODULE__{} = conn, new_pid) when is_pid(new_pid) do 883 with :ok <- conn.transport.controlling_process(conn.socket, new_pid) do 884 {:ok, conn} 885 end 886 end 887 888 @doc """ 889 See `Mint.HTTP.put_private/3`. 890 """ 891 @impl true 892 @spec put_private(t(), atom(), term()) :: t() 893 def put_private(%Mint.HTTP2{private: private} = conn, key, value) when is_atom(key) do 894 %{conn | private: Map.put(private, key, value)} 895 end 896 897 @doc """ 898 See `Mint.HTTP.get_private/3`. 899 """ 900 @impl true 901 @spec get_private(t(), atom(), term()) :: term() 902 def get_private(%Mint.HTTP2{private: private} = _conn, key, default \\ nil) when is_atom(key) do 903 Map.get(private, key, default) 904 end 905 906 @doc """ 907 See `Mint.HTTP.delete_private/2`. 908 """ 909 @impl true 910 @spec delete_private(t(), atom()) :: t() 911 def delete_private(%Mint.HTTP2{private: private} = conn, key) when is_atom(key) do 912 %{conn | private: Map.delete(private, key)} 913 end 914 915 # http://httpwg.org/specs/rfc7540.html#rfc.section.6.5 916 # SETTINGS parameters are not negotiated. We keep client settings and server settings separate. 917 @doc false 918 @impl true 919 @spec initiate( 920 Types.scheme(), 921 Types.socket(), 922 String.t(), 923 :inet.port_number(), 924 keyword() 925 ) :: {:ok, t()} | {:error, Types.error()} 926 def initiate(scheme, socket, hostname, port, opts) do 927 transport = scheme_to_transport(scheme) 928 mode = Keyword.get(opts, :mode, :active) 929 client_settings_params = Keyword.get(opts, :client_settings, []) 930 validate_settings!(client_settings_params) 931 932 unless mode in [:active, :passive] do 933 raise ArgumentError, 934 "the :mode option must be either :active or :passive, got: #{inspect(mode)}" 935 end 936 937 conn = %Mint.HTTP2{ 938 hostname: hostname, 939 port: port, 940 transport: scheme_to_transport(scheme), 941 socket: socket, 942 mode: mode, 943 scheme: Atom.to_string(scheme), 944 state: :open 945 } 946 947 with :ok <- inet_opts(transport, socket), 948 client_settings = settings(stream_id: 0, params: client_settings_params), 949 preface = [@connection_preface, Frame.encode(client_settings)], 950 :ok <- transport.send(socket, preface), 951 conn = update_in(conn.client_settings_queue, &:queue.in(client_settings_params, &1)), 952 {:ok, server_settings, buffer, socket} <- receive_server_settings(transport, socket), 953 server_settings_ack = 954 settings(stream_id: 0, params: [], flags: set_flags(:settings, [:ack])), 955 :ok <- transport.send(socket, Frame.encode(server_settings_ack)), 956 conn = put_in(conn.buffer, buffer), 957 conn = put_in(conn.socket, socket), 958 conn = apply_server_settings(conn, settings(server_settings, :params)), 959 :ok <- if(mode == :active, do: transport.setopts(socket, active: :once), else: :ok) do 960 {:ok, conn} 961 else 962 error -> 963 transport.close(socket) 964 error 965 end 966 catch 967 {:mint, conn, error} -> 968 {:ok, _conn} = close(conn) 969 {:error, error} 970 end 971 972 @doc """ 973 See `Mint.HTTP.get_socket/1`. 974 """ 975 @impl true 976 @spec get_socket(t()) :: Mint.Types.socket() 977 def get_socket(%Mint.HTTP2{socket: socket} = _conn) do 978 socket 979 end 980 981 @doc """ 982 See `Mint.HTTP.get_proxy_headers/1`. 983 """ 984 if Version.compare(System.version(), "1.7.0") in [:eq, :gt] do 985 @doc since: "1.4.0" 986 end 987 988 @impl true 989 @spec get_proxy_headers(t()) :: Mint.Types.headers() 990 def get_proxy_headers(%__MODULE__{proxy_headers: proxy_headers}), do: proxy_headers 991 992 ## Helpers 993 994 defp handle_closed(conn) do 995 conn = put_in(conn.state, :closed) 996 997 if conn.open_client_stream_count > 0 or conn.open_server_stream_count > 0 do 998 error = conn.transport.wrap_error(:closed) 999 {:error, conn, error, _responses = []} 1000 else 1001 {:ok, conn, _responses = []} 1002 end 1003 end 1004 1005 defp negotiate(address, port, :http, transport_opts) do 1006 # We don't support protocol negotiation for TCP connections 1007 # so currently we just assume the HTTP/2 protocol 1008 transport = scheme_to_transport(:http) 1009 transport.connect(address, port, transport_opts) 1010 end 1011 1012 defp negotiate(address, port, :https, transport_opts) do 1013 transport = scheme_to_transport(:https) 1014 1015 with {:ok, socket} <- transport.connect(address, port, transport_opts), 1016 {:ok, protocol} <- transport.negotiated_protocol(socket) do 1017 if protocol == "h2" do 1018 {:ok, socket} 1019 else 1020 {:error, transport.wrap_error({:bad_alpn_protocol, protocol})} 1021 end 1022 end 1023 end 1024 1025 defp receive_server_settings(transport, socket) do 1026 case recv_next_frame(transport, socket, _buffer = "") do 1027 {:ok, settings(), _buffer, _socket} = result -> 1028 result 1029 1030 {:ok, goaway(error_code: error_code, debug_data: debug_data), _buffer, _socket} -> 1031 error = wrap_error({:server_closed_connection, error_code, debug_data}) 1032 {:error, error} 1033 1034 {:ok, frame, _buffer, _socket} -> 1035 debug_data = "received invalid frame #{elem(frame, 0)} during handshake" 1036 {:error, wrap_error({:protocol_error, debug_data})} 1037 1038 {:error, error} -> 1039 {:error, error} 1040 end 1041 end 1042 1043 defp recv_next_frame(transport, socket, buffer) do 1044 case Frame.decode_next(buffer, @default_max_frame_size) do 1045 {:ok, frame, rest} -> 1046 {:ok, frame, rest, socket} 1047 1048 :more -> 1049 with {:ok, data} <- transport.recv(socket, 0, _timeout = 10_000) do 1050 data = maybe_concat(buffer, data) 1051 recv_next_frame(transport, socket, data) 1052 end 1053 1054 {:error, {kind, _info} = reason} when kind in [:frame_size_error, :protocol_error] -> 1055 {:error, wrap_error(reason)} 1056 end 1057 end 1058 1059 defp open_stream(conn) do 1060 max_concurrent_streams = conn.server_settings.max_concurrent_streams 1061 1062 if conn.open_client_stream_count >= max_concurrent_streams do 1063 throw({:mint, conn, wrap_error(:too_many_concurrent_requests)}) 1064 end 1065 1066 stream = %{ 1067 id: conn.next_stream_id, 1068 ref: make_ref(), 1069 state: :idle, 1070 window_size: conn.server_settings.initial_window_size, 1071 received_first_headers?: false 1072 } 1073 1074 conn = put_in(conn.streams[stream.id], stream) 1075 conn = put_in(conn.ref_to_stream_id[stream.ref], stream.id) 1076 conn = update_in(conn.next_stream_id, &(&1 + 2)) 1077 {conn, stream.id, stream.ref} 1078 end 1079 1080 defp encode_stream_body_request_payload(conn, stream_id, :eof) do 1081 encode_data(conn, stream_id, "", [:end_stream]) 1082 end 1083 1084 defp encode_stream_body_request_payload(conn, stream_id, {:eof, trailing_headers}) do 1085 lowered_headers = downcase_header_names(trailing_headers) 1086 1087 if unallowed_trailing_header = Util.find_unallowed_trailing_header(lowered_headers) do 1088 error = wrap_error({:unallowed_trailing_header, unallowed_trailing_header}) 1089 throw({:mint, conn, error}) 1090 end 1091 1092 encode_headers(conn, stream_id, trailing_headers, [:end_headers, :end_stream]) 1093 end 1094 1095 defp encode_stream_body_request_payload(conn, stream_id, iodata) do 1096 encode_data(conn, stream_id, iodata, []) 1097 end 1098 1099 defp encode_request_payload(conn, stream_id, headers, :stream) do 1100 encode_headers(conn, stream_id, headers, [:end_headers]) 1101 end 1102 1103 defp encode_request_payload(conn, stream_id, headers, nil) do 1104 encode_headers(conn, stream_id, headers, [:end_stream, :end_headers]) 1105 end 1106 1107 defp encode_request_payload(conn, stream_id, headers, iodata) do 1108 {conn, headers_payload} = encode_headers(conn, stream_id, headers, [:end_headers]) 1109 {conn, data_payload} = encode_data(conn, stream_id, iodata, [:end_stream]) 1110 {conn, [headers_payload, data_payload]} 1111 end 1112 1113 defp encode_headers(conn, stream_id, headers, enabled_flags) do 1114 assert_headers_smaller_than_max_header_list_size(conn, headers) 1115 1116 headers = Enum.map(headers, fn {name, value} -> {:store_name, name, value} end) 1117 {hbf, conn} = get_and_update_in(conn.encode_table, &HPAX.encode(headers, &1)) 1118 1119 payload = headers_to_encoded_frames(conn, stream_id, hbf, enabled_flags) 1120 1121 stream_state = if :end_stream in enabled_flags, do: :half_closed_local, else: :open 1122 1123 conn = put_in(conn.streams[stream_id].state, stream_state) 1124 conn = update_in(conn.open_client_stream_count, &(&1 + 1)) 1125 1126 {conn, payload} 1127 end 1128 1129 defp assert_headers_smaller_than_max_header_list_size( 1130 %{server_settings: %{max_header_list_size: :infinity}}, 1131 _headers 1132 ) do 1133 :ok 1134 end 1135 1136 defp assert_headers_smaller_than_max_header_list_size(conn, headers) do 1137 # The value is based on the uncompressed size of header fields, including the length 1138 # of the name and value in octets plus an overhead of 32 octets for each header field. 1139 total_size = 1140 Enum.reduce(headers, 0, fn {name, value}, acc -> 1141 acc + byte_size(name) + byte_size(value) + 32 1142 end) 1143 1144 max_header_list_size = conn.server_settings.max_header_list_size 1145 1146 if total_size <= max_header_list_size do 1147 :ok 1148 else 1149 error = wrap_error({:max_header_list_size_exceeded, total_size, max_header_list_size}) 1150 throw({:mint, conn, error}) 1151 end 1152 end 1153 1154 defp headers_to_encoded_frames(conn, stream_id, hbf, enabled_flags) do 1155 if IO.iodata_length(hbf) > conn.server_settings.max_frame_size do 1156 hbf 1157 |> IO.iodata_to_binary() 1158 |> split_payload_in_chunks(conn.server_settings.max_frame_size) 1159 |> split_hbf_to_encoded_frames(stream_id, enabled_flags) 1160 else 1161 Frame.encode( 1162 headers(stream_id: stream_id, hbf: hbf, flags: set_flags(:headers, enabled_flags)) 1163 ) 1164 end 1165 end 1166 1167 defp split_hbf_to_encoded_frames({[first_chunk | chunks], last_chunk}, stream_id, enabled_flags) do 1168 flags = set_flags(:headers, enabled_flags -- [:end_headers]) 1169 first_frame = Frame.encode(headers(stream_id: stream_id, hbf: first_chunk, flags: flags)) 1170 1171 middle_frames = 1172 Enum.map(chunks, fn chunk -> 1173 Frame.encode(continuation(stream_id: stream_id, hbf: chunk)) 1174 end) 1175 1176 flags = 1177 if :end_headers in enabled_flags do 1178 set_flags(:continuation, [:end_headers]) 1179 else 1180 set_flags(:continuation, []) 1181 end 1182 1183 last_frame = Frame.encode(continuation(stream_id: stream_id, hbf: last_chunk, flags: flags)) 1184 1185 [first_frame, middle_frames, last_frame] 1186 end 1187 1188 defp encode_data(conn, stream_id, data, enabled_flags) do 1189 stream = fetch_stream!(conn, stream_id) 1190 1191 if stream.state != :open do 1192 error = wrap_error(:request_is_not_streaming) 1193 throw({:mint, conn, error}) 1194 end 1195 1196 data_size = IO.iodata_length(data) 1197 1198 cond do 1199 data_size > stream.window_size -> 1200 throw({:mint, conn, wrap_error({:exceeds_window_size, :request, stream.window_size})}) 1201 1202 data_size > conn.window_size -> 1203 throw({:mint, conn, wrap_error({:exceeds_window_size, :connection, conn.window_size})}) 1204 1205 # If the data size is greater than the max frame size, we chunk automatically based 1206 # on the max frame size. 1207 data_size > conn.server_settings.max_frame_size -> 1208 {chunks, last_chunk} = 1209 data 1210 |> IO.iodata_to_binary() 1211 |> split_payload_in_chunks(conn.server_settings.max_frame_size) 1212 1213 {encoded_chunks, conn} = 1214 Enum.map_reduce(chunks, conn, fn chunk, acc -> 1215 {acc, encoded} = encode_data_chunk(acc, stream_id, chunk, []) 1216 {encoded, acc} 1217 end) 1218 1219 {conn, encoded_last_chunk} = encode_data_chunk(conn, stream_id, last_chunk, enabled_flags) 1220 {conn, [encoded_chunks, encoded_last_chunk]} 1221 1222 true -> 1223 encode_data_chunk(conn, stream_id, data, enabled_flags) 1224 end 1225 end 1226 1227 defp encode_data_chunk(%__MODULE__{} = conn, stream_id, chunk, enabled_flags) 1228 when is_integer(stream_id) and is_list(enabled_flags) do 1229 chunk_size = IO.iodata_length(chunk) 1230 frame = data(stream_id: stream_id, flags: set_flags(:data, enabled_flags), data: chunk) 1231 conn = update_in(conn.streams[stream_id].window_size, &(&1 - chunk_size)) 1232 conn = update_in(conn.window_size, &(&1 - chunk_size)) 1233 1234 conn = 1235 if :end_stream in enabled_flags do 1236 put_in(conn.streams[stream_id].state, :half_closed_local) 1237 else 1238 conn 1239 end 1240 1241 {conn, Frame.encode(frame)} 1242 end 1243 1244 defp split_payload_in_chunks(binary, chunk_size), 1245 do: split_payload_in_chunks(binary, chunk_size, []) 1246 1247 defp split_payload_in_chunks(chunk, chunk_size, acc) when byte_size(chunk) <= chunk_size do 1248 {Enum.reverse(acc), chunk} 1249 end 1250 1251 defp split_payload_in_chunks(binary, chunk_size, acc) do 1252 <<chunk::size(chunk_size)-binary, rest::binary>> = binary 1253 split_payload_in_chunks(rest, chunk_size, [chunk | acc]) 1254 end 1255 1256 defp send_ping(conn, payload) do 1257 frame = Frame.ping(stream_id: 0, opaque_data: payload) 1258 conn = send!(conn, Frame.encode(frame)) 1259 ref = make_ref() 1260 conn = update_in(conn.ping_queue, &:queue.in({ref, payload}, &1)) 1261 {conn, ref} 1262 end 1263 1264 defp send_settings(conn, settings) do 1265 validate_settings!(settings) 1266 frame = settings(stream_id: 0, params: settings) 1267 conn = send!(conn, Frame.encode(frame)) 1268 conn = update_in(conn.client_settings_queue, &:queue.in(settings, &1)) 1269 conn 1270 end 1271 1272 defp validate_settings!(settings) do 1273 unless Keyword.keyword?(settings) do 1274 raise ArgumentError, "settings must be a keyword list" 1275 end 1276 1277 Enum.each(settings, fn 1278 {:header_table_size, value} -> 1279 unless is_integer(value) do 1280 raise ArgumentError, ":header_table_size must be an integer, got: #{inspect(value)}" 1281 end 1282 1283 {:enable_push, value} -> 1284 unless is_boolean(value) do 1285 raise ArgumentError, ":enable_push must be a boolean, got: #{inspect(value)}" 1286 end 1287 1288 {:max_concurrent_streams, value} -> 1289 unless is_integer(value) do 1290 raise ArgumentError, 1291 ":max_concurrent_streams must be an integer, got: #{inspect(value)}" 1292 end 1293 1294 {:initial_window_size, value} -> 1295 unless is_integer(value) and value <= @max_window_size do 1296 raise ArgumentError, 1297 ":initial_window_size must be an integer < #{@max_window_size}, " <> 1298 "got: #{inspect(value)}" 1299 end 1300 1301 {:max_frame_size, value} -> 1302 unless is_integer(value) and value in @valid_max_frame_size_range do 1303 raise ArgumentError, 1304 ":max_frame_size must be an integer in #{inspect(@valid_max_frame_size_range)}, " <> 1305 "got: #{inspect(value)}" 1306 end 1307 1308 {:max_header_list_size, value} -> 1309 unless is_integer(value) do 1310 raise ArgumentError, ":max_header_list_size must be an integer, got: #{inspect(value)}" 1311 end 1312 1313 {:enable_connect_protocol, value} -> 1314 unless is_boolean(value) do 1315 raise ArgumentError, 1316 ":enable_connect_protocol must be a boolean, got: #{inspect(value)}" 1317 end 1318 1319 {name, _value} -> 1320 raise ArgumentError, "unknown setting parameter #{inspect(name)}" 1321 end) 1322 end 1323 1324 defp downcase_header_names(headers) do 1325 for {name, value} <- headers, do: {Util.downcase_ascii(name), value} 1326 end 1327 1328 defp add_default_headers(headers, body) do 1329 headers 1330 |> Util.put_new_header("user-agent", @user_agent) 1331 |> add_default_content_length_header(body) 1332 end 1333 1334 defp add_default_content_length_header(headers, body) when body in [nil, :stream] do 1335 headers 1336 end 1337 1338 defp add_default_content_length_header(headers, body) do 1339 Util.put_new_header_lazy(headers, "content-length", fn -> 1340 body |> IO.iodata_length() |> Integer.to_string() 1341 end) 1342 end 1343 1344 defp add_pseudo_headers(headers, conn, method, path) do 1345 if String.upcase(method) == "CONNECT" do 1346 [ 1347 {":method", method}, 1348 {":authority", authority_pseudo_header(conn.scheme, conn.port, conn.hostname)} 1349 | headers 1350 ] 1351 else 1352 [ 1353 {":method", method}, 1354 {":path", path}, 1355 {":scheme", conn.scheme}, 1356 {":authority", authority_pseudo_header(conn.scheme, conn.port, conn.hostname)} 1357 | headers 1358 ] 1359 end 1360 end 1361 1362 defp sort_pseudo_headers_to_front(headers) do 1363 Enum.sort_by(headers, fn {key, _value} -> 1364 not String.starts_with?(key, ":") 1365 end) 1366 end 1367 1368 ## Frame handling 1369 1370 defp maybe_concat_and_handle_new_data(conn, data) do 1371 data = maybe_concat(conn.buffer, data) 1372 {conn, responses} = handle_new_data(conn, data, []) 1373 {:ok, conn, Enum.reverse(responses)} 1374 end 1375 1376 defp handle_new_data(%Mint.HTTP2{} = conn, data, responses) do 1377 case Frame.decode_next(data, conn.client_settings.max_frame_size) do 1378 {:ok, frame, rest} -> 1379 assert_valid_frame(conn, frame) 1380 {conn, responses} = handle_frame(conn, frame, responses) 1381 handle_new_data(conn, rest, responses) 1382 1383 :more -> 1384 conn = put_in(conn.buffer, data) 1385 handle_consumed_all_frames(conn, responses) 1386 1387 {:error, :payload_too_big} -> 1388 # TODO: sometimes, this could be handled with RST_STREAM instead of a GOAWAY frame (for 1389 # example, if the payload of a DATA frame is too big). 1390 # http://httpwg.org/specs/rfc7540.html#rfc.section.4.2 1391 debug_data = "frame payload exceeds connection's max frame size" 1392 send_connection_error!(conn, :frame_size_error, debug_data) 1393 1394 {:error, {:frame_size_error, frame}} -> 1395 debug_data = "error with size of frame: #{inspect(frame)}" 1396 send_connection_error!(conn, :frame_size_error, debug_data) 1397 1398 {:error, {:protocol_error, info}} -> 1399 debug_data = "error when decoding frame: #{inspect(info)}" 1400 send_connection_error!(conn, :protocol_error, debug_data) 1401 end 1402 catch 1403 :throw, {:mint, conn, error} -> throw({:mint, conn, error, responses}) 1404 :throw, {:mint, _conn, _error, _responses} = thrown -> throw(thrown) 1405 end 1406 1407 defp handle_consumed_all_frames(%{state: state} = conn, responses) do 1408 case state do 1409 # TODO: should we do something with the debug data here, like logging it? 1410 {:goaway, :no_error, _debug_data} -> 1411 {conn, responses} 1412 1413 {:goaway, error_code, debug_data} -> 1414 error = wrap_error({:server_closed_connection, error_code, debug_data}) 1415 throw({:mint, conn, error, responses}) 1416 1417 _ -> 1418 {conn, responses} 1419 end 1420 end 1421 1422 defp assert_valid_frame(_conn, unknown()) do 1423 # Unknown frames MUST be ignored: 1424 # https://datatracker.ietf.org/doc/html/rfc7540#section-4.1 1425 :ok 1426 end 1427 1428 defp assert_valid_frame(conn, frame) do 1429 stream_id = elem(frame, 1) 1430 1431 assert_frame_on_right_level(conn, elem(frame, 0), stream_id) 1432 assert_stream_id_is_allowed(conn, stream_id) 1433 assert_frame_doesnt_interrupt_header_streaming(conn, frame) 1434 end 1435 1436 # http://httpwg.org/specs/rfc7540.html#HttpSequence 1437 defp assert_frame_doesnt_interrupt_header_streaming(conn, frame) do 1438 case {conn.headers_being_processed, frame} do 1439 {nil, continuation()} -> 1440 debug_data = "CONTINUATION received outside of headers streaming" 1441 send_connection_error!(conn, :protocol_error, debug_data) 1442 1443 {nil, _frame} -> 1444 :ok 1445 1446 {{stream_id, _, _}, continuation(stream_id: stream_id)} -> 1447 :ok 1448 1449 _other -> 1450 debug_data = 1451 "headers are streaming but got a #{inspect(elem(frame, 0))} frame instead " <> 1452 "of a CONTINUATION frame" 1453 1454 send_connection_error!(conn, :protocol_error, debug_data) 1455 end 1456 end 1457 1458 stream_level_frames = [:data, :headers, :priority, :rst_stream, :push_promise, :continuation] 1459 connection_level_frames = [:settings, :ping, :goaway] 1460 1461 defp assert_frame_on_right_level(conn, frame, _stream_id = 0) 1462 when frame in unquote(stream_level_frames) do 1463 debug_data = "frame #{inspect(frame)} not allowed at the connection level (stream_id = 0)" 1464 send_connection_error!(conn, :protocol_error, debug_data) 1465 end 1466 1467 defp assert_frame_on_right_level(conn, frame, stream_id) 1468 when frame in unquote(connection_level_frames) and stream_id != 0 do 1469 debug_data = "frame #{inspect(frame)} only allowed at the connection level" 1470 send_connection_error!(conn, :protocol_error, debug_data) 1471 end 1472 1473 defp assert_frame_on_right_level(_conn, _frame, _stream_id) do 1474 :ok 1475 end 1476 1477 defp assert_stream_id_is_allowed(conn, stream_id) do 1478 if Integer.is_odd(stream_id) and stream_id >= conn.next_stream_id do 1479 debug_data = "frame with stream ID #{inspect(stream_id)} has not been opened yet" 1480 send_connection_error!(conn, :protocol_error, debug_data) 1481 else 1482 :ok 1483 end 1484 end 1485 1486 for frame_name <- stream_level_frames ++ connection_level_frames ++ [:window_update, :unknown] do 1487 function_name = :"handle_#{frame_name}" 1488 1489 defp handle_frame(conn, Frame.unquote(frame_name)() = frame, responses) do 1490 unquote(function_name)(conn, frame, responses) 1491 end 1492 end 1493 1494 defp handle_unknown(conn, _frame, responses) do 1495 # Implementations MUST ignore and discard any frame that has a type that is unknown. 1496 # see: https://datatracker.ietf.org/doc/html/rfc7540#section-4.1 1497 1498 {conn, responses} 1499 end 1500 1501 # DATA 1502 1503 defp handle_data(conn, frame, responses) do 1504 data(stream_id: stream_id, flags: flags, data: data, padding: padding) = frame 1505 1506 # Regardless of whether we have the stream or not, we need to abide by flow 1507 # control rules so we still refill the client window for the stream_id we got. 1508 window_size_increment = byte_size(data) + byte_size(padding || "") 1509 1510 conn = 1511 if window_size_increment > 0 do 1512 refill_client_windows(conn, stream_id, window_size_increment) 1513 else 1514 conn 1515 end 1516 1517 case Map.fetch(conn.streams, stream_id) do 1518 {:ok, stream} -> 1519 assert_stream_in_state(conn, stream, [:open, :half_closed_local]) 1520 responses = [{:data, stream.ref, data} | responses] 1521 1522 if flag_set?(flags, :data, :end_stream) do 1523 conn = close_stream!(conn, stream.id, :no_error) 1524 {conn, [{:done, stream.ref} | responses]} 1525 else 1526 {conn, responses} 1527 end 1528 1529 :error -> 1530 _ = Logger.debug(fn -> "Received DATA frame on closed stream ID #{stream_id}" end) 1531 {conn, responses} 1532 end 1533 end 1534 1535 defp refill_client_windows(conn, stream_id, data_size) do 1536 connection_frame = window_update(stream_id: 0, window_size_increment: data_size) 1537 stream_frame = window_update(stream_id: stream_id, window_size_increment: data_size) 1538 1539 if open?(conn) do 1540 send!(conn, [Frame.encode(connection_frame), Frame.encode(stream_frame)]) 1541 else 1542 conn 1543 end 1544 end 1545 1546 # HEADERS 1547 1548 defp handle_headers(conn, frame, responses) do 1549 headers(stream_id: stream_id, flags: flags, hbf: hbf) = frame 1550 1551 stream = Map.get(conn.streams, stream_id) 1552 end_stream? = flag_set?(flags, :headers, :end_stream) 1553 1554 if stream do 1555 assert_stream_in_state(conn, stream, [:open, :half_closed_local, :reserved_remote]) 1556 end 1557 1558 if flag_set?(flags, :headers, :end_headers) do 1559 decode_hbf_and_add_responses(conn, responses, hbf, stream, end_stream?) 1560 else 1561 callback = &decode_hbf_and_add_responses(&1, &2, &3, &4, end_stream?) 1562 conn = put_in(conn.headers_being_processed, {stream_id, hbf, callback}) 1563 {conn, responses} 1564 end 1565 end 1566 1567 # Here, "stream" can be nil in case the stream was closed. In that case, we 1568 # still need to process the hbf so that the HPACK table is updated, but then 1569 # we don't add any responses. 1570 defp decode_hbf_and_add_responses(conn, responses, hbf, stream, end_stream?) do 1571 {conn, headers} = decode_hbf(conn, hbf) 1572 1573 if stream do 1574 handle_decoded_headers_for_stream(conn, responses, stream, headers, end_stream?) 1575 else 1576 _ = Logger.debug(fn -> "Received HEADERS frame on closed stream ID" end) 1577 {conn, responses} 1578 end 1579 end 1580 1581 defp handle_decoded_headers_for_stream(conn, responses, stream, headers, end_stream?) do 1582 %{ref: ref, received_first_headers?: received_first_headers?} = stream 1583 1584 case headers do 1585 # Interim response (1xx), which is made of only one HEADERS plus zero or more CONTINUATIONs. 1586 # There can be zero or more interim responses before a "proper" response. 1587 # https://httpwg.org/specs/rfc9113.html#HttpFraming 1588 [{":status", <<?1, _, _>> = status} | headers] -> 1589 cond do 1590 received_first_headers? -> 1591 conn = close_stream!(conn, stream.id, :protocol_error) 1592 1593 debug_data = 1594 "informational response (1xx) must appear before final response, got a #{status} status" 1595 1596 error = wrap_error({:protocol_error, debug_data}) 1597 responses = [{:error, stream.ref, error} | responses] 1598 {conn, responses} 1599 1600 end_stream? -> 1601 conn = close_stream!(conn, stream.id, :protocol_error) 1602 debug_data = "informational response (1xx) must not have the END_STREAM flag set" 1603 error = wrap_error({:protocol_error, debug_data}) 1604 responses = [{:error, stream.ref, error} | responses] 1605 {conn, responses} 1606 1607 true -> 1608 assert_stream_in_state(conn, stream, [:open, :half_closed_local]) 1609 status = String.to_integer(status) 1610 headers = join_cookie_headers(headers) 1611 new_responses = [{:headers, ref, headers}, {:status, ref, status} | responses] 1612 {conn, new_responses} 1613 end 1614 1615 [{":status", status} | headers] when not received_first_headers? -> 1616 conn = put_in(conn.streams[stream.id].received_first_headers?, true) 1617 status = String.to_integer(status) 1618 headers = join_cookie_headers(headers) 1619 new_responses = [{:headers, ref, headers}, {:status, ref, status} | responses] 1620 1621 cond do 1622 # :reserved_remote means that this was a promised stream. As soon as headers come, 1623 # the stream goes in the :half_closed_local state (unless it's not allowed because 1624 # of the client's max concurrent streams limit, or END_STREAM is set). 1625 stream.state == :reserved_remote -> 1626 cond do 1627 conn.open_server_stream_count >= conn.client_settings.max_concurrent_streams -> 1628 conn = close_stream!(conn, stream.id, :refused_stream) 1629 {conn, responses} 1630 1631 end_stream? -> 1632 conn = close_stream!(conn, stream.id, :no_error) 1633 {conn, [{:done, ref} | new_responses]} 1634 1635 true -> 1636 conn = update_in(conn.open_server_stream_count, &(&1 + 1)) 1637 conn = put_in(conn.streams[stream.id].state, :half_closed_local) 1638 {conn, new_responses} 1639 end 1640 1641 end_stream? -> 1642 conn = close_stream!(conn, stream.id, :no_error) 1643 {conn, [{:done, ref} | new_responses]} 1644 1645 true -> 1646 {conn, new_responses} 1647 end 1648 1649 # Trailing headers. We don't care about the :status header here. 1650 headers when received_first_headers? -> 1651 if end_stream? do 1652 conn = close_stream!(conn, stream.id, :no_error) 1653 headers = headers |> Util.remove_unallowed_trailing_headers() |> join_cookie_headers() 1654 {conn, [{:done, ref}, {:headers, ref, headers} | responses]} 1655 else 1656 # Trailing headers must set the END_STREAM flag because they're 1657 # the last thing allowed on the stream (other than RST_STREAM and 1658 # the usual frames). 1659 conn = close_stream!(conn, stream.id, :protocol_error) 1660 debug_data = "trailing headers didn't set the END_STREAM flag" 1661 error = wrap_error({:protocol_error, debug_data}) 1662 responses = [{:error, stream.ref, error} | responses] 1663 {conn, responses} 1664 end 1665 1666 # Non-trailing headers need to have a :status header, otherwise 1667 # it's a protocol error. 1668 _headers -> 1669 conn = close_stream!(conn, stream.id, :protocol_error) 1670 error = wrap_error(:missing_status_header) 1671 responses = [{:error, stream.ref, error} | responses] 1672 {conn, responses} 1673 end 1674 end 1675 1676 defp decode_hbf(conn, hbf) do 1677 case HPAX.decode(hbf, conn.decode_table) do 1678 {:ok, headers, decode_table} -> 1679 conn = put_in(conn.decode_table, decode_table) 1680 {conn, headers} 1681 1682 {:error, reason} -> 1683 debug_data = "unable to decode headers: #{inspect(reason)}" 1684 send_connection_error!(conn, :compression_error, debug_data) 1685 end 1686 end 1687 1688 # If the port is the default for the scheme, don't add it to the :authority pseudo-header 1689 defp authority_pseudo_header(scheme, port, hostname) do 1690 if URI.default_port(scheme) == port do 1691 hostname 1692 else 1693 "#{hostname}:#{port}" 1694 end 1695 end 1696 1697 defp join_cookie_headers(headers) do 1698 # If we have 0 or 1 Cookie headers, we just use the old list of headers. 1699 case Enum.split_with(headers, fn {name, _value} -> Util.downcase_ascii(name) == "cookie" end) do 1700 {[], _headers} -> 1701 headers 1702 1703 {[_], _headers} -> 1704 headers 1705 1706 {cookies, headers} -> 1707 cookie = Enum.map_join(cookies, "; ", fn {_name, value} -> value end) 1708 [{"cookie", cookie} | headers] 1709 end 1710 end 1711 1712 # PRIORITY 1713 1714 # For now we ignore all PRIORITY frames. This shouldn't cause practical trouble. 1715 defp handle_priority(conn, frame, responses) do 1716 _ = Logger.warn(fn -> "Ignoring PRIORITY frame: #{inspect(frame)}" end) 1717 {conn, responses} 1718 end 1719 1720 # RST_STREAM 1721 1722 defp handle_rst_stream(conn, frame, responses) do 1723 rst_stream(stream_id: stream_id, error_code: error_code) = frame 1724 1725 # If we receive RST_STREAM on a closed stream, we ignore it. 1726 case Map.fetch(conn.streams, stream_id) do 1727 {:ok, stream} -> 1728 # If we receive RST_STREAM then the stream is definitely closed. 1729 # We won't send anything else on the stream so we can simply delete 1730 # it, so that if we get things like DATA on that stream we error out. 1731 conn = delete_stream(conn, stream) 1732 1733 if error_code == :no_error do 1734 {conn, [{:done, stream.ref} | responses]} 1735 else 1736 error = wrap_error({:server_closed_request, error_code}) 1737 {conn, [{:error, stream.ref, error} | responses]} 1738 end 1739 1740 :error -> 1741 {conn, responses} 1742 end 1743 end 1744 1745 # SETTINGS 1746 1747 defp handle_settings(conn, frame, responses) do 1748 settings(flags: flags, params: params) = frame 1749 1750 if flag_set?(flags, :settings, :ack) do 1751 {{:value, params}, conn} = get_and_update_in(conn.client_settings_queue, &:queue.out/1) 1752 conn = apply_client_settings(conn, params) 1753 {conn, responses} 1754 else 1755 conn = apply_server_settings(conn, params) 1756 frame = settings(flags: set_flags(:settings, [:ack]), params: []) 1757 conn = send!(conn, Frame.encode(frame)) 1758 {conn, responses} 1759 end 1760 end 1761 1762 defp apply_server_settings(conn, server_settings) do 1763 Enum.reduce(server_settings, conn, fn 1764 {:header_table_size, header_table_size}, conn -> 1765 update_in(conn.encode_table, &HPAX.resize(&1, header_table_size)) 1766 1767 {:enable_push, enable_push?}, conn -> 1768 put_in(conn.server_settings.enable_push, enable_push?) 1769 1770 {:max_concurrent_streams, max_concurrent_streams}, conn -> 1771 put_in(conn.server_settings.max_concurrent_streams, max_concurrent_streams) 1772 1773 {:initial_window_size, initial_window_size}, conn -> 1774 if initial_window_size > @max_window_size do 1775 debug_data = "INITIAL_WINDOW_SIZE setting of #{initial_window_size} is too big" 1776 send_connection_error!(conn, :flow_control_error, debug_data) 1777 end 1778 1779 update_server_initial_window_size(conn, initial_window_size) 1780 1781 {:max_frame_size, max_frame_size}, conn -> 1782 if max_frame_size not in @valid_max_frame_size_range do 1783 debug_data = "MAX_FRAME_SIZE setting parameter outside of allowed range" 1784 send_connection_error!(conn, :protocol_error, debug_data) 1785 end 1786 1787 put_in(conn.server_settings.max_frame_size, max_frame_size) 1788 1789 {:max_header_list_size, max_header_list_size}, conn -> 1790 put_in(conn.server_settings.max_header_list_size, max_header_list_size) 1791 1792 {:enable_connect_protocol, enable_connect_protocol?}, conn -> 1793 put_in(conn.server_settings.enable_connect_protocol, enable_connect_protocol?) 1794 end) 1795 end 1796 1797 defp apply_client_settings(conn, client_settings) do 1798 Enum.reduce(client_settings, conn, fn 1799 {:max_frame_size, value}, conn -> 1800 put_in(conn.client_settings.max_frame_size, value) 1801 1802 {:max_concurrent_streams, value}, conn -> 1803 put_in(conn.client_settings.max_concurrent_streams, value) 1804 1805 {:enable_push, value}, conn -> 1806 put_in(conn.client_settings.enable_push, value) 1807 end) 1808 end 1809 1810 defp update_server_initial_window_size(conn, new_iws) do 1811 diff = new_iws - conn.server_settings.initial_window_size 1812 1813 conn = 1814 update_in(conn.streams, fn streams -> 1815 for {stream_id, stream} <- streams, 1816 stream.state in [:open, :half_closed_remote], 1817 into: streams do 1818 window_size = stream.window_size + diff 1819 1820 if window_size > @max_window_size do 1821 debug_data = 1822 "INITIAL_WINDOW_SIZE parameter of #{window_size} makes some window sizes too big" 1823 1824 send_connection_error!(conn, :flow_control_error, debug_data) 1825 end 1826 1827 {stream_id, %{stream | window_size: window_size}} 1828 end 1829 end) 1830 1831 put_in(conn.server_settings.initial_window_size, new_iws) 1832 end 1833 1834 # PUSH_PROMISE 1835 1836 defp handle_push_promise( 1837 %Mint.HTTP2{client_settings: %{enable_push: false}} = conn, 1838 push_promise(), 1839 _responses 1840 ) do 1841 debug_data = "received PUSH_PROMISE frame when SETTINGS_ENABLE_PUSH was false" 1842 send_connection_error!(conn, :protocol_error, debug_data) 1843 end 1844 1845 defp handle_push_promise(conn, push_promise() = frame, responses) do 1846 push_promise( 1847 stream_id: stream_id, 1848 flags: flags, 1849 promised_stream_id: promised_stream_id, 1850 hbf: hbf 1851 ) = frame 1852 1853 assert_valid_promised_stream_id(conn, promised_stream_id) 1854 1855 stream = fetch_stream!(conn, stream_id) 1856 assert_stream_in_state(conn, stream, [:open, :half_closed_local]) 1857 1858 if flag_set?(flags, :push_promise, :end_headers) do 1859 decode_push_promise_headers_and_add_response( 1860 conn, 1861 responses, 1862 hbf, 1863 stream, 1864 promised_stream_id 1865 ) 1866 else 1867 callback = &decode_push_promise_headers_and_add_response(&1, &2, &3, &4, promised_stream_id) 1868 conn = put_in(conn.headers_being_processed, {stream_id, hbf, callback}) 1869 {conn, responses} 1870 end 1871 end 1872 1873 defp decode_push_promise_headers_and_add_response( 1874 conn, 1875 responses, 1876 hbf, 1877 stream, 1878 promised_stream_id 1879 ) do 1880 {conn, headers} = decode_hbf(conn, hbf) 1881 1882 promised_stream = %{ 1883 id: promised_stream_id, 1884 ref: make_ref(), 1885 state: :reserved_remote, 1886 window_size: conn.server_settings.initial_window_size, 1887 received_first_headers?: false 1888 } 1889 1890 conn = put_in(conn.streams[promised_stream.id], promised_stream) 1891 new_response = {:push_promise, stream.ref, promised_stream.ref, headers} 1892 {conn, [new_response | responses]} 1893 end 1894 1895 defp assert_valid_promised_stream_id(conn, promised_stream_id) do 1896 cond do 1897 not is_integer(promised_stream_id) or Integer.is_odd(promised_stream_id) -> 1898 debug_data = "invalid promised stream ID: #{inspect(promised_stream_id)}" 1899 send_connection_error!(conn, :protocol_error, debug_data) 1900 1901 Map.has_key?(conn.streams, promised_stream_id) -> 1902 debug_data = 1903 "stream with ID #{inspect(promised_stream_id)} already exists and can't be " <> 1904 "reserved by the server" 1905 1906 send_connection_error!(conn, :protocol_error, debug_data) 1907 1908 true -> 1909 :ok 1910 end 1911 end 1912 1913 # PING 1914 1915 defp handle_ping(conn, Frame.ping() = frame, responses) do 1916 Frame.ping(flags: flags, opaque_data: opaque_data) = frame 1917 1918 if flag_set?(flags, :ping, :ack) do 1919 handle_ping_ack(conn, opaque_data, responses) 1920 else 1921 ack = Frame.ping(stream_id: 0, flags: set_flags(:ping, [:ack]), opaque_data: opaque_data) 1922 conn = send!(conn, Frame.encode(ack)) 1923 {conn, responses} 1924 end 1925 end 1926 1927 defp handle_ping_ack(conn, opaque_data, responses) do 1928 case :queue.peek(conn.ping_queue) do 1929 {:value, {ref, ^opaque_data}} -> 1930 conn = update_in(conn.ping_queue, &:queue.drop/1) 1931 {conn, [{:pong, ref} | responses]} 1932 1933 {:value, _} -> 1934 _ = Logger.warn("Received PING ack that doesn't match next PING request in the queue") 1935 {conn, responses} 1936 1937 :empty -> 1938 _ = Logger.warn("Received PING ack but no PING requests are pending") 1939 {conn, responses} 1940 end 1941 end 1942 1943 # GOAWAY 1944 1945 defp handle_goaway(conn, frame, responses) do 1946 goaway( 1947 last_stream_id: last_stream_id, 1948 error_code: error_code, 1949 debug_data: debug_data 1950 ) = frame 1951 1952 # We gather all the unprocessed requests and form {:error, _, _} tuples for each one. 1953 # At the same time, we delete all the unprocessed requests from the stream set. 1954 {unprocessed_request_responses, conn} = 1955 Enum.flat_map_reduce(conn.streams, conn, fn 1956 {stream_id, _stream}, conn_acc when stream_id <= last_stream_id -> 1957 {[], conn_acc} 1958 1959 {_stream_id, stream}, conn_acc -> 1960 conn_acc = delete_stream(conn_acc, stream) 1961 {[{:error, stream.ref, wrap_error(:unprocessed)}], conn_acc} 1962 end) 1963 1964 conn = put_in(conn.state, {:goaway, error_code, debug_data}) 1965 {conn, unprocessed_request_responses ++ responses} 1966 end 1967 1968 # WINDOW_UPDATE 1969 1970 defp handle_window_update( 1971 conn, 1972 window_update(stream_id: 0, window_size_increment: wsi), 1973 responses 1974 ) do 1975 new_window_size = conn.window_size + wsi 1976 1977 if new_window_size > @max_window_size do 1978 send_connection_error!(conn, :flow_control_error, "window size too big") 1979 else 1980 conn = put_in(conn.window_size, new_window_size) 1981 {conn, responses} 1982 end 1983 end 1984 1985 defp handle_window_update( 1986 conn, 1987 window_update(stream_id: stream_id, window_size_increment: wsi), 1988 responses 1989 ) do 1990 stream = fetch_stream!(conn, stream_id) 1991 new_window_size = conn.streams[stream_id].window_size + wsi 1992 1993 if new_window_size > @max_window_size do 1994 conn = close_stream!(conn, stream_id, :flow_control_error) 1995 error = wrap_error({:flow_control_error, "window size too big"}) 1996 {conn, [{:error, stream.ref, error} | responses]} 1997 else 1998 conn = put_in(conn.streams[stream_id].window_size, new_window_size) 1999 {conn, responses} 2000 end 2001 end 2002 2003 # CONTINUATION 2004 2005 defp handle_continuation(conn, frame, responses) do 2006 continuation(stream_id: stream_id, flags: flags, hbf: hbf_chunk) = frame 2007 stream = Map.get(conn.streams, stream_id) 2008 2009 if stream do 2010 assert_stream_in_state(conn, stream, [:open, :half_closed_local, :reserved_remote]) 2011 end 2012 2013 {^stream_id, hbf_acc, callback} = conn.headers_being_processed 2014 2015 if flag_set?(flags, :continuation, :end_headers) do 2016 hbf = IO.iodata_to_binary([hbf_acc, hbf_chunk]) 2017 conn = put_in(conn.headers_being_processed, nil) 2018 callback.(conn, responses, hbf, stream) 2019 else 2020 conn = put_in(conn.headers_being_processed, {stream_id, [hbf_acc, hbf_chunk], callback}) 2021 {conn, responses} 2022 end 2023 end 2024 2025 ## General helpers 2026 2027 defp send_connection_error!(conn, error_code, debug_data) do 2028 frame = 2029 goaway(stream_id: 0, last_stream_id: 2, error_code: error_code, debug_data: debug_data) 2030 2031 conn = send!(conn, Frame.encode(frame)) 2032 _ = conn.transport.close(conn.socket) 2033 conn = put_in(conn.state, :closed) 2034 throw({:mint, conn, wrap_error({error_code, debug_data})}) 2035 end 2036 2037 defp close_stream!(conn, stream_id, error_code) do 2038 stream = Map.fetch!(conn.streams, stream_id) 2039 2040 # First of all we send a RST_STREAM with the given error code so that we 2041 # move the stream to the :closed state (that is, we remove it). 2042 rst_stream_frame = rst_stream(stream_id: stream_id, error_code: error_code) 2043 2044 conn = 2045 if open?(conn) do 2046 send!(conn, Frame.encode(rst_stream_frame)) 2047 else 2048 conn 2049 end 2050 2051 delete_stream(conn, stream) 2052 end 2053 2054 defp delete_stream(conn, stream) do 2055 conn = update_in(conn.streams, &Map.delete(&1, stream.id)) 2056 conn = update_in(conn.ref_to_stream_id, &Map.delete(&1, stream.ref)) 2057 2058 stream_open? = stream.state in [:open, :half_closed_local, :half_closed_remote] 2059 2060 conn = 2061 cond do 2062 # Stream initiated by the client. 2063 stream_open? and Integer.is_odd(stream.id) -> 2064 update_in(conn.open_client_stream_count, &(&1 - 1)) 2065 2066 # Stream initiated by the server. 2067 stream_open? and Integer.is_even(stream.id) -> 2068 update_in(conn.open_server_stream_count, &(&1 - 1)) 2069 2070 true -> 2071 conn 2072 end 2073 2074 conn 2075 end 2076 2077 defp fetch_stream!(conn, stream_id) do 2078 case Map.fetch(conn.streams, stream_id) do 2079 {:ok, stream} -> stream 2080 :error -> throw({:mint, conn, wrap_error({:stream_not_found, stream_id})}) 2081 end 2082 end 2083 2084 defp assert_stream_in_state(conn, %{state: state}, expected_states) do 2085 if state not in expected_states do 2086 debug_data = 2087 "stream was in state #{inspect(state)} and not in one of the expected states: " <> 2088 Enum.map_join(expected_states, ", ", &inspect/1) 2089 2090 send_connection_error!(conn, :protocol_error, debug_data) 2091 end 2092 end 2093 2094 defp send!(%Mint.HTTP2{transport: transport, socket: socket} = conn, bytes) do 2095 case transport.send(socket, bytes) do 2096 :ok -> 2097 conn 2098 2099 {:error, %TransportError{reason: :closed} = error} -> 2100 throw({:mint, %{conn | state: :closed}, error}) 2101 2102 {:error, reason} -> 2103 throw({:mint, conn, reason}) 2104 end 2105 end 2106 2107 defp wrap_error(reason) do 2108 %HTTPError{reason: reason, module: __MODULE__} 2109 end 2110 2111 @doc false 2112 def format_error(reason) 2113 2114 def format_error(:closed) do 2115 "the connection is closed" 2116 end 2117 2118 def format_error(:closed_for_writing) do 2119 "the connection is closed for writing, which means that you cannot issue any more " <> 2120 "requests on the connection but you can expect responses to still be delivered for " <> 2121 "part of the requests that are in flight. If a connection is closed for writing, " <> 2122 "it usually means that you got a :server_closed_request error already." 2123 end 2124 2125 def format_error(:too_many_concurrent_requests) do 2126 "the number of max concurrent HTTP/2 requests supported by the server has been reached. " <> 2127 "Use Mint.HTTP2.get_server_setting/2 with the :max_concurrent_streams setting name " <> 2128 "to find out the maximum number of concurrent requests supported by the server." 2129 end 2130 2131 def format_error({:max_header_list_size_exceeded, size, max_size}) do 2132 "the given header list (of size #{size}) goes over the max header list size of " <> 2133 "#{max_size} supported by the server. In HTTP/2, the header list size is calculated " <> 2134 "by summing up the size in bytes of each header name, value, plus 32 for each header." 2135 end 2136 2137 def format_error({:exceeds_window_size, what, window_size}) do 2138 what = 2139 case what do 2140 :request -> "request" 2141 :connection -> "connection" 2142 end 2143 2144 "the given data exceeds the #{what} window size, which is #{window_size}. " <> 2145 "The server will refill the window size of the #{what} when ready. This will be " <> 2146 "handled transparently by stream/2." 2147 end 2148 2149 def format_error({:stream_not_found, stream_id}) do 2150 "request not found (with stream_id #{inspect(stream_id)})" 2151 end 2152 2153 def format_error(:unknown_request_to_stream) do 2154 "can't stream chunk of data because the request is unknown" 2155 end 2156 2157 def format_error(:request_is_not_streaming) do 2158 "can't send more data on this request since it's not streaming" 2159 end 2160 2161 def format_error({:unallowed_trailing_header, {name, value}}) do 2162 "header #{inspect(name)} (with value #{inspect(value)}) is not allowed as a trailing header" 2163 end 2164 2165 def format_error(:missing_status_header) do 2166 "the :status pseudo-header (which is required in HTTP/2) is missing from the response" 2167 end 2168 2169 def format_error({:server_closed_request, error_code}) do 2170 "server closed request with error code #{inspect(error_code)}" 2171 end 2172 2173 def format_error({:server_closed_connection, error, debug_data}) do 2174 "server closed connection with error code #{inspect(error)} and debug data: " <> debug_data 2175 end 2176 2177 def format_error(:unprocessed) do 2178 "request was not processed by the server, which means that it's safe to retry on a " <> 2179 "different or new connection" 2180 end 2181 2182 def format_error({:frame_size_error, frame}) do 2183 "frame size error for #{inspect(frame)} frame" 2184 end 2185 2186 def format_error({:protocol_error, debug_data}) do 2187 "protocol error: " <> debug_data 2188 end 2189 2190 def format_error({:compression_error, debug_data}) do 2191 "compression error: " <> debug_data 2192 end 2193 2194 def format_error({:flow_control_error, debug_data}) do 2195 "flow control error: " <> debug_data 2196 end 2197 end