zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

frame.ex (14275B)


      1 defmodule Mint.HTTP2.Frame do
      2   @moduledoc false
      3 
      4   import Bitwise, only: [band: 2, bor: 2]
      5   import Record
      6 
      7   shared_stream = [:stream_id, {:flags, 0x00}]
      8   shared_conn = [stream_id: 0, flags: 0x00]
      9 
     10   defrecord :data, shared_stream ++ [:data, :padding]
     11   defrecord :headers, shared_stream ++ [:exclusive?, :stream_dependency, :weight, :hbf, :padding]
     12   defrecord :priority, shared_stream ++ [:exclusive?, :stream_dependency, :weight]
     13   defrecord :rst_stream, shared_stream ++ [:error_code]
     14   defrecord :settings, shared_conn ++ [:params]
     15   defrecord :push_promise, shared_stream ++ [:promised_stream_id, :hbf, :padding]
     16   defrecord :ping, shared_conn ++ [:opaque_data]
     17   defrecord :goaway, shared_conn ++ [:last_stream_id, :error_code, :debug_data]
     18   defrecord :window_update, shared_stream ++ [:window_size_increment]
     19   defrecord :continuation, shared_stream ++ [:hbf]
     20   defrecord :unknown, []
     21 
     22   @types %{
     23     data: 0x00,
     24     headers: 0x01,
     25     priority: 0x02,
     26     rst_stream: 0x03,
     27     settings: 0x04,
     28     push_promise: 0x05,
     29     ping: 0x06,
     30     goaway: 0x07,
     31     window_update: 0x08,
     32     continuation: 0x09
     33   }
     34 
     35   ## Flag handling
     36 
     37   @flags %{
     38     data: [end_stream: 0x01, padded: 0x08],
     39     headers: [end_stream: 0x01, end_headers: 0x04, padded: 0x08, priority: 0x20],
     40     settings: [ack: 0x01],
     41     push_promise: [end_headers: 0x04, padded: 0x08],
     42     ping: [ack: 0x01],
     43     continuation: [end_headers: 0x04]
     44   }
     45 
     46   @spec set_flags(byte(), atom(), [flag_name :: atom()]) :: byte()
     47   def set_flags(initial_flags \\ 0x00, frame_name, flags_to_set)
     48       when is_integer(initial_flags) and is_list(flags_to_set) do
     49     Enum.reduce(flags_to_set, initial_flags, &set_flag(&2, frame_name, &1))
     50   end
     51 
     52   @spec flag_set?(byte(), atom(), atom()) :: boolean()
     53   def flag_set?(flags, frame, flag_name)
     54 
     55   for {frame, flags} <- @flags,
     56       {flag_name, flag_value} <- flags do
     57     defp set_flag(flags, unquote(frame), unquote(flag_name)), do: bor(flags, unquote(flag_value))
     58     defp set_flag(unquote(frame), unquote(flag_name)), do: unquote(flag_value)
     59 
     60     def flag_set?(flags, unquote(frame), unquote(flag_name)),
     61       do: band(flags, unquote(flag_value)) == unquote(flag_value)
     62   end
     63 
     64   defmacrop is_flag_set(flags, flag) do
     65     quote do
     66       band(unquote(flags), unquote(flag)) == unquote(flag)
     67     end
     68   end
     69 
     70   ## Parsing
     71 
     72   @doc """
     73   Decodes the next frame of the given binary.
     74 
     75   Returns `{:ok, frame, rest}` if successful, `{:error, reason}` if not.
     76   """
     77   @spec decode_next(binary()) :: {:ok, tuple(), binary()} | :more | {:error, reason}
     78         when reason:
     79                {:frame_size_error, atom()}
     80                | {:protocol_error, binary()}
     81                | :payload_too_big
     82   def decode_next(bin, max_frame_size \\ 16_384) when is_binary(bin) do
     83     case decode_next_raw(bin) do
     84       {:ok, {_type, _flags, _stream_id, payload}, _rest}
     85       when byte_size(payload) > max_frame_size ->
     86         {:error, :payload_too_big}
     87 
     88       {:ok, {type, flags, stream_id, payload}, rest} ->
     89         {:ok, decode_contents(type, flags, stream_id, payload), rest}
     90 
     91       :more ->
     92         :more
     93     end
     94   catch
     95     :throw, {:mint, reason} -> {:error, reason}
     96   end
     97 
     98   defp decode_next_raw(<<
     99          length::24,
    100          type,
    101          flags,
    102          _reserved::1,
    103          stream_id::31,
    104          payload::size(length)-binary,
    105          rest::binary
    106        >>) do
    107     {:ok, {type, flags, stream_id, payload}, rest}
    108   end
    109 
    110   defp decode_next_raw(_other) do
    111     :more
    112   end
    113 
    114   for {frame, type} <- @types do
    115     function = :"decode_#{frame}"
    116 
    117     defp decode_contents(unquote(type), flags, stream_id, payload) do
    118       unquote(function)(flags, stream_id, payload)
    119     end
    120   end
    121 
    122   defp decode_contents(_type, _flags, _stream_id, _payload) do
    123     unknown()
    124   end
    125 
    126   # Parsing of specific frames
    127 
    128   # http://httpwg.org/specs/rfc7540.html#rfc.section.6.1
    129   defp decode_data(flags, stream_id, payload) do
    130     {data, padding} = decode_padding(:data, flags, payload)
    131     data(stream_id: stream_id, flags: flags, data: data, padding: padding)
    132   end
    133 
    134   # http://httpwg.org/specs/rfc7540.html#rfc.section.6.2
    135   defp decode_headers(flags, stream_id, payload) do
    136     {data, padding} = decode_padding(:headers, flags, payload)
    137 
    138     {exclusive?, stream_dependency, weight, data} =
    139       if flag_set?(flags, :headers, :priority) do
    140         <<exclusive::1, stream_dependency::31, weight::8, rest::binary>> = data
    141         {exclusive == 1, stream_dependency, weight + 1, rest}
    142       else
    143         {nil, nil, nil, data}
    144       end
    145 
    146     headers(
    147       stream_id: stream_id,
    148       flags: flags,
    149       padding: padding,
    150       exclusive?: exclusive?,
    151       stream_dependency: stream_dependency,
    152       weight: weight,
    153       hbf: data
    154     )
    155   end
    156 
    157   # http://httpwg.org/specs/rfc7540.html#rfc.section.6.3
    158   defp decode_priority(_flags, _stream_id, payload) when byte_size(payload) != 5 do
    159     throw({:mint, {:frame_size_error, :priority}})
    160   end
    161 
    162   defp decode_priority(flags, stream_id, payload) do
    163     <<exclusive::1, stream_dependency::31, weight::8>> = payload
    164 
    165     priority(
    166       stream_id: stream_id,
    167       flags: flags,
    168       exclusive?: exclusive == 1,
    169       stream_dependency: stream_dependency,
    170       weight: weight + 1
    171     )
    172   end
    173 
    174   # http://httpwg.org/specs/rfc7540.html#rfc.section.6.4
    175   defp decode_rst_stream(_flags, _stream_id, payload) when byte_size(payload) != 4 do
    176     throw({:mint, {:frame_size_error, :rst_stream}})
    177   end
    178 
    179   defp decode_rst_stream(flags, stream_id, <<error_code::32>>) do
    180     rst_stream(
    181       stream_id: stream_id,
    182       flags: flags,
    183       error_code: humanize_error_code(error_code)
    184     )
    185   end
    186 
    187   # http://httpwg.org/specs/rfc7540.html#rfc.section.6.5
    188   defp decode_settings(_flags, _stream_id, payload) when rem(byte_size(payload), 6) != 0 do
    189     throw({:mint, {:frame_size_error, :settings}})
    190   end
    191 
    192   defp decode_settings(flags, stream_id, payload) do
    193     settings(stream_id: stream_id, flags: flags, params: decode_settings_params(payload))
    194   end
    195 
    196   # http://httpwg.org/specs/rfc7540.html#rfc.section.6.6
    197   defp decode_push_promise(flags, stream_id, payload) do
    198     {data, padding} = decode_padding(:push_promise, flags, payload)
    199     <<_reserved::1, promised_stream_id::31, header_block_fragment::binary>> = data
    200 
    201     push_promise(
    202       stream_id: stream_id,
    203       flags: flags,
    204       promised_stream_id: promised_stream_id,
    205       hbf: header_block_fragment,
    206       padding: padding
    207     )
    208   end
    209 
    210   # http://httpwg.org/specs/rfc7540.html#rfc.section.6.7
    211   defp decode_ping(_flags, _stream_id, payload) when byte_size(payload) != 8 do
    212     throw({:mint, {:frame_size_error, :ping}})
    213   end
    214 
    215   defp decode_ping(flags, stream_id, payload) do
    216     ping(stream_id: stream_id, flags: flags, opaque_data: payload)
    217   end
    218 
    219   # http://httpwg.org/specs/rfc7540.html#rfc.section.6.8
    220   defp decode_goaway(flags, stream_id, payload) do
    221     <<_reserved::1, last_stream_id::31, error_code::32, debug_data::binary>> = payload
    222 
    223     goaway(
    224       stream_id: stream_id,
    225       flags: flags,
    226       last_stream_id: last_stream_id,
    227       error_code: humanize_error_code(error_code),
    228       debug_data: debug_data
    229     )
    230   end
    231 
    232   # http://httpwg.org/specs/rfc7540.html#rfc.section.6.9
    233   defp decode_window_update(_flags, _stream_id, payload) when byte_size(payload) != 4 do
    234     throw({:mint, {:frame_size_error, :window_update}})
    235   end
    236 
    237   defp decode_window_update(_flags, _stream_id, <<_reserved::1, 0::31>>) do
    238     throw({:mint, {:protocol_error, "bad WINDOW_SIZE increment"}})
    239   end
    240 
    241   defp decode_window_update(flags, stream_id, <<_reserved::1, window_size_increment::31>>) do
    242     window_update(
    243       stream_id: stream_id,
    244       flags: flags,
    245       window_size_increment: window_size_increment
    246     )
    247   end
    248 
    249   # http://httpwg.org/specs/rfc7540.html#rfc.section.6.10
    250   defp decode_continuation(flags, stream_id, payload) do
    251     continuation(stream_id: stream_id, flags: flags, hbf: payload)
    252   end
    253 
    254   defp decode_padding(frame, flags, <<pad_length, rest::binary>> = payload)
    255        when is_flag_set(flags, unquote(@flags[:data][:padded])) do
    256     if pad_length >= byte_size(payload) do
    257       debug_data =
    258         "the padding length of a #{inspect(frame)} frame is bigger than the payload length"
    259 
    260       throw({:mint, {:protocol_error, debug_data}})
    261     else
    262       # 1 byte is for the space taken by pad_length
    263       data_length = byte_size(payload) - pad_length - 1
    264       <<data::size(data_length)-binary, padding::size(pad_length)-binary>> = rest
    265       {data, padding}
    266     end
    267   end
    268 
    269   defp decode_padding(_frame, _flags, payload) do
    270     {payload, nil}
    271   end
    272 
    273   defp decode_settings_params(payload) do
    274     decode_settings_params(payload, _acc = [])
    275   end
    276 
    277   defp decode_settings_params(<<>>, acc) do
    278     Enum.reverse(acc)
    279   end
    280 
    281   defp decode_settings_params(<<identifier::16, value::32, rest::binary>>, acc) do
    282     # From http://httpwg.org/specs/rfc7540.html#SettingValues:
    283     # An endpoint that receives a SETTINGS frame with any unknown or unsupported identifier MUST
    284     # ignore that setting.
    285     acc =
    286       case identifier do
    287         0x01 -> [{:header_table_size, value} | acc]
    288         0x02 -> [{:enable_push, value == 1} | acc]
    289         0x03 -> [{:max_concurrent_streams, value} | acc]
    290         0x04 -> [{:initial_window_size, value} | acc]
    291         0x05 -> [{:max_frame_size, value} | acc]
    292         0x06 -> [{:max_header_list_size, value} | acc]
    293         0x08 -> [{:enable_connect_protocol, value == 1} | acc]
    294         _other -> acc
    295       end
    296 
    297     decode_settings_params(rest, acc)
    298   end
    299 
    300   ## Encoding
    301 
    302   @doc """
    303   Encodes the given `frame`.
    304   """
    305   @spec encode(tuple()) :: iodata()
    306   def encode(frame)
    307 
    308   def encode(data(stream_id: stream_id, flags: flags, data: data, padding: nil)) do
    309     encode_raw(@types[:data], flags, stream_id, data)
    310   end
    311 
    312   def encode(data(stream_id: stream_id, flags: flags, data: data, padding: padding)) do
    313     flags = set_flags(flags, :data, [:padded])
    314     payload = [byte_size(padding), data, padding]
    315     encode_raw(@types[:data], flags, stream_id, payload)
    316   end
    317 
    318   def encode(headers() = frame) do
    319     headers(
    320       flags: flags,
    321       stream_id: stream_id,
    322       exclusive?: exclusive?,
    323       stream_dependency: stream_dependency,
    324       weight: weight,
    325       hbf: hbf,
    326       padding: padding
    327     ) = frame
    328 
    329     payload = hbf
    330 
    331     {payload, flags} =
    332       if stream_dependency && weight && is_boolean(exclusive?) do
    333         {
    334           [<<if(exclusive?, do: 1, else: 0)::1, stream_dependency::31>>, weight - 1, payload],
    335           set_flags(flags, :headers, [:priority])
    336         }
    337       else
    338         {payload, flags}
    339       end
    340 
    341     {payload, flags} =
    342       if padding do
    343         {[byte_size(padding), payload, padding], set_flags(flags, :headers, [:padded])}
    344       else
    345         {payload, flags}
    346       end
    347 
    348     encode_raw(@types[:headers], flags, stream_id, payload)
    349   end
    350 
    351   def encode(priority() = frame) do
    352     priority(
    353       stream_id: stream_id,
    354       flags: flags,
    355       exclusive?: exclusive?,
    356       stream_dependency: stream_dependency,
    357       weight: weight
    358     ) = frame
    359 
    360     payload = [
    361       <<if(exclusive?, do: 1, else: 0)::1, stream_dependency::31>>,
    362       weight - 1
    363     ]
    364 
    365     encode_raw(@types[:priority], flags, stream_id, payload)
    366   end
    367 
    368   def encode(rst_stream(stream_id: stream_id, flags: flags, error_code: error_code)) do
    369     payload = <<dehumanize_error_code(error_code)::32>>
    370     encode_raw(@types[:rst_stream], flags, stream_id, payload)
    371   end
    372 
    373   def encode(settings(stream_id: stream_id, flags: flags, params: params)) do
    374     payload =
    375       Enum.map(params, fn
    376         {:header_table_size, value} -> <<0x01::16, value::32>>
    377         {:enable_push, value} -> <<0x02::16, if(value, do: 1, else: 0)::32>>
    378         {:max_concurrent_streams, value} -> <<0x03::16, value::32>>
    379         {:initial_window_size, value} -> <<0x04::16, value::32>>
    380         {:max_frame_size, value} -> <<0x05::16, value::32>>
    381         {:max_header_list_size, value} -> <<0x06::16, value::32>>
    382         {:enable_connect_protocol, value} -> <<0x08::16, if(value, do: 1, else: 0)::32>>
    383       end)
    384 
    385     encode_raw(@types[:settings], flags, stream_id, payload)
    386   end
    387 
    388   def encode(push_promise() = frame) do
    389     push_promise(
    390       stream_id: stream_id,
    391       flags: flags,
    392       promised_stream_id: promised_stream_id,
    393       hbf: hbf,
    394       padding: padding
    395     ) = frame
    396 
    397     payload = [<<0::1, promised_stream_id::31>>, hbf]
    398 
    399     {payload, flags} =
    400       if padding do
    401         {
    402           [byte_size(padding), payload, padding],
    403           set_flags(flags, :push_promise, [:padded])
    404         }
    405       else
    406         {payload, flags}
    407       end
    408 
    409     encode_raw(@types[:push_promise], flags, stream_id, payload)
    410   end
    411 
    412   def encode(ping(stream_id: 0, flags: flags, opaque_data: opaque_data)) do
    413     encode_raw(@types[:ping], flags, 0, opaque_data)
    414   end
    415 
    416   def encode(goaway() = frame) do
    417     goaway(
    418       stream_id: 0,
    419       flags: flags,
    420       last_stream_id: last_stream_id,
    421       error_code: error_code,
    422       debug_data: debug_data
    423     ) = frame
    424 
    425     payload = [<<0::1, last_stream_id::31, dehumanize_error_code(error_code)::32>>, debug_data]
    426     encode_raw(@types[:goaway], flags, 0, payload)
    427   end
    428 
    429   def encode(window_update(stream_id: stream_id, flags: flags, window_size_increment: wsi)) do
    430     payload = <<0::1, wsi::31>>
    431     encode_raw(@types[:window_update], flags, stream_id, payload)
    432   end
    433 
    434   def encode(continuation(stream_id: stream_id, flags: flags, hbf: hbf)) do
    435     encode_raw(@types[:continuation], flags, stream_id, _payload = hbf)
    436   end
    437 
    438   def encode_raw(type, flags, stream_id, payload) do
    439     [<<IO.iodata_length(payload)::24>>, type, flags, <<0::1, stream_id::31>>, payload]
    440   end
    441 
    442   ## Helpers
    443 
    444   error_codes = %{
    445     0x00 => :no_error,
    446     0x01 => :protocol_error,
    447     0x02 => :internal_error,
    448     0x03 => :flow_control_error,
    449     0x04 => :settings_timeout,
    450     0x05 => :stream_closed,
    451     0x06 => :frame_size_error,
    452     0x07 => :refused_stream,
    453     0x08 => :cancel,
    454     0x09 => :compression_error,
    455     0x0A => :connect_error,
    456     0x0B => :enhance_your_calm,
    457     0x0C => :inadequate_security,
    458     0x0D => :http_1_1_required
    459   }
    460 
    461   for {code, human_code} <- error_codes do
    462     defp humanize_error_code(unquote(code)), do: unquote(human_code)
    463     defp dehumanize_error_code(unquote(human_code)), do: unquote(code)
    464   end
    465 end