zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

cow_http2_machine.erl (68837B)


      1 %% Copyright (c) 2018, Loïc Hoguin <essen@ninenines.eu>
      2 %%
      3 %% Permission to use, copy, modify, and/or distribute this software for any
      4 %% purpose with or without fee is hereby granted, provided that the above
      5 %% copyright notice and this permission notice appear in all copies.
      6 %%
      7 %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      8 %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
      9 %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     10 %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     11 %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     12 %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     13 %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     14 
     15 -module(cow_http2_machine).
     16 
     17 -export([init/2]).
     18 -export([init_stream/2]).
     19 -export([init_upgrade_stream/2]).
     20 -export([frame/2]).
     21 -export([ignored_frame/1]).
     22 -export([timeout/3]).
     23 -export([prepare_headers/5]).
     24 -export([prepare_push_promise/4]).
     25 -export([prepare_trailers/3]).
     26 -export([send_or_queue_data/4]).
     27 -export([ensure_window/2]).
     28 -export([ensure_window/3]).
     29 -export([update_window/2]).
     30 -export([update_window/3]).
     31 -export([reset_stream/2]).
     32 -export([get_connection_local_buffer_size/1]).
     33 -export([get_local_setting/2]).
     34 -export([get_remote_settings/1]).
     35 -export([get_last_streamid/1]).
     36 -export([set_last_streamid/1]).
     37 -export([get_stream_local_buffer_size/2]).
     38 -export([get_stream_local_state/2]).
     39 -export([get_stream_remote_state/2]).
     40 -export([is_lingering_stream/2]).
     41 
     42 -type opts() :: #{
     43 	connection_window_margin_size => 0..16#7fffffff,
     44 	connection_window_update_threshold => 0..16#7fffffff,
     45 	enable_connect_protocol => boolean(),
     46 	initial_connection_window_size => 65535..16#7fffffff,
     47 	initial_stream_window_size => 0..16#7fffffff,
     48 	max_connection_window_size => 0..16#7fffffff,
     49 	max_concurrent_streams => non_neg_integer() | infinity,
     50 	max_decode_table_size => non_neg_integer(),
     51 	max_encode_table_size => non_neg_integer(),
     52 	max_frame_size_received => 16384..16777215,
     53 	max_frame_size_sent => 16384..16777215 | infinity,
     54 	max_stream_window_size => 0..16#7fffffff,
     55 	message_tag => any(),
     56 	preface_timeout => timeout(),
     57 	settings_timeout => timeout(),
     58 	stream_window_data_threshold => 0..16#7fffffff,
     59 	stream_window_margin_size => 0..16#7fffffff,
     60 	stream_window_update_threshold => 0..16#7fffffff
     61 }.
     62 -export_type([opts/0]).
     63 
     64 %% The order of the fields is significant.
     65 -record(sendfile, {
     66 	offset :: non_neg_integer(),
     67 	bytes :: pos_integer(),
     68 	path :: file:name_all()
     69 }).
     70 
     71 -record(stream, {
     72 	id = undefined :: cow_http2:streamid(),
     73 
     74 	%% Request method.
     75 	method = undefined :: binary(),
     76 
     77 	%% Whether we finished sending data.
     78 	local = idle :: idle | cow_http2:fin(),
     79 
     80 	%% Local flow control window (how much we can send).
     81 	local_window :: integer(),
     82 
     83 	%% Buffered data waiting for the flow control window to increase.
     84 	local_buffer = queue:new() ::
     85 		queue:queue({cow_http2:fin(), non_neg_integer(), {data, iodata()} | #sendfile{}}),
     86 	local_buffer_size = 0 :: non_neg_integer(),
     87 	local_trailers = undefined :: undefined | cow_http:headers(),
     88 
     89 	%% Whether we finished receiving data.
     90 	remote = idle :: idle | cow_http2:fin(),
     91 
     92 	%% Remote flow control window (how much we accept to receive).
     93 	remote_window :: integer(),
     94 
     95 	%% Size expected and read from the request body.
     96 	remote_expected_size = undefined :: undefined | non_neg_integer(),
     97 	remote_read_size = 0 :: non_neg_integer(),
     98 
     99 	%% Unparsed te header. Used to know if we can send trailers.
    100 	%% Note that we can always send trailers to the server.
    101 	te :: undefined | binary()
    102 }).
    103 
    104 -type stream() :: #stream{}.
    105 
    106 -type continued_frame() ::
    107 	{headers, cow_http2:streamid(), cow_http2:fin(), cow_http2:head_fin(), binary()} |
    108 	{push_promise, cow_http2:streamid(), cow_http2:head_fin(), cow_http2:streamid(), binary()}.
    109 
    110 -record(http2_machine, {
    111 	%% Whether the HTTP/2 endpoint is a client or a server.
    112 	mode :: client | server,
    113 
    114 	%% HTTP/2 SETTINGS customization.
    115 	opts = #{} :: opts(),
    116 
    117 	%% Connection-wide frame processing state.
    118 	state = settings :: settings | normal
    119 		| {continuation, request | response | trailers | push_promise, continued_frame()},
    120 
    121 	%% Timer for the connection preface.
    122 	preface_timer = undefined :: undefined | reference(),
    123 
    124 	%% Timer for the ack for a SETTINGS frame we sent.
    125 	settings_timer = undefined :: undefined | reference(),
    126 
    127 	%% Settings are separate for each endpoint. In addition, settings
    128 	%% must be acknowledged before they can be expected to be applied.
    129 	local_settings = #{
    130 %		header_table_size => 4096,
    131 %		enable_push => true,
    132 %		max_concurrent_streams => infinity,
    133 		initial_window_size => 65535
    134 %		max_frame_size => 16384
    135 %		max_header_list_size => infinity
    136 	} :: map(),
    137 	next_settings = undefined :: undefined | map(),
    138 	remote_settings = #{
    139 		initial_window_size => 65535
    140 	} :: map(),
    141 
    142 	%% Connection-wide flow control window.
    143 	local_window = 65535 :: integer(), %% How much we can send.
    144 	remote_window = 65535 :: integer(), %% How much we accept to receive.
    145 
    146 	%% Stream identifiers.
    147 	local_streamid :: pos_integer(), %% The next streamid to be used.
    148 	remote_streamid = 0 :: non_neg_integer(), %% The last streamid received.
    149 	last_remote_streamid = 16#7fffffff :: non_neg_integer(), %% Used in GOAWAY.
    150 
    151 	%% Currently active HTTP/2 streams. Streams may be initiated either
    152 	%% by the client or by the server through PUSH_PROMISE frames.
    153 	streams = #{} :: #{cow_http2:streamid() => stream()},
    154 
    155 	%% HTTP/2 streams that have recently been reset locally.
    156 	%% We are expected to keep receiving additional frames after
    157 	%% sending an RST_STREAM.
    158 	local_lingering_streams = [] :: [cow_http2:streamid()],
    159 
    160 	%% HTTP/2 streams that have recently been reset remotely.
    161 	%% We keep a few of these around in order to reject subsequent
    162 	%% frames on these streams.
    163 	remote_lingering_streams = [] :: [cow_http2:streamid()],
    164 
    165 	%% HPACK decoding and encoding state.
    166 	decode_state = cow_hpack:init() :: cow_hpack:state(),
    167 	encode_state = cow_hpack:init() :: cow_hpack:state()
    168 }).
    169 
    170 -opaque http2_machine() :: #http2_machine{}.
    171 -export_type([http2_machine/0]).
    172 
    173 -type pseudo_headers() :: #{} %% Trailers
    174 	| #{ %% Responses.
    175 		status := cow_http:status()
    176 	} | #{ %% Normal CONNECT requests.
    177 		method := binary(),
    178 		authority := binary()
    179 	} | #{ %% Other requests and extended CONNECT requests.
    180 		method := binary(),
    181 		scheme := binary(),
    182 		authority := binary(),
    183 		path := binary(),
    184 		protocol => binary()
    185 	}.
    186 
    187 %% Returns true when the given StreamID is for a local-initiated stream.
    188 -define(IS_SERVER_LOCAL(StreamID), ((StreamID rem 2) =:= 0)).
    189 -define(IS_CLIENT_LOCAL(StreamID), ((StreamID rem 2) =:= 1)).
    190 -define(IS_LOCAL(Mode, StreamID), (
    191 	((Mode =:= server) andalso ?IS_SERVER_LOCAL(StreamID))
    192 	orelse
    193 	((Mode =:= client) andalso ?IS_CLIENT_LOCAL(StreamID))
    194 )).
    195 
    196 -spec init(client | server, opts()) -> {ok, iodata(), http2_machine()}.
    197 init(client, Opts) ->
    198 	NextSettings = settings_init(Opts),
    199 	client_preface(#http2_machine{
    200 		mode=client,
    201 		opts=Opts,
    202 		preface_timer=start_timer(preface_timeout, Opts),
    203 		settings_timer=start_timer(settings_timeout, Opts),
    204 		next_settings=NextSettings,
    205 		local_streamid=1
    206 	});
    207 init(server, Opts) ->
    208 	NextSettings = settings_init(Opts),
    209 	common_preface(#http2_machine{
    210 		mode=server,
    211 		opts=Opts,
    212 		preface_timer=start_timer(preface_timeout, Opts),
    213 		settings_timer=start_timer(settings_timeout, Opts),
    214 		next_settings=NextSettings,
    215 		local_streamid=2
    216 	}).
    217 
    218 %% @todo In Cowlib 3.0 we should always include MessageTag in the message.
    219 %% It can be set to 'undefined' if the option is missing.
    220 start_timer(Name, Opts=#{message_tag := MessageTag}) ->
    221 	case maps:get(Name, Opts, 5000) of
    222 		infinity -> undefined;
    223 		Timeout -> erlang:start_timer(Timeout, self(), {?MODULE, MessageTag, Name})
    224 	end;
    225 start_timer(Name, Opts) ->
    226 	case maps:get(Name, Opts, 5000) of
    227 		infinity -> undefined;
    228 		Timeout -> erlang:start_timer(Timeout, self(), {?MODULE, Name})
    229 	end.
    230 
    231 client_preface(State0) ->
    232 	{ok, CommonPreface, State} = common_preface(State0),
    233 	{ok, [
    234 		<<"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n">>,
    235 		CommonPreface
    236 	], State}.
    237 
    238 %% We send next_settings and use defaults until we get an ack.
    239 %%
    240 %% We also send a WINDOW_UPDATE frame for the connection when
    241 %% the user specified an initial_connection_window_size.
    242 common_preface(State=#http2_machine{opts=Opts, next_settings=NextSettings}) ->
    243 	case maps:get(initial_connection_window_size, Opts, 65535) of
    244 		65535 ->
    245 			{ok, cow_http2:settings(NextSettings), State};
    246 		Size ->
    247 			{ok, [
    248 				cow_http2:settings(NextSettings),
    249 				cow_http2:window_update(Size - 65535)
    250 			], update_window(Size - 65535, State)}
    251 	end.
    252 
    253 settings_init(Opts) ->
    254 	S0 = setting_from_opt(#{}, Opts, max_decode_table_size,
    255 		header_table_size, 4096),
    256 	S1 = setting_from_opt(S0, Opts, max_concurrent_streams,
    257 		max_concurrent_streams, infinity),
    258 	S2 = setting_from_opt(S1, Opts, initial_stream_window_size,
    259 		initial_window_size, 65535),
    260 	S3 = setting_from_opt(S2, Opts, max_frame_size_received,
    261 		max_frame_size, 16384),
    262 	%% @todo max_header_list_size
    263 	setting_from_opt(S3, Opts, enable_connect_protocol,
    264 		enable_connect_protocol, false).
    265 
    266 setting_from_opt(Settings, Opts, OptName, SettingName, Default) ->
    267 	case maps:get(OptName, Opts, Default) of
    268 		Default -> Settings;
    269 		Value -> Settings#{SettingName => Value}
    270 	end.
    271 
    272 -spec init_stream(binary(), State)
    273 	-> {ok, cow_http2:streamid(), State} when State::http2_machine().
    274 init_stream(Method, State=#http2_machine{mode=client, local_streamid=LocalStreamID,
    275 		local_settings=#{initial_window_size := RemoteWindow},
    276 		remote_settings=#{initial_window_size := LocalWindow}}) ->
    277 	Stream = #stream{id=LocalStreamID, method=Method,
    278 		local_window=LocalWindow, remote_window=RemoteWindow},
    279 	{ok, LocalStreamID, stream_store(Stream, State#http2_machine{
    280 		local_streamid=LocalStreamID + 2})}.
    281 
    282 -spec init_upgrade_stream(binary(), State)
    283 	-> {ok, cow_http2:streamid(), State} when State::http2_machine().
    284 init_upgrade_stream(Method, State=#http2_machine{mode=server, remote_streamid=0,
    285 		local_settings=#{initial_window_size := RemoteWindow},
    286 		remote_settings=#{initial_window_size := LocalWindow}}) ->
    287 	Stream = #stream{id=1, method=Method,
    288 		remote=fin, remote_expected_size=0,
    289 		local_window=LocalWindow, remote_window=RemoteWindow, te=undefined},
    290 	{ok, 1, stream_store(Stream, State#http2_machine{remote_streamid=1})}.
    291 
    292 -spec frame(cow_http2:frame(), State)
    293 	-> {ok, State}
    294 	| {ok, {data, cow_http2:streamid(), cow_http2:fin(), binary()}, State}
    295 	| {ok, {headers, cow_http2:streamid(), cow_http2:fin(),
    296 		cow_http:headers(), pseudo_headers(), non_neg_integer() | undefined}, State}
    297 	| {ok, {trailers, cow_http2:streamid(), cow_http:headers()}, State}
    298 	| {ok, {rst_stream, cow_http2:streamid(), cow_http2:error()}, State}
    299 	| {ok, {push_promise, cow_http2:streamid(), cow_http2:streamid(),
    300 		cow_http:headers(), pseudo_headers()}, State}
    301 	| {ok, {goaway, cow_http2:streamid(), cow_http2:error(), binary()}, State}
    302 	| {send, [{cow_http2:streamid(), cow_http2:fin(),
    303 		[{data, iodata()} | #sendfile{} | {trailers, cow_http:headers()}]}], State}
    304 	| {error, {stream_error, cow_http2:streamid(), cow_http2:error(), atom()}, State}
    305 	| {error, {connection_error, cow_http2:error(), atom()}, State}
    306 	when State::http2_machine().
    307 frame(Frame, State=#http2_machine{state=settings, preface_timer=TRef}) ->
    308 	ok = case TRef of
    309 		undefined -> ok;
    310 		_ -> erlang:cancel_timer(TRef, [{async, true}, {info, false}])
    311 	end,
    312 	settings_frame(Frame, State#http2_machine{state=normal, preface_timer=undefined});
    313 frame(Frame, State=#http2_machine{state={continuation, _, _}}) ->
    314 	maybe_discard_result(continuation_frame(Frame, State));
    315 frame(settings_ack, State=#http2_machine{state=normal}) ->
    316 	settings_ack_frame(State);
    317 frame(Frame, State=#http2_machine{state=normal}) ->
    318 	Result = case element(1, Frame) of
    319 		data -> data_frame(Frame, State);
    320 		headers -> headers_frame(Frame, State);
    321 		priority -> priority_frame(Frame, State);
    322 		rst_stream -> rst_stream_frame(Frame, State);
    323 		settings -> settings_frame(Frame, State);
    324 		push_promise -> push_promise_frame(Frame, State);
    325 		ping -> ping_frame(Frame, State);
    326 		ping_ack -> ping_ack_frame(Frame, State);
    327 		goaway -> goaway_frame(Frame, State);
    328 		window_update -> window_update_frame(Frame, State);
    329 		continuation -> unexpected_continuation_frame(Frame, State);
    330 		_ -> ignored_frame(State)
    331 	end,
    332 	maybe_discard_result(Result).
    333 
    334 %% RFC7540 6.9. After sending a GOAWAY frame, the sender can discard frames for
    335 %% streams initiated by the receiver with identifiers higher than the identified
    336 %% last stream. However, any frames that alter connection state cannot be
    337 %% completely ignored. For instance, HEADERS, PUSH_PROMISE, and CONTINUATION
    338 %% frames MUST be minimally processed to ensure the state maintained for header
    339 %% compression is consistent.
    340 maybe_discard_result(FrameResult={ok, Result, State=#http2_machine{mode=Mode,
    341 		last_remote_streamid=MaxID}})
    342 		when element(1, Result) =/= goaway ->
    343 	case element(2, Result) of
    344 		StreamID when StreamID > MaxID, not ?IS_LOCAL(Mode, StreamID) ->
    345 			{ok, State};
    346 		_StreamID ->
    347 			FrameResult
    348 	end;
    349 maybe_discard_result(FrameResult) ->
    350 	FrameResult.
    351 
    352 %% DATA frame.
    353 
    354 data_frame({data, StreamID, _, _}, State=#http2_machine{mode=Mode,
    355 		local_streamid=LocalStreamID, remote_streamid=RemoteStreamID})
    356 		when (?IS_LOCAL(Mode, StreamID) andalso (StreamID >= LocalStreamID))
    357 		orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID > RemoteStreamID)) ->
    358 	{error, {connection_error, protocol_error,
    359 		'DATA frame received on a stream in idle state. (RFC7540 5.1)'},
    360 		State};
    361 data_frame({data, _, _, Data}, State=#http2_machine{remote_window=ConnWindow})
    362 		when byte_size(Data) > ConnWindow ->
    363 	{error, {connection_error, flow_control_error,
    364 		'DATA frame overflowed the connection flow control window. (RFC7540 6.9, RFC7540 6.9.1)'},
    365 		State};
    366 data_frame(Frame={data, StreamID, _, Data}, State0=#http2_machine{
    367 		remote_window=ConnWindow, local_lingering_streams=Lingering}) ->
    368 	DataLen = byte_size(Data),
    369 	State = State0#http2_machine{remote_window=ConnWindow - DataLen},
    370 	case stream_get(StreamID, State) of
    371 		#stream{remote_window=StreamWindow} when StreamWindow < DataLen ->
    372 			stream_reset(StreamID, State, flow_control_error,
    373 				'DATA frame overflowed the stream flow control window. (RFC7540 6.9, RFC7540 6.9.1)');
    374 		Stream = #stream{remote=nofin} ->
    375 			data_frame(Frame, State, Stream, DataLen);
    376 		#stream{remote=idle} ->
    377 			stream_reset(StreamID, State, protocol_error,
    378 				'DATA frame received before a HEADERS frame. (RFC7540 8.1, RFC7540 8.1.2.6)');
    379 		#stream{remote=fin} ->
    380 			stream_reset(StreamID, State, stream_closed,
    381 				'DATA frame received for a half-closed (remote) stream. (RFC7540 5.1)');
    382 		undefined ->
    383 			%% After we send an RST_STREAM frame and terminate a stream,
    384 			%% the remote endpoint might still be sending us some more
    385 			%% frames until it can process this RST_STREAM.
    386 			case lists:member(StreamID, Lingering) of
    387 				true ->
    388 					{ok, State};
    389 				false ->
    390 					{error, {connection_error, stream_closed,
    391 						'DATA frame received for a closed stream. (RFC7540 5.1)'},
    392 						State}
    393 			end
    394 	end.
    395 
    396 data_frame(Frame={data, _, IsFin, _}, State0, Stream0=#stream{id=StreamID,
    397 		remote_window=StreamWindow, remote_read_size=StreamRead}, DataLen) ->
    398 	Stream = Stream0#stream{remote=IsFin,
    399 		remote_window=StreamWindow - DataLen,
    400 		remote_read_size=StreamRead + DataLen},
    401 	State = stream_store(Stream, State0),
    402 	case is_body_size_valid(Stream) of
    403 		true ->
    404 			{ok, Frame, State};
    405 		false ->
    406 			stream_reset(StreamID, State, protocol_error,
    407 				'The total size of DATA frames is different than the content-length. (RFC7540 8.1.2.6)')
    408 	end.
    409 
    410 %% It's always valid when no content-length header was specified.
    411 is_body_size_valid(#stream{remote_expected_size=undefined}) ->
    412 	true;
    413 %% We didn't finish reading the body but the size is already larger than expected.
    414 is_body_size_valid(#stream{remote=nofin, remote_expected_size=Expected,
    415 		remote_read_size=Read}) when Read > Expected ->
    416 	false;
    417 is_body_size_valid(#stream{remote=nofin}) ->
    418 	true;
    419 is_body_size_valid(#stream{remote=fin, remote_expected_size=Expected,
    420 		remote_read_size=Expected}) ->
    421 	true;
    422 %% We finished reading the body and the size read is not the one expected.
    423 is_body_size_valid(_) ->
    424 	false.
    425 
    426 %% HEADERS frame.
    427 %%
    428 %% We always close the connection when we detect errors before
    429 %% decoding the headers to not waste resources on non-compliant
    430 %% endpoints, making us stricter than the RFC requires.
    431 
    432 %% Convenience record to manipulate the tuple.
    433 %% The order of the fields matter.
    434 -record(headers, {
    435 	id :: cow_http2:streamid(),
    436 	fin :: cow_http2:fin(),
    437 	head :: cow_http2:head_fin(),
    438 	data :: binary()
    439 }).
    440 
    441 headers_frame(Frame=#headers{}, State=#http2_machine{mode=Mode}) ->
    442 	case Mode of
    443 		server -> server_headers_frame(Frame, State);
    444 		client -> client_headers_frame(Frame, State)
    445 	end;
    446 %% @todo Handle the PRIORITY data, but only if this returns an ok tuple.
    447 %% @todo Do not lose the PRIORITY information if CONTINUATION frames follow.
    448 headers_frame({headers, StreamID, IsFin, IsHeadFin,
    449 		_IsExclusive, _DepStreamID, _Weight, HeaderData},
    450 		State=#http2_machine{mode=Mode}) ->
    451 	HeadersFrame = #headers{id=StreamID, fin=IsFin, head=IsHeadFin, data=HeaderData},
    452 	case Mode of
    453 		server -> server_headers_frame(HeadersFrame, State);
    454 		client -> client_headers_frame(HeadersFrame, State)
    455 	end.
    456 
    457 %% Reject HEADERS frames with even-numbered streamid.
    458 server_headers_frame(#headers{id=StreamID}, State)
    459 		when ?IS_SERVER_LOCAL(StreamID) ->
    460 	{error, {connection_error, protocol_error,
    461 		'HEADERS frame received with even-numbered streamid. (RFC7540 5.1.1)'},
    462 		State};
    463 %% HEADERS frame on an idle stream: new request.
    464 server_headers_frame(Frame=#headers{id=StreamID, head=IsHeadFin},
    465 		State=#http2_machine{mode=server, remote_streamid=RemoteStreamID})
    466 		when StreamID > RemoteStreamID ->
    467 	case IsHeadFin of
    468 		head_fin ->
    469 			headers_decode(Frame, State, request, undefined);
    470 		head_nofin ->
    471 			{ok, State#http2_machine{state={continuation, request, Frame}}}
    472 	end;
    473 %% Either a HEADERS frame received on (half-)closed stream,
    474 %% or a HEADERS frame containing the trailers.
    475 server_headers_frame(Frame=#headers{id=StreamID, fin=IsFin, head=IsHeadFin}, State) ->
    476 	case stream_get(StreamID, State) of
    477 		%% Trailers.
    478 		Stream = #stream{remote=nofin} when IsFin =:= fin ->
    479 			case IsHeadFin of
    480 				head_fin ->
    481 					headers_decode(Frame, State, trailers, Stream);
    482 				head_nofin ->
    483 					{ok, State#http2_machine{state={continuation, trailers, Frame}}}
    484 			end;
    485 		#stream{remote=nofin} ->
    486 			{error, {connection_error, protocol_error,
    487 				'Trailing HEADERS frame received without the END_STREAM flag set. (RFC7540 8.1, RFC7540 8.1.2.6)'},
    488 				State};
    489 		_ ->
    490 			{error, {connection_error, stream_closed,
    491 				'HEADERS frame received on a stream in closed or half-closed state. (RFC7540 5.1)'},
    492 				State}
    493 	end.
    494 
    495 %% Either a HEADERS frame received on an (half-)closed stream,
    496 %% or a HEADERS frame containing the response or the trailers.
    497 client_headers_frame(Frame=#headers{id=StreamID, fin=IsFin, head=IsHeadFin},
    498 		State=#http2_machine{local_streamid=LocalStreamID, remote_streamid=RemoteStreamID})
    499 		when (?IS_CLIENT_LOCAL(StreamID) andalso (StreamID < LocalStreamID))
    500 		orelse ((not ?IS_CLIENT_LOCAL(StreamID)) andalso (StreamID =< RemoteStreamID)) ->
    501 	case stream_get(StreamID, State) of
    502 		Stream = #stream{remote=idle} ->
    503 			case IsHeadFin of
    504 				head_fin ->
    505 					headers_decode(Frame, State, response, Stream);
    506 				head_nofin ->
    507 					{ok, State#http2_machine{state={continuation, response, Frame}}}
    508 			end;
    509 		Stream = #stream{remote=nofin} when IsFin =:= fin ->
    510 			case IsHeadFin of
    511 				head_fin ->
    512 					headers_decode(Frame, State, trailers, Stream);
    513 				head_nofin ->
    514 					{ok, State#http2_machine{state={continuation, trailers, Frame}}}
    515 			end;
    516 		#stream{remote=nofin} ->
    517 			{error, {connection_error, protocol_error,
    518 				'Trailing HEADERS frame received without the END_STREAM flag set. (RFC7540 8.1, RFC7540 8.1.2.6)'},
    519 				State};
    520 		_ ->
    521 			{error, {connection_error, stream_closed,
    522 				'HEADERS frame received on a stream in closed or half-closed state. (RFC7540 5.1)'},
    523 				State}
    524 	end;
    525 %% Reject HEADERS frames received on idle streams.
    526 client_headers_frame(_, State) ->
    527 	{error, {connection_error, protocol_error,
    528 		'HEADERS frame received on an idle stream. (RFC7540 5.1.1)'},
    529 		State}.
    530 
    531 headers_decode(Frame=#headers{head=head_fin, data=HeaderData},
    532 		State=#http2_machine{decode_state=DecodeState0}, Type, Stream) ->
    533 	try cow_hpack:decode(HeaderData, DecodeState0) of
    534 		{Headers, DecodeState} when Type =:= request ->
    535 			headers_enforce_concurrency_limit(Frame,
    536 				State#http2_machine{decode_state=DecodeState}, Type, Stream, Headers);
    537 		{Headers, DecodeState} ->
    538 			headers_pseudo_headers(Frame,
    539 				State#http2_machine{decode_state=DecodeState}, Type, Stream, Headers)
    540 	catch _:_ ->
    541 		{error, {connection_error, compression_error,
    542 			'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'},
    543 			State}
    544 	end.
    545 
    546 headers_enforce_concurrency_limit(Frame=#headers{id=StreamID},
    547 		State=#http2_machine{local_settings=LocalSettings, streams=Streams},
    548 		Type, Stream, Headers) ->
    549 	MaxConcurrentStreams = maps:get(max_concurrent_streams, LocalSettings, infinity),
    550 	%% Using < is correct because this new stream is not included
    551 	%% in the Streams variable yet and so we'll end up with +1 stream.
    552 	case map_size(Streams) < MaxConcurrentStreams of
    553 		true ->
    554 			headers_pseudo_headers(Frame, State, Type, Stream, Headers);
    555 		false ->
    556 			{error, {stream_error, StreamID, refused_stream,
    557 				'Maximum number of concurrent streams has been reached. (RFC7540 5.1.2)'},
    558 				State}
    559 	end.
    560 
    561 headers_pseudo_headers(Frame, State=#http2_machine{local_settings=LocalSettings},
    562 		Type, Stream, Headers0) when Type =:= request; Type =:= push_promise ->
    563 	IsExtendedConnectEnabled = maps:get(enable_connect_protocol, LocalSettings, false),
    564 	case request_pseudo_headers(Headers0, #{}) of
    565 		%% Extended CONNECT method (RFC8441).
    566 		{ok, PseudoHeaders=#{method := <<"CONNECT">>, scheme := _,
    567 			authority := _, path := _, protocol := _}, Headers}
    568 			when IsExtendedConnectEnabled ->
    569 			headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers);
    570 		{ok, #{method := <<"CONNECT">>, scheme := _,
    571 			authority := _, path := _}, _}
    572 			when IsExtendedConnectEnabled ->
    573 			headers_malformed(Frame, State,
    574 				'The :protocol pseudo-header MUST be sent with an extended CONNECT. (RFC8441 4)');
    575 		{ok, #{protocol := _}, _} ->
    576 			headers_malformed(Frame, State,
    577 				'The :protocol pseudo-header is only defined for the extended CONNECT. (RFC8441 4)');
    578 		%% Normal CONNECT (no scheme/path).
    579 		{ok, PseudoHeaders=#{method := <<"CONNECT">>, authority := _}, Headers}
    580 				when map_size(PseudoHeaders) =:= 2 ->
    581 			headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers);
    582 		{ok, #{method := <<"CONNECT">>}, _} ->
    583 			headers_malformed(Frame, State,
    584 				'CONNECT requests only use the :method and :authority pseudo-headers. (RFC7540 8.3)');
    585 		%% Other requests.
    586 		{ok, PseudoHeaders=#{method := _, scheme := _, path := _}, Headers} ->
    587 			headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers);
    588 		{ok, _, _} ->
    589 			headers_malformed(Frame, State,
    590 				'A required pseudo-header was not found. (RFC7540 8.1.2.3)');
    591 		{error, HumanReadable} ->
    592 			headers_malformed(Frame, State, HumanReadable)
    593 	end;
    594 headers_pseudo_headers(Frame=#headers{id=StreamID},
    595 		State, Type=response, Stream, Headers0) ->
    596 	case response_pseudo_headers(Headers0, #{}) of
    597 		{ok, PseudoHeaders=#{status := _}, Headers} ->
    598 			headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers);
    599 		{ok, _, _} ->
    600 			stream_reset(StreamID, State, protocol_error,
    601 				'A required pseudo-header was not found. (RFC7540 8.1.2.4)');
    602 		{error, HumanReadable} ->
    603 			stream_reset(StreamID, State, protocol_error, HumanReadable)
    604 	end;
    605 headers_pseudo_headers(Frame=#headers{id=StreamID},
    606 		State, Type=trailers, Stream, Headers) ->
    607 	case trailers_contain_pseudo_headers(Headers) of
    608 		false ->
    609 			headers_regular_headers(Frame, State, Type, Stream, #{}, Headers);
    610 		true ->
    611 			stream_reset(StreamID, State, protocol_error,
    612 				'Trailer header blocks must not contain pseudo-headers. (RFC7540 8.1.2.1)')
    613 	end.
    614 
    615 headers_malformed(#headers{id=StreamID}, State, HumanReadable) ->
    616 	{error, {stream_error, StreamID, protocol_error, HumanReadable}, State}.
    617 
    618 request_pseudo_headers([{<<":method">>, _}|_], #{method := _}) ->
    619 	{error, 'Multiple :method pseudo-headers were found. (RFC7540 8.1.2.3)'};
    620 request_pseudo_headers([{<<":method">>, Method}|Tail], PseudoHeaders) ->
    621 	request_pseudo_headers(Tail, PseudoHeaders#{method => Method});
    622 request_pseudo_headers([{<<":scheme">>, _}|_], #{scheme := _}) ->
    623 	{error, 'Multiple :scheme pseudo-headers were found. (RFC7540 8.1.2.3)'};
    624 request_pseudo_headers([{<<":scheme">>, Scheme}|Tail], PseudoHeaders) ->
    625 	request_pseudo_headers(Tail, PseudoHeaders#{scheme => Scheme});
    626 request_pseudo_headers([{<<":authority">>, _}|_], #{authority := _}) ->
    627 	{error, 'Multiple :authority pseudo-headers were found. (RFC7540 8.1.2.3)'};
    628 request_pseudo_headers([{<<":authority">>, Authority}|Tail], PseudoHeaders) ->
    629 	request_pseudo_headers(Tail, PseudoHeaders#{authority => Authority});
    630 request_pseudo_headers([{<<":path">>, _}|_], #{path := _}) ->
    631 	{error, 'Multiple :path pseudo-headers were found. (RFC7540 8.1.2.3)'};
    632 request_pseudo_headers([{<<":path">>, Path}|Tail], PseudoHeaders) ->
    633 	request_pseudo_headers(Tail, PseudoHeaders#{path => Path});
    634 request_pseudo_headers([{<<":protocol">>, _}|_], #{protocol := _}) ->
    635 	{error, 'Multiple :protocol pseudo-headers were found. (RFC7540 8.1.2.3)'};
    636 request_pseudo_headers([{<<":protocol">>, Protocol}|Tail], PseudoHeaders) ->
    637 	request_pseudo_headers(Tail, PseudoHeaders#{protocol => Protocol});
    638 request_pseudo_headers([{<<":", _/bits>>, _}|_], _) ->
    639 	{error, 'An unknown or invalid pseudo-header was found. (RFC7540 8.1.2.1)'};
    640 request_pseudo_headers(Headers, PseudoHeaders) ->
    641 	{ok, PseudoHeaders, Headers}.
    642 
    643 response_pseudo_headers([{<<":status">>, _}|_], #{status := _}) ->
    644 	{error, 'Multiple :status pseudo-headers were found. (RFC7540 8.1.2.3)'};
    645 response_pseudo_headers([{<<":status">>, Status}|Tail], PseudoHeaders) ->
    646 	try cow_http:status_to_integer(Status) of
    647 		IntStatus ->
    648 			response_pseudo_headers(Tail, PseudoHeaders#{status => IntStatus})
    649 	catch _:_ ->
    650 		{error, 'The :status pseudo-header value is invalid. (RFC7540 8.1.2.4)'}
    651 	end;
    652 response_pseudo_headers([{<<":", _/bits>>, _}|_], _) ->
    653 	{error, 'An unknown or invalid pseudo-header was found. (RFC7540 8.1.2.1)'};
    654 response_pseudo_headers(Headers, PseudoHeaders) ->
    655 	{ok, PseudoHeaders, Headers}.
    656 
    657 trailers_contain_pseudo_headers([]) ->
    658 	false;
    659 trailers_contain_pseudo_headers([{<<":", _/bits>>, _}|_]) ->
    660 	true;
    661 trailers_contain_pseudo_headers([_|Tail]) ->
    662 	trailers_contain_pseudo_headers(Tail).
    663 
    664 %% Rejecting invalid regular headers might be a bit too strong for clients.
    665 headers_regular_headers(Frame=#headers{id=StreamID},
    666 		State, Type, Stream, PseudoHeaders, Headers) ->
    667 	case regular_headers(Headers, Type) of
    668 		ok when Type =:= request ->
    669 			request_expected_size(Frame, State, Type, Stream, PseudoHeaders, Headers);
    670 		ok when Type =:= push_promise ->
    671 			push_promise_frame(Frame, State, Stream, PseudoHeaders, Headers);
    672 		ok when Type =:= response ->
    673 			response_expected_size(Frame, State, Type, Stream, PseudoHeaders, Headers);
    674 		ok when Type =:= trailers ->
    675 			trailers_frame(Frame, State, Stream, Headers);
    676 		{error, HumanReadable} when Type =:= request ->
    677 			headers_malformed(Frame, State, HumanReadable);
    678 		{error, HumanReadable} ->
    679 			stream_reset(StreamID, State, protocol_error, HumanReadable)
    680 	end.
    681 
    682 regular_headers([{<<>>, _}|_], _) ->
    683 	{error, 'Empty header names are not valid regular headers. (CVE-2019-9516)'};
    684 regular_headers([{<<":", _/bits>>, _}|_], _) ->
    685 	{error, 'Pseudo-headers were found after regular headers. (RFC7540 8.1.2.1)'};
    686 regular_headers([{<<"connection">>, _}|_], _) ->
    687 	{error, 'The connection header is not allowed. (RFC7540 8.1.2.2)'};
    688 regular_headers([{<<"keep-alive">>, _}|_], _) ->
    689 	{error, 'The keep-alive header is not allowed. (RFC7540 8.1.2.2)'};
    690 regular_headers([{<<"proxy-authenticate">>, _}|_], _) ->
    691 	{error, 'The proxy-authenticate header is not allowed. (RFC7540 8.1.2.2)'};
    692 regular_headers([{<<"proxy-authorization">>, _}|_], _) ->
    693 	{error, 'The proxy-authorization header is not allowed. (RFC7540 8.1.2.2)'};
    694 regular_headers([{<<"transfer-encoding">>, _}|_], _) ->
    695 	{error, 'The transfer-encoding header is not allowed. (RFC7540 8.1.2.2)'};
    696 regular_headers([{<<"upgrade">>, _}|_], _) ->
    697 	{error, 'The upgrade header is not allowed. (RFC7540 8.1.2.2)'};
    698 regular_headers([{<<"te">>, Value}|_], request) when Value =/= <<"trailers">> ->
    699 	{error, 'The te header with a value other than "trailers" is not allowed. (RFC7540 8.1.2.2)'};
    700 regular_headers([{<<"te">>, _}|_], Type) when Type =/= request ->
    701 	{error, 'The te header is only allowed in request headers. (RFC7540 8.1.2.2)'};
    702 regular_headers([{Name, _}|Tail], Type) ->
    703 	Pattern = [
    704 		<<$A>>, <<$B>>, <<$C>>, <<$D>>, <<$E>>, <<$F>>, <<$G>>, <<$H>>, <<$I>>,
    705 		<<$J>>, <<$K>>, <<$L>>, <<$M>>, <<$N>>, <<$O>>, <<$P>>, <<$Q>>, <<$R>>,
    706 		<<$S>>, <<$T>>, <<$U>>, <<$V>>, <<$W>>, <<$X>>, <<$Y>>, <<$Z>>
    707 	],
    708 	case binary:match(Name, Pattern) of
    709 		nomatch -> regular_headers(Tail, Type);
    710 		_ -> {error, 'Header names must be lowercase. (RFC7540 8.1.2)'}
    711 	end;
    712 regular_headers([], _) ->
    713 	ok.
    714 
    715 request_expected_size(Frame=#headers{fin=IsFin}, State, Type, Stream, PseudoHeaders, Headers) ->
    716 	case [CL || {<<"content-length">>, CL} <- Headers] of
    717 		[] when IsFin =:= fin ->
    718 			headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0);
    719 		[] ->
    720 			headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, undefined);
    721 		[<<"0">>] when IsFin =:= fin ->
    722 			headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0);
    723 		[_] when IsFin =:= fin ->
    724 			headers_malformed(Frame, State,
    725 				'HEADERS frame with the END_STREAM flag contains a non-zero content-length. (RFC7540 8.1.2.6)');
    726 		[BinLen] ->
    727 			headers_parse_expected_size(Frame, State, Type, Stream,
    728 				PseudoHeaders, Headers, BinLen);
    729 		_ ->
    730 			headers_malformed(Frame, State,
    731 				'Multiple content-length headers were received. (RFC7230 3.3.2)')
    732 	end.
    733 
    734 response_expected_size(Frame=#headers{id=StreamID, fin=IsFin}, State, Type,
    735 		Stream=#stream{method=Method}, PseudoHeaders=#{status := Status}, Headers) ->
    736 	case [CL || {<<"content-length">>, CL} <- Headers] of
    737 		[] when IsFin =:= fin ->
    738 			headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0);
    739 		[] ->
    740 			headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, undefined);
    741 		[_] when Status >= 100, Status =< 199 ->
    742 			stream_reset(StreamID, State, protocol_error,
    743 				'Content-length header received in a 1xx response. (RFC7230 3.3.2)');
    744 		[_] when Status =:= 204 ->
    745 			stream_reset(StreamID, State, protocol_error,
    746 				'Content-length header received in a 204 response. (RFC7230 3.3.2)');
    747 		[_] when Status >= 200, Status =< 299, Method =:= <<"CONNECT">> ->
    748 			stream_reset(StreamID, State, protocol_error,
    749 				'Content-length header received in a 2xx response to a CONNECT request. (RFC7230 3.3.2).');
    750 		%% Responses to HEAD requests, and 304 responses may contain
    751 		%% a content-length header that must be ignored. (RFC7230 3.3.2)
    752 		[_] when Method =:= <<"HEAD">> ->
    753 			headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0);
    754 		[_] when Status =:= 304 ->
    755 			headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0);
    756 		[<<"0">>] when IsFin =:= fin ->
    757 			headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0);
    758 		[_] when IsFin =:= fin ->
    759 			stream_reset(StreamID, State, protocol_error,
    760 				'HEADERS frame with the END_STREAM flag contains a non-zero content-length. (RFC7540 8.1.2.6)');
    761 		[BinLen] ->
    762 			headers_parse_expected_size(Frame, State, Type, Stream,
    763 				PseudoHeaders, Headers, BinLen);
    764 		_ ->
    765 			stream_reset(StreamID, State, protocol_error,
    766 				'Multiple content-length headers were received. (RFC7230 3.3.2)')
    767 	end.
    768 
    769 headers_parse_expected_size(Frame=#headers{id=StreamID},
    770 		State, Type, Stream, PseudoHeaders, Headers, BinLen) ->
    771 	try cow_http_hd:parse_content_length(BinLen) of
    772 		Len ->
    773 			headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, Len)
    774 	catch
    775 		_:_ ->
    776 			HumanReadable = 'The content-length header is invalid. (RFC7230 3.3.2)',
    777 			case Type of
    778 				request -> headers_malformed(Frame, State, HumanReadable);
    779 				response -> stream_reset(StreamID, State, protocol_error, HumanReadable)
    780 			end
    781 	end.
    782 
    783 headers_frame(#headers{id=StreamID, fin=IsFin}, State0=#http2_machine{
    784 		local_settings=#{initial_window_size := RemoteWindow},
    785 		remote_settings=#{initial_window_size := LocalWindow}},
    786 		Type, Stream0, PseudoHeaders, Headers, Len) ->
    787 	{Stream, State1} = case Type of
    788 		request ->
    789 			TE = case lists:keyfind(<<"te">>, 1, Headers) of
    790 				{_, TE0} -> TE0;
    791 				false -> undefined
    792 			end,
    793 			{#stream{id=StreamID, method=maps:get(method, PseudoHeaders),
    794 				remote=IsFin, remote_expected_size=Len,
    795 				local_window=LocalWindow, remote_window=RemoteWindow, te=TE},
    796 				State0#http2_machine{remote_streamid=StreamID}};
    797 		response ->
    798 			Stream1 = case PseudoHeaders of
    799 				#{status := Status} when Status >= 100, Status =< 199 -> Stream0;
    800 				_ -> Stream0#stream{remote=IsFin, remote_expected_size=Len}
    801 			end,
    802 			{Stream1, State0}
    803 	end,
    804 	State = stream_store(Stream, State1),
    805 	{ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, Len}, State}.
    806 
    807 trailers_frame(#headers{id=StreamID}, State0, Stream0, Headers) ->
    808 	Stream = Stream0#stream{remote=fin},
    809 	State = stream_store(Stream, State0),
    810 	case is_body_size_valid(Stream) of
    811 		true ->
    812 			{ok, {trailers, StreamID, Headers}, State};
    813 		false ->
    814 			stream_reset(StreamID, State, protocol_error,
    815 				'The total size of DATA frames is different than the content-length. (RFC7540 8.1.2.6)')
    816 	end.
    817 
    818 %% PRIORITY frame.
    819 %%
    820 %% @todo Handle PRIORITY frames.
    821 
    822 priority_frame(_Frame, State) ->
    823 	{ok, State}.
    824 
    825 %% RST_STREAM frame.
    826 
    827 rst_stream_frame({rst_stream, StreamID, _}, State=#http2_machine{mode=Mode,
    828 		local_streamid=LocalStreamID, remote_streamid=RemoteStreamID})
    829 		when (?IS_LOCAL(Mode, StreamID) andalso (StreamID >= LocalStreamID))
    830 		orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID > RemoteStreamID)) ->
    831 	{error, {connection_error, protocol_error,
    832 		'RST_STREAM frame received on a stream in idle state. (RFC7540 5.1)'},
    833 		State};
    834 rst_stream_frame({rst_stream, StreamID, Reason}, State=#http2_machine{
    835 		streams=Streams0, remote_lingering_streams=Lingering0}) ->
    836 	Streams = maps:remove(StreamID, Streams0),
    837 	%% We only keep up to 10 streams in this state. @todo Make it configurable?
    838 	Lingering = [StreamID|lists:sublist(Lingering0, 10 - 1)],
    839 	{ok, {rst_stream, StreamID, Reason},
    840 		State#http2_machine{streams=Streams, remote_lingering_streams=Lingering}}.
    841 
    842 %% SETTINGS frame.
    843 
    844 settings_frame({settings, Settings}, State0=#http2_machine{
    845 		opts=Opts, remote_settings=Settings0}) ->
    846 	State1 = State0#http2_machine{remote_settings=maps:merge(Settings0, Settings)},
    847 	State2 = maps:fold(fun
    848 		(header_table_size, NewSize, State=#http2_machine{encode_state=EncodeState0}) ->
    849 			MaxSize = maps:get(max_encode_table_size, Opts, 4096),
    850 			EncodeState = cow_hpack:set_max_size(min(NewSize, MaxSize), EncodeState0),
    851 			State#http2_machine{encode_state=EncodeState};
    852 		(initial_window_size, NewWindowSize, State) ->
    853 			OldWindowSize = maps:get(initial_window_size, Settings0, 65535),
    854 			streams_update_local_window(State, NewWindowSize - OldWindowSize);
    855 		(_, _, State) ->
    856 			State
    857 	end, State1, Settings),
    858 	case Settings of
    859 		#{initial_window_size := _} -> send_data(State2);
    860 		_ -> {ok, State2}
    861 	end;
    862 %% We expect to receive a SETTINGS frame as part of the preface.
    863 settings_frame(_F, State=#http2_machine{mode=server}) ->
    864 	{error, {connection_error, protocol_error,
    865 		'The preface sequence must be followed by a SETTINGS frame. (RFC7540 3.5)'},
    866 		State};
    867 settings_frame(_F, State) ->
    868 	{error, {connection_error, protocol_error,
    869 		'The preface must begin with a SETTINGS frame. (RFC7540 3.5)'},
    870 		State}.
    871 
    872 %% When SETTINGS_INITIAL_WINDOW_SIZE changes we need to update
    873 %% the local stream windows for all active streams and perhaps
    874 %% resume sending data.
    875 streams_update_local_window(State=#http2_machine{streams=Streams0}, Increment) ->
    876 	Streams = maps:map(fun(_, S=#stream{local_window=StreamWindow}) ->
    877 		S#stream{local_window=StreamWindow + Increment}
    878 	end, Streams0),
    879 	State#http2_machine{streams=Streams}.
    880 
    881 %% Ack for a previously sent SETTINGS frame.
    882 
    883 settings_ack_frame(State0=#http2_machine{settings_timer=TRef,
    884 		local_settings=Local0, next_settings=NextSettings}) ->
    885 	ok = case TRef of
    886 		undefined -> ok;
    887 		_ -> erlang:cancel_timer(TRef, [{async, true}, {info, false}])
    888 	end,
    889 	Local = maps:merge(Local0, NextSettings),
    890 	State1 = State0#http2_machine{settings_timer=undefined,
    891 		local_settings=Local, next_settings=#{}},
    892 	{ok, maps:fold(fun
    893 		(header_table_size, MaxSize, State=#http2_machine{decode_state=DecodeState0}) ->
    894 			DecodeState = cow_hpack:set_max_size(MaxSize, DecodeState0),
    895 			State#http2_machine{decode_state=DecodeState};
    896 		(initial_window_size, NewWindowSize, State) ->
    897 			OldWindowSize = maps:get(initial_window_size, Local0, 65535),
    898 			streams_update_remote_window(State, NewWindowSize - OldWindowSize);
    899 		(_, _, State) ->
    900 			State
    901 	end, State1, NextSettings)}.
    902 
    903 %% When we receive an ack to a SETTINGS frame we sent we need to update
    904 %% the remote stream windows for all active streams.
    905 streams_update_remote_window(State=#http2_machine{streams=Streams0}, Increment) ->
    906 	Streams = maps:map(fun(_, S=#stream{remote_window=StreamWindow}) ->
    907 		S#stream{remote_window=StreamWindow + Increment}
    908 	end, Streams0),
    909 	State#http2_machine{streams=Streams}.
    910 
    911 %% PUSH_PROMISE frame.
    912 
    913 %% Convenience record to manipulate the tuple.
    914 %% The order of the fields matter.
    915 -record(push_promise, {
    916 	id :: cow_http2:streamid(),
    917 	head :: cow_http2:head_fin(),
    918 	promised_id :: cow_http2:streamid(),
    919 	data :: binary()
    920 }).
    921 
    922 push_promise_frame(_, State=#http2_machine{mode=server}) ->
    923 	{error, {connection_error, protocol_error,
    924 		'PUSH_PROMISE frames MUST NOT be sent by the client. (RFC7540 6.6)'},
    925 		State};
    926 push_promise_frame(_, State=#http2_machine{local_settings=#{enable_push := false}}) ->
    927 	{error, {connection_error, protocol_error,
    928 		'PUSH_PROMISE frame received despite SETTINGS_ENABLE_PUSH set to 0. (RFC7540 6.6)'},
    929 		State};
    930 push_promise_frame(#push_promise{promised_id=PromisedStreamID},
    931 		State=#http2_machine{remote_streamid=RemoteStreamID})
    932 		when PromisedStreamID =< RemoteStreamID ->
    933 	{error, {connection_error, protocol_error,
    934 		'PUSH_PROMISE frame received for a promised stream in closed or half-closed state. (RFC7540 5.1, RFC7540 6.6)'},
    935 		State};
    936 push_promise_frame(#push_promise{id=StreamID}, State)
    937 		when not ?IS_CLIENT_LOCAL(StreamID) ->
    938 	{error, {connection_error, protocol_error,
    939 		'PUSH_PROMISE frame received on a server-initiated stream. (RFC7540 6.6)'},
    940 		State};
    941 push_promise_frame(Frame=#push_promise{id=StreamID, head=IsHeadFin,
    942 		promised_id=PromisedStreamID, data=HeaderData}, State) ->
    943 	case stream_get(StreamID, State) of
    944 		Stream=#stream{remote=idle} ->
    945 			case IsHeadFin of
    946 				head_fin ->
    947 					headers_decode(#headers{id=PromisedStreamID,
    948 						fin=fin, head=IsHeadFin, data=HeaderData},
    949 						State, push_promise, Stream);
    950 				head_nofin ->
    951 					{ok, State#http2_machine{state={continuation, push_promise, Frame}}}
    952 			end;
    953 		_ ->
    954 %% @todo Check if the stream is lingering. If it is, decode the frame
    955 %% and do what? That's the big question and why it's not implemented yet.
    956 %   However, an endpoint that
    957 %   has sent RST_STREAM on the associated stream MUST handle PUSH_PROMISE
    958 %   frames that might have been created before the RST_STREAM frame is
    959 %   received and processed. (RFC7540 6.6)
    960 			{error, {connection_error, stream_closed,
    961 				'PUSH_PROMISE frame received on a stream in closed or half-closed state. (RFC7540 5.1, RFC7540 6.6)'},
    962 				State}
    963 	end.
    964 
    965 push_promise_frame(#headers{id=PromisedStreamID},
    966 		State0=#http2_machine{
    967 			local_settings=#{initial_window_size := RemoteWindow},
    968 			remote_settings=#{initial_window_size := LocalWindow}},
    969 		#stream{id=StreamID}, PseudoHeaders=#{method := Method}, Headers) ->
    970 	TE = case lists:keyfind(<<"te">>, 1, Headers) of
    971 		{_, TE0} -> TE0;
    972 		false -> undefined
    973 	end,
    974 	PromisedStream = #stream{id=PromisedStreamID, method=Method,
    975 		local=fin, local_window=LocalWindow,
    976 		remote_window=RemoteWindow, te=TE},
    977 	State = stream_store(PromisedStream,
    978 		State0#http2_machine{remote_streamid=PromisedStreamID}),
    979 	{ok, {push_promise, StreamID, PromisedStreamID, Headers, PseudoHeaders}, State}.
    980 
    981 %% PING frame.
    982 
    983 ping_frame({ping, _}, State) ->
    984 	{ok, State}.
    985 
    986 %% Ack for a previously sent PING frame.
    987 %%
    988 %% @todo Might want to check contents but probably a waste of time.
    989 
    990 ping_ack_frame({ping_ack, _}, State) ->
    991 	{ok, State}.
    992 
    993 %% GOAWAY frame.
    994 
    995 goaway_frame(Frame={goaway, _, _, _}, State) ->
    996 	{ok, Frame, State}.
    997 
    998 %% WINDOW_UPDATE frame.
    999 
   1000 %% Connection-wide WINDOW_UPDATE frame.
   1001 window_update_frame({window_update, Increment}, State=#http2_machine{local_window=ConnWindow})
   1002 		when ConnWindow + Increment > 16#7fffffff ->
   1003 	{error, {connection_error, flow_control_error,
   1004 		'The flow control window must not be greater than 2^31-1. (RFC7540 6.9.1)'},
   1005 		State};
   1006 window_update_frame({window_update, Increment}, State=#http2_machine{local_window=ConnWindow}) ->
   1007 	send_data(State#http2_machine{local_window=ConnWindow + Increment});
   1008 %% Stream-specific WINDOW_UPDATE frame.
   1009 window_update_frame({window_update, StreamID, _}, State=#http2_machine{mode=Mode,
   1010 		local_streamid=LocalStreamID, remote_streamid=RemoteStreamID})
   1011 		when (?IS_LOCAL(Mode, StreamID) andalso (StreamID >= LocalStreamID))
   1012 		orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID > RemoteStreamID)) ->
   1013 	{error, {connection_error, protocol_error,
   1014 		'WINDOW_UPDATE frame received on a stream in idle state. (RFC7540 5.1)'},
   1015 		State};
   1016 window_update_frame({window_update, StreamID, Increment},
   1017 		State0=#http2_machine{remote_lingering_streams=Lingering}) ->
   1018 	case stream_get(StreamID, State0) of
   1019 		#stream{local_window=StreamWindow} when StreamWindow + Increment > 16#7fffffff ->
   1020 			stream_reset(StreamID, State0, flow_control_error,
   1021 				'The flow control window must not be greater than 2^31-1. (RFC7540 6.9.1)');
   1022 		Stream0 = #stream{local_window=StreamWindow} ->
   1023 			send_data(Stream0#stream{local_window=StreamWindow + Increment}, State0);
   1024 		undefined ->
   1025 			%% WINDOW_UPDATE frames may be received for a short period of time
   1026 			%% after a stream is closed. They must be ignored.
   1027 			case lists:member(StreamID, Lingering) of
   1028 				false -> {ok, State0};
   1029 				true -> stream_reset(StreamID, State0, stream_closed,
   1030 					'WINDOW_UPDATE frame received after the stream was reset. (RFC7540 5.1)')
   1031 			end
   1032 	end.
   1033 
   1034 %% CONTINUATION frame.
   1035 
   1036 %% Convenience record to manipulate the tuple.
   1037 %% The order of the fields matter.
   1038 -record(continuation, {
   1039 	id :: cow_http2:streamid(),
   1040 	head :: cow_http2:head_fin(),
   1041 	data :: binary()
   1042 }).
   1043 
   1044 unexpected_continuation_frame(#continuation{}, State) ->
   1045 	{error, {connection_error, protocol_error,
   1046 		'CONTINUATION frames MUST be preceded by a HEADERS or PUSH_PROMISE frame. (RFC7540 6.10)'},
   1047 		State}.
   1048 
   1049 continuation_frame(#continuation{id=StreamID, head=head_fin, data=HeaderFragment1},
   1050 		State=#http2_machine{state={continuation, Type,
   1051 			Frame=#headers{id=StreamID, data=HeaderFragment0}}}) ->
   1052 	HeaderData = <<HeaderFragment0/binary, HeaderFragment1/binary>>,
   1053 	headers_decode(Frame#headers{head=head_fin, data=HeaderData},
   1054 		State#http2_machine{state=normal}, Type, stream_get(StreamID, State));
   1055 continuation_frame(#continuation{id=StreamID, head=head_fin, data=HeaderFragment1},
   1056 		State=#http2_machine{state={continuation, Type, #push_promise{
   1057 			id=StreamID, promised_id=PromisedStreamID, data=HeaderFragment0}}}) ->
   1058 	HeaderData = <<HeaderFragment0/binary, HeaderFragment1/binary>>,
   1059 	headers_decode(#headers{id=PromisedStreamID, fin=fin, head=head_fin, data=HeaderData},
   1060 		State#http2_machine{state=normal}, Type, undefined);
   1061 continuation_frame(#continuation{id=StreamID, data=HeaderFragment1},
   1062 		State=#http2_machine{state={continuation, Type, ContinuedFrame0}})
   1063 		when element(2, ContinuedFrame0) =:= StreamID ->
   1064 	ContinuedFrame = case ContinuedFrame0 of
   1065 		#headers{data=HeaderFragment0} ->
   1066 			HeaderData = <<HeaderFragment0/binary, HeaderFragment1/binary>>,
   1067 			ContinuedFrame0#headers{data=HeaderData};
   1068 		#push_promise{data=HeaderFragment0} ->
   1069 			HeaderData = <<HeaderFragment0/binary, HeaderFragment1/binary>>,
   1070 			ContinuedFrame0#push_promise{data=HeaderData}
   1071 	end,
   1072 	{ok, State#http2_machine{state={continuation, Type, ContinuedFrame}}};
   1073 continuation_frame(_F, State) ->
   1074 	{error, {connection_error, protocol_error,
   1075 		'An invalid frame was received in the middle of a header block. (RFC7540 6.2)'},
   1076 		State}.
   1077 
   1078 %% Ignored frames.
   1079 
   1080 -spec ignored_frame(State)
   1081 	-> {ok, State}
   1082 	| {error, {connection_error, protocol_error, atom()}, State}
   1083 	when State::http2_machine().
   1084 ignored_frame(State=#http2_machine{state={continuation, _, _}}) ->
   1085 	{error, {connection_error, protocol_error,
   1086 		'An invalid frame was received in the middle of a header block. (RFC7540 6.2)'},
   1087 		State};
   1088 %% @todo It might be useful to error out when we receive
   1089 %% too many unknown frames. (RFC7540 10.5)
   1090 ignored_frame(State) ->
   1091 	{ok, State}.
   1092 
   1093 %% Timeouts.
   1094 
   1095 -spec timeout(preface_timeout | settings_timeout, reference(), State)
   1096 	-> {ok, State}
   1097 	| {error, {connection_error, cow_http2:error(), atom()}, State}
   1098 	when State::http2_machine().
   1099 timeout(preface_timeout, TRef, State=#http2_machine{preface_timer=TRef}) ->
   1100 	{error, {connection_error, protocol_error,
   1101 		'The preface was not received in a reasonable amount of time.'},
   1102 		State};
   1103 timeout(settings_timeout, TRef, State=#http2_machine{settings_timer=TRef}) ->
   1104 	{error, {connection_error, settings_timeout,
   1105 		'The SETTINGS ack was not received within the configured time. (RFC7540 6.5.3)'},
   1106 		State};
   1107 timeout(_, _, State) ->
   1108 	{ok, State}.
   1109 
   1110 %% Functions for sending a message header or body. Note that
   1111 %% this module does not send data directly, instead it returns
   1112 %% a value that can then be used to send the frames.
   1113 
   1114 -spec prepare_headers(cow_http2:streamid(), State, idle | cow_http2:fin(),
   1115 	pseudo_headers(), cow_http:headers())
   1116 	-> {ok, cow_http2:fin(), iodata(), State} when State::http2_machine().
   1117 prepare_headers(StreamID, State=#http2_machine{encode_state=EncodeState0},
   1118 		IsFin0, PseudoHeaders, Headers0) ->
   1119 	Stream = #stream{method=Method, local=idle} = stream_get(StreamID, State),
   1120 	IsFin = case {IsFin0, Method} of
   1121 		{idle, _} -> nofin;
   1122 		{_, <<"HEAD">>} -> fin;
   1123 		_ -> IsFin0
   1124 	end,
   1125 	Headers = merge_pseudo_headers(PseudoHeaders, remove_http11_headers(Headers0)),
   1126 	{HeaderBlock, EncodeState} = cow_hpack:encode(Headers, EncodeState0),
   1127 	{ok, IsFin, HeaderBlock, stream_store(Stream#stream{local=IsFin0},
   1128 		State#http2_machine{encode_state=EncodeState})}.
   1129 
   1130 -spec prepare_push_promise(cow_http2:streamid(), State, pseudo_headers(), cow_http:headers())
   1131 	-> {ok, cow_http2:streamid(), iodata(), State}
   1132 	| {error, no_push} when State::http2_machine().
   1133 prepare_push_promise(_, #http2_machine{remote_settings=#{enable_push := false}}, _, _) ->
   1134 	{error, no_push};
   1135 prepare_push_promise(StreamID, State=#http2_machine{encode_state=EncodeState0,
   1136 		local_settings=#{initial_window_size := RemoteWindow},
   1137 		remote_settings=#{initial_window_size := LocalWindow},
   1138 		local_streamid=LocalStreamID}, PseudoHeaders, Headers0) ->
   1139 	#stream{local=idle} = stream_get(StreamID, State),
   1140 	TE = case lists:keyfind(<<"te">>, 1, Headers0) of
   1141 		{_, TE0} -> TE0;
   1142 		false -> undefined
   1143 	end,
   1144 	Headers = merge_pseudo_headers(PseudoHeaders, remove_http11_headers(Headers0)),
   1145 	{HeaderBlock, EncodeState} = cow_hpack:encode(Headers, EncodeState0),
   1146 	{ok, LocalStreamID, HeaderBlock, stream_store(
   1147 		#stream{id=LocalStreamID, method=maps:get(method, PseudoHeaders),
   1148 			remote=fin, remote_expected_size=0,
   1149 			local_window=LocalWindow, remote_window=RemoteWindow, te=TE},
   1150 		State#http2_machine{encode_state=EncodeState, local_streamid=LocalStreamID + 2})}.
   1151 
   1152 remove_http11_headers(Headers) ->
   1153 	RemoveHeaders0 = [
   1154 		<<"keep-alive">>,
   1155 		<<"proxy-connection">>,
   1156 		<<"transfer-encoding">>,
   1157 		<<"upgrade">>
   1158 	],
   1159 	RemoveHeaders = case lists:keyfind(<<"connection">>, 1, Headers) of
   1160 		false ->
   1161 			RemoveHeaders0;
   1162 		{_, ConnHd} ->
   1163 			%% We do not need to worry about any "close" header because
   1164 			%% that header name is reserved.
   1165 			Connection = cow_http_hd:parse_connection(ConnHd),
   1166 			Connection ++ [<<"connection">>|RemoveHeaders0]
   1167 	end,
   1168 	lists:filter(fun({Name, _}) ->
   1169 		not lists:member(Name, RemoveHeaders)
   1170 	end, Headers).
   1171 
   1172 merge_pseudo_headers(PseudoHeaders, Headers0) ->
   1173 	lists:foldl(fun
   1174 		({status, Status}, Acc) when is_integer(Status) ->
   1175 			[{<<":status">>, integer_to_binary(Status)}|Acc];
   1176 		({Name, Value}, Acc) ->
   1177 			[{iolist_to_binary([$:, atom_to_binary(Name, latin1)]), Value}|Acc]
   1178 		end, Headers0, maps:to_list(PseudoHeaders)).
   1179 
   1180 -spec prepare_trailers(cow_http2:streamid(), State, cow_http:headers())
   1181 	-> {ok, iodata(), State} when State::http2_machine().
   1182 prepare_trailers(StreamID, State=#http2_machine{encode_state=EncodeState0}, Trailers) ->
   1183 	Stream = #stream{local=nofin} = stream_get(StreamID, State),
   1184 	{HeaderBlock, EncodeState} = cow_hpack:encode(Trailers, EncodeState0),
   1185 	{ok, HeaderBlock, stream_store(Stream#stream{local=fin},
   1186 		State#http2_machine{encode_state=EncodeState})}.
   1187 
   1188 -spec send_or_queue_data(cow_http2:streamid(), State, cow_http2:fin(), DataOrFileOrTrailers)
   1189 	-> {ok, State}
   1190 	| {send, [{cow_http2:streamid(), cow_http2:fin(), [DataOrFileOrTrailers]}], State}
   1191 	when State::http2_machine(), DataOrFileOrTrailers::
   1192 		{data, iodata()} | #sendfile{} | {trailers, cow_http:headers()}.
   1193 send_or_queue_data(StreamID, State0=#http2_machine{opts=Opts, local_window=ConnWindow},
   1194 		IsFin0, DataOrFileOrTrailers0) ->
   1195 	%% @todo Probably just ignore if the method was HEAD.
   1196 	Stream0 = #stream{
   1197 		local=nofin,
   1198 		local_window=StreamWindow,
   1199 		local_buffer_size=BufferSize,
   1200 		te=TE0
   1201 	} = stream_get(StreamID, State0),
   1202 	DataOrFileOrTrailers = case DataOrFileOrTrailers0 of
   1203 		{trailers, _} ->
   1204 			%% We only accept TE headers containing exactly "trailers" (RFC7540 8.1.2.1).
   1205 			TE = try cow_http_hd:parse_te(TE0) of
   1206 				{trailers, []} -> trailers;
   1207 				_ -> no_trailers
   1208 			catch _:_ ->
   1209 				%% If we can't parse the TE header, assume we can't send trailers.
   1210 				no_trailers
   1211 			end,
   1212 			case TE of
   1213 				trailers ->
   1214 					DataOrFileOrTrailers0;
   1215 				no_trailers ->
   1216 					{data, <<>>}
   1217 			end;
   1218 		_ ->
   1219 			DataOrFileOrTrailers0
   1220 	end,
   1221 	SendSize = case DataOrFileOrTrailers of
   1222 		{data, D} -> BufferSize + iolist_size(D);
   1223 		#sendfile{bytes=B} -> BufferSize + B;
   1224 		{trailers, _} -> 0
   1225 	end,
   1226 	MinSendSize = maps:get(stream_window_data_threshold, Opts, 16384),
   1227 	if
   1228 		%% If we cannot send the data all at once and the window
   1229 		%% is smaller than we are willing to send at a minimum,
   1230 		%% we queue the data directly.
   1231 		(StreamWindow < MinSendSize)
   1232 				andalso ((StreamWindow < SendSize) orelse (ConnWindow < SendSize)) ->
   1233 			{ok, stream_store(queue_data(Stream0, IsFin0, DataOrFileOrTrailers, in), State0)};
   1234 		true ->
   1235 			case send_or_queue_data(Stream0, State0, [], IsFin0, DataOrFileOrTrailers, in) of
   1236 				{ok, Stream, State, []} ->
   1237 					{ok, stream_store(Stream, State)};
   1238 				{ok, Stream=#stream{local=IsFin}, State, SendData} ->
   1239 					{send, [{StreamID, IsFin, lists:reverse(SendData)}], stream_store(Stream, State)}
   1240 			end
   1241 	end.
   1242 
   1243 %% Internal data sending/queuing functions.
   1244 
   1245 %% @todo Should we ever want to implement the PRIORITY mechanism,
   1246 %% this would be the place to do it. Right now, we just go over
   1247 %% all streams and send what we can until either everything is
   1248 %% sent or we run out of space in the window.
   1249 send_data(State0=#http2_machine{streams=Streams0}) ->
   1250 	Iterator = maps:iterator(Streams0),
   1251 	case send_data_for_all_streams(maps:next(Iterator), Streams0, State0, []) of
   1252 		{ok, Streams, State, []} ->
   1253 			{ok, State#http2_machine{streams=Streams}};
   1254 		{ok, Streams, State, Send} ->
   1255 			{send, Send, State#http2_machine{streams=Streams}}
   1256 	end.
   1257 
   1258 send_data_for_all_streams(none, Streams, State, Send) ->
   1259 	{ok, Streams, State, Send};
   1260 %% While technically we should never get < 0 here, let's be on the safe side.
   1261 send_data_for_all_streams(_, Streams, State=#http2_machine{local_window=ConnWindow}, Send)
   1262 		when ConnWindow =< 0 ->
   1263 	{ok, Streams, State, Send};
   1264 %% We rely on send_data_for_one_stream/3 to do all the necessary checks about the stream.
   1265 send_data_for_all_streams({StreamID, Stream0, Iterator}, Streams, State0, Send) ->
   1266 	case send_data_for_one_stream(Stream0, State0, []) of
   1267 		{ok, Stream, State, []} ->
   1268 			send_data_for_all_streams(maps:next(Iterator),
   1269 				Streams#{StreamID => Stream}, State, Send);
   1270 		%% We need to remove the stream here because we do not use stream_store/2.
   1271 		{ok, #stream{local=fin, remote=fin}, State, SendData} ->
   1272 			send_data_for_all_streams(maps:next(Iterator),
   1273 				maps:remove(StreamID, Streams), State, [{StreamID, fin, SendData}|Send]);
   1274 		{ok, Stream=#stream{local=IsFin}, State, SendData} ->
   1275 			send_data_for_all_streams(maps:next(Iterator),
   1276 				Streams#{StreamID => Stream}, State, [{StreamID, IsFin, SendData}|Send])
   1277 	end.
   1278 
   1279 send_data(Stream0, State0) ->
   1280 	case send_data_for_one_stream(Stream0, State0, []) of
   1281 		{ok, Stream, State, []} ->
   1282 			{ok, stream_store(Stream, State)};
   1283 		{ok, Stream=#stream{id=StreamID, local=IsFin}, State, SendData} ->
   1284 			{send, [{StreamID, IsFin, SendData}], stream_store(Stream, State)}
   1285 	end.
   1286 
   1287 send_data_for_one_stream(Stream=#stream{local=nofin, local_buffer_size=0,
   1288 		local_trailers=Trailers}, State, SendAcc) when Trailers =/= undefined ->
   1289 	{ok, Stream, State, lists:reverse([{trailers, Trailers}|SendAcc])};
   1290 send_data_for_one_stream(Stream=#stream{local=nofin, local_buffer=Q0, local_buffer_size=0},
   1291 		State, SendAcc) ->
   1292 	case queue:len(Q0) of
   1293 		0 ->
   1294 			{ok, Stream, State, lists:reverse(SendAcc)};
   1295 		1 ->
   1296 			%% We know there is a final empty data frame in the queue.
   1297 			%% We need to mark the stream as complete.
   1298 			{{value, {fin, 0, _}}, Q} = queue:out(Q0),
   1299 			{ok, Stream#stream{local=fin, local_buffer=Q}, State, lists:reverse(SendAcc)}
   1300 	end;
   1301 send_data_for_one_stream(Stream=#stream{local=IsFin, local_window=StreamWindow,
   1302 		local_buffer_size=BufferSize}, State=#http2_machine{local_window=ConnWindow}, SendAcc)
   1303 		when ConnWindow =< 0; IsFin =:= fin; StreamWindow =< 0; BufferSize =:= 0 ->
   1304 	{ok, Stream, State, lists:reverse(SendAcc)};
   1305 send_data_for_one_stream(Stream0=#stream{local_window=StreamWindow,
   1306 		local_buffer=Q0, local_buffer_size=BufferSize},
   1307 		State0=#http2_machine{opts=Opts, local_window=ConnWindow}, SendAcc0) ->
   1308 	MinSendSize = maps:get(stream_window_data_threshold, Opts, 16384),
   1309 	if
   1310 		%% If we cannot send the entire buffer at once and the window
   1311 		%% is smaller than we are willing to send at a minimum, do nothing.
   1312 		%%
   1313 		%% We only do this check the first time we go through this function;
   1314 		%% we want to send as much data as possible IF we send some.
   1315 		(SendAcc0 =:= []) andalso (StreamWindow < MinSendSize)
   1316 				andalso ((StreamWindow < BufferSize) orelse (ConnWindow < BufferSize)) ->
   1317 			{ok, Stream0, State0, []};
   1318 		true ->
   1319 			%% We know there is an item in the queue.
   1320 			{{value, {IsFin, DataSize, Data}}, Q} = queue:out(Q0),
   1321 			Stream1 = Stream0#stream{local_buffer=Q, local_buffer_size=BufferSize - DataSize},
   1322 			{ok, Stream, State, SendAcc}
   1323 				= send_or_queue_data(Stream1, State0, SendAcc0, IsFin, Data, in_r),
   1324 			send_data_for_one_stream(Stream, State, SendAcc)
   1325 	end.
   1326 
   1327 %% We can send trailers immediately if the queue is empty, otherwise we queue.
   1328 %% We always send trailer frames even if the window is empty.
   1329 send_or_queue_data(Stream=#stream{local_buffer_size=0},
   1330 		State, SendAcc, fin, {trailers, Trailers}, _) ->
   1331 	{ok, Stream, State, [{trailers, Trailers}|SendAcc]};
   1332 send_or_queue_data(Stream, State, SendAcc, fin, {trailers, Trailers}, _) ->
   1333 	{ok, Stream#stream{local_trailers=Trailers}, State, SendAcc};
   1334 %% Send data immediately if we can, buffer otherwise.
   1335 send_or_queue_data(Stream=#stream{local_window=StreamWindow},
   1336 		State=#http2_machine{local_window=ConnWindow},
   1337 		SendAcc, IsFin, Data, In)
   1338 		when ConnWindow =< 0; StreamWindow =< 0 ->
   1339 	{ok, queue_data(Stream, IsFin, Data, In), State, SendAcc};
   1340 send_or_queue_data(Stream=#stream{local_window=StreamWindow},
   1341 		State=#http2_machine{opts=Opts, remote_settings=RemoteSettings,
   1342 		local_window=ConnWindow}, SendAcc, IsFin, Data, In) ->
   1343 	RemoteMaxFrameSize = maps:get(max_frame_size, RemoteSettings, 16384),
   1344 	ConfiguredMaxFrameSize = maps:get(max_frame_size_sent, Opts, infinity),
   1345 	MaxSendSize = min(
   1346 		min(ConnWindow, StreamWindow),
   1347 		min(RemoteMaxFrameSize, ConfiguredMaxFrameSize)
   1348 	),
   1349 	case Data of
   1350 		File = #sendfile{bytes=Bytes} when Bytes =< MaxSendSize ->
   1351 			{ok, Stream#stream{local=IsFin, local_window=StreamWindow - Bytes},
   1352 				State#http2_machine{local_window=ConnWindow - Bytes},
   1353 				[File|SendAcc]};
   1354 		File = #sendfile{offset=Offset, bytes=Bytes} ->
   1355 			send_or_queue_data(Stream#stream{local_window=StreamWindow - MaxSendSize},
   1356 				State#http2_machine{local_window=ConnWindow - MaxSendSize},
   1357 				[File#sendfile{bytes=MaxSendSize}|SendAcc], IsFin,
   1358 				File#sendfile{offset=Offset + MaxSendSize, bytes=Bytes - MaxSendSize}, In);
   1359 		{data, Iolist0} ->
   1360 			IolistSize = iolist_size(Iolist0),
   1361 			if
   1362 				IolistSize =< MaxSendSize ->
   1363 					{ok, Stream#stream{local=IsFin, local_window=StreamWindow - IolistSize},
   1364 						State#http2_machine{local_window=ConnWindow - IolistSize},
   1365 						[{data, Iolist0}|SendAcc]};
   1366 				true ->
   1367 					{Iolist, More} = cow_iolists:split(MaxSendSize, Iolist0),
   1368 					send_or_queue_data(Stream#stream{local_window=StreamWindow - MaxSendSize},
   1369 						State#http2_machine{local_window=ConnWindow - MaxSendSize},
   1370 						[{data, Iolist}|SendAcc], IsFin, {data, More}, In)
   1371 			end
   1372 	end.
   1373 
   1374 queue_data(Stream=#stream{local_buffer=Q0, local_buffer_size=Size0}, IsFin, Data, In) ->
   1375 	DataSize = case Data of
   1376 		{sendfile, _, Bytes, _} -> Bytes;
   1377 		{data, Iolist} -> iolist_size(Iolist)
   1378 	end,
   1379 	%% Never queue non-final empty data frames.
   1380 	case {DataSize, IsFin} of
   1381 		{0, nofin} ->
   1382 			Stream;
   1383 		_ ->
   1384 			Q = queue:In({IsFin, DataSize, Data}, Q0),
   1385 			Stream#stream{local_buffer=Q, local_buffer_size=Size0 + DataSize}
   1386 	end.
   1387 
   1388 %% Public interface to update the flow control window.
   1389 %%
   1390 %% The ensure_window function applies heuristics to avoid updating the
   1391 %% window when it is not necessary. The update_window function updates
   1392 %% the window unconditionally.
   1393 %%
   1394 %% The ensure_window function should be called when requesting more
   1395 %% data (for example when reading a request or response body) as well
   1396 %% as when receiving new data. Failure to do so may result in the
   1397 %% window being depleted.
   1398 %%
   1399 %% The heuristics dictating whether the window must be updated and
   1400 %% what the window size is depends on three options (margin, max
   1401 %% and threshold) along with the Size argument. The window increment
   1402 %% returned by this function may therefore be smaller than the Size
   1403 %% argument. On the other hand the total window allocated over many
   1404 %% calls may end up being larger than the initial Size argument. As
   1405 %% a result, it is the responsibility of the caller to ensure that
   1406 %% the Size argument is never lower than 0.
   1407 
   1408 -spec ensure_window(non_neg_integer(), State)
   1409 	-> ok | {ok, pos_integer(), State} when State::http2_machine().
   1410 ensure_window(Size, State=#http2_machine{opts=Opts, remote_window=RemoteWindow}) ->
   1411 	case ensure_window(Size, RemoteWindow, connection, Opts) of
   1412 		ok ->
   1413 			ok;
   1414 		{ok, Increment} ->
   1415 			{ok, Increment, State#http2_machine{remote_window=RemoteWindow + Increment}}
   1416 	end.
   1417 
   1418 -spec ensure_window(cow_http2:streamid(), non_neg_integer(), State)
   1419 	-> ok | {ok, pos_integer(), State} when State::http2_machine().
   1420 ensure_window(StreamID, Size, State=#http2_machine{opts=Opts}) ->
   1421 	case stream_get(StreamID, State) of
   1422 		%% For simplicity's sake, we do not consider attempts to ensure the window
   1423 		%% of a terminated stream to be errors. We simply act as if the stream
   1424 		%% window is large enough.
   1425 		undefined ->
   1426 			ok;
   1427 		Stream = #stream{remote_window=RemoteWindow} ->
   1428 			case ensure_window(Size, RemoteWindow, stream, Opts) of
   1429 				ok ->
   1430 					ok;
   1431 				{ok, Increment} ->
   1432 					{ok, Increment, stream_store(Stream#stream{remote_window=RemoteWindow + Increment}, State)}
   1433 			end
   1434 	end.
   1435 
   1436 %% No need to update the window when we are not expecting data.
   1437 ensure_window(0, _, _, _) ->
   1438 	ok;
   1439 %% No need to update the window when it is already high enough.
   1440 ensure_window(Size, Window, _, _) when Size =< Window ->
   1441 	ok;
   1442 ensure_window(Size0, Window, Type, Opts) ->
   1443 	Threshold = ensure_window_threshold(Type, Opts),
   1444 	if
   1445 		%% We do not update the window when it is higher than the threshold.
   1446 		Window > Threshold ->
   1447 			ok;
   1448 		true ->
   1449 			Margin = ensure_window_margin(Type, Opts),
   1450 			Size = Size0 + Margin,
   1451 			MaxWindow = ensure_window_max(Type, Opts),
   1452 			Increment = if
   1453 				%% We cannot go above the maximum window size.
   1454 				Size > MaxWindow -> MaxWindow - Window;
   1455 				true -> Size - Window
   1456 			end,
   1457 			case Increment of
   1458 				0 -> ok;
   1459 				_ -> {ok, Increment}
   1460 			end
   1461 	end.
   1462 
   1463 %% Margin defaults to the default initial window size.
   1464 ensure_window_margin(connection, Opts) ->
   1465 	maps:get(connection_window_margin_size, Opts, 65535);
   1466 ensure_window_margin(stream, Opts) ->
   1467 	maps:get(stream_window_margin_size, Opts, 65535).
   1468 
   1469 %% Max window defaults to the max value allowed by the protocol.
   1470 ensure_window_max(connection, Opts) ->
   1471 	maps:get(max_connection_window_size, Opts, 16#7fffffff);
   1472 ensure_window_max(stream, Opts) ->
   1473 	maps:get(max_stream_window_size, Opts, 16#7fffffff).
   1474 
   1475 %% Threshold defaults to 10 times the default frame size.
   1476 ensure_window_threshold(connection, Opts) ->
   1477 	maps:get(connection_window_update_threshold, Opts, 163840);
   1478 ensure_window_threshold(stream, Opts) ->
   1479 	maps:get(stream_window_update_threshold, Opts, 163840).
   1480 
   1481 -spec update_window(1..16#7fffffff, State)
   1482 	-> State when State::http2_machine().
   1483 update_window(Size, State=#http2_machine{remote_window=RemoteWindow})
   1484 		when Size > 0 ->
   1485 	State#http2_machine{remote_window=RemoteWindow + Size}.
   1486 
   1487 -spec update_window(cow_http2:streamid(), 1..16#7fffffff, State)
   1488 	-> State when State::http2_machine().
   1489 update_window(StreamID, Size, State)
   1490 		when Size > 0 ->
   1491 	Stream = #stream{remote_window=RemoteWindow} = stream_get(StreamID, State),
   1492 	stream_store(Stream#stream{remote_window=RemoteWindow + Size}, State).
   1493 
   1494 %% Public interface to reset streams.
   1495 
   1496 -spec reset_stream(cow_http2:streamid(), State)
   1497 	-> {ok, State} | {error, not_found} when State::http2_machine().
   1498 reset_stream(StreamID, State=#http2_machine{streams=Streams0}) ->
   1499 	case maps:take(StreamID, Streams0) of
   1500 		{_, Streams} ->
   1501 			{ok, stream_linger(StreamID, State#http2_machine{streams=Streams})};
   1502 		error ->
   1503 			{error, not_found}
   1504 	end.
   1505 
   1506 %% Retrieve the buffer size for all streams.
   1507 
   1508 -spec get_connection_local_buffer_size(http2_machine()) -> non_neg_integer().
   1509 get_connection_local_buffer_size(#http2_machine{streams=Streams}) ->
   1510 	maps:fold(fun(_, #stream{local_buffer_size=Size}, Acc) ->
   1511 		Acc + Size
   1512 	end, 0, Streams).
   1513 
   1514 %% Retrieve a setting value, or its default value if not set.
   1515 
   1516 -spec get_local_setting(atom(), http2_machine()) -> atom() | integer().
   1517 get_local_setting(Key, #http2_machine{local_settings=Settings}) ->
   1518 	maps:get(Key, Settings, default_setting_value(Key)).
   1519 
   1520 -spec get_remote_settings(http2_machine()) -> map().
   1521 get_remote_settings(#http2_machine{mode=Mode, remote_settings=Settings}) ->
   1522 	Defaults0 = #{
   1523 		header_table_size => default_setting_value(header_table_size),
   1524 		enable_push => default_setting_value(enable_push),
   1525 		max_concurrent_streams => default_setting_value(max_concurrent_streams),
   1526 		initial_window_size => default_setting_value(initial_window_size),
   1527 		max_frame_size => default_setting_value(max_frame_size),
   1528 		max_header_list_size => default_setting_value(max_header_list_size)
   1529 	},
   1530 	Defaults = case Mode of
   1531 		server ->
   1532 			Defaults0#{enable_connect_protocol => default_setting_value(enable_connect_protocol)};
   1533 		client ->
   1534 			Defaults0
   1535 	end,
   1536 	maps:merge(Defaults, Settings).
   1537 
   1538 default_setting_value(header_table_size) -> 4096;
   1539 default_setting_value(enable_push) -> true;
   1540 default_setting_value(max_concurrent_streams) -> infinity;
   1541 default_setting_value(initial_window_size) -> 65535;
   1542 default_setting_value(max_frame_size) -> 16384;
   1543 default_setting_value(max_header_list_size) -> infinity;
   1544 default_setting_value(enable_connect_protocol) -> false.
   1545 
   1546 %% Function to obtain the last known streamid received
   1547 %% for the purposes of sending a GOAWAY frame and closing the connection.
   1548 
   1549 -spec get_last_streamid(http2_machine()) -> cow_http2:streamid().
   1550 get_last_streamid(#http2_machine{remote_streamid=RemoteStreamID}) ->
   1551 	RemoteStreamID.
   1552 
   1553 %% Set last accepted streamid to the last known streamid, for the purpose
   1554 %% ignoring frames for remote streams created after sending GOAWAY.
   1555 
   1556 -spec set_last_streamid(http2_machine()) -> {cow_http2:streamid(), http2_machine()}.
   1557 set_last_streamid(State=#http2_machine{remote_streamid=StreamID,
   1558 		last_remote_streamid=LastStreamID}) when StreamID =< LastStreamID->
   1559 	{StreamID, State#http2_machine{last_remote_streamid = StreamID}}.
   1560 
   1561 %% Retrieve the local buffer size for a stream.
   1562 
   1563 -spec get_stream_local_buffer_size(cow_http2:streamid(), http2_machine())
   1564 	-> {ok, non_neg_integer()} | {error, not_found | closed}.
   1565 get_stream_local_buffer_size(StreamID, State=#http2_machine{mode=Mode,
   1566 		local_streamid=LocalStreamID, remote_streamid=RemoteStreamID}) ->
   1567 	case stream_get(StreamID, State) of
   1568 		#stream{local_buffer_size=Size} ->
   1569 			{ok, Size};
   1570 		undefined when (?IS_LOCAL(Mode, StreamID) andalso (StreamID < LocalStreamID))
   1571 				orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID =< RemoteStreamID)) ->
   1572 			{error, closed};
   1573 		undefined ->
   1574 			{error, not_found}
   1575 	end.
   1576 
   1577 %% Retrieve the local state for a stream, including the state in the queue.
   1578 
   1579 -spec get_stream_local_state(cow_http2:streamid(), http2_machine())
   1580 	-> {ok, idle | cow_http2:fin(), empty | nofin | fin} | {error, not_found | closed}.
   1581 get_stream_local_state(StreamID, State=#http2_machine{mode=Mode,
   1582 		local_streamid=LocalStreamID, remote_streamid=RemoteStreamID}) ->
   1583 	case stream_get(StreamID, State) of
   1584 		#stream{local=IsFin, local_buffer=Q, local_trailers=undefined} ->
   1585 			IsQueueFin = case queue:peek_r(Q) of
   1586 				empty -> empty;
   1587 				{value, {IsQueueFin0, _, _}} -> IsQueueFin0
   1588 			end,
   1589 			{ok, IsFin, IsQueueFin};
   1590 		%% Trailers are queued so the local state is fin after the queue is drained.
   1591 		#stream{local=IsFin} ->
   1592 			{ok, IsFin, fin};
   1593 		undefined when (?IS_LOCAL(Mode, StreamID) andalso (StreamID < LocalStreamID))
   1594 				orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID =< RemoteStreamID)) ->
   1595 			{error, closed};
   1596 		undefined ->
   1597 			{error, not_found}
   1598 	end.
   1599 
   1600 %% Retrieve the remote state for a stream.
   1601 
   1602 -spec get_stream_remote_state(cow_http2:streamid(), http2_machine())
   1603 	-> {ok, idle | cow_http2:fin()} | {error, not_found | closed}.
   1604 get_stream_remote_state(StreamID, State=#http2_machine{mode=Mode,
   1605 		local_streamid=LocalStreamID, remote_streamid=RemoteStreamID}) ->
   1606 	case stream_get(StreamID, State) of
   1607 		#stream{remote=IsFin} ->
   1608 			{ok, IsFin};
   1609 		undefined when (?IS_LOCAL(Mode, StreamID) andalso (StreamID < LocalStreamID))
   1610 				orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID =< RemoteStreamID)) ->
   1611 			{error, closed};
   1612 		undefined ->
   1613 			{error, not_found}
   1614 	end.
   1615 
   1616 %% Query whether the stream was reset recently by the remote endpoint.
   1617 
   1618 -spec is_lingering_stream(cow_http2:streamid(), http2_machine()) -> boolean().
   1619 is_lingering_stream(StreamID, #http2_machine{
   1620 		local_lingering_streams=Local, remote_lingering_streams=Remote}) ->
   1621 	case lists:member(StreamID, Local) of
   1622 		true -> true;
   1623 		false -> lists:member(StreamID, Remote)
   1624 	end.
   1625 
   1626 %% Stream-related functions.
   1627 
   1628 stream_get(StreamID, #http2_machine{streams=Streams}) ->
   1629 	maps:get(StreamID, Streams, undefined).
   1630 
   1631 stream_store(#stream{id=StreamID, local=fin, remote=fin},
   1632 		State=#http2_machine{streams=Streams0}) ->
   1633 	Streams = maps:remove(StreamID, Streams0),
   1634 	State#http2_machine{streams=Streams};
   1635 stream_store(Stream=#stream{id=StreamID},
   1636 		State=#http2_machine{streams=Streams}) ->
   1637 	State#http2_machine{streams=Streams#{StreamID => Stream}}.
   1638 
   1639 %% @todo Don't send an RST_STREAM if one was already sent.
   1640 stream_reset(StreamID, State, Reason, HumanReadable) ->
   1641 	{error, {stream_error, StreamID, Reason, HumanReadable},
   1642 		stream_linger(StreamID, State)}.
   1643 
   1644 stream_linger(StreamID, State=#http2_machine{local_lingering_streams=Lingering0}) ->
   1645 	%% We only keep up to 100 streams in this state. @todo Make it configurable?
   1646 	Lingering = [StreamID|lists:sublist(Lingering0, 100 - 1)],
   1647 	State#http2_machine{local_lingering_streams=Lingering}.