http1.ex (32953B)
1 defmodule Mint.HTTP1 do 2 @moduledoc """ 3 Processless HTTP client with support for HTTP/1 and HTTP/1.1. 4 5 This module provides a data structure that represents an HTTP/1 or HTTP/1.1 connection to 6 a given server. The connection is represented as an opaque struct `%Mint.HTTP1{}`. 7 The connection is a data structure and is not backed by a process, and all the 8 connection handling happens in the process that creates the struct. 9 10 This module and data structure work exactly like the ones described in the `Mint` 11 module, with the exception that `Mint.HTTP1` specifically deals with HTTP/1 and HTTP/1.1 while 12 `Mint` deals seamlessly with HTTP/1, HTTP/1.1, and HTTP/2. For more information on 13 how to use the data structure and client architecture, see `Mint`. 14 """ 15 16 import Mint.Core.Util 17 18 alias Mint.Core.Util 19 alias Mint.HTTP1.{Parse, Request, Response} 20 alias Mint.{HTTPError, TransportError, Types} 21 22 require Logger 23 24 @behaviour Mint.Core.Conn 25 26 @opaque t() :: %__MODULE__{} 27 28 @user_agent "mint/" <> Mix.Project.config()[:version] 29 30 @typedoc """ 31 An HTTP/1-specific error reason. 32 33 The values can be: 34 35 * `:closed` - when you try to make a request or stream a body chunk but the connection 36 is closed. 37 38 * `:request_body_is_streaming` - when you call `request/5` to send a new 39 request but another request is already streaming. 40 41 * `{:unexpected_data, data}` - when unexpected data is received from the server. 42 43 * `:invalid_status_line` - when the HTTP/1 status line is invalid. 44 45 * `{:invalid_request_target, target}` - when the request target is invalid. 46 47 * `:invalid_header` - when headers can't be parsed correctly. 48 49 * `{:invalid_header_name, name}` - when a header name is invalid. 50 51 * `{:invalid_header_value, name, value}` - when a header value is invalid. `name` 52 is the name of the header and `value` is the invalid value. 53 54 * `:invalid_chunk_size` - when the chunk size is invalid. 55 56 * `:missing_crlf_after_chunk` - when the CRLF after a chunk is missing. 57 58 * `:invalid_trailer_header` - when trailer headers can't be parsed. 59 60 * `:more_than_one_content_length_header` - when more than one `content-length` 61 headers are in the response. 62 63 * `:transfer_encoding_and_content_length` - when both the `content-length` as well 64 as the `transfer-encoding` headers are in the response. 65 66 * `{:invalid_content_length_header, value}` - when the value of the `content-length` 67 header is invalid, that is, is not an non-negative integer. 68 69 * `:empty_token_list` - when a header that is supposed to contain a list of tokens 70 (such as the `connection` header) doesn't contain any. 71 72 * `{:invalid_token_list, string}` - when a header that is supposed to contain a list 73 of tokens (such as the `connection` header) contains a malformed list of tokens. 74 75 * `:trailing_headers_but_not_chunked_encoding` - when you try to send trailing 76 headers through `stream_request_body/3` but the transfer encoding of the request 77 was not `chunked`. 78 79 """ 80 @type error_reason() :: term() 81 82 defstruct [ 83 :host, 84 :port, 85 :request, 86 :streaming_request, 87 :socket, 88 :transport, 89 :mode, 90 :scheme_as_string, 91 requests: :queue.new(), 92 state: :closed, 93 buffer: "", 94 proxy_headers: [], 95 private: %{} 96 ] 97 98 @doc """ 99 Same as `Mint.HTTP.connect/4`, but forces an HTTP/1 or HTTP/1.1 connection. 100 101 This function doesn't support proxying. 102 """ 103 @spec connect(Types.scheme(), Types.address(), :inet.port_number(), keyword()) :: 104 {:ok, t()} | {:error, Types.error()} 105 def connect(scheme, address, port, opts \\ []) do 106 # TODO: Also ALPN negotiate HTTP1? 107 108 hostname = Mint.Core.Util.hostname(opts, address) 109 transport = scheme_to_transport(scheme) 110 111 transport_opts = 112 Keyword.get(opts, :transport_opts, []) 113 |> Keyword.put(:hostname, hostname) 114 115 with {:ok, socket} <- transport.connect(address, port, transport_opts) do 116 initiate(scheme, socket, hostname, port, opts) 117 end 118 end 119 120 @doc false 121 @spec upgrade( 122 Types.scheme(), 123 Types.socket(), 124 Types.scheme(), 125 String.t(), 126 :inet.port_number(), 127 keyword() 128 ) :: {:ok, t()} | {:error, Types.error()} 129 def upgrade(old_scheme, socket, new_scheme, hostname, port, opts) do 130 # TODO: Also ALPN negotiate HTTP1? 131 132 transport = scheme_to_transport(new_scheme) 133 134 transport_opts = 135 Keyword.get(opts, :transport_opts, []) 136 |> Keyword.put(:hostname, hostname) 137 138 with {:ok, socket} <- transport.upgrade(socket, old_scheme, hostname, port, transport_opts) do 139 initiate(new_scheme, socket, hostname, port, opts) 140 end 141 end 142 143 @doc false 144 @impl true 145 @spec initiate( 146 Types.scheme(), 147 Types.socket(), 148 String.t(), 149 :inet.port_number(), 150 keyword() 151 ) :: {:ok, t()} | {:error, Types.error()} 152 def initiate(scheme, socket, hostname, port, opts) do 153 transport = scheme_to_transport(scheme) 154 mode = Keyword.get(opts, :mode, :active) 155 156 unless mode in [:active, :passive] do 157 raise ArgumentError, 158 "the :mode option must be either :active or :passive, got: #{inspect(mode)}" 159 end 160 161 with :ok <- inet_opts(transport, socket), 162 :ok <- if(mode == :active, do: transport.setopts(socket, active: :once), else: :ok) do 163 conn = %__MODULE__{ 164 transport: transport, 165 socket: socket, 166 mode: mode, 167 host: hostname, 168 port: port, 169 scheme_as_string: Atom.to_string(scheme), 170 state: :open 171 } 172 173 {:ok, conn} 174 else 175 {:error, reason} -> 176 :ok = transport.close(socket) 177 {:error, reason} 178 end 179 end 180 181 @doc """ 182 See `Mint.HTTP.close/1`. 183 """ 184 @impl true 185 @spec close(t()) :: {:ok, t()} 186 def close(conn) 187 188 def close(%__MODULE__{state: :open} = conn) do 189 conn = internal_close(conn) 190 {:ok, conn} 191 end 192 193 def close(%__MODULE__{state: :closed} = conn) do 194 {:ok, conn} 195 end 196 197 @doc """ 198 See `Mint.HTTP.open?/1`. 199 """ 200 @impl true 201 @spec open?(t(), :read | :write | :read_write) :: boolean() 202 def open?(conn, type \\ :read_write) 203 204 def open?(%__MODULE__{state: state}, type) when type in [:read, :write, :read_write] do 205 state == :open 206 end 207 208 @doc """ 209 See `Mint.HTTP.request/5`. 210 211 In HTTP/1 and HTTP/1.1, you can't open a new request if you're streaming the body of 212 another request. If you try, an error will be returned. 213 """ 214 @impl true 215 @spec request( 216 t(), 217 method :: String.t(), 218 path :: String.t(), 219 Types.headers(), 220 body :: iodata() | nil | :stream 221 ) :: 222 {:ok, t(), Types.request_ref()} 223 | {:error, t(), Types.error()} 224 def request(conn, method, path, headers, body) 225 226 def request(%__MODULE__{state: :closed} = conn, _method, _path, _headers, _body) do 227 {:error, conn, wrap_error(:closed)} 228 end 229 230 def request(%__MODULE__{streaming_request: %{}} = conn, _method, _path, _headers, _body) do 231 {:error, conn, wrap_error(:request_body_is_streaming)} 232 end 233 234 def request(%__MODULE__{} = conn, method, path, headers, body) do 235 %__MODULE__{transport: transport, socket: socket} = conn 236 237 headers = 238 headers 239 |> lower_header_keys() 240 |> add_default_headers(conn) 241 242 with {:ok, headers, encoding} <- add_content_length_or_transfer_encoding(headers, body), 243 {:ok, iodata} <- Request.encode(method, path, headers, body), 244 :ok <- transport.send(socket, iodata) do 245 request_ref = make_ref() 246 request = new_request(request_ref, method, body, encoding) 247 248 case request.state do 249 {:stream_request, _} -> 250 conn = %__MODULE__{conn | streaming_request: request} 251 {:ok, conn, request_ref} 252 253 _ -> 254 conn = enqueue_request(conn, request) 255 {:ok, conn, request_ref} 256 end 257 else 258 {:error, %TransportError{reason: :closed} = error} -> 259 {:error, %{conn | state: :closed}, error} 260 261 {:error, %error_module{} = error} when error_module in [HTTPError, TransportError] -> 262 {:error, conn, error} 263 264 {:error, reason} -> 265 {:error, conn, wrap_error(reason)} 266 end 267 end 268 269 @doc """ 270 See `Mint.HTTP.stream_request_body/3`. 271 272 In HTTP/1, sending an empty chunk is a no-op. 273 274 ## Transfer encoding and content length 275 276 When streaming the request body, Mint cannot send a precalculated `content-length` 277 request header because it doesn't know the body that you'll stream. However, Mint 278 will transparently handle the presence of a `content-length` header using this logic: 279 280 * if you specifically set a `content-length` header, then transfer encoding and 281 making sure the content length is correct for what you'll stream is up to you. 282 283 * if you specifically set the transfer encoding (`transfer-encoding` header) 284 to `chunked`, then it's up to you to 285 [properly encode chunks](https://en.wikipedia.org/wiki/Chunked_transfer_encoding). 286 287 * if you don't set the transfer encoding to `chunked` and don't provide a 288 `content-length` header, Mint will do implicit `chunked` transfer encoding 289 (setting the `transfer-encoding` header appropriately) and will take care 290 of properly encoding the chunks. 291 292 """ 293 @impl true 294 @spec stream_request_body( 295 t(), 296 Types.request_ref(), 297 iodata() | :eof | {:eof, trailing_headers :: Types.headers()} 298 ) :: 299 {:ok, t()} | {:error, t(), Types.error()} 300 def stream_request_body( 301 %__MODULE__{streaming_request: %{state: {:stream_request, :identity}, ref: ref}} = conn, 302 ref, 303 :eof 304 ) do 305 request = %{conn.streaming_request | state: :status} 306 conn = enqueue_request(%__MODULE__{conn | streaming_request: nil}, request) 307 {:ok, conn} 308 end 309 310 def stream_request_body( 311 %__MODULE__{streaming_request: %{state: {:stream_request, :identity}, ref: ref}} = conn, 312 ref, 313 {:eof, _trailing_headers} 314 ) do 315 {:error, conn, wrap_error(:trailing_headers_but_not_chunked_encoding)} 316 end 317 318 def stream_request_body( 319 %__MODULE__{streaming_request: %{state: {:stream_request, :identity}, ref: ref}} = conn, 320 ref, 321 body 322 ) do 323 case conn.transport.send(conn.socket, body) do 324 :ok -> 325 {:ok, conn} 326 327 {:error, %TransportError{reason: :closed} = error} -> 328 {:error, %{conn | state: :closed}, error} 329 330 {:error, error} -> 331 {:error, conn, error} 332 end 333 end 334 335 def stream_request_body( 336 %__MODULE__{streaming_request: %{state: {:stream_request, :chunked}, ref: ref}} = conn, 337 ref, 338 chunk 339 ) do 340 with {:ok, chunk} <- validate_chunk(chunk), 341 :ok <- conn.transport.send(conn.socket, Request.encode_chunk(chunk)) do 342 case chunk do 343 :eof -> 344 request = %{conn.streaming_request | state: :status} 345 conn = enqueue_request(%__MODULE__{conn | streaming_request: nil}, request) 346 {:ok, conn} 347 348 {:eof, _trailing_headers} -> 349 request = %{conn.streaming_request | state: :status} 350 conn = enqueue_request(%__MODULE__{conn | streaming_request: nil}, request) 351 {:ok, conn} 352 353 _other -> 354 {:ok, conn} 355 end 356 else 357 :empty_chunk -> 358 {:ok, conn} 359 360 {:error, %TransportError{reason: :closed} = error} -> 361 {:error, %{conn | state: :closed}, error} 362 363 {:error, error} -> 364 {:error, conn, error} 365 end 366 end 367 368 defp validate_chunk({:eof, trailing_headers}) do 369 headers = lower_header_keys(trailing_headers) 370 371 if unallowed_header = find_unallowed_trailing_header(headers) do 372 {:error, wrap_error({:unallowed_trailing_header, unallowed_header})} 373 else 374 {:ok, {:eof, headers}} 375 end 376 end 377 378 defp validate_chunk(:eof) do 379 {:ok, :eof} 380 end 381 382 defp validate_chunk(chunk) do 383 if IO.iodata_length(chunk) == 0 do 384 :empty_chunk 385 else 386 {:ok, chunk} 387 end 388 end 389 390 @doc """ 391 See `Mint.HTTP.stream/2`. 392 """ 393 @impl true 394 @spec stream(t(), term()) :: 395 {:ok, t(), [Types.response()]} 396 | {:error, t(), Types.error(), [Types.response()]} 397 | :unknown 398 def stream(conn, message) 399 400 def stream(%__MODULE__{transport: transport, socket: socket} = conn, {tag, socket, data}) 401 when tag in [:tcp, :ssl] do 402 case handle_data(conn, data) do 403 {:ok, %{mode: mode, state: state} = conn, responses} 404 when mode == :active and state != :closed -> 405 case transport.setopts(socket, active: :once) do 406 :ok -> {:ok, conn, responses} 407 {:error, reason} -> {:error, put_in(conn.state, :closed), reason, responses} 408 end 409 410 other -> 411 other 412 end 413 end 414 415 def stream(%__MODULE__{socket: socket} = conn, {tag, socket}) 416 when tag in [:tcp_closed, :ssl_closed] do 417 handle_close(conn) 418 end 419 420 def stream(%__MODULE__{socket: socket} = conn, {tag, socket, reason}) 421 when tag in [:tcp_error, :ssl_error] do 422 handle_error(conn, conn.transport.wrap_error(reason)) 423 end 424 425 def stream(%__MODULE__{}, _message) do 426 :unknown 427 end 428 429 defp handle_data(%__MODULE__{request: nil} = conn, data) do 430 conn = internal_close(conn) 431 {:error, conn, wrap_error({:unexpected_data, data}), []} 432 end 433 434 defp handle_data(%__MODULE__{request: request} = conn, data) do 435 data = maybe_concat(conn.buffer, data) 436 437 case decode(request.state, conn, data, []) do 438 {:ok, conn, responses} -> 439 {:ok, conn, Enum.reverse(responses)} 440 441 {:error, conn, reason, responses} -> 442 conn = put_in(conn.state, :closed) 443 {:error, conn, reason, responses} 444 end 445 end 446 447 defp handle_close(%__MODULE__{request: request} = conn) do 448 conn = put_in(conn.state, :closed) 449 conn = request_done(conn) 450 451 if request && request.body == :until_closed do 452 conn = put_in(conn.state, :closed) 453 {:ok, conn, [{:done, request.ref}]} 454 else 455 {:error, conn, conn.transport.wrap_error(:closed), []} 456 end 457 end 458 459 defp handle_error(conn, error) do 460 conn = put_in(conn.state, :closed) 461 {:error, conn, error, []} 462 end 463 464 @doc """ 465 See `Mint.HTTP.recv/3`. 466 """ 467 @impl true 468 @spec recv(t(), non_neg_integer(), timeout()) :: 469 {:ok, t(), [Types.response()]} 470 | {:error, t(), Types.error(), [Types.response()]} 471 def recv(conn, byte_count, timeout) 472 473 def recv(%__MODULE__{mode: :passive} = conn, byte_count, timeout) do 474 case conn.transport.recv(conn.socket, byte_count, timeout) do 475 {:ok, data} -> handle_data(conn, data) 476 {:error, %Mint.TransportError{reason: :closed}} -> handle_close(conn) 477 {:error, error} -> handle_error(conn, error) 478 end 479 end 480 481 def recv(_conn, _byte_count, _timeout) do 482 raise ArgumentError, 483 "can't use recv/3 to synchronously receive data when the mode is :active. " <> 484 "Use Mint.HTTP.set_mode/2 to set the connection to passive mode" 485 end 486 487 @doc """ 488 See `Mint.HTTP.set_mode/2`. 489 """ 490 @impl true 491 @spec set_mode(t(), :active | :passive) :: {:ok, t()} | {:error, Types.error()} 492 def set_mode(%__MODULE__{} = conn, mode) when mode in [:active, :passive] do 493 active = 494 case mode do 495 :active -> :once 496 :passive -> false 497 end 498 499 with :ok <- conn.transport.setopts(conn.socket, active: active) do 500 {:ok, put_in(conn.mode, mode)} 501 end 502 end 503 504 @doc """ 505 See `Mint.HTTP.controlling_process/2`. 506 """ 507 @impl true 508 @spec controlling_process(t(), pid()) :: {:ok, t()} | {:error, Types.error()} 509 def controlling_process(%__MODULE__{} = conn, new_pid) when is_pid(new_pid) do 510 with :ok <- conn.transport.controlling_process(conn.socket, new_pid) do 511 {:ok, conn} 512 end 513 end 514 515 @doc """ 516 See `Mint.HTTP.open_request_count/1`. 517 518 In HTTP/1, the number of open requests is the number of pipelined requests. 519 """ 520 @impl true 521 @spec open_request_count(t()) :: non_neg_integer() 522 def open_request_count(%__MODULE__{} = conn) do 523 case conn do 524 %{request: nil, streaming_request: nil} -> 0 525 %{request: nil} -> 1 526 %{streaming_request: nil} -> 1 + :queue.len(conn.requests) 527 _ -> 2 + :queue.len(conn.requests) 528 end 529 end 530 531 @doc """ 532 See `Mint.HTTP.put_private/3`. 533 """ 534 @impl true 535 @spec put_private(t(), atom(), term()) :: t() 536 def put_private(%__MODULE__{private: private} = conn, key, value) when is_atom(key) do 537 %{conn | private: Map.put(private, key, value)} 538 end 539 540 @doc """ 541 See `Mint.HTTP.get_private/3`. 542 """ 543 @impl true 544 @spec get_private(t(), atom(), term()) :: term() 545 def get_private(%__MODULE__{private: private} = _conn, key, default \\ nil) when is_atom(key) do 546 Map.get(private, key, default) 547 end 548 549 @doc """ 550 See `Mint.HTTP.delete_private/2`. 551 """ 552 @impl true 553 @spec delete_private(t(), atom()) :: t() 554 def delete_private(%__MODULE__{private: private} = conn, key) when is_atom(key) do 555 %{conn | private: Map.delete(private, key)} 556 end 557 558 @doc """ 559 See `Mint.HTTP.get_socket/1`. 560 """ 561 @impl true 562 @spec get_socket(t()) :: Mint.Types.socket() 563 def get_socket(%__MODULE__{socket: socket} = _conn) do 564 socket 565 end 566 567 @doc """ 568 See `Mint.HTTP.get_proxy_headers/1`. 569 """ 570 if Version.compare(System.version(), "1.7.0") in [:eq, :gt] do 571 @doc since: "1.4.0" 572 end 573 574 @impl true 575 @spec get_proxy_headers(t()) :: Mint.Types.headers() 576 def get_proxy_headers(%__MODULE__{proxy_headers: proxy_headers}), do: proxy_headers 577 578 ## Helpers 579 580 defp decode(:status, %{request: request} = conn, data, responses) do 581 case Response.decode_status_line(data) do 582 {:ok, {version, status, _reason}, rest} -> 583 request = %{request | version: version, status: status, state: :headers} 584 conn = %{conn | request: request} 585 responses = [{:status, request.ref, status} | responses] 586 decode(:headers, conn, rest, responses) 587 588 :more -> 589 conn = put_in(conn.buffer, data) 590 {:ok, conn, responses} 591 592 :error -> 593 {:error, conn, wrap_error(:invalid_status_line), responses} 594 end 595 end 596 597 defp decode(:headers, %{request: request} = conn, data, responses) do 598 decode_headers(conn, request, data, responses, request.headers_buffer) 599 end 600 601 defp decode(:body, conn, data, responses) do 602 case message_body(conn.request) do 603 {:ok, body} -> 604 conn = put_in(conn.request.body, body) 605 decode_body(body, conn, data, conn.request.ref, responses) 606 607 {:error, reason} -> 608 {:error, conn, wrap_error(reason), responses} 609 end 610 end 611 612 defp decode_headers(conn, request, data, responses, headers) do 613 case Response.decode_header(data) do 614 {:ok, {name, value}, rest} -> 615 headers = [{name, value} | headers] 616 617 case store_header(request, name, value) do 618 {:ok, request} -> decode_headers(conn, request, rest, responses, headers) 619 {:error, reason} -> {:error, conn, wrap_error(reason), responses} 620 end 621 622 {:ok, :eof, rest} -> 623 responses = [{:headers, request.ref, Enum.reverse(headers)} | responses] 624 request = %{request | state: :body, headers_buffer: []} 625 conn = %{conn | buffer: "", request: request} 626 decode(:body, conn, rest, responses) 627 628 :more -> 629 request = %{request | headers_buffer: headers} 630 conn = %{conn | buffer: data, request: request} 631 {:ok, conn, responses} 632 633 :error -> 634 {:error, conn, wrap_error(:invalid_header), responses} 635 end 636 end 637 638 defp decode_body(:none, conn, data, request_ref, responses) do 639 conn = put_in(conn.buffer, data) 640 conn = request_done(conn) 641 responses = [{:done, request_ref} | responses] 642 {:ok, conn, responses} 643 end 644 645 defp decode_body(:single, conn, data, request_ref, responses) do 646 {conn, responses} = add_body(conn, data, responses) 647 conn = request_done(conn) 648 responses = [{:done, request_ref} | responses] 649 {:ok, conn, responses} 650 end 651 652 defp decode_body(:until_closed, conn, data, _request_ref, responses) do 653 {conn, responses} = add_body(conn, data, responses) 654 {:ok, conn, responses} 655 end 656 657 defp decode_body({:content_length, length}, conn, data, request_ref, responses) do 658 cond do 659 length > byte_size(data) -> 660 conn = put_in(conn.request.body, {:content_length, length - byte_size(data)}) 661 {conn, responses} = add_body(conn, data, responses) 662 {:ok, conn, responses} 663 664 length <= byte_size(data) -> 665 <<body::binary-size(length), rest::binary>> = data 666 {conn, responses} = add_body(conn, body, responses) 667 conn = request_done(conn) 668 responses = [{:done, request_ref} | responses] 669 next_request(conn, rest, responses) 670 end 671 end 672 673 defp decode_body({:chunked, nil}, conn, "", _request_ref, responses) do 674 conn = put_in(conn.buffer, "") 675 conn = put_in(conn.request.body, {:chunked, nil}) 676 {:ok, conn, responses} 677 end 678 679 defp decode_body({:chunked, nil}, conn, data, request_ref, responses) do 680 case Integer.parse(data, 16) do 681 {_size, ""} -> 682 conn = put_in(conn.buffer, data) 683 conn = put_in(conn.request.body, {:chunked, nil}) 684 {:ok, conn, responses} 685 686 {0, rest} -> 687 # Manually collapse the body buffer since we're done with the body 688 {conn, responses} = collapse_body_buffer(conn, responses) 689 decode_body({:chunked, :metadata, :trailer}, conn, rest, request_ref, responses) 690 691 {size, rest} when size > 0 -> 692 decode_body({:chunked, :metadata, size}, conn, rest, request_ref, responses) 693 694 _other -> 695 {:error, conn, wrap_error(:invalid_chunk_size), responses} 696 end 697 end 698 699 defp decode_body({:chunked, :metadata, size}, conn, data, request_ref, responses) do 700 case Parse.ignore_until_crlf(data) do 701 {:ok, rest} -> 702 decode_body({:chunked, size}, conn, rest, request_ref, responses) 703 704 :more -> 705 conn = put_in(conn.buffer, data) 706 conn = put_in(conn.request.body, {:chunked, :metadata, size}) 707 {:ok, conn, responses} 708 end 709 end 710 711 defp decode_body({:chunked, :trailer}, conn, data, _request_ref, responses) do 712 decode_trailer_headers(conn, data, responses, conn.request.headers_buffer) 713 end 714 715 defp decode_body({:chunked, :crlf}, conn, data, request_ref, responses) do 716 case data do 717 <<"\r\n", rest::binary>> -> 718 conn = put_in(conn.request.body, {:chunked, nil}) 719 decode_body({:chunked, nil}, conn, rest, request_ref, responses) 720 721 _other when byte_size(data) < 2 -> 722 conn = put_in(conn.buffer, data) 723 {:ok, conn, responses} 724 725 _other -> 726 {:error, conn, wrap_error(:missing_crlf_after_chunk), responses} 727 end 728 end 729 730 defp decode_body({:chunked, length}, conn, data, request_ref, responses) do 731 cond do 732 length > byte_size(data) -> 733 conn = put_in(conn.buffer, "") 734 conn = put_in(conn.request.body, {:chunked, length - byte_size(data)}) 735 conn = add_body_to_buffer(conn, data) 736 {:ok, conn, responses} 737 738 length <= byte_size(data) -> 739 <<body::binary-size(length), rest::binary>> = data 740 {conn, responses} = add_body(conn, body, responses) 741 conn = put_in(conn.request.body, {:chunked, :crlf}) 742 decode_body({:chunked, :crlf}, conn, rest, request_ref, responses) 743 end 744 end 745 746 defp decode_trailer_headers(conn, data, responses, headers) do 747 case Response.decode_header(data) do 748 {:ok, {name, value}, rest} -> 749 headers = [{name, value} | headers] 750 decode_trailer_headers(conn, rest, responses, headers) 751 752 {:ok, :eof, rest} -> 753 headers = Util.remove_unallowed_trailing_headers(headers) 754 755 responses = [ 756 {:done, conn.request.ref} 757 | add_trailing_headers(headers, conn.request.ref, responses) 758 ] 759 760 conn = request_done(conn) 761 next_request(conn, rest, responses) 762 763 :more -> 764 request = %{conn.request | body: {:chunked, :trailer}, headers_buffer: headers} 765 conn = %{conn | buffer: data, request: request} 766 {:ok, conn, responses} 767 768 :error -> 769 {:error, conn, wrap_error(:invalid_trailer_header), responses} 770 end 771 end 772 773 defp next_request(%{request: nil} = conn, data, responses) do 774 # TODO: Figure out if we should keep buffering even though there are no 775 # requests in flight 776 {:ok, %{conn | buffer: data}, responses} 777 end 778 779 defp next_request(conn, data, responses) do 780 decode(:status, %{conn | state: :status}, data, responses) 781 end 782 783 defp add_trailing_headers([], _request_ref, responses), do: responses 784 785 defp add_trailing_headers(headers, request_ref, responses), 786 do: [{:headers, request_ref, Enum.reverse(headers)} | responses] 787 788 defp add_body(conn, data, responses) do 789 conn = add_body_to_buffer(conn, data) 790 collapse_body_buffer(conn, responses) 791 end 792 793 defp add_body_to_buffer(conn, data) do 794 update_in(conn.request.data_buffer, &[&1 | data]) 795 end 796 797 defp collapse_body_buffer(conn, responses) do 798 case IO.iodata_to_binary(conn.request.data_buffer) do 799 "" -> 800 {conn, responses} 801 802 data -> 803 conn = put_in(conn.request.data_buffer, []) 804 {conn, [{:data, conn.request.ref, data} | responses]} 805 end 806 end 807 808 defp store_header(%{content_length: nil} = request, "content-length", value) do 809 with {:ok, content_length} <- Parse.content_length_header(value), 810 do: {:ok, %{request | content_length: content_length}} 811 end 812 813 defp store_header(%{connection: connection} = request, "connection", value) do 814 with {:ok, connection_header} <- Parse.connection_header(value), 815 do: {:ok, %{request | connection: connection ++ connection_header}} 816 end 817 818 defp store_header(%{transfer_encoding: transfer_encoding} = request, "transfer-encoding", value) do 819 with {:ok, transfer_encoding_header} <- Parse.transfer_encoding_header(value), 820 do: {:ok, %{request | transfer_encoding: transfer_encoding ++ transfer_encoding_header}} 821 end 822 823 defp store_header(_request, "content-length", _value) do 824 {:error, :more_than_one_content_length_header} 825 end 826 827 defp store_header(request, _name, _value) do 828 {:ok, request} 829 end 830 831 defp request_done(%{request: request} = conn) do 832 conn = pop_request(conn) 833 834 cond do 835 !request -> conn 836 "close" in request.connection -> internal_close(conn) 837 request.version >= {1, 1} -> conn 838 "keep-alive" in request.connection -> conn 839 true -> internal_close(conn) 840 end 841 end 842 843 defp pop_request(conn) do 844 case :queue.out(conn.requests) do 845 {{:value, request}, requests} -> 846 %{conn | request: request, requests: requests} 847 848 {:empty, requests} -> 849 %{conn | request: nil, requests: requests} 850 end 851 end 852 853 defp enqueue_request(%{request: nil} = conn, request) do 854 %__MODULE__{conn | request: request} 855 end 856 857 defp enqueue_request(conn, request) do 858 requests = :queue.in(request, conn.requests) 859 %__MODULE__{conn | requests: requests} 860 end 861 862 defp internal_close(conn) do 863 if conn.buffer != "" do 864 _ = Logger.debug(["Connection closed with data left in the buffer: ", inspect(conn.buffer)]) 865 end 866 867 _ = conn.transport.close(conn.socket) 868 %{conn | state: :closed} 869 end 870 871 # RFC7230 3.3.3: 872 # > If a message is received with both a Transfer-Encoding and a 873 # > Content-Length header field, the Transfer-Encoding overrides the 874 # > Content-Length. Such a message might indicate an attempt to 875 # > perform request smuggling (Section 9.5) or response splitting 876 # > (Section 9.4) and ought to be handled as an error. A sender MUST 877 # > remove the received Content-Length field prior to forwarding such 878 # > a message downstream. 879 defp message_body(%{body: nil, method: method, status: status} = request) do 880 cond do 881 status == 101 -> 882 {:ok, :single} 883 884 method == "HEAD" or status in 100..199 or status in [204, 304] -> 885 {:ok, :none} 886 887 # method == "CONNECT" and status in 200..299 -> nil 888 889 request.transfer_encoding != [] && request.content_length -> 890 {:error, :transfer_encoding_and_content_length} 891 892 "chunked" == List.first(request.transfer_encoding) -> 893 {:ok, {:chunked, nil}} 894 895 request.content_length -> 896 {:ok, {:content_length, request.content_length}} 897 898 true -> 899 {:ok, :until_closed} 900 end 901 end 902 903 defp message_body(%{body: body}) do 904 {:ok, body} 905 end 906 907 defp new_request(ref, method, body, encoding) do 908 state = 909 if body == :stream do 910 {:stream_request, encoding} 911 else 912 :status 913 end 914 915 %{ 916 ref: ref, 917 state: state, 918 method: method, 919 version: nil, 920 status: nil, 921 headers_buffer: [], 922 data_buffer: [], 923 content_length: nil, 924 connection: [], 925 transfer_encoding: [], 926 body: nil 927 } 928 end 929 930 defp lower_header_keys(headers) do 931 for {name, value} <- headers, do: {Util.downcase_ascii(name), value} 932 end 933 934 defp add_default_headers(headers, conn) do 935 headers 936 |> Util.put_new_header("user-agent", @user_agent) 937 |> Util.put_new_header("host", default_host_header(conn)) 938 end 939 940 # If the port is the default for the scheme, don't add it to the host header 941 defp default_host_header(%__MODULE__{scheme_as_string: scheme, host: host, port: port}) do 942 if URI.default_port(scheme) == port do 943 host 944 else 945 "#{host}:#{port}" 946 end 947 end 948 949 defp add_content_length_or_transfer_encoding(headers, :stream) do 950 cond do 951 List.keymember?(headers, "content-length", 0) -> 952 {:ok, headers, :identity} 953 954 found = List.keyfind(headers, "transfer-encoding", 0) -> 955 {"transfer-encoding", value} = found 956 957 with {:ok, tokens} <- Parse.transfer_encoding_header(value) do 958 if "chunked" in tokens or "identity" in tokens do 959 {:ok, headers, :identity} 960 else 961 new_transfer_encoding = {"transfer-encoding", value <> ",chunked"} 962 headers = List.keyreplace(headers, "transfer-encoding", 0, new_transfer_encoding) 963 {:ok, headers, :chunked} 964 end 965 end 966 967 # If no content-length or transfer-encoding are present, assume 968 # chunked transfer-encoding and handle the encoding ourselves. 969 true -> 970 headers = Util.put_new_header(headers, "transfer-encoding", "chunked") 971 {:ok, headers, :chunked} 972 end 973 end 974 975 defp add_content_length_or_transfer_encoding(headers, nil) do 976 {:ok, headers, :identity} 977 end 978 979 defp add_content_length_or_transfer_encoding(headers, body) do 980 length_fun = fn -> body |> IO.iodata_length() |> Integer.to_string() end 981 {:ok, Util.put_new_header_lazy(headers, "content-length", length_fun), :identity} 982 end 983 984 defp wrap_error(reason) do 985 %HTTPError{reason: reason, module: __MODULE__} 986 end 987 988 @doc false 989 def format_error(reason) 990 991 def format_error(:closed) do 992 "the connection is closed" 993 end 994 995 def format_error(:request_body_is_streaming) do 996 "a request body is currently streaming, so no new requests can be issued" 997 end 998 999 def format_error({:unexpected_data, data}) do 1000 "received unexpected data: " <> inspect(data) 1001 end 1002 1003 def format_error(:invalid_status_line) do 1004 "invalid status line" 1005 end 1006 1007 def format_error(:invalid_header) do 1008 "invalid header" 1009 end 1010 1011 def format_error({:invalid_request_target, target}) do 1012 "invalid request target: #{inspect(target)}" 1013 end 1014 1015 def format_error({:invalid_header_name, name}) do 1016 "invalid header name: #{inspect(name)}" 1017 end 1018 1019 def format_error({:invalid_header_value, name, value}) do 1020 "invalid value for header (only printable ASCII characters are allowed) " <> 1021 "#{inspect(name)}: #{inspect(value)}" 1022 end 1023 1024 def format_error(:invalid_chunk_size) do 1025 "invalid chunk size" 1026 end 1027 1028 def format_error(:missing_crlf_after_chunk) do 1029 "missing CRLF after chunk" 1030 end 1031 1032 def format_error(:invalid_trailer_header) do 1033 "invalid trailer header" 1034 end 1035 1036 def format_error(:more_than_one_content_length_header) do 1037 "the response contains two or more Content-Length headers" 1038 end 1039 1040 def format_error(:transfer_encoding_and_content_length) do 1041 "the response contained both a Transfer-Encoding header as well as a Content-Length header" 1042 end 1043 1044 def format_error({:invalid_content_length_header, value}) do 1045 "invalid Content-Length header: #{inspect(value)}" 1046 end 1047 1048 def format_error(:empty_token_list) do 1049 "header should contain a list of values, but it doesn't" 1050 end 1051 1052 def format_error({:invalid_token_list, string}) do 1053 "header contains invalid tokens: #{inspect(string)}" 1054 end 1055 1056 def format_error(:trailing_headers_but_not_chunked_encoding) do 1057 "trailing headers can only be sent when using chunked transfer-encoding" 1058 end 1059 1060 def format_error({:unallowed_trailing_header, {name, value}}) do 1061 "header #{inspect(name)} (with value #{inspect(value)}) is not allowed as a trailing header" 1062 end 1063 end