zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

cowboy_http2.erl (50062B)


      1 %% Copyright (c) 2015-2017, Loïc Hoguin <essen@ninenines.eu>
      2 %%
      3 %% Permission to use, copy, modify, and/or distribute this software for any
      4 %% purpose with or without fee is hereby granted, provided that the above
      5 %% copyright notice and this permission notice appear in all copies.
      6 %%
      7 %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      8 %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
      9 %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     10 %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     11 %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     12 %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     13 %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     14 
     15 -module(cowboy_http2).
     16 
     17 -export([init/6]).
     18 -export([init/10]).
     19 -export([init/12]).
     20 
     21 -export([system_continue/3]).
     22 -export([system_terminate/4]).
     23 -export([system_code_change/4]).
     24 
     25 -type opts() :: #{
     26 	active_n => pos_integer(),
     27 	compress_buffering => boolean(),
     28 	compress_threshold => non_neg_integer(),
     29 	connection_type => worker | supervisor,
     30 	connection_window_margin_size => 0..16#7fffffff,
     31 	connection_window_update_threshold => 0..16#7fffffff,
     32 	enable_connect_protocol => boolean(),
     33 	env => cowboy_middleware:env(),
     34 	goaway_initial_timeout => timeout(),
     35 	goaway_complete_timeout => timeout(),
     36 	idle_timeout => timeout(),
     37 	inactivity_timeout => timeout(),
     38 	initial_connection_window_size => 65535..16#7fffffff,
     39 	initial_stream_window_size => 0..16#7fffffff,
     40 	linger_timeout => timeout(),
     41 	logger => module(),
     42 	max_concurrent_streams => non_neg_integer() | infinity,
     43 	max_connection_buffer_size => non_neg_integer(),
     44 	max_connection_window_size => 0..16#7fffffff,
     45 	max_decode_table_size => non_neg_integer(),
     46 	max_encode_table_size => non_neg_integer(),
     47 	max_frame_size_received => 16384..16777215,
     48 	max_frame_size_sent => 16384..16777215 | infinity,
     49 	max_received_frame_rate => {pos_integer(), timeout()},
     50 	max_reset_stream_rate => {pos_integer(), timeout()},
     51 	max_stream_buffer_size => non_neg_integer(),
     52 	max_stream_window_size => 0..16#7fffffff,
     53 	metrics_callback => cowboy_metrics_h:metrics_callback(),
     54 	metrics_req_filter => fun((cowboy_req:req()) -> map()),
     55 	metrics_resp_headers_filter => fun((cowboy:http_headers()) -> cowboy:http_headers()),
     56 	middlewares => [module()],
     57 	preface_timeout => timeout(),
     58 	proxy_header => boolean(),
     59 	sendfile => boolean(),
     60 	settings_timeout => timeout(),
     61 	shutdown_timeout => timeout(),
     62 	stream_handlers => [module()],
     63 	stream_window_data_threshold => 0..16#7fffffff,
     64 	stream_window_margin_size => 0..16#7fffffff,
     65 	stream_window_update_threshold => 0..16#7fffffff,
     66 	tracer_callback => cowboy_tracer_h:tracer_callback(),
     67 	tracer_flags => [atom()],
     68 	tracer_match_specs => cowboy_tracer_h:tracer_match_specs(),
     69 	%% Open ended because configured stream handlers might add options.
     70 	_ => _
     71 }.
     72 -export_type([opts/0]).
     73 
     74 -record(stream, {
     75 	%% Whether the stream is currently stopping.
     76 	status = running :: running | stopping,
     77 
     78 	%% Flow requested for this stream.
     79 	flow = 0 :: non_neg_integer(),
     80 
     81 	%% Stream state.
     82 	state :: {module, any()}
     83 }).
     84 
     85 -record(state, {
     86 	parent = undefined :: pid(),
     87 	ref :: ranch:ref(),
     88 	socket = undefined :: inet:socket(),
     89 	transport :: module(),
     90 	proxy_header :: undefined | ranch_proxy_header:proxy_info(),
     91 	opts = #{} :: opts(),
     92 
     93 	%% Timer for idle_timeout; also used for goaway timers.
     94 	timer = undefined :: undefined | reference(),
     95 
     96 	%% Remote address and port for the connection.
     97 	peer = undefined :: {inet:ip_address(), inet:port_number()},
     98 
     99 	%% Local address and port for the connection.
    100 	sock = undefined :: {inet:ip_address(), inet:port_number()},
    101 
    102 	%% Client certificate (TLS only).
    103 	cert :: undefined | binary(),
    104 
    105 	%% HTTP/2 state machine.
    106 	http2_status :: sequence | settings | upgrade | connected | closing_initiated | closing,
    107 	http2_machine :: cow_http2_machine:http2_machine(),
    108 
    109 	%% HTTP/2 frame rate flood protection.
    110 	frame_rate_num :: undefined | pos_integer(),
    111 	frame_rate_time :: undefined | integer(),
    112 
    113 	%% HTTP/2 reset stream flood protection.
    114 	reset_rate_num :: undefined | pos_integer(),
    115 	reset_rate_time :: undefined | integer(),
    116 
    117 	%% Flow requested for all streams.
    118 	flow = 0 :: non_neg_integer(),
    119 
    120 	%% Currently active HTTP/2 streams. Streams may be initiated either
    121 	%% by the client or by the server through PUSH_PROMISE frames.
    122 	streams = #{} :: #{cow_http2:streamid() => #stream{}},
    123 
    124 	%% Streams can spawn zero or more children which are then managed
    125 	%% by this module if operating as a supervisor.
    126 	children = cowboy_children:init() :: cowboy_children:children()
    127 }).
    128 
    129 -spec init(pid(), ranch:ref(), inet:socket(), module(),
    130 	ranch_proxy_header:proxy_info() | undefined, cowboy:opts()) -> ok.
    131 init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
    132 	Peer0 = Transport:peername(Socket),
    133 	Sock0 = Transport:sockname(Socket),
    134 	Cert1 = case Transport:name() of
    135 		ssl ->
    136 			case ssl:peercert(Socket) of
    137 				{error, no_peercert} ->
    138 					{ok, undefined};
    139 				Cert0 ->
    140 					Cert0
    141 			end;
    142 		_ ->
    143 			{ok, undefined}
    144 	end,
    145 	case {Peer0, Sock0, Cert1} of
    146 		{{ok, Peer}, {ok, Sock}, {ok, Cert}} ->
    147 			init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, <<>>);
    148 		{{error, Reason}, _, _} ->
    149 			terminate(undefined, {socket_error, Reason,
    150 				'A socket error occurred when retrieving the peer name.'});
    151 		{_, {error, Reason}, _} ->
    152 			terminate(undefined, {socket_error, Reason,
    153 				'A socket error occurred when retrieving the sock name.'});
    154 		{_, _, {error, Reason}} ->
    155 			terminate(undefined, {socket_error, Reason,
    156 				'A socket error occurred when retrieving the client TLS certificate.'})
    157 	end.
    158 
    159 -spec init(pid(), ranch:ref(), inet:socket(), module(),
    160 	ranch_proxy_header:proxy_info() | undefined, cowboy:opts(),
    161 	{inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()},
    162 	binary() | undefined, binary()) -> ok.
    163 init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer) ->
    164 	{ok, Preface, HTTP2Machine} = cow_http2_machine:init(server, Opts),
    165 	State = set_idle_timeout(init_rate_limiting(#state{parent=Parent, ref=Ref, socket=Socket,
    166 		transport=Transport, proxy_header=ProxyHeader,
    167 		opts=Opts, peer=Peer, sock=Sock, cert=Cert,
    168 		http2_status=sequence, http2_machine=HTTP2Machine})),
    169 	Transport:send(Socket, Preface),
    170 	setopts_active(State),
    171 	case Buffer of
    172 		<<>> -> loop(State, Buffer);
    173 		_ -> parse(State, Buffer)
    174 	end.
    175 
    176 init_rate_limiting(State) ->
    177 	CurrentTime = erlang:monotonic_time(millisecond),
    178 	init_reset_rate_limiting(init_frame_rate_limiting(State, CurrentTime), CurrentTime).
    179 
    180 init_frame_rate_limiting(State=#state{opts=Opts}, CurrentTime) ->
    181 	{FrameRateNum, FrameRatePeriod} = maps:get(max_received_frame_rate, Opts, {10000, 10000}),
    182 	State#state{
    183 		frame_rate_num=FrameRateNum, frame_rate_time=add_period(CurrentTime, FrameRatePeriod)
    184 	}.
    185 
    186 init_reset_rate_limiting(State=#state{opts=Opts}, CurrentTime) ->
    187 	{ResetRateNum, ResetRatePeriod} = maps:get(max_reset_stream_rate, Opts, {10, 10000}),
    188 	State#state{
    189 		reset_rate_num=ResetRateNum, reset_rate_time=add_period(CurrentTime, ResetRatePeriod)
    190 	}.
    191 
    192 add_period(_, infinity) -> infinity;
    193 add_period(Time, Period) -> Time + Period.
    194 
    195 %% @todo Add an argument for the request body.
    196 -spec init(pid(), ranch:ref(), inet:socket(), module(),
    197 	ranch_proxy_header:proxy_info() | undefined, cowboy:opts(),
    198 	{inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()},
    199 	binary() | undefined, binary(), map() | undefined, cowboy_req:req()) -> ok.
    200 init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer,
    201 		_Settings, Req=#{method := Method}) ->
    202 	{ok, Preface, HTTP2Machine0} = cow_http2_machine:init(server, Opts),
    203 	{ok, StreamID, HTTP2Machine}
    204 		= cow_http2_machine:init_upgrade_stream(Method, HTTP2Machine0),
    205 	State0 = #state{parent=Parent, ref=Ref, socket=Socket,
    206 		transport=Transport, proxy_header=ProxyHeader,
    207 		opts=Opts, peer=Peer, sock=Sock, cert=Cert,
    208 		http2_status=upgrade, http2_machine=HTTP2Machine},
    209 	State1 = headers_frame(State0#state{
    210 		http2_machine=HTTP2Machine}, StreamID, Req),
    211 	%% We assume that the upgrade will be applied. A stream handler
    212 	%% must not prevent the normal operations of the server.
    213 	State2 = info(State1, 1, {switch_protocol, #{
    214 		<<"connection">> => <<"Upgrade">>,
    215 		<<"upgrade">> => <<"h2c">>
    216 	}, ?MODULE, undefined}), %% @todo undefined or #{}?
    217 	State = set_idle_timeout(init_rate_limiting(State2#state{http2_status=sequence})),
    218 	Transport:send(Socket, Preface),
    219 	setopts_active(State),
    220 	case Buffer of
    221 		<<>> -> loop(State, Buffer);
    222 		_ -> parse(State, Buffer)
    223 	end.
    224 
    225 %% Because HTTP/2 has flow control and Cowboy has other rate limiting
    226 %% mechanisms implemented, a very large active_n value should be fine,
    227 %% as long as the stream handlers do their work in a timely manner.
    228 setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
    229 	N = maps:get(active_n, Opts, 100),
    230 	Transport:setopts(Socket, [{active, N}]).
    231 
    232 loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
    233 		opts=Opts, timer=TimerRef, children=Children}, Buffer) ->
    234 	Messages = Transport:messages(),
    235 	InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000),
    236 	receive
    237 		%% Socket messages.
    238 		{OK, Socket, Data} when OK =:= element(1, Messages) ->
    239 			parse(set_idle_timeout(State), << Buffer/binary, Data/binary >>);
    240 		{Closed, Socket} when Closed =:= element(2, Messages) ->
    241 			Reason = case State#state.http2_status of
    242 				closing -> {stop, closed, 'The client is going away.'};
    243 				_ -> {socket_error, closed, 'The socket has been closed.'}
    244 			end,
    245 			terminate(State, Reason);
    246 		{Error, Socket, Reason} when Error =:= element(3, Messages) ->
    247 			terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
    248 		{Passive, Socket} when Passive =:= element(4, Messages);
    249 				%% Hardcoded for compatibility with Ranch 1.x.
    250 				Passive =:= tcp_passive; Passive =:= ssl_passive ->
    251 			setopts_active(State),
    252 			loop(State, Buffer);
    253 		%% System messages.
    254 		{'EXIT', Parent, shutdown} ->
    255 			Reason = {stop, {exit, shutdown}, 'Parent process requested shutdown.'},
    256 			loop(initiate_closing(State, Reason), Buffer);
    257 		{'EXIT', Parent, Reason} ->
    258 			terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'});
    259 		{system, From, Request} ->
    260 			sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer});
    261 		%% Timeouts.
    262 		{timeout, TimerRef, idle_timeout} ->
    263 			terminate(State, {stop, timeout,
    264 				'Connection idle longer than configuration allows.'});
    265 		{timeout, Ref, {shutdown, Pid}} ->
    266 			cowboy_children:shutdown_timeout(Children, Ref, Pid),
    267 			loop(State, Buffer);
    268 		{timeout, TRef, {cow_http2_machine, Name}} ->
    269 			loop(timeout(State, Name, TRef), Buffer);
    270 		{timeout, TimerRef, {goaway_initial_timeout, Reason}} ->
    271 			loop(closing(State, Reason), Buffer);
    272 		{timeout, TimerRef, {goaway_complete_timeout, Reason}} ->
    273 			terminate(State, {stop, stop_reason(Reason),
    274 				'Graceful shutdown timed out.'});
    275 		%% Messages pertaining to a stream.
    276 		{{Pid, StreamID}, Msg} when Pid =:= self() ->
    277 			loop(info(State, StreamID, Msg), Buffer);
    278 		%% Exit signal from children.
    279 		Msg = {'EXIT', Pid, _} ->
    280 			loop(down(State, Pid, Msg), Buffer);
    281 		%% Calls from supervisor module.
    282 		{'$gen_call', From, Call} ->
    283 			cowboy_children:handle_supervisor_call(Call, From, Children, ?MODULE),
    284 			loop(State, Buffer);
    285 		Msg ->
    286 			cowboy:log(warning, "Received stray message ~p.", [Msg], Opts),
    287 			loop(State, Buffer)
    288 	after InactivityTimeout ->
    289 		terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
    290 	end.
    291 
    292 set_idle_timeout(State=#state{http2_status=Status, timer=TimerRef})
    293 		when Status =:= closing_initiated orelse Status =:= closing,
    294 			TimerRef =/= undefined ->
    295 	State;
    296 set_idle_timeout(State=#state{opts=Opts}) ->
    297 	set_timeout(State, maps:get(idle_timeout, Opts, 60000), idle_timeout).
    298 
    299 set_timeout(State=#state{timer=TimerRef0}, Timeout, Message) ->
    300 	ok = case TimerRef0 of
    301 		undefined -> ok;
    302 		_ -> erlang:cancel_timer(TimerRef0, [{async, true}, {info, false}])
    303 	end,
    304 	TimerRef = case Timeout of
    305 		infinity -> undefined;
    306 		Timeout -> erlang:start_timer(Timeout, self(), Message)
    307 	end,
    308 	State#state{timer=TimerRef}.
    309 
    310 %% HTTP/2 protocol parsing.
    311 
    312 parse(State=#state{http2_status=sequence}, Data) ->
    313 	case cow_http2:parse_sequence(Data) of
    314 		{ok, Rest} ->
    315 			parse(State#state{http2_status=settings}, Rest);
    316 		more ->
    317 			loop(State, Data);
    318 		Error = {connection_error, _, _} ->
    319 			terminate(State, Error)
    320 	end;
    321 parse(State=#state{http2_status=Status, http2_machine=HTTP2Machine, streams=Streams}, Data) ->
    322 	MaxFrameSize = cow_http2_machine:get_local_setting(max_frame_size, HTTP2Machine),
    323 	case cow_http2:parse(Data, MaxFrameSize) of
    324 		{ok, Frame, Rest} ->
    325 			parse(frame_rate(State, Frame), Rest);
    326 		{ignore, Rest} ->
    327 			parse(frame_rate(State, ignore), Rest);
    328 		{stream_error, StreamID, Reason, Human, Rest} ->
    329 			parse(reset_stream(State, StreamID, {stream_error, Reason, Human}), Rest);
    330 		Error = {connection_error, _, _} ->
    331 			terminate(State, Error);
    332 		%% Terminate the connection if we are closing and all streams have completed.
    333 		more when Status =:= closing, Streams =:= #{} ->
    334 			terminate(State, {stop, normal, 'The connection is going away.'});
    335 		more ->
    336 			loop(State, Data)
    337 	end.
    338 
    339 %% Frame rate flood protection.
    340 
    341 frame_rate(State0=#state{frame_rate_num=Num0, frame_rate_time=Time}, Frame) ->
    342 	{Result, State} = case Num0 - 1 of
    343 		0 ->
    344 			CurrentTime = erlang:monotonic_time(millisecond),
    345 			if
    346 				CurrentTime < Time ->
    347 					{error, State0};
    348 				true ->
    349 					%% When the option has a period of infinity we cannot reach this clause.
    350 					{ok, init_frame_rate_limiting(State0, CurrentTime)}
    351 			end;
    352 		Num ->
    353 			{ok, State0#state{frame_rate_num=Num}}
    354 	end,
    355 	case {Result, Frame} of
    356 		{ok, ignore} -> ignored_frame(State);
    357 		{ok, _} -> frame(State, Frame);
    358 		{error, _} -> terminate(State, {connection_error, enhance_your_calm,
    359 			'Frame rate larger than configuration allows. Flood? (CVE-2019-9512, CVE-2019-9515, CVE-2019-9518)'})
    360 	end.
    361 
    362 %% Frames received.
    363 
    364 %% We do nothing when receiving a lingering DATA frame.
    365 %% We already removed the stream flow from the connection
    366 %% flow and are therefore already accounting for the window
    367 %% being reduced by these frames.
    368 frame(State=#state{http2_machine=HTTP2Machine0}, Frame) ->
    369 	case cow_http2_machine:frame(Frame, HTTP2Machine0) of
    370 		{ok, HTTP2Machine} ->
    371 			maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame);
    372 		{ok, {data, StreamID, IsFin, Data}, HTTP2Machine} ->
    373 			data_frame(State#state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data);
    374 		{ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP2Machine} ->
    375 			headers_frame(State#state{http2_machine=HTTP2Machine},
    376 				StreamID, IsFin, Headers, PseudoHeaders, BodyLen);
    377 		{ok, {trailers, _StreamID, _Trailers}, HTTP2Machine} ->
    378 			%% @todo Propagate trailers.
    379 			State#state{http2_machine=HTTP2Machine};
    380 		{ok, {rst_stream, StreamID, Reason}, HTTP2Machine} ->
    381 			rst_stream_frame(State#state{http2_machine=HTTP2Machine}, StreamID, Reason);
    382 		{ok, GoAway={goaway, _, _, _}, HTTP2Machine} ->
    383 			goaway(State#state{http2_machine=HTTP2Machine}, GoAway);
    384 		{send, SendData, HTTP2Machine} ->
    385 			%% We may need to send an alarm for each of the streams sending data.
    386 			lists:foldl(
    387 				fun({StreamID, _, _}, S) -> maybe_send_data_alarm(S, HTTP2Machine0, StreamID) end,
    388 				send_data(maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame), SendData, []),
    389 				SendData);
    390 		{error, {stream_error, StreamID, Reason, Human}, HTTP2Machine} ->
    391 			reset_stream(State#state{http2_machine=HTTP2Machine},
    392 				StreamID, {stream_error, Reason, Human});
    393 		{error, Error={connection_error, _, _}, HTTP2Machine} ->
    394 			terminate(State#state{http2_machine=HTTP2Machine}, Error)
    395 	end.
    396 
    397 %% We use this opportunity to mark the HTTP/2 status as connected
    398 %% if we were still waiting for a SETTINGS frame.
    399 maybe_ack(State=#state{http2_status=settings}, Frame) ->
    400 	maybe_ack(State#state{http2_status=connected}, Frame);
    401 maybe_ack(State=#state{socket=Socket, transport=Transport}, Frame) ->
    402 	case Frame of
    403 		{settings, _} -> Transport:send(Socket, cow_http2:settings_ack());
    404 		{ping, Opaque} -> Transport:send(Socket, cow_http2:ping_ack(Opaque));
    405 		_ -> ok
    406 	end,
    407 	State.
    408 
    409 data_frame(State0=#state{opts=Opts, flow=Flow, streams=Streams}, StreamID, IsFin, Data) ->
    410 	case Streams of
    411 		#{StreamID := Stream=#stream{status=running, flow=StreamFlow, state=StreamState0}} ->
    412 			try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
    413 				{Commands, StreamState} ->
    414 					%% Remove the amount of data received from the flow.
    415 					%% We may receive more data than we requested. We ensure
    416 					%% that the flow value doesn't go lower than 0.
    417 					Size = byte_size(Data),
    418 					State = update_window(State0#state{flow=max(0, Flow - Size),
    419 						streams=Streams#{StreamID => Stream#stream{
    420 							flow=max(0, StreamFlow - Size), state=StreamState}}},
    421 						StreamID),
    422 					commands(State, StreamID, Commands)
    423 			catch Class:Exception:Stacktrace ->
    424 				cowboy:log(cowboy_stream:make_error_log(data,
    425 					[StreamID, IsFin, Data, StreamState0],
    426 					Class, Exception, Stacktrace), Opts),
    427 				reset_stream(State0, StreamID, {internal_error, {Class, Exception},
    428 					'Unhandled exception in cowboy_stream:data/4.'})
    429 			end;
    430 		%% We ignore DATA frames for streams that are stopping.
    431 		#{} ->
    432 			State0
    433 	end.
    434 
    435 headers_frame(State, StreamID, IsFin, Headers,
    436 		PseudoHeaders=#{method := <<"CONNECT">>}, _)
    437 		when map_size(PseudoHeaders) =:= 2 ->
    438 	early_error(State, StreamID, IsFin, Headers, PseudoHeaders, 501,
    439 		'The CONNECT method is currently not implemented. (RFC7231 4.3.6)');
    440 headers_frame(State, StreamID, IsFin, Headers,
    441 		PseudoHeaders=#{method := <<"TRACE">>}, _) ->
    442 	early_error(State, StreamID, IsFin, Headers, PseudoHeaders, 501,
    443 		'The TRACE method is currently not implemented. (RFC7231 4.3.8)');
    444 headers_frame(State, StreamID, IsFin, Headers, PseudoHeaders=#{authority := Authority}, BodyLen) ->
    445 	headers_frame_parse_host(State, StreamID, IsFin, Headers, PseudoHeaders, BodyLen, Authority);
    446 headers_frame(State, StreamID, IsFin, Headers, PseudoHeaders, BodyLen) ->
    447 	case lists:keyfind(<<"host">>, 1, Headers) of
    448 		{_, Authority} ->
    449 			headers_frame_parse_host(State, StreamID, IsFin, Headers, PseudoHeaders, BodyLen, Authority);
    450 		_ ->
    451 			reset_stream(State, StreamID, {stream_error, protocol_error,
    452 				'Requests translated from HTTP/1.1 must include a host header. (RFC7540 8.1.2.3, RFC7230 5.4)'})
    453 	end.
    454 
    455 headers_frame_parse_host(State=#state{ref=Ref, peer=Peer, sock=Sock, cert=Cert, proxy_header=ProxyHeader},
    456 		StreamID, IsFin, Headers, PseudoHeaders=#{method := Method, scheme := Scheme, path := PathWithQs},
    457 		BodyLen, Authority) ->
    458 	try cow_http_hd:parse_host(Authority) of
    459 		{Host, Port0} ->
    460 			Port = ensure_port(Scheme, Port0),
    461 			try cow_http:parse_fullpath(PathWithQs) of
    462 				{<<>>, _} ->
    463 					reset_stream(State, StreamID, {stream_error, protocol_error,
    464 						'The path component must not be empty. (RFC7540 8.1.2.3)'});
    465 				{Path, Qs} ->
    466 					Req0 = #{
    467 						ref => Ref,
    468 						pid => self(),
    469 						streamid => StreamID,
    470 						peer => Peer,
    471 						sock => Sock,
    472 						cert => Cert,
    473 						method => Method,
    474 						scheme => Scheme,
    475 						host => Host,
    476 						port => Port,
    477 						path => Path,
    478 						qs => Qs,
    479 						version => 'HTTP/2',
    480 						headers => headers_to_map(Headers, #{}),
    481 						has_body => IsFin =:= nofin,
    482 						body_length => BodyLen
    483 					},
    484 					%% We add the PROXY header information if any.
    485 					Req1 = case ProxyHeader of
    486 						undefined -> Req0;
    487 						_ -> Req0#{proxy_header => ProxyHeader}
    488 					end,
    489 					%% We add the protocol information for extended CONNECTs.
    490 					Req = case PseudoHeaders of
    491 						#{protocol := Protocol} -> Req1#{protocol => Protocol};
    492 						_ -> Req1
    493 					end,
    494 					headers_frame(State, StreamID, Req)
    495 			catch _:_ ->
    496 				reset_stream(State, StreamID, {stream_error, protocol_error,
    497 					'The :path pseudo-header is invalid. (RFC7540 8.1.2.3)'})
    498 			end
    499 	catch _:_ ->
    500 		reset_stream(State, StreamID, {stream_error, protocol_error,
    501 			'The :authority pseudo-header is invalid. (RFC7540 8.1.2.3)'})
    502 	end.
    503 
    504 ensure_port(<<"http">>, undefined) -> 80;
    505 ensure_port(<<"https">>, undefined) -> 443;
    506 ensure_port(_, Port) -> Port.
    507 
    508 %% This function is necessary to properly handle duplicate headers
    509 %% and the special-case cookie header.
    510 headers_to_map([], Acc) ->
    511 	Acc;
    512 headers_to_map([{Name, Value}|Tail], Acc0) ->
    513 	Acc = case Acc0 of
    514 		%% The cookie header does not use proper HTTP header lists.
    515 		#{Name := Value0} when Name =:= <<"cookie">> ->
    516 			Acc0#{Name => << Value0/binary, "; ", Value/binary >>};
    517 		#{Name := Value0} ->
    518 			Acc0#{Name => << Value0/binary, ", ", Value/binary >>};
    519 		_ ->
    520 			Acc0#{Name => Value}
    521 	end,
    522 	headers_to_map(Tail, Acc).
    523 
    524 headers_frame(State=#state{opts=Opts, streams=Streams}, StreamID, Req) ->
    525 	try cowboy_stream:init(StreamID, Req, Opts) of
    526 		{Commands, StreamState} ->
    527 			commands(State#state{
    528 				streams=Streams#{StreamID => #stream{state=StreamState}}},
    529 				StreamID, Commands)
    530 	catch Class:Exception:Stacktrace ->
    531 		cowboy:log(cowboy_stream:make_error_log(init,
    532 			[StreamID, Req, Opts],
    533 			Class, Exception, Stacktrace), Opts),
    534 		reset_stream(State, StreamID, {internal_error, {Class, Exception},
    535 			'Unhandled exception in cowboy_stream:init/3.'})
    536 	end.
    537 
    538 early_error(State0=#state{ref=Ref, opts=Opts, peer=Peer},
    539 		StreamID, _IsFin, Headers, #{method := Method},
    540 		StatusCode0, HumanReadable) ->
    541 	%% We automatically terminate the stream but it is not an error
    542 	%% per se (at least not in the first implementation).
    543 	Reason = {stream_error, no_error, HumanReadable},
    544 	%% The partial Req is minimal for now. We only have one case
    545 	%% where it can be called (when a method is completely disabled).
    546 	%% @todo Fill in the other elements.
    547 	PartialReq = #{
    548 		ref => Ref,
    549 		peer => Peer,
    550 		method => Method,
    551 		headers => headers_to_map(Headers, #{})
    552 	},
    553 	Resp = {response, StatusCode0, RespHeaders0=#{<<"content-length">> => <<"0">>}, <<>>},
    554 	try cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts) of
    555 		{response, StatusCode, RespHeaders, RespBody} ->
    556 			send_response(State0, StreamID, StatusCode, RespHeaders, RespBody)
    557 	catch Class:Exception:Stacktrace ->
    558 		cowboy:log(cowboy_stream:make_error_log(early_error,
    559 			[StreamID, Reason, PartialReq, Resp, Opts],
    560 			Class, Exception, Stacktrace), Opts),
    561 		%% We still need to send an error response, so send what we initially
    562 		%% wanted to send. It's better than nothing.
    563 		send_headers(State0, StreamID, fin, StatusCode0, RespHeaders0)
    564 	end.
    565 
    566 rst_stream_frame(State=#state{streams=Streams0, children=Children0}, StreamID, Reason) ->
    567 	case maps:take(StreamID, Streams0) of
    568 		{#stream{state=StreamState}, Streams} ->
    569 			terminate_stream_handler(State, StreamID, Reason, StreamState),
    570 			Children = cowboy_children:shutdown(Children0, StreamID),
    571 			State#state{streams=Streams, children=Children};
    572 		error ->
    573 			State
    574 	end.
    575 
    576 ignored_frame(State=#state{http2_machine=HTTP2Machine0}) ->
    577 	case cow_http2_machine:ignored_frame(HTTP2Machine0) of
    578 		{ok, HTTP2Machine} ->
    579 			State#state{http2_machine=HTTP2Machine};
    580 		{error, Error={connection_error, _, _}, HTTP2Machine} ->
    581 			terminate(State#state{http2_machine=HTTP2Machine}, Error)
    582 	end.
    583 
    584 %% HTTP/2 timeouts.
    585 
    586 timeout(State=#state{http2_machine=HTTP2Machine0}, Name, TRef) ->
    587 	case cow_http2_machine:timeout(Name, TRef, HTTP2Machine0) of
    588 		{ok, HTTP2Machine} ->
    589 			State#state{http2_machine=HTTP2Machine};
    590 		{error, Error={connection_error, _, _}, HTTP2Machine} ->
    591 			terminate(State#state{http2_machine=HTTP2Machine}, Error)
    592 	end.
    593 
    594 %% Erlang messages.
    595 
    596 down(State0=#state{opts=Opts, children=Children0}, Pid, Msg) ->
    597 	State = case cowboy_children:down(Children0, Pid) of
    598 		%% The stream was terminated already.
    599 		{ok, undefined, Children} ->
    600 			State0#state{children=Children};
    601 		%% The stream is still running.
    602 		{ok, StreamID, Children} ->
    603 			info(State0#state{children=Children}, StreamID, Msg);
    604 		%% The process was unknown.
    605 		error ->
    606 			cowboy:log(warning, "Received EXIT signal ~p for unknown process ~p.~n",
    607 				[Msg, Pid], Opts),
    608 			State0
    609 	end,
    610 	if
    611 		State#state.http2_status =:= closing, State#state.streams =:= #{} ->
    612 			terminate(State, {stop, normal, 'The connection is going away.'});
    613 		true ->
    614 			State
    615 	end.
    616 
    617 info(State=#state{opts=Opts, http2_machine=HTTP2Machine, streams=Streams}, StreamID, Msg) ->
    618 	case Streams of
    619 		#{StreamID := Stream=#stream{state=StreamState0}} ->
    620 			try cowboy_stream:info(StreamID, Msg, StreamState0) of
    621 				{Commands, StreamState} ->
    622 					commands(State#state{streams=Streams#{StreamID => Stream#stream{state=StreamState}}},
    623 						StreamID, Commands)
    624 			catch Class:Exception:Stacktrace ->
    625 				cowboy:log(cowboy_stream:make_error_log(info,
    626 					[StreamID, Msg, StreamState0],
    627 					Class, Exception, Stacktrace), Opts),
    628 				reset_stream(State, StreamID, {internal_error, {Class, Exception},
    629 					'Unhandled exception in cowboy_stream:info/3.'})
    630 			end;
    631 		_ ->
    632 			case cow_http2_machine:is_lingering_stream(StreamID, HTTP2Machine) of
    633 				true ->
    634 					ok;
    635 				false ->
    636 					cowboy:log(warning, "Received message ~p for unknown stream ~p.",
    637 						[Msg, StreamID], Opts)
    638 			end,
    639 			State
    640 	end.
    641 
    642 %% Stream handler commands.
    643 %%
    644 %% @todo Kill the stream if it tries to send a response, headers,
    645 %% data or push promise when the stream is closed or half-closed.
    646 
    647 commands(State, _, []) ->
    648 	State;
    649 %% Error responses are sent only if a response wasn't sent already.
    650 commands(State=#state{http2_machine=HTTP2Machine}, StreamID,
    651 		[{error_response, StatusCode, Headers, Body}|Tail]) ->
    652 	case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine) of
    653 		{ok, idle, _} ->
    654 			commands(State, StreamID, [{response, StatusCode, Headers, Body}|Tail]);
    655 		_ ->
    656 			commands(State, StreamID, Tail)
    657 	end;
    658 %% Send an informational response.
    659 commands(State0, StreamID, [{inform, StatusCode, Headers}|Tail]) ->
    660 	State = send_headers(State0, StreamID, idle, StatusCode, Headers),
    661 	commands(State, StreamID, Tail);
    662 %% Send response headers.
    663 commands(State0, StreamID, [{response, StatusCode, Headers, Body}|Tail]) ->
    664 	State = send_response(State0, StreamID, StatusCode, Headers, Body),
    665 	commands(State, StreamID, Tail);
    666 %% Send response headers.
    667 commands(State0, StreamID, [{headers, StatusCode, Headers}|Tail]) ->
    668 	State = send_headers(State0, StreamID, nofin, StatusCode, Headers),
    669 	commands(State, StreamID, Tail);
    670 %% Send a response body chunk.
    671 commands(State0, StreamID, [{data, IsFin, Data}|Tail]) ->
    672 	State = maybe_send_data(State0, StreamID, IsFin, Data, []),
    673 	commands(State, StreamID, Tail);
    674 %% Send trailers.
    675 commands(State0, StreamID, [{trailers, Trailers}|Tail]) ->
    676 	State = maybe_send_data(State0, StreamID, fin, {trailers, maps:to_list(Trailers)}, []),
    677 	commands(State, StreamID, Tail);
    678 %% Send a push promise.
    679 %%
    680 %% @todo Responses sent as a result of a push_promise request
    681 %% must not send push_promise frames themselves.
    682 %%
    683 %% @todo We should not send push_promise frames when we are
    684 %% in the closing http2_status.
    685 commands(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0},
    686 		StreamID, [{push, Method, Scheme, Host, Port, Path, Qs, Headers0}|Tail]) ->
    687 	Authority = case {Scheme, Port} of
    688 		{<<"http">>, 80} -> Host;
    689 		{<<"https">>, 443} -> Host;
    690 		_ -> iolist_to_binary([Host, $:, integer_to_binary(Port)])
    691 	end,
    692 	PathWithQs = iolist_to_binary(case Qs of
    693 		<<>> -> Path;
    694 		_ -> [Path, $?, Qs]
    695 	end),
    696 	PseudoHeaders = #{
    697 		method => Method,
    698 		scheme => Scheme,
    699 		authority => Authority,
    700 		path => PathWithQs
    701 	},
    702 	%% We need to make sure the header value is binary before we can
    703 	%% create the Req object, as it expects them to be flat.
    704 	Headers = maps:to_list(maps:map(fun(_, V) -> iolist_to_binary(V) end, Headers0)),
    705 	State = case cow_http2_machine:prepare_push_promise(StreamID, HTTP2Machine0,
    706 			PseudoHeaders, Headers) of
    707 		{ok, PromisedStreamID, HeaderBlock, HTTP2Machine} ->
    708 			Transport:send(Socket, cow_http2:push_promise(
    709 				StreamID, PromisedStreamID, HeaderBlock)),
    710 			headers_frame(State0#state{http2_machine=HTTP2Machine},
    711 				PromisedStreamID, fin, Headers, PseudoHeaders, 0);
    712 		{error, no_push} ->
    713 			State0
    714 	end,
    715 	commands(State, StreamID, Tail);
    716 %% Read the request body.
    717 commands(State0=#state{flow=Flow, streams=Streams}, StreamID, [{flow, Size}|Tail]) ->
    718 	#{StreamID := Stream=#stream{flow=StreamFlow}} = Streams,
    719 	State = update_window(State0#state{flow=Flow + Size,
    720 		streams=Streams#{StreamID => Stream#stream{flow=StreamFlow + Size}}},
    721 		StreamID),
    722 	commands(State, StreamID, Tail);
    723 %% Supervise a child process.
    724 commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) ->
    725 	 commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)},
    726 		StreamID, Tail);
    727 %% Error handling.
    728 commands(State, StreamID, [Error = {internal_error, _, _}|_Tail]) ->
    729 	%% @todo Do we want to run the commands after an internal_error?
    730 	%% @todo Do we even allow commands after?
    731 	%% @todo Only reset when the stream still exists.
    732 	reset_stream(State, StreamID, Error);
    733 %% Upgrade to HTTP/2. This is triggered by cowboy_http2 itself.
    734 commands(State=#state{socket=Socket, transport=Transport, http2_status=upgrade},
    735 		StreamID, [{switch_protocol, Headers, ?MODULE, _}|Tail]) ->
    736 	%% @todo This 101 response needs to be passed through stream handlers.
    737 	Transport:send(Socket, cow_http:response(101, 'HTTP/1.1', maps:to_list(Headers))),
    738 	commands(State, StreamID, Tail);
    739 %% Use a different protocol within the stream (CONNECT :protocol).
    740 %% @todo Make sure we error out when the feature is disabled.
    741 commands(State0, StreamID, [{switch_protocol, Headers, _Mod, _ModState}|Tail]) ->
    742 	State = info(State0, StreamID, {headers, 200, Headers}),
    743 	commands(State, StreamID, Tail);
    744 %% Set options dynamically.
    745 commands(State, StreamID, [{set_options, _Opts}|Tail]) ->
    746 	commands(State, StreamID, Tail);
    747 commands(State, StreamID, [stop|_Tail]) ->
    748 	%% @todo Do we want to run the commands after a stop?
    749 	%% @todo Do we even allow commands after?
    750 	stop_stream(State, StreamID);
    751 %% Log event.
    752 commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) ->
    753 	cowboy:log(Log, Opts),
    754 	commands(State, StreamID, Tail).
    755 
    756 %% Tentatively update the window after the flow was updated.
    757 
    758 update_window(State=#state{socket=Socket, transport=Transport,
    759 		http2_machine=HTTP2Machine0, flow=Flow, streams=Streams}, StreamID) ->
    760 	#{StreamID := #stream{flow=StreamFlow}} = Streams,
    761 	{Data1, HTTP2Machine2} = case cow_http2_machine:ensure_window(Flow, HTTP2Machine0) of
    762 		ok -> {<<>>, HTTP2Machine0};
    763 		{ok, Increment1, HTTP2Machine1} -> {cow_http2:window_update(Increment1), HTTP2Machine1}
    764 	end,
    765 	{Data2, HTTP2Machine} = case cow_http2_machine:ensure_window(StreamID, StreamFlow, HTTP2Machine2) of
    766 		ok -> {<<>>, HTTP2Machine2};
    767 		{ok, Increment2, HTTP2Machine3} -> {cow_http2:window_update(StreamID, Increment2), HTTP2Machine3}
    768 	end,
    769 	case {Data1, Data2} of
    770 		{<<>>, <<>>} -> ok;
    771 		_ -> Transport:send(Socket, [Data1, Data2])
    772 	end,
    773 	State#state{http2_machine=HTTP2Machine}.
    774 
    775 %% Send the response, trailers or data.
    776 
    777 send_response(State0=#state{http2_machine=HTTP2Machine0}, StreamID, StatusCode, Headers, Body) ->
    778 	Size = case Body of
    779 		{sendfile, _, Bytes, _} -> Bytes;
    780 		_ -> iolist_size(Body)
    781 	end,
    782 	case Size of
    783 		0 ->
    784 			State = send_headers(State0, StreamID, fin, StatusCode, Headers),
    785 			maybe_terminate_stream(State, StreamID, fin);
    786 		_ ->
    787 			%% @todo Add a test for HEAD to make sure we don't send the body when
    788 			%% returning {response...} from a stream handler (or {headers...} then {data...}).
    789 			{ok, _IsFin, HeaderBlock, HTTP2Machine}
    790 				= cow_http2_machine:prepare_headers(StreamID, HTTP2Machine0, nofin,
    791 					#{status => cow_http:status_to_integer(StatusCode)},
    792 					headers_to_list(Headers)),
    793 			maybe_send_data(State0#state{http2_machine=HTTP2Machine}, StreamID, fin, Body,
    794 				[cow_http2:headers(StreamID, nofin, HeaderBlock)])
    795 	end.
    796 
    797 send_headers(State=#state{socket=Socket, transport=Transport,
    798 		http2_machine=HTTP2Machine0}, StreamID, IsFin0, StatusCode, Headers) ->
    799 	{ok, IsFin, HeaderBlock, HTTP2Machine}
    800 		= cow_http2_machine:prepare_headers(StreamID, HTTP2Machine0, IsFin0,
    801 			#{status => cow_http:status_to_integer(StatusCode)},
    802 			headers_to_list(Headers)),
    803 	Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)),
    804 	State#state{http2_machine=HTTP2Machine}.
    805 
    806 %% The set-cookie header is special; we can only send one cookie per header.
    807 headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) ->
    808 	Headers = maps:to_list(maps:remove(<<"set-cookie">>, Headers0)),
    809 	Headers ++ [{<<"set-cookie">>, Value} || Value <- SetCookies];
    810 headers_to_list(Headers) ->
    811 	maps:to_list(Headers).
    812 
    813 maybe_send_data(State0=#state{socket=Socket, transport=Transport,
    814 		http2_machine=HTTP2Machine0}, StreamID, IsFin, Data0, Prefix) ->
    815 	Data = case is_tuple(Data0) of
    816 		false -> {data, Data0};
    817 		true -> Data0
    818 	end,
    819 	case cow_http2_machine:send_or_queue_data(StreamID, HTTP2Machine0, IsFin, Data) of
    820 		{ok, HTTP2Machine} ->
    821 			%% If we have prefix data (like a HEADERS frame) we need to send it
    822 			%% even if we do not send any DATA frames.
    823 			case Prefix of
    824 				[] -> ok;
    825 				_ -> Transport:send(Socket, Prefix)
    826 			end,
    827 			maybe_send_data_alarm(State0#state{http2_machine=HTTP2Machine}, HTTP2Machine0, StreamID);
    828 		{send, SendData, HTTP2Machine} ->
    829 			State = #state{http2_status=Status, streams=Streams}
    830 				= send_data(State0#state{http2_machine=HTTP2Machine}, SendData, Prefix),
    831 			%% Terminate the connection if we are closing and all streams have completed.
    832 			if
    833 				Status =:= closing, Streams =:= #{} ->
    834 					terminate(State, {stop, normal, 'The connection is going away.'});
    835 				true ->
    836 					maybe_send_data_alarm(State, HTTP2Machine0, StreamID)
    837 			end
    838 	end.
    839 
    840 send_data(State0=#state{socket=Socket, transport=Transport, opts=Opts}, SendData, Prefix) ->
    841 	{Acc, State} = prepare_data(State0, SendData, [], Prefix),
    842 	_ = [case Data of
    843 		{sendfile, Offset, Bytes, Path} ->
    844 			%% When sendfile is disabled we explicitly use the fallback.
    845 			_ = case maps:get(sendfile, Opts, true) of
    846 				true -> Transport:sendfile(Socket, Path, Offset, Bytes);
    847 				false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, [])
    848 			end;
    849 		_ ->
    850 			Transport:send(Socket, Data)
    851 	end || Data <- Acc],
    852 	State.
    853 
    854 prepare_data(State, [], Acc, []) ->
    855 	{lists:reverse(Acc), State};
    856 prepare_data(State, [], Acc, Buffer) ->
    857 	{lists:reverse([lists:reverse(Buffer)|Acc]), State};
    858 prepare_data(State0, [{StreamID, IsFin, SendData}|Tail], Acc0, Buffer0) ->
    859 	{Acc, Buffer, State} = prepare_data(State0, StreamID, IsFin, SendData, Acc0, Buffer0),
    860 	prepare_data(State, Tail, Acc, Buffer).
    861 
    862 prepare_data(State0, StreamID, IsFin, [], Acc, Buffer) ->
    863 	State = maybe_terminate_stream(State0, StreamID, IsFin),
    864 	{Acc, Buffer, State};
    865 prepare_data(State0, StreamID, IsFin, [FrameData|Tail], Acc, Buffer) ->
    866 	FrameIsFin = case Tail of
    867 		[] -> IsFin;
    868 		_ -> nofin
    869 	end,
    870 	case prepare_data_frame(State0, StreamID, FrameIsFin, FrameData) of
    871 		{{MoreData, Sendfile}, State} when is_tuple(Sendfile) ->
    872 			case Buffer of
    873 				[] ->
    874 					prepare_data(State, StreamID, IsFin, Tail,
    875 						[Sendfile, MoreData|Acc], []);
    876 				_ ->
    877 					prepare_data(State, StreamID, IsFin, Tail,
    878 						[Sendfile, lists:reverse([MoreData|Buffer])|Acc], [])
    879 			end;
    880 		{MoreData, State} ->
    881 			prepare_data(State, StreamID, IsFin, Tail,
    882 				Acc, [MoreData|Buffer])
    883 	end.
    884 
    885 prepare_data_frame(State, StreamID, IsFin, {data, Data}) ->
    886 	{cow_http2:data(StreamID, IsFin, Data),
    887 		State};
    888 prepare_data_frame(State, StreamID, IsFin, Sendfile={sendfile, _, Bytes, _}) ->
    889 	{{cow_http2:data_header(StreamID, IsFin, Bytes), Sendfile},
    890 		State};
    891 %% The stream is terminated in cow_http2_machine:prepare_trailers.
    892 prepare_data_frame(State=#state{http2_machine=HTTP2Machine0},
    893 		StreamID, nofin, {trailers, Trailers}) ->
    894 	{ok, HeaderBlock, HTTP2Machine}
    895 		= cow_http2_machine:prepare_trailers(StreamID, HTTP2Machine0, Trailers),
    896 	{cow_http2:headers(StreamID, fin, HeaderBlock),
    897 		State#state{http2_machine=HTTP2Machine}}.
    898 
    899 %% After we have sent or queued data we may need to set or clear an alarm.
    900 %% We do this by comparing the HTTP2Machine buffer state before/after for
    901 %% the relevant streams.
    902 maybe_send_data_alarm(State=#state{opts=Opts, http2_machine=HTTP2Machine}, HTTP2Machine0, StreamID) ->
    903 	ConnBufferSizeBefore = cow_http2_machine:get_connection_local_buffer_size(HTTP2Machine0),
    904 	ConnBufferSizeAfter = cow_http2_machine:get_connection_local_buffer_size(HTTP2Machine),
    905 	{ok, StreamBufferSizeBefore} = cow_http2_machine:get_stream_local_buffer_size(StreamID, HTTP2Machine0),
    906 	%% When the stream ends up closed after it finished sending data,
    907 	%% we do not want to trigger an alarm. We act as if the buffer
    908 	%% size did not change.
    909 	StreamBufferSizeAfter = case cow_http2_machine:get_stream_local_buffer_size(StreamID, HTTP2Machine) of
    910 		{ok, BSA} -> BSA;
    911 		{error, closed} -> StreamBufferSizeBefore
    912 	end,
    913 	MaxConnBufferSize = maps:get(max_connection_buffer_size, Opts, 16000000),
    914 	MaxStreamBufferSize = maps:get(max_stream_buffer_size, Opts, 8000000),
    915 	%% I do not want to document these internal events yet. I am not yet
    916 	%% convinced it should be {alarm, Name, on|off} and not {internal_event, E}
    917 	%% or something else entirely. Though alarms are probably right.
    918 	if
    919 		ConnBufferSizeBefore >= MaxConnBufferSize, ConnBufferSizeAfter < MaxConnBufferSize ->
    920 			connection_alarm(State, connection_buffer_full, off);
    921 		ConnBufferSizeBefore < MaxConnBufferSize, ConnBufferSizeAfter >= MaxConnBufferSize ->
    922 			connection_alarm(State, connection_buffer_full, on);
    923 		StreamBufferSizeBefore >= MaxStreamBufferSize, StreamBufferSizeAfter < MaxStreamBufferSize ->
    924 			stream_alarm(State, StreamID, stream_buffer_full, off);
    925 		StreamBufferSizeBefore < MaxStreamBufferSize, StreamBufferSizeAfter >= MaxStreamBufferSize ->
    926 			stream_alarm(State, StreamID, stream_buffer_full, on);
    927 		true ->
    928 			State
    929 	end.
    930 
    931 connection_alarm(State0=#state{streams=Streams}, Name, Value) ->
    932 	lists:foldl(fun(StreamID, State) ->
    933 		stream_alarm(State, StreamID, Name, Value)
    934 	end, State0, maps:keys(Streams)).
    935 
    936 stream_alarm(State, StreamID, Name, Value) ->
    937 	info(State, StreamID, {alarm, Name, Value}).
    938 
    939 %% Terminate a stream or the connection.
    940 
    941 %% We may have to cancel streams even if we receive multiple
    942 %% GOAWAY frames as the LastStreamID value may be lower than
    943 %% the one previously received.
    944 goaway(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0,
    945 		http2_status=Status, streams=Streams0}, {goaway, LastStreamID, Reason, _})
    946 		when Status =:= connected; Status =:= closing_initiated; Status =:= closing ->
    947 	Streams = goaway_streams(State0, maps:to_list(Streams0), LastStreamID,
    948 		{stop, {goaway, Reason}, 'The connection is going away.'}, []),
    949 	State = State0#state{streams=maps:from_list(Streams)},
    950 	if
    951 		Status =:= connected; Status =:= closing_initiated ->
    952 			{OurLastStreamID, HTTP2Machine} =
    953 				cow_http2_machine:set_last_streamid(HTTP2Machine0),
    954 			Transport:send(Socket, cow_http2:goaway(
    955 				OurLastStreamID, no_error, <<>>)),
    956 			State#state{http2_status=closing,
    957 				http2_machine=HTTP2Machine};
    958 		true ->
    959 			State
    960 	end;
    961 %% We terminate the connection immediately if it hasn't fully been initialized.
    962 goaway(State, {goaway, _, Reason, _}) ->
    963 	terminate(State, {stop, {goaway, Reason}, 'The connection is going away.'}).
    964 
    965 %% Cancel client-initiated streams that are above LastStreamID.
    966 goaway_streams(_, [], _, _, Acc) ->
    967 	Acc;
    968 goaway_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], LastStreamID, Reason, Acc)
    969 		when StreamID > LastStreamID, (StreamID rem 2) =:= 0 ->
    970 	terminate_stream_handler(State, StreamID, Reason, StreamState),
    971 	goaway_streams(State, Tail, LastStreamID, Reason, Acc);
    972 goaway_streams(State, [Stream|Tail], LastStreamID, Reason, Acc) ->
    973 	goaway_streams(State, Tail, LastStreamID, Reason, [Stream|Acc]).
    974 
    975 %% A server that is attempting to gracefully shut down a connection SHOULD send
    976 %% an initial GOAWAY frame with the last stream identifier set to 2^31-1 and a
    977 %% NO_ERROR code. This signals to the client that a shutdown is imminent and
    978 %% that initiating further requests is prohibited. After allowing time for any
    979 %% in-flight stream creation (at least one round-trip time), the server can send
    980 %% another GOAWAY frame with an updated last stream identifier. This ensures
    981 %% that a connection can be cleanly shut down without losing requests.
    982 -spec initiate_closing(#state{}, _) -> #state{}.
    983 initiate_closing(State=#state{http2_status=connected, socket=Socket,
    984 		transport=Transport, opts=Opts}, Reason) ->
    985 	Transport:send(Socket, cow_http2:goaway(16#7fffffff, no_error, <<>>)),
    986 	Timeout = maps:get(goaway_initial_timeout, Opts, 1000),
    987 	Message = {goaway_initial_timeout, Reason},
    988 	set_timeout(State#state{http2_status=closing_initiated}, Timeout, Message);
    989 initiate_closing(State=#state{http2_status=Status}, _Reason)
    990 		when Status =:= closing_initiated; Status =:= closing ->
    991 	%% This happens if sys:terminate/2,3 is called twice or if the supervisor
    992 	%% tells us to shutdown after sys:terminate/2,3 is called or vice versa.
    993 	State;
    994 initiate_closing(State, Reason) ->
    995 	terminate(State, {stop, stop_reason(Reason), 'The connection is going away.'}).
    996 
    997 %% Switch to 'closing' state and stop accepting new streams.
    998 -spec closing(#state{}, Reason :: term()) -> #state{}.
    999 closing(State=#state{streams=Streams}, Reason) when Streams =:= #{} ->
   1000 	terminate(State, Reason);
   1001 closing(State=#state{http2_status=closing_initiated,
   1002 		http2_machine=HTTP2Machine0, socket=Socket, transport=Transport},
   1003 		Reason) ->
   1004 	%% Stop accepting new streams.
   1005 	{LastStreamID, HTTP2Machine} =
   1006 		cow_http2_machine:set_last_streamid(HTTP2Machine0),
   1007 	Transport:send(Socket, cow_http2:goaway(LastStreamID, no_error, <<>>)),
   1008 	closing(State#state{http2_status=closing, http2_machine=HTTP2Machine}, Reason);
   1009 closing(State=#state{http2_status=closing, opts=Opts}, Reason) ->
   1010 	%% If client sent GOAWAY, we may already be in 'closing' but without the
   1011 	%% goaway complete timeout set.
   1012 	Timeout = maps:get(goaway_complete_timeout, Opts, 3000),
   1013 	Message = {goaway_complete_timeout, Reason},
   1014 	set_timeout(State, Timeout, Message).
   1015 
   1016 stop_reason({stop, Reason, _}) -> Reason;
   1017 stop_reason(Reason) -> Reason.
   1018 
   1019 -spec terminate(#state{}, _) -> no_return().
   1020 terminate(undefined, Reason) ->
   1021 	exit({shutdown, Reason});
   1022 terminate(State=#state{socket=Socket, transport=Transport, http2_status=Status,
   1023 		http2_machine=HTTP2Machine, streams=Streams, children=Children}, Reason)
   1024 		when Status =:= connected; Status =:= closing_initiated; Status =:= closing ->
   1025 	%% @todo We might want to optionally send the Reason value
   1026 	%% as debug data in the GOAWAY frame here. Perhaps more.
   1027 	if
   1028 		Status =:= connected; Status =:= closing_initiated ->
   1029 			Transport:send(Socket, cow_http2:goaway(
   1030 				cow_http2_machine:get_last_streamid(HTTP2Machine),
   1031 				terminate_reason(Reason), <<>>));
   1032 		%% We already sent the GOAWAY frame.
   1033 		Status =:= closing ->
   1034 			ok
   1035 	end,
   1036 	terminate_all_streams(State, maps:to_list(Streams), Reason),
   1037 	cowboy_children:terminate(Children),
   1038 	terminate_linger(State),
   1039 	exit({shutdown, Reason});
   1040 terminate(#state{socket=Socket, transport=Transport}, Reason) ->
   1041 	Transport:close(Socket),
   1042 	exit({shutdown, Reason}).
   1043 
   1044 terminate_reason({connection_error, Reason, _}) -> Reason;
   1045 terminate_reason({stop, _, _}) -> no_error;
   1046 terminate_reason({socket_error, _, _}) -> internal_error;
   1047 terminate_reason({internal_error, _, _}) -> internal_error.
   1048 
   1049 terminate_all_streams(_, [], _) ->
   1050 	ok;
   1051 terminate_all_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], Reason) ->
   1052 	terminate_stream_handler(State, StreamID, Reason, StreamState),
   1053 	terminate_all_streams(State, Tail, Reason).
   1054 
   1055 %% This code is copied from cowboy_http.
   1056 terminate_linger(State=#state{socket=Socket, transport=Transport, opts=Opts}) ->
   1057 	case Transport:shutdown(Socket, write) of
   1058 		ok ->
   1059 			case maps:get(linger_timeout, Opts, 1000) of
   1060 				0 ->
   1061 					ok;
   1062 				infinity ->
   1063 					terminate_linger_before_loop(State, undefined, Transport:messages());
   1064 				Timeout ->
   1065 					TimerRef = erlang:start_timer(Timeout, self(), linger_timeout),
   1066 					terminate_linger_before_loop(State, TimerRef, Transport:messages())
   1067 			end;
   1068 		{error, _} ->
   1069 			ok
   1070 	end.
   1071 
   1072 terminate_linger_before_loop(State, TimerRef, Messages) ->
   1073 	%% We may already be in active mode when we do this
   1074 	%% but it's OK because we are shutting down anyway.
   1075 	case setopts_active(State) of
   1076 		ok ->
   1077 			terminate_linger_loop(State, TimerRef, Messages);
   1078 		{error, _} ->
   1079 			ok
   1080 	end.
   1081 
   1082 terminate_linger_loop(State=#state{socket=Socket}, TimerRef, Messages) ->
   1083 	receive
   1084 		{OK, Socket, _} when OK =:= element(1, Messages) ->
   1085 			terminate_linger_loop(State, TimerRef, Messages);
   1086 		{Closed, Socket} when Closed =:= element(2, Messages) ->
   1087 			ok;
   1088 		{Error, Socket, _} when Error =:= element(3, Messages) ->
   1089 			ok;
   1090 		{Passive, Socket} when Passive =:= tcp_passive; Passive =:= ssl_passive ->
   1091 			terminate_linger_before_loop(State, TimerRef, Messages);
   1092 		{timeout, TimerRef, linger_timeout} ->
   1093 			ok;
   1094 		_ ->
   1095 			terminate_linger_loop(State, TimerRef, Messages)
   1096 	end.
   1097 
   1098 %% @todo Don't send an RST_STREAM if one was already sent.
   1099 reset_stream(State0=#state{socket=Socket, transport=Transport,
   1100 		http2_machine=HTTP2Machine0}, StreamID, Error) ->
   1101 	Reason = case Error of
   1102 		{internal_error, _, _} -> internal_error;
   1103 		{stream_error, Reason0, _} -> Reason0
   1104 	end,
   1105 	Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)),
   1106 	State1 = case cow_http2_machine:reset_stream(StreamID, HTTP2Machine0) of
   1107 		{ok, HTTP2Machine} ->
   1108 			terminate_stream(State0#state{http2_machine=HTTP2Machine}, StreamID, Error);
   1109 		{error, not_found} ->
   1110 			terminate_stream(State0, StreamID, Error)
   1111 	end,
   1112 	case reset_rate(State1) of
   1113 		{ok, State} ->
   1114 			State;
   1115 		error ->
   1116 			terminate(State1, {connection_error, enhance_your_calm,
   1117 				'Stream reset rate larger than configuration allows. Flood? (CVE-2019-9514)'})
   1118 	end.
   1119 
   1120 reset_rate(State0=#state{reset_rate_num=Num0, reset_rate_time=Time}) ->
   1121 	case Num0 - 1 of
   1122 		0 ->
   1123 			CurrentTime = erlang:monotonic_time(millisecond),
   1124 			if
   1125 				CurrentTime < Time ->
   1126 					error;
   1127 				true ->
   1128 					%% When the option has a period of infinity we cannot reach this clause.
   1129 					{ok, init_reset_rate_limiting(State0, CurrentTime)}
   1130 			end;
   1131 		Num ->
   1132 			{ok, State0#state{reset_rate_num=Num}}
   1133 	end.
   1134 
   1135 stop_stream(State=#state{http2_machine=HTTP2Machine}, StreamID) ->
   1136 	case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine) of
   1137 		%% When the stream terminates normally (without sending RST_STREAM)
   1138 		%% and no response was sent, we need to send a proper response back to the client.
   1139 		%% We delay the termination of the stream until the response is fully sent.
   1140 		{ok, idle, _} ->
   1141 			info(stopping(State, StreamID), StreamID, {response, 204, #{}, <<>>});
   1142 		%% When a response was sent but not terminated, we need to close the stream.
   1143 		%% We delay the termination of the stream until the response is fully sent.
   1144 		{ok, nofin, fin} ->
   1145 			stopping(State, StreamID);
   1146 		%% We only send a final DATA frame if there isn't one queued yet.
   1147 		{ok, nofin, _} ->
   1148 			info(stopping(State, StreamID), StreamID, {data, fin, <<>>});
   1149 		%% When a response was sent fully we can terminate the stream,
   1150 		%% regardless of the stream being in half-closed or closed state.
   1151 		_ ->
   1152 			terminate_stream(State, StreamID)
   1153 	end.
   1154 
   1155 stopping(State=#state{streams=Streams}, StreamID) ->
   1156 	#{StreamID := Stream} = Streams,
   1157 	State#state{streams=Streams#{StreamID => Stream#stream{status=stopping}}}.
   1158 
   1159 %% If we finished sending data and the stream is stopping, terminate it.
   1160 maybe_terminate_stream(State=#state{streams=Streams}, StreamID, fin) ->
   1161 	case Streams of
   1162 		#{StreamID := #stream{status=stopping}} ->
   1163 			terminate_stream(State, StreamID);
   1164 		_ ->
   1165 			State
   1166 	end;
   1167 maybe_terminate_stream(State, _, _) ->
   1168 	State.
   1169 
   1170 %% When the stream stops normally without reading the request
   1171 %% body fully we need to tell the client to stop sending it.
   1172 %% We do this by sending an RST_STREAM with reason NO_ERROR. (RFC7540 8.1.0)
   1173 terminate_stream(State0=#state{socket=Socket, transport=Transport,
   1174 		http2_machine=HTTP2Machine0}, StreamID) ->
   1175 	State = case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine0) of
   1176 		{ok, fin, _} ->
   1177 			Transport:send(Socket, cow_http2:rst_stream(StreamID, no_error)),
   1178 			{ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0),
   1179 			State0#state{http2_machine=HTTP2Machine};
   1180 		{error, closed} ->
   1181 			State0
   1182 	end,
   1183 	terminate_stream(State, StreamID, normal).
   1184 
   1185 %% We remove the stream flow from the connection flow. Any further
   1186 %% data received for this stream is therefore fully contained within
   1187 %% the extra window we allocated for this stream.
   1188 terminate_stream(State=#state{flow=Flow, streams=Streams0, children=Children0}, StreamID, Reason) ->
   1189 	case maps:take(StreamID, Streams0) of
   1190 		{#stream{flow=StreamFlow, state=StreamState}, Streams} ->
   1191 			terminate_stream_handler(State, StreamID, Reason, StreamState),
   1192 			Children = cowboy_children:shutdown(Children0, StreamID),
   1193 			State#state{flow=Flow - StreamFlow, streams=Streams, children=Children};
   1194 		error ->
   1195 			State
   1196 	end.
   1197 
   1198 terminate_stream_handler(#state{opts=Opts}, StreamID, Reason, StreamState) ->
   1199 	try
   1200 		cowboy_stream:terminate(StreamID, Reason, StreamState)
   1201 	catch Class:Exception:Stacktrace ->
   1202 		cowboy:log(cowboy_stream:make_error_log(terminate,
   1203 			[StreamID, Reason, StreamState],
   1204 			Class, Exception, Stacktrace), Opts)
   1205 	end.
   1206 
   1207 %% System callbacks.
   1208 
   1209 -spec system_continue(_, _, {#state{}, binary()}) -> ok.
   1210 system_continue(_, _, {State, Buffer}) ->
   1211 	loop(State, Buffer).
   1212 
   1213 -spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return().
   1214 system_terminate(Reason0, _, _, {State, Buffer}) ->
   1215 	Reason = {stop, {exit, Reason0}, 'sys:terminate/2,3 was called.'},
   1216 	loop(initiate_closing(State, Reason), Buffer).
   1217 
   1218 -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
   1219 system_code_change(Misc, _, _, _) ->
   1220 	{ok, Misc}.