zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

http1.ex (32953B)


      1 defmodule Mint.HTTP1 do
      2   @moduledoc """
      3   Processless HTTP client with support for HTTP/1 and HTTP/1.1.
      4 
      5   This module provides a data structure that represents an HTTP/1 or HTTP/1.1 connection to
      6   a given server. The connection is represented as an opaque struct `%Mint.HTTP1{}`.
      7   The connection is a data structure and is not backed by a process, and all the
      8   connection handling happens in the process that creates the struct.
      9 
     10   This module and data structure work exactly like the ones described in the `Mint`
     11   module, with the exception that `Mint.HTTP1` specifically deals with HTTP/1 and HTTP/1.1 while
     12   `Mint` deals seamlessly with HTTP/1, HTTP/1.1, and HTTP/2. For more information on
     13   how to use the data structure and client architecture, see `Mint`.
     14   """
     15 
     16   import Mint.Core.Util
     17 
     18   alias Mint.Core.Util
     19   alias Mint.HTTP1.{Parse, Request, Response}
     20   alias Mint.{HTTPError, TransportError, Types}
     21 
     22   require Logger
     23 
     24   @behaviour Mint.Core.Conn
     25 
     26   @opaque t() :: %__MODULE__{}
     27 
     28   @user_agent "mint/" <> Mix.Project.config()[:version]
     29 
     30   @typedoc """
     31   An HTTP/1-specific error reason.
     32 
     33   The values can be:
     34 
     35     * `:closed` - when you try to make a request or stream a body chunk but the connection
     36       is closed.
     37 
     38     * `:request_body_is_streaming` - when you call `request/5` to send a new
     39       request but another request is already streaming.
     40 
     41     * `{:unexpected_data, data}` - when unexpected data is received from the server.
     42 
     43     * `:invalid_status_line` - when the HTTP/1 status line is invalid.
     44 
     45     * `{:invalid_request_target, target}` - when the request target is invalid.
     46 
     47     * `:invalid_header` - when headers can't be parsed correctly.
     48 
     49     * `{:invalid_header_name, name}` - when a header name is invalid.
     50 
     51     * `{:invalid_header_value, name, value}` - when a header value is invalid. `name`
     52       is the name of the header and `value` is the invalid value.
     53 
     54     * `:invalid_chunk_size` - when the chunk size is invalid.
     55 
     56     * `:missing_crlf_after_chunk` - when the CRLF after a chunk is missing.
     57 
     58     * `:invalid_trailer_header` - when trailer headers can't be parsed.
     59 
     60     * `:more_than_one_content_length_header` - when more than one `content-length`
     61       headers are in the response.
     62 
     63     * `:transfer_encoding_and_content_length` - when both the `content-length` as well
     64       as the `transfer-encoding` headers are in the response.
     65 
     66     * `{:invalid_content_length_header, value}` - when the value of the `content-length`
     67       header is invalid, that is, is not an non-negative integer.
     68 
     69     * `:empty_token_list` - when a header that is supposed to contain a list of tokens
     70       (such as the `connection` header) doesn't contain any.
     71 
     72     * `{:invalid_token_list, string}` - when a header that is supposed to contain a list
     73       of tokens (such as the `connection` header) contains a malformed list of tokens.
     74 
     75     * `:trailing_headers_but_not_chunked_encoding` - when you try to send trailing
     76       headers through `stream_request_body/3` but the transfer encoding of the request
     77       was not `chunked`.
     78 
     79   """
     80   @type error_reason() :: term()
     81 
     82   defstruct [
     83     :host,
     84     :port,
     85     :request,
     86     :streaming_request,
     87     :socket,
     88     :transport,
     89     :mode,
     90     :scheme_as_string,
     91     requests: :queue.new(),
     92     state: :closed,
     93     buffer: "",
     94     proxy_headers: [],
     95     private: %{}
     96   ]
     97 
     98   @doc """
     99   Same as `Mint.HTTP.connect/4`, but forces an HTTP/1 or HTTP/1.1 connection.
    100 
    101   This function doesn't support proxying.
    102   """
    103   @spec connect(Types.scheme(), Types.address(), :inet.port_number(), keyword()) ::
    104           {:ok, t()} | {:error, Types.error()}
    105   def connect(scheme, address, port, opts \\ []) do
    106     # TODO: Also ALPN negotiate HTTP1?
    107 
    108     hostname = Mint.Core.Util.hostname(opts, address)
    109     transport = scheme_to_transport(scheme)
    110 
    111     transport_opts =
    112       Keyword.get(opts, :transport_opts, [])
    113       |> Keyword.put(:hostname, hostname)
    114 
    115     with {:ok, socket} <- transport.connect(address, port, transport_opts) do
    116       initiate(scheme, socket, hostname, port, opts)
    117     end
    118   end
    119 
    120   @doc false
    121   @spec upgrade(
    122           Types.scheme(),
    123           Types.socket(),
    124           Types.scheme(),
    125           String.t(),
    126           :inet.port_number(),
    127           keyword()
    128         ) :: {:ok, t()} | {:error, Types.error()}
    129   def upgrade(old_scheme, socket, new_scheme, hostname, port, opts) do
    130     # TODO: Also ALPN negotiate HTTP1?
    131 
    132     transport = scheme_to_transport(new_scheme)
    133 
    134     transport_opts =
    135       Keyword.get(opts, :transport_opts, [])
    136       |> Keyword.put(:hostname, hostname)
    137 
    138     with {:ok, socket} <- transport.upgrade(socket, old_scheme, hostname, port, transport_opts) do
    139       initiate(new_scheme, socket, hostname, port, opts)
    140     end
    141   end
    142 
    143   @doc false
    144   @impl true
    145   @spec initiate(
    146           Types.scheme(),
    147           Types.socket(),
    148           String.t(),
    149           :inet.port_number(),
    150           keyword()
    151         ) :: {:ok, t()} | {:error, Types.error()}
    152   def initiate(scheme, socket, hostname, port, opts) do
    153     transport = scheme_to_transport(scheme)
    154     mode = Keyword.get(opts, :mode, :active)
    155 
    156     unless mode in [:active, :passive] do
    157       raise ArgumentError,
    158             "the :mode option must be either :active or :passive, got: #{inspect(mode)}"
    159     end
    160 
    161     with :ok <- inet_opts(transport, socket),
    162          :ok <- if(mode == :active, do: transport.setopts(socket, active: :once), else: :ok) do
    163       conn = %__MODULE__{
    164         transport: transport,
    165         socket: socket,
    166         mode: mode,
    167         host: hostname,
    168         port: port,
    169         scheme_as_string: Atom.to_string(scheme),
    170         state: :open
    171       }
    172 
    173       {:ok, conn}
    174     else
    175       {:error, reason} ->
    176         :ok = transport.close(socket)
    177         {:error, reason}
    178     end
    179   end
    180 
    181   @doc """
    182   See `Mint.HTTP.close/1`.
    183   """
    184   @impl true
    185   @spec close(t()) :: {:ok, t()}
    186   def close(conn)
    187 
    188   def close(%__MODULE__{state: :open} = conn) do
    189     conn = internal_close(conn)
    190     {:ok, conn}
    191   end
    192 
    193   def close(%__MODULE__{state: :closed} = conn) do
    194     {:ok, conn}
    195   end
    196 
    197   @doc """
    198   See `Mint.HTTP.open?/1`.
    199   """
    200   @impl true
    201   @spec open?(t(), :read | :write | :read_write) :: boolean()
    202   def open?(conn, type \\ :read_write)
    203 
    204   def open?(%__MODULE__{state: state}, type) when type in [:read, :write, :read_write] do
    205     state == :open
    206   end
    207 
    208   @doc """
    209   See `Mint.HTTP.request/5`.
    210 
    211   In HTTP/1 and HTTP/1.1, you can't open a new request if you're streaming the body of
    212   another request. If you try, an error will be returned.
    213   """
    214   @impl true
    215   @spec request(
    216           t(),
    217           method :: String.t(),
    218           path :: String.t(),
    219           Types.headers(),
    220           body :: iodata() | nil | :stream
    221         ) ::
    222           {:ok, t(), Types.request_ref()}
    223           | {:error, t(), Types.error()}
    224   def request(conn, method, path, headers, body)
    225 
    226   def request(%__MODULE__{state: :closed} = conn, _method, _path, _headers, _body) do
    227     {:error, conn, wrap_error(:closed)}
    228   end
    229 
    230   def request(%__MODULE__{streaming_request: %{}} = conn, _method, _path, _headers, _body) do
    231     {:error, conn, wrap_error(:request_body_is_streaming)}
    232   end
    233 
    234   def request(%__MODULE__{} = conn, method, path, headers, body) do
    235     %__MODULE__{transport: transport, socket: socket} = conn
    236 
    237     headers =
    238       headers
    239       |> lower_header_keys()
    240       |> add_default_headers(conn)
    241 
    242     with {:ok, headers, encoding} <- add_content_length_or_transfer_encoding(headers, body),
    243          {:ok, iodata} <- Request.encode(method, path, headers, body),
    244          :ok <- transport.send(socket, iodata) do
    245       request_ref = make_ref()
    246       request = new_request(request_ref, method, body, encoding)
    247 
    248       case request.state do
    249         {:stream_request, _} ->
    250           conn = %__MODULE__{conn | streaming_request: request}
    251           {:ok, conn, request_ref}
    252 
    253         _ ->
    254           conn = enqueue_request(conn, request)
    255           {:ok, conn, request_ref}
    256       end
    257     else
    258       {:error, %TransportError{reason: :closed} = error} ->
    259         {:error, %{conn | state: :closed}, error}
    260 
    261       {:error, %error_module{} = error} when error_module in [HTTPError, TransportError] ->
    262         {:error, conn, error}
    263 
    264       {:error, reason} ->
    265         {:error, conn, wrap_error(reason)}
    266     end
    267   end
    268 
    269   @doc """
    270   See `Mint.HTTP.stream_request_body/3`.
    271 
    272   In HTTP/1, sending an empty chunk is a no-op.
    273 
    274   ## Transfer encoding and content length
    275 
    276   When streaming the request body, Mint cannot send a precalculated `content-length`
    277   request header because it doesn't know the body that you'll stream. However, Mint
    278   will transparently handle the presence of a `content-length` header using this logic:
    279 
    280     * if you specifically set a `content-length` header, then transfer encoding and
    281       making sure the content length is correct for what you'll stream is up to you.
    282 
    283     * if you specifically set the transfer encoding (`transfer-encoding` header)
    284       to `chunked`, then it's up to you to
    285       [properly encode chunks](https://en.wikipedia.org/wiki/Chunked_transfer_encoding).
    286 
    287     * if you don't set the transfer encoding to `chunked` and don't provide a
    288       `content-length` header, Mint will do implicit `chunked` transfer encoding
    289       (setting the `transfer-encoding` header appropriately) and will take care
    290       of properly encoding the chunks.
    291 
    292   """
    293   @impl true
    294   @spec stream_request_body(
    295           t(),
    296           Types.request_ref(),
    297           iodata() | :eof | {:eof, trailing_headers :: Types.headers()}
    298         ) ::
    299           {:ok, t()} | {:error, t(), Types.error()}
    300   def stream_request_body(
    301         %__MODULE__{streaming_request: %{state: {:stream_request, :identity}, ref: ref}} = conn,
    302         ref,
    303         :eof
    304       ) do
    305     request = %{conn.streaming_request | state: :status}
    306     conn = enqueue_request(%__MODULE__{conn | streaming_request: nil}, request)
    307     {:ok, conn}
    308   end
    309 
    310   def stream_request_body(
    311         %__MODULE__{streaming_request: %{state: {:stream_request, :identity}, ref: ref}} = conn,
    312         ref,
    313         {:eof, _trailing_headers}
    314       ) do
    315     {:error, conn, wrap_error(:trailing_headers_but_not_chunked_encoding)}
    316   end
    317 
    318   def stream_request_body(
    319         %__MODULE__{streaming_request: %{state: {:stream_request, :identity}, ref: ref}} = conn,
    320         ref,
    321         body
    322       ) do
    323     case conn.transport.send(conn.socket, body) do
    324       :ok ->
    325         {:ok, conn}
    326 
    327       {:error, %TransportError{reason: :closed} = error} ->
    328         {:error, %{conn | state: :closed}, error}
    329 
    330       {:error, error} ->
    331         {:error, conn, error}
    332     end
    333   end
    334 
    335   def stream_request_body(
    336         %__MODULE__{streaming_request: %{state: {:stream_request, :chunked}, ref: ref}} = conn,
    337         ref,
    338         chunk
    339       ) do
    340     with {:ok, chunk} <- validate_chunk(chunk),
    341          :ok <- conn.transport.send(conn.socket, Request.encode_chunk(chunk)) do
    342       case chunk do
    343         :eof ->
    344           request = %{conn.streaming_request | state: :status}
    345           conn = enqueue_request(%__MODULE__{conn | streaming_request: nil}, request)
    346           {:ok, conn}
    347 
    348         {:eof, _trailing_headers} ->
    349           request = %{conn.streaming_request | state: :status}
    350           conn = enqueue_request(%__MODULE__{conn | streaming_request: nil}, request)
    351           {:ok, conn}
    352 
    353         _other ->
    354           {:ok, conn}
    355       end
    356     else
    357       :empty_chunk ->
    358         {:ok, conn}
    359 
    360       {:error, %TransportError{reason: :closed} = error} ->
    361         {:error, %{conn | state: :closed}, error}
    362 
    363       {:error, error} ->
    364         {:error, conn, error}
    365     end
    366   end
    367 
    368   defp validate_chunk({:eof, trailing_headers}) do
    369     headers = lower_header_keys(trailing_headers)
    370 
    371     if unallowed_header = find_unallowed_trailing_header(headers) do
    372       {:error, wrap_error({:unallowed_trailing_header, unallowed_header})}
    373     else
    374       {:ok, {:eof, headers}}
    375     end
    376   end
    377 
    378   defp validate_chunk(:eof) do
    379     {:ok, :eof}
    380   end
    381 
    382   defp validate_chunk(chunk) do
    383     if IO.iodata_length(chunk) == 0 do
    384       :empty_chunk
    385     else
    386       {:ok, chunk}
    387     end
    388   end
    389 
    390   @doc """
    391   See `Mint.HTTP.stream/2`.
    392   """
    393   @impl true
    394   @spec stream(t(), term()) ::
    395           {:ok, t(), [Types.response()]}
    396           | {:error, t(), Types.error(), [Types.response()]}
    397           | :unknown
    398   def stream(conn, message)
    399 
    400   def stream(%__MODULE__{transport: transport, socket: socket} = conn, {tag, socket, data})
    401       when tag in [:tcp, :ssl] do
    402     case handle_data(conn, data) do
    403       {:ok, %{mode: mode, state: state} = conn, responses}
    404       when mode == :active and state != :closed ->
    405         case transport.setopts(socket, active: :once) do
    406           :ok -> {:ok, conn, responses}
    407           {:error, reason} -> {:error, put_in(conn.state, :closed), reason, responses}
    408         end
    409 
    410       other ->
    411         other
    412     end
    413   end
    414 
    415   def stream(%__MODULE__{socket: socket} = conn, {tag, socket})
    416       when tag in [:tcp_closed, :ssl_closed] do
    417     handle_close(conn)
    418   end
    419 
    420   def stream(%__MODULE__{socket: socket} = conn, {tag, socket, reason})
    421       when tag in [:tcp_error, :ssl_error] do
    422     handle_error(conn, conn.transport.wrap_error(reason))
    423   end
    424 
    425   def stream(%__MODULE__{}, _message) do
    426     :unknown
    427   end
    428 
    429   defp handle_data(%__MODULE__{request: nil} = conn, data) do
    430     conn = internal_close(conn)
    431     {:error, conn, wrap_error({:unexpected_data, data}), []}
    432   end
    433 
    434   defp handle_data(%__MODULE__{request: request} = conn, data) do
    435     data = maybe_concat(conn.buffer, data)
    436 
    437     case decode(request.state, conn, data, []) do
    438       {:ok, conn, responses} ->
    439         {:ok, conn, Enum.reverse(responses)}
    440 
    441       {:error, conn, reason, responses} ->
    442         conn = put_in(conn.state, :closed)
    443         {:error, conn, reason, responses}
    444     end
    445   end
    446 
    447   defp handle_close(%__MODULE__{request: request} = conn) do
    448     conn = put_in(conn.state, :closed)
    449     conn = request_done(conn)
    450 
    451     if request && request.body == :until_closed do
    452       conn = put_in(conn.state, :closed)
    453       {:ok, conn, [{:done, request.ref}]}
    454     else
    455       {:error, conn, conn.transport.wrap_error(:closed), []}
    456     end
    457   end
    458 
    459   defp handle_error(conn, error) do
    460     conn = put_in(conn.state, :closed)
    461     {:error, conn, error, []}
    462   end
    463 
    464   @doc """
    465   See `Mint.HTTP.recv/3`.
    466   """
    467   @impl true
    468   @spec recv(t(), non_neg_integer(), timeout()) ::
    469           {:ok, t(), [Types.response()]}
    470           | {:error, t(), Types.error(), [Types.response()]}
    471   def recv(conn, byte_count, timeout)
    472 
    473   def recv(%__MODULE__{mode: :passive} = conn, byte_count, timeout) do
    474     case conn.transport.recv(conn.socket, byte_count, timeout) do
    475       {:ok, data} -> handle_data(conn, data)
    476       {:error, %Mint.TransportError{reason: :closed}} -> handle_close(conn)
    477       {:error, error} -> handle_error(conn, error)
    478     end
    479   end
    480 
    481   def recv(_conn, _byte_count, _timeout) do
    482     raise ArgumentError,
    483           "can't use recv/3 to synchronously receive data when the mode is :active. " <>
    484             "Use Mint.HTTP.set_mode/2 to set the connection to passive mode"
    485   end
    486 
    487   @doc """
    488   See `Mint.HTTP.set_mode/2`.
    489   """
    490   @impl true
    491   @spec set_mode(t(), :active | :passive) :: {:ok, t()} | {:error, Types.error()}
    492   def set_mode(%__MODULE__{} = conn, mode) when mode in [:active, :passive] do
    493     active =
    494       case mode do
    495         :active -> :once
    496         :passive -> false
    497       end
    498 
    499     with :ok <- conn.transport.setopts(conn.socket, active: active) do
    500       {:ok, put_in(conn.mode, mode)}
    501     end
    502   end
    503 
    504   @doc """
    505   See `Mint.HTTP.controlling_process/2`.
    506   """
    507   @impl true
    508   @spec controlling_process(t(), pid()) :: {:ok, t()} | {:error, Types.error()}
    509   def controlling_process(%__MODULE__{} = conn, new_pid) when is_pid(new_pid) do
    510     with :ok <- conn.transport.controlling_process(conn.socket, new_pid) do
    511       {:ok, conn}
    512     end
    513   end
    514 
    515   @doc """
    516   See `Mint.HTTP.open_request_count/1`.
    517 
    518   In HTTP/1, the number of open requests is the number of pipelined requests.
    519   """
    520   @impl true
    521   @spec open_request_count(t()) :: non_neg_integer()
    522   def open_request_count(%__MODULE__{} = conn) do
    523     case conn do
    524       %{request: nil, streaming_request: nil} -> 0
    525       %{request: nil} -> 1
    526       %{streaming_request: nil} -> 1 + :queue.len(conn.requests)
    527       _ -> 2 + :queue.len(conn.requests)
    528     end
    529   end
    530 
    531   @doc """
    532   See `Mint.HTTP.put_private/3`.
    533   """
    534   @impl true
    535   @spec put_private(t(), atom(), term()) :: t()
    536   def put_private(%__MODULE__{private: private} = conn, key, value) when is_atom(key) do
    537     %{conn | private: Map.put(private, key, value)}
    538   end
    539 
    540   @doc """
    541   See `Mint.HTTP.get_private/3`.
    542   """
    543   @impl true
    544   @spec get_private(t(), atom(), term()) :: term()
    545   def get_private(%__MODULE__{private: private} = _conn, key, default \\ nil) when is_atom(key) do
    546     Map.get(private, key, default)
    547   end
    548 
    549   @doc """
    550   See `Mint.HTTP.delete_private/2`.
    551   """
    552   @impl true
    553   @spec delete_private(t(), atom()) :: t()
    554   def delete_private(%__MODULE__{private: private} = conn, key) when is_atom(key) do
    555     %{conn | private: Map.delete(private, key)}
    556   end
    557 
    558   @doc """
    559   See `Mint.HTTP.get_socket/1`.
    560   """
    561   @impl true
    562   @spec get_socket(t()) :: Mint.Types.socket()
    563   def get_socket(%__MODULE__{socket: socket} = _conn) do
    564     socket
    565   end
    566 
    567   @doc """
    568   See `Mint.HTTP.get_proxy_headers/1`.
    569   """
    570   if Version.compare(System.version(), "1.7.0") in [:eq, :gt] do
    571     @doc since: "1.4.0"
    572   end
    573 
    574   @impl true
    575   @spec get_proxy_headers(t()) :: Mint.Types.headers()
    576   def get_proxy_headers(%__MODULE__{proxy_headers: proxy_headers}), do: proxy_headers
    577 
    578   ## Helpers
    579 
    580   defp decode(:status, %{request: request} = conn, data, responses) do
    581     case Response.decode_status_line(data) do
    582       {:ok, {version, status, _reason}, rest} ->
    583         request = %{request | version: version, status: status, state: :headers}
    584         conn = %{conn | request: request}
    585         responses = [{:status, request.ref, status} | responses]
    586         decode(:headers, conn, rest, responses)
    587 
    588       :more ->
    589         conn = put_in(conn.buffer, data)
    590         {:ok, conn, responses}
    591 
    592       :error ->
    593         {:error, conn, wrap_error(:invalid_status_line), responses}
    594     end
    595   end
    596 
    597   defp decode(:headers, %{request: request} = conn, data, responses) do
    598     decode_headers(conn, request, data, responses, request.headers_buffer)
    599   end
    600 
    601   defp decode(:body, conn, data, responses) do
    602     case message_body(conn.request) do
    603       {:ok, body} ->
    604         conn = put_in(conn.request.body, body)
    605         decode_body(body, conn, data, conn.request.ref, responses)
    606 
    607       {:error, reason} ->
    608         {:error, conn, wrap_error(reason), responses}
    609     end
    610   end
    611 
    612   defp decode_headers(conn, request, data, responses, headers) do
    613     case Response.decode_header(data) do
    614       {:ok, {name, value}, rest} ->
    615         headers = [{name, value} | headers]
    616 
    617         case store_header(request, name, value) do
    618           {:ok, request} -> decode_headers(conn, request, rest, responses, headers)
    619           {:error, reason} -> {:error, conn, wrap_error(reason), responses}
    620         end
    621 
    622       {:ok, :eof, rest} ->
    623         responses = [{:headers, request.ref, Enum.reverse(headers)} | responses]
    624         request = %{request | state: :body, headers_buffer: []}
    625         conn = %{conn | buffer: "", request: request}
    626         decode(:body, conn, rest, responses)
    627 
    628       :more ->
    629         request = %{request | headers_buffer: headers}
    630         conn = %{conn | buffer: data, request: request}
    631         {:ok, conn, responses}
    632 
    633       :error ->
    634         {:error, conn, wrap_error(:invalid_header), responses}
    635     end
    636   end
    637 
    638   defp decode_body(:none, conn, data, request_ref, responses) do
    639     conn = put_in(conn.buffer, data)
    640     conn = request_done(conn)
    641     responses = [{:done, request_ref} | responses]
    642     {:ok, conn, responses}
    643   end
    644 
    645   defp decode_body(:single, conn, data, request_ref, responses) do
    646     {conn, responses} = add_body(conn, data, responses)
    647     conn = request_done(conn)
    648     responses = [{:done, request_ref} | responses]
    649     {:ok, conn, responses}
    650   end
    651 
    652   defp decode_body(:until_closed, conn, data, _request_ref, responses) do
    653     {conn, responses} = add_body(conn, data, responses)
    654     {:ok, conn, responses}
    655   end
    656 
    657   defp decode_body({:content_length, length}, conn, data, request_ref, responses) do
    658     cond do
    659       length > byte_size(data) ->
    660         conn = put_in(conn.request.body, {:content_length, length - byte_size(data)})
    661         {conn, responses} = add_body(conn, data, responses)
    662         {:ok, conn, responses}
    663 
    664       length <= byte_size(data) ->
    665         <<body::binary-size(length), rest::binary>> = data
    666         {conn, responses} = add_body(conn, body, responses)
    667         conn = request_done(conn)
    668         responses = [{:done, request_ref} | responses]
    669         next_request(conn, rest, responses)
    670     end
    671   end
    672 
    673   defp decode_body({:chunked, nil}, conn, "", _request_ref, responses) do
    674     conn = put_in(conn.buffer, "")
    675     conn = put_in(conn.request.body, {:chunked, nil})
    676     {:ok, conn, responses}
    677   end
    678 
    679   defp decode_body({:chunked, nil}, conn, data, request_ref, responses) do
    680     case Integer.parse(data, 16) do
    681       {_size, ""} ->
    682         conn = put_in(conn.buffer, data)
    683         conn = put_in(conn.request.body, {:chunked, nil})
    684         {:ok, conn, responses}
    685 
    686       {0, rest} ->
    687         # Manually collapse the body buffer since we're done with the body
    688         {conn, responses} = collapse_body_buffer(conn, responses)
    689         decode_body({:chunked, :metadata, :trailer}, conn, rest, request_ref, responses)
    690 
    691       {size, rest} when size > 0 ->
    692         decode_body({:chunked, :metadata, size}, conn, rest, request_ref, responses)
    693 
    694       _other ->
    695         {:error, conn, wrap_error(:invalid_chunk_size), responses}
    696     end
    697   end
    698 
    699   defp decode_body({:chunked, :metadata, size}, conn, data, request_ref, responses) do
    700     case Parse.ignore_until_crlf(data) do
    701       {:ok, rest} ->
    702         decode_body({:chunked, size}, conn, rest, request_ref, responses)
    703 
    704       :more ->
    705         conn = put_in(conn.buffer, data)
    706         conn = put_in(conn.request.body, {:chunked, :metadata, size})
    707         {:ok, conn, responses}
    708     end
    709   end
    710 
    711   defp decode_body({:chunked, :trailer}, conn, data, _request_ref, responses) do
    712     decode_trailer_headers(conn, data, responses, conn.request.headers_buffer)
    713   end
    714 
    715   defp decode_body({:chunked, :crlf}, conn, data, request_ref, responses) do
    716     case data do
    717       <<"\r\n", rest::binary>> ->
    718         conn = put_in(conn.request.body, {:chunked, nil})
    719         decode_body({:chunked, nil}, conn, rest, request_ref, responses)
    720 
    721       _other when byte_size(data) < 2 ->
    722         conn = put_in(conn.buffer, data)
    723         {:ok, conn, responses}
    724 
    725       _other ->
    726         {:error, conn, wrap_error(:missing_crlf_after_chunk), responses}
    727     end
    728   end
    729 
    730   defp decode_body({:chunked, length}, conn, data, request_ref, responses) do
    731     cond do
    732       length > byte_size(data) ->
    733         conn = put_in(conn.buffer, "")
    734         conn = put_in(conn.request.body, {:chunked, length - byte_size(data)})
    735         conn = add_body_to_buffer(conn, data)
    736         {:ok, conn, responses}
    737 
    738       length <= byte_size(data) ->
    739         <<body::binary-size(length), rest::binary>> = data
    740         {conn, responses} = add_body(conn, body, responses)
    741         conn = put_in(conn.request.body, {:chunked, :crlf})
    742         decode_body({:chunked, :crlf}, conn, rest, request_ref, responses)
    743     end
    744   end
    745 
    746   defp decode_trailer_headers(conn, data, responses, headers) do
    747     case Response.decode_header(data) do
    748       {:ok, {name, value}, rest} ->
    749         headers = [{name, value} | headers]
    750         decode_trailer_headers(conn, rest, responses, headers)
    751 
    752       {:ok, :eof, rest} ->
    753         headers = Util.remove_unallowed_trailing_headers(headers)
    754 
    755         responses = [
    756           {:done, conn.request.ref}
    757           | add_trailing_headers(headers, conn.request.ref, responses)
    758         ]
    759 
    760         conn = request_done(conn)
    761         next_request(conn, rest, responses)
    762 
    763       :more ->
    764         request = %{conn.request | body: {:chunked, :trailer}, headers_buffer: headers}
    765         conn = %{conn | buffer: data, request: request}
    766         {:ok, conn, responses}
    767 
    768       :error ->
    769         {:error, conn, wrap_error(:invalid_trailer_header), responses}
    770     end
    771   end
    772 
    773   defp next_request(%{request: nil} = conn, data, responses) do
    774     # TODO: Figure out if we should keep buffering even though there are no
    775     # requests in flight
    776     {:ok, %{conn | buffer: data}, responses}
    777   end
    778 
    779   defp next_request(conn, data, responses) do
    780     decode(:status, %{conn | state: :status}, data, responses)
    781   end
    782 
    783   defp add_trailing_headers([], _request_ref, responses), do: responses
    784 
    785   defp add_trailing_headers(headers, request_ref, responses),
    786     do: [{:headers, request_ref, Enum.reverse(headers)} | responses]
    787 
    788   defp add_body(conn, data, responses) do
    789     conn = add_body_to_buffer(conn, data)
    790     collapse_body_buffer(conn, responses)
    791   end
    792 
    793   defp add_body_to_buffer(conn, data) do
    794     update_in(conn.request.data_buffer, &[&1 | data])
    795   end
    796 
    797   defp collapse_body_buffer(conn, responses) do
    798     case IO.iodata_to_binary(conn.request.data_buffer) do
    799       "" ->
    800         {conn, responses}
    801 
    802       data ->
    803         conn = put_in(conn.request.data_buffer, [])
    804         {conn, [{:data, conn.request.ref, data} | responses]}
    805     end
    806   end
    807 
    808   defp store_header(%{content_length: nil} = request, "content-length", value) do
    809     with {:ok, content_length} <- Parse.content_length_header(value),
    810          do: {:ok, %{request | content_length: content_length}}
    811   end
    812 
    813   defp store_header(%{connection: connection} = request, "connection", value) do
    814     with {:ok, connection_header} <- Parse.connection_header(value),
    815          do: {:ok, %{request | connection: connection ++ connection_header}}
    816   end
    817 
    818   defp store_header(%{transfer_encoding: transfer_encoding} = request, "transfer-encoding", value) do
    819     with {:ok, transfer_encoding_header} <- Parse.transfer_encoding_header(value),
    820          do: {:ok, %{request | transfer_encoding: transfer_encoding ++ transfer_encoding_header}}
    821   end
    822 
    823   defp store_header(_request, "content-length", _value) do
    824     {:error, :more_than_one_content_length_header}
    825   end
    826 
    827   defp store_header(request, _name, _value) do
    828     {:ok, request}
    829   end
    830 
    831   defp request_done(%{request: request} = conn) do
    832     conn = pop_request(conn)
    833 
    834     cond do
    835       !request -> conn
    836       "close" in request.connection -> internal_close(conn)
    837       request.version >= {1, 1} -> conn
    838       "keep-alive" in request.connection -> conn
    839       true -> internal_close(conn)
    840     end
    841   end
    842 
    843   defp pop_request(conn) do
    844     case :queue.out(conn.requests) do
    845       {{:value, request}, requests} ->
    846         %{conn | request: request, requests: requests}
    847 
    848       {:empty, requests} ->
    849         %{conn | request: nil, requests: requests}
    850     end
    851   end
    852 
    853   defp enqueue_request(%{request: nil} = conn, request) do
    854     %__MODULE__{conn | request: request}
    855   end
    856 
    857   defp enqueue_request(conn, request) do
    858     requests = :queue.in(request, conn.requests)
    859     %__MODULE__{conn | requests: requests}
    860   end
    861 
    862   defp internal_close(conn) do
    863     if conn.buffer != "" do
    864       _ = Logger.debug(["Connection closed with data left in the buffer: ", inspect(conn.buffer)])
    865     end
    866 
    867     _ = conn.transport.close(conn.socket)
    868     %{conn | state: :closed}
    869   end
    870 
    871   # RFC7230 3.3.3:
    872   # > If a message is received with both a Transfer-Encoding and a
    873   # > Content-Length header field, the Transfer-Encoding overrides the
    874   # > Content-Length.  Such a message might indicate an attempt to
    875   # > perform request smuggling (Section 9.5) or response splitting
    876   # > (Section 9.4) and ought to be handled as an error.  A sender MUST
    877   # > remove the received Content-Length field prior to forwarding such
    878   # > a message downstream.
    879   defp message_body(%{body: nil, method: method, status: status} = request) do
    880     cond do
    881       status == 101 ->
    882         {:ok, :single}
    883 
    884       method == "HEAD" or status in 100..199 or status in [204, 304] ->
    885         {:ok, :none}
    886 
    887       # method == "CONNECT" and status in 200..299 -> nil
    888 
    889       request.transfer_encoding != [] && request.content_length ->
    890         {:error, :transfer_encoding_and_content_length}
    891 
    892       "chunked" == List.first(request.transfer_encoding) ->
    893         {:ok, {:chunked, nil}}
    894 
    895       request.content_length ->
    896         {:ok, {:content_length, request.content_length}}
    897 
    898       true ->
    899         {:ok, :until_closed}
    900     end
    901   end
    902 
    903   defp message_body(%{body: body}) do
    904     {:ok, body}
    905   end
    906 
    907   defp new_request(ref, method, body, encoding) do
    908     state =
    909       if body == :stream do
    910         {:stream_request, encoding}
    911       else
    912         :status
    913       end
    914 
    915     %{
    916       ref: ref,
    917       state: state,
    918       method: method,
    919       version: nil,
    920       status: nil,
    921       headers_buffer: [],
    922       data_buffer: [],
    923       content_length: nil,
    924       connection: [],
    925       transfer_encoding: [],
    926       body: nil
    927     }
    928   end
    929 
    930   defp lower_header_keys(headers) do
    931     for {name, value} <- headers, do: {Util.downcase_ascii(name), value}
    932   end
    933 
    934   defp add_default_headers(headers, conn) do
    935     headers
    936     |> Util.put_new_header("user-agent", @user_agent)
    937     |> Util.put_new_header("host", default_host_header(conn))
    938   end
    939 
    940   # If the port is the default for the scheme, don't add it to the host header
    941   defp default_host_header(%__MODULE__{scheme_as_string: scheme, host: host, port: port}) do
    942     if URI.default_port(scheme) == port do
    943       host
    944     else
    945       "#{host}:#{port}"
    946     end
    947   end
    948 
    949   defp add_content_length_or_transfer_encoding(headers, :stream) do
    950     cond do
    951       List.keymember?(headers, "content-length", 0) ->
    952         {:ok, headers, :identity}
    953 
    954       found = List.keyfind(headers, "transfer-encoding", 0) ->
    955         {"transfer-encoding", value} = found
    956 
    957         with {:ok, tokens} <- Parse.transfer_encoding_header(value) do
    958           if "chunked" in tokens or "identity" in tokens do
    959             {:ok, headers, :identity}
    960           else
    961             new_transfer_encoding = {"transfer-encoding", value <> ",chunked"}
    962             headers = List.keyreplace(headers, "transfer-encoding", 0, new_transfer_encoding)
    963             {:ok, headers, :chunked}
    964           end
    965         end
    966 
    967       # If no content-length or transfer-encoding are present, assume
    968       # chunked transfer-encoding and handle the encoding ourselves.
    969       true ->
    970         headers = Util.put_new_header(headers, "transfer-encoding", "chunked")
    971         {:ok, headers, :chunked}
    972     end
    973   end
    974 
    975   defp add_content_length_or_transfer_encoding(headers, nil) do
    976     {:ok, headers, :identity}
    977   end
    978 
    979   defp add_content_length_or_transfer_encoding(headers, body) do
    980     length_fun = fn -> body |> IO.iodata_length() |> Integer.to_string() end
    981     {:ok, Util.put_new_header_lazy(headers, "content-length", length_fun), :identity}
    982   end
    983 
    984   defp wrap_error(reason) do
    985     %HTTPError{reason: reason, module: __MODULE__}
    986   end
    987 
    988   @doc false
    989   def format_error(reason)
    990 
    991   def format_error(:closed) do
    992     "the connection is closed"
    993   end
    994 
    995   def format_error(:request_body_is_streaming) do
    996     "a request body is currently streaming, so no new requests can be issued"
    997   end
    998 
    999   def format_error({:unexpected_data, data}) do
   1000     "received unexpected data: " <> inspect(data)
   1001   end
   1002 
   1003   def format_error(:invalid_status_line) do
   1004     "invalid status line"
   1005   end
   1006 
   1007   def format_error(:invalid_header) do
   1008     "invalid header"
   1009   end
   1010 
   1011   def format_error({:invalid_request_target, target}) do
   1012     "invalid request target: #{inspect(target)}"
   1013   end
   1014 
   1015   def format_error({:invalid_header_name, name}) do
   1016     "invalid header name: #{inspect(name)}"
   1017   end
   1018 
   1019   def format_error({:invalid_header_value, name, value}) do
   1020     "invalid value for header (only printable ASCII characters are allowed) " <>
   1021       "#{inspect(name)}: #{inspect(value)}"
   1022   end
   1023 
   1024   def format_error(:invalid_chunk_size) do
   1025     "invalid chunk size"
   1026   end
   1027 
   1028   def format_error(:missing_crlf_after_chunk) do
   1029     "missing CRLF after chunk"
   1030   end
   1031 
   1032   def format_error(:invalid_trailer_header) do
   1033     "invalid trailer header"
   1034   end
   1035 
   1036   def format_error(:more_than_one_content_length_header) do
   1037     "the response contains two or more Content-Length headers"
   1038   end
   1039 
   1040   def format_error(:transfer_encoding_and_content_length) do
   1041     "the response contained both a Transfer-Encoding header as well as a Content-Length header"
   1042   end
   1043 
   1044   def format_error({:invalid_content_length_header, value}) do
   1045     "invalid Content-Length header: #{inspect(value)}"
   1046   end
   1047 
   1048   def format_error(:empty_token_list) do
   1049     "header should contain a list of values, but it doesn't"
   1050   end
   1051 
   1052   def format_error({:invalid_token_list, string}) do
   1053     "header contains invalid tokens: #{inspect(string)}"
   1054   end
   1055 
   1056   def format_error(:trailing_headers_but_not_chunked_encoding) do
   1057     "trailing headers can only be sent when using chunked transfer-encoding"
   1058   end
   1059 
   1060   def format_error({:unallowed_trailing_header, {name, value}}) do
   1061     "header #{inspect(name)} (with value #{inspect(value)}) is not allowed as a trailing header"
   1062   end
   1063 end