zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

tunnel_proxy.ex (4609B)


      1 defmodule Mint.TunnelProxy do
      2   @moduledoc false
      3 
      4   alias Mint.{HTTP1, HTTPError, Negotiate, TransportError}
      5 
      6   @tunnel_timeout 30_000
      7 
      8   def connect(proxy, host) do
      9     with {:ok, conn} <- establish_proxy(proxy, host) do
     10       upgrade_connection(conn, proxy, host)
     11     end
     12   end
     13 
     14   defp establish_proxy(proxy, host) do
     15     {proxy_scheme, proxy_address, proxy_port, proxy_opts} = proxy
     16     {_scheme, address, port, opts} = host
     17     hostname = Mint.Core.Util.hostname(opts, address)
     18 
     19     path = "#{hostname}:#{port}"
     20 
     21     with {:ok, conn} <- HTTP1.connect(proxy_scheme, proxy_address, proxy_port, proxy_opts),
     22          timeout_deadline = timeout_deadline(proxy_opts),
     23          headers = Keyword.get(opts, :proxy_headers, []),
     24          {:ok, conn, ref} <- HTTP1.request(conn, "CONNECT", path, headers, nil),
     25          {:ok, proxy_headers} <- receive_response(conn, ref, timeout_deadline) do
     26       {:ok, struct!(conn, proxy_headers: proxy_headers)}
     27     else
     28       {:error, reason} ->
     29         {:error, wrap_in_proxy_error(reason)}
     30 
     31       {:error, conn, reason} ->
     32         {:ok, _conn} = HTTP1.close(conn)
     33         {:error, wrap_in_proxy_error(reason)}
     34     end
     35   end
     36 
     37   defp upgrade_connection(
     38          %{proxy_headers: proxy_headers} = conn,
     39          {proxy_scheme, _proxy_address, _proxy_port, _proxy_opts} = _proxy,
     40          {scheme, hostname, port, opts} = _host
     41        ) do
     42     socket = HTTP1.get_socket(conn)
     43 
     44     # Note that we may leak messages if the server sent data after the CONNECT response
     45     case Negotiate.upgrade(proxy_scheme, socket, scheme, hostname, port, opts) do
     46       {:ok, conn} -> {:ok, struct!(conn, proxy_headers: proxy_headers)}
     47       {:error, reason} -> wrap_in_proxy_error(reason)
     48     end
     49   end
     50 
     51   defp receive_response(conn, ref, timeout_deadline) do
     52     timeout = timeout_deadline - System.monotonic_time(:millisecond)
     53     socket = HTTP1.get_socket(conn)
     54 
     55     receive do
     56       {tag, ^socket, _data} = msg when tag in [:tcp, :ssl] ->
     57         stream(conn, ref, timeout_deadline, msg)
     58 
     59       {tag, ^socket} = msg when tag in [:tcp_closed, :ssl_closed] ->
     60         stream(conn, ref, timeout_deadline, msg)
     61 
     62       {tag, ^socket, _reason} = msg when tag in [:tcp_error, :ssl_error] ->
     63         stream(conn, ref, timeout_deadline, msg)
     64     after
     65       timeout ->
     66         {:error, conn, wrap_error({:proxy, :tunnel_timeout})}
     67     end
     68   end
     69 
     70   defp stream(conn, ref, timeout_deadline, msg) do
     71     case HTTP1.stream(conn, msg) do
     72       {:ok, conn, responses} ->
     73         case handle_responses(ref, timeout_deadline, responses) do
     74           {:done, proxy_headers} -> {:ok, proxy_headers}
     75           :more -> receive_response(conn, ref, timeout_deadline)
     76           {:error, reason} -> {:error, conn, reason}
     77         end
     78 
     79       {:error, conn, reason, _responses} ->
     80         {:error, conn, wrap_in_proxy_error(reason)}
     81     end
     82   end
     83 
     84   defp handle_responses(ref, timeout_deadline, [response | responses]) do
     85     case response do
     86       {:status, ^ref, status} when status in 200..299 ->
     87         handle_responses(ref, timeout_deadline, responses)
     88 
     89       {:status, ^ref, status} ->
     90         {:error, wrap_error({:proxy, {:unexpected_status, status}})}
     91 
     92       {:headers, ^ref, headers} when responses == [] ->
     93         {:done, headers}
     94 
     95       {:headers, ^ref, _headers} ->
     96         {:error, wrap_error({:proxy, {:unexpected_trailing_responses, responses}})}
     97 
     98       {:error, ^ref, reason} ->
     99         {:error, wrap_in_proxy_error(reason)}
    100     end
    101   end
    102 
    103   defp handle_responses(_ref, _timeout_deadline, []) do
    104     :more
    105   end
    106 
    107   defp timeout_deadline(opts) do
    108     timeout = Keyword.get(opts, :tunnel_timeout, @tunnel_timeout)
    109     System.monotonic_time(:millisecond) + timeout
    110   end
    111 
    112   defp wrap_error(reason) do
    113     %HTTPError{module: __MODULE__, reason: reason}
    114   end
    115 
    116   defp wrap_in_proxy_error(%HTTPError{reason: {:proxy, _}} = error) do
    117     error
    118   end
    119 
    120   defp wrap_in_proxy_error(%HTTPError{reason: reason}) do
    121     %HTTPError{module: __MODULE__, reason: {:proxy, reason}}
    122   end
    123 
    124   defp wrap_in_proxy_error(%TransportError{} = error) do
    125     error
    126   end
    127 
    128   @doc false
    129   def format_error({:proxy, reason}) do
    130     case reason do
    131       :tunnel_timeout ->
    132         "proxy tunnel timeout"
    133 
    134       {:unexpected_status, status} ->
    135         "expected tunnel proxy to return a status between 200 and 299, got: #{inspect(status)}"
    136 
    137       {:unexpected_trailing_responses, responses} ->
    138         "tunnel proxy returned unexpected trailing responses: #{inspect(responses)}"
    139 
    140       http_reason ->
    141         "error when establishing the tunnel proxy connection: " <>
    142           HTTP1.format_error(http_reason)
    143     end
    144   end
    145 end