zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

cowboy_http.erl (60202B)


      1 %% Copyright (c) 2016-2017, Loïc Hoguin <essen@ninenines.eu>
      2 %%
      3 %% Permission to use, copy, modify, and/or distribute this software for any
      4 %% purpose with or without fee is hereby granted, provided that the above
      5 %% copyright notice and this permission notice appear in all copies.
      6 %%
      7 %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      8 %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
      9 %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     10 %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     11 %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     12 %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     13 %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     14 
     15 -module(cowboy_http).
     16 
     17 -export([init/6]).
     18 
     19 -export([system_continue/3]).
     20 -export([system_terminate/4]).
     21 -export([system_code_change/4]).
     22 
     23 -type opts() :: #{
     24 	active_n => pos_integer(),
     25 	chunked => boolean(),
     26 	compress_buffering => boolean(),
     27 	compress_threshold => non_neg_integer(),
     28 	connection_type => worker | supervisor,
     29 	env => cowboy_middleware:env(),
     30 	http10_keepalive => boolean(),
     31 	idle_timeout => timeout(),
     32 	inactivity_timeout => timeout(),
     33 	initial_stream_flow_size => non_neg_integer(),
     34 	linger_timeout => timeout(),
     35 	logger => module(),
     36 	max_authority_length => non_neg_integer(),
     37 	max_empty_lines => non_neg_integer(),
     38 	max_header_name_length => non_neg_integer(),
     39 	max_header_value_length => non_neg_integer(),
     40 	max_headers => non_neg_integer(),
     41 	max_keepalive => non_neg_integer(),
     42 	max_method_length => non_neg_integer(),
     43 	max_request_line_length => non_neg_integer(),
     44 	metrics_callback => cowboy_metrics_h:metrics_callback(),
     45 	metrics_req_filter => fun((cowboy_req:req()) -> map()),
     46 	metrics_resp_headers_filter => fun((cowboy:http_headers()) -> cowboy:http_headers()),
     47 	middlewares => [module()],
     48 	proxy_header => boolean(),
     49 	request_timeout => timeout(),
     50 	sendfile => boolean(),
     51 	shutdown_timeout => timeout(),
     52 	stream_handlers => [module()],
     53 	tracer_callback => cowboy_tracer_h:tracer_callback(),
     54 	tracer_flags => [atom()],
     55 	tracer_match_specs => cowboy_tracer_h:tracer_match_specs(),
     56 	%% Open ended because configured stream handlers might add options.
     57 	_ => _
     58 }.
     59 -export_type([opts/0]).
     60 
     61 -record(ps_request_line, {
     62 	empty_lines = 0 :: non_neg_integer()
     63 }).
     64 
     65 -record(ps_header, {
     66 	method = undefined :: binary(),
     67 	authority = undefined :: binary() | undefined,
     68 	path = undefined :: binary(),
     69 	qs = undefined :: binary(),
     70 	version = undefined :: cowboy:http_version(),
     71 	headers = undefined :: cowboy:http_headers() | undefined,
     72 	name = undefined :: binary() | undefined
     73 }).
     74 
     75 -record(ps_body, {
     76 	length :: non_neg_integer() | undefined,
     77 	received = 0 :: non_neg_integer(),
     78 	transfer_decode_fun :: fun((binary(), cow_http_te:state()) -> cow_http_te:decode_ret()),
     79 	transfer_decode_state :: cow_http_te:state()
     80 }).
     81 
     82 -record(stream, {
     83 	id = undefined :: cowboy_stream:streamid(),
     84 	%% Stream handlers and their state.
     85 	state = undefined :: {module(), any()},
     86 	%% Request method.
     87 	method = undefined :: binary(),
     88 	%% Client HTTP version for this stream.
     89 	version = undefined :: cowboy:http_version(),
     90 	%% Unparsed te header. Used to know if we can send trailers.
     91 	te :: undefined | binary(),
     92 	%% Expected body size.
     93 	local_expected_size = undefined :: undefined | non_neg_integer(),
     94 	%% Sent body size.
     95 	local_sent_size = 0 :: non_neg_integer(),
     96 	%% Commands queued.
     97 	queue = [] :: cowboy_stream:commands()
     98 }).
     99 
    100 -type stream() :: #stream{}.
    101 
    102 -record(state, {
    103 	parent :: pid(),
    104 	ref :: ranch:ref(),
    105 	socket :: inet:socket(),
    106 	transport :: module(),
    107 	proxy_header :: undefined | ranch_proxy_header:proxy_info(),
    108 	opts = #{} :: cowboy:opts(),
    109 	buffer = <<>> :: binary(),
    110 
    111 	%% Some options may be overriden for the current stream.
    112 	overriden_opts = #{} :: cowboy:opts(),
    113 
    114 	%% Remote address and port for the connection.
    115 	peer = undefined :: {inet:ip_address(), inet:port_number()},
    116 
    117 	%% Local address and port for the connection.
    118 	sock = undefined :: {inet:ip_address(), inet:port_number()},
    119 
    120 	%% Client certificate (TLS only).
    121 	cert :: undefined | binary(),
    122 
    123 	timer = undefined :: undefined | reference(),
    124 
    125 	%% Whether we are currently receiving data from the socket.
    126 	active = true :: boolean(),
    127 
    128 	%% Identifier for the stream currently being read (or waiting to be received).
    129 	in_streamid = 1 :: pos_integer(),
    130 
    131 	%% Parsing state for the current stream or stream-to-be.
    132 	in_state = #ps_request_line{} :: #ps_request_line{} | #ps_header{} | #ps_body{},
    133 
    134 	%% Flow requested for the current stream.
    135 	flow = infinity :: non_neg_integer() | infinity,
    136 
    137 	%% Identifier for the stream currently being written.
    138 	%% Note that out_streamid =< in_streamid.
    139 	out_streamid = 1 :: pos_integer(),
    140 
    141 	%% Whether we finished writing data for the current stream.
    142 	out_state = wait :: wait | chunked | streaming | done,
    143 
    144 	%% The connection will be closed after this stream.
    145 	last_streamid = undefined :: pos_integer(),
    146 
    147 	%% Currently active HTTP/1.1 streams.
    148 	streams = [] :: [stream()],
    149 
    150 	%% Children processes created by streams.
    151 	children = cowboy_children:init() :: cowboy_children:children()
    152 }).
    153 
    154 -include_lib("cowlib/include/cow_inline.hrl").
    155 -include_lib("cowlib/include/cow_parse.hrl").
    156 
    157 -spec init(pid(), ranch:ref(), inet:socket(), module(),
    158 	ranch_proxy_header:proxy_info(), cowboy:opts()) -> ok.
    159 init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
    160 	Peer0 = Transport:peername(Socket),
    161 	Sock0 = Transport:sockname(Socket),
    162 	Cert1 = case Transport:name() of
    163 		ssl ->
    164 			case ssl:peercert(Socket) of
    165 				{error, no_peercert} ->
    166 					{ok, undefined};
    167 				Cert0 ->
    168 					Cert0
    169 			end;
    170 		_ ->
    171 			{ok, undefined}
    172 	end,
    173 	case {Peer0, Sock0, Cert1} of
    174 		{{ok, Peer}, {ok, Sock}, {ok, Cert}} ->
    175 			State = #state{
    176 				parent=Parent, ref=Ref, socket=Socket,
    177 				transport=Transport, proxy_header=ProxyHeader, opts=Opts,
    178 				peer=Peer, sock=Sock, cert=Cert,
    179 				last_streamid=maps:get(max_keepalive, Opts, 1000)},
    180 			setopts_active(State),
    181 			loop(set_timeout(State, request_timeout));
    182 		{{error, Reason}, _, _} ->
    183 			terminate(undefined, {socket_error, Reason,
    184 				'A socket error occurred when retrieving the peer name.'});
    185 		{_, {error, Reason}, _} ->
    186 			terminate(undefined, {socket_error, Reason,
    187 				'A socket error occurred when retrieving the sock name.'});
    188 		{_, _, {error, Reason}} ->
    189 			terminate(undefined, {socket_error, Reason,
    190 				'A socket error occurred when retrieving the client TLS certificate.'})
    191 	end.
    192 
    193 setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
    194 	N = maps:get(active_n, Opts, 100),
    195 	Transport:setopts(Socket, [{active, N}]).
    196 
    197 active(State) ->
    198 	setopts_active(State),
    199 	State#state{active=true}.
    200 
    201 passive(State=#state{socket=Socket, transport=Transport}) ->
    202 	Transport:setopts(Socket, [{active, false}]),
    203 	Messages = Transport:messages(),
    204 	flush_passive(Socket, Messages),
    205 	State#state{active=false}.
    206 
    207 flush_passive(Socket, Messages) ->
    208 	receive
    209 		{Passive, Socket} when Passive =:= element(4, Messages);
    210 				%% Hardcoded for compatibility with Ranch 1.x.
    211 				Passive =:= tcp_passive; Passive =:= ssl_passive ->
    212 			flush_passive(Socket, Messages)
    213 	after 0 ->
    214 		ok
    215 	end.
    216 
    217 loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
    218 		buffer=Buffer, timer=TimerRef, children=Children, in_streamid=InStreamID,
    219 		last_streamid=LastStreamID}) ->
    220 	Messages = Transport:messages(),
    221 	InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000),
    222 	receive
    223 		%% Discard data coming in after the last request
    224 		%% we want to process was received fully.
    225 		{OK, Socket, _} when OK =:= element(1, Messages), InStreamID > LastStreamID ->
    226 			loop(State);
    227 		%% Socket messages.
    228 		{OK, Socket, Data} when OK =:= element(1, Messages) ->
    229 			parse(<< Buffer/binary, Data/binary >>, State);
    230 		{Closed, Socket} when Closed =:= element(2, Messages) ->
    231 			terminate(State, {socket_error, closed, 'The socket has been closed.'});
    232 		{Error, Socket, Reason} when Error =:= element(3, Messages) ->
    233 			terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
    234 		{Passive, Socket} when Passive =:= element(4, Messages);
    235 				%% Hardcoded for compatibility with Ranch 1.x.
    236 				Passive =:= tcp_passive; Passive =:= ssl_passive ->
    237 			setopts_active(State),
    238 			loop(State);
    239 		%% Timeouts.
    240 		{timeout, Ref, {shutdown, Pid}} ->
    241 			cowboy_children:shutdown_timeout(Children, Ref, Pid),
    242 			loop(State);
    243 		{timeout, TimerRef, Reason} ->
    244 			timeout(State, Reason);
    245 		{timeout, _, _} ->
    246 			loop(State);
    247 		%% System messages.
    248 		{'EXIT', Parent, shutdown} ->
    249 			Reason = {stop, {exit, shutdown}, 'Parent process requested shutdown.'},
    250 			loop(initiate_closing(State, Reason));
    251 		{'EXIT', Parent, Reason} ->
    252 			terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'});
    253 		{system, From, Request} ->
    254 			sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State);
    255 		%% Messages pertaining to a stream.
    256 		{{Pid, StreamID}, Msg} when Pid =:= self() ->
    257 			loop(info(State, StreamID, Msg));
    258 		%% Exit signal from children.
    259 		Msg = {'EXIT', Pid, _} ->
    260 			loop(down(State, Pid, Msg));
    261 		%% Calls from supervisor module.
    262 		{'$gen_call', From, Call} ->
    263 			cowboy_children:handle_supervisor_call(Call, From, Children, ?MODULE),
    264 			loop(State);
    265 		%% Unknown messages.
    266 		Msg ->
    267 			cowboy:log(warning, "Received stray message ~p.~n", [Msg], Opts),
    268 			loop(State)
    269 	after InactivityTimeout ->
    270 		terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
    271 	end.
    272 
    273 %% We do not set request_timeout if there are active streams.
    274 set_timeout(State=#state{streams=[_|_]}, request_timeout) ->
    275 	State;
    276 %% We do not set request_timeout if we are skipping a body.
    277 set_timeout(State=#state{in_state=#ps_body{}}, request_timeout) ->
    278 	State;
    279 %% We do not set idle_timeout if there are no active streams,
    280 %% unless when we are skipping a body.
    281 set_timeout(State=#state{streams=[], in_state=InState}, idle_timeout)
    282 		when element(1, InState) =/= ps_body ->
    283 	State;
    284 %% Otherwise we can set the timeout.
    285 set_timeout(State0=#state{opts=Opts, overriden_opts=Override}, Name) ->
    286 	State = cancel_timeout(State0),
    287 	Default = case Name of
    288 		request_timeout -> 5000;
    289 		idle_timeout -> 60000
    290 	end,
    291 	Timeout = case Override of
    292 		%% The timeout may have been overriden for the current stream.
    293 		#{Name := Timeout0} -> Timeout0;
    294 		_ -> maps:get(Name, Opts, Default)
    295 	end,
    296 	TimerRef = case Timeout of
    297 		infinity -> undefined;
    298 		Timeout -> erlang:start_timer(Timeout, self(), Name)
    299 	end,
    300 	State#state{timer=TimerRef}.
    301 
    302 cancel_timeout(State=#state{timer=TimerRef}) ->
    303 	ok = case TimerRef of
    304 		undefined ->
    305 			ok;
    306 		_ ->
    307 			%% Do a synchronous cancel and remove the message if any
    308 			%% to avoid receiving stray messages.
    309 			_ = erlang:cancel_timer(TimerRef),
    310 			receive
    311 				{timeout, TimerRef, _} -> ok
    312 			after 0 ->
    313 				ok
    314 			end
    315 	end,
    316 	State#state{timer=undefined}.
    317 
    318 -spec timeout(_, _) -> no_return().
    319 timeout(State=#state{in_state=#ps_request_line{}}, request_timeout) ->
    320 	terminate(State, {connection_error, timeout,
    321 		'No request-line received before timeout.'});
    322 timeout(State=#state{in_state=#ps_header{}}, request_timeout) ->
    323 	error_terminate(408, State, {connection_error, timeout,
    324 		'Request headers not received before timeout.'});
    325 timeout(State, idle_timeout) ->
    326 	terminate(State, {connection_error, timeout,
    327 		'Connection idle longer than configuration allows.'}).
    328 
    329 parse(<<>>, State) ->
    330 	loop(State#state{buffer= <<>>});
    331 %% Do not process requests that come in after the last request
    332 %% and discard the buffer if any to save memory.
    333 parse(_, State=#state{in_streamid=InStreamID, in_state=#ps_request_line{},
    334 		last_streamid=LastStreamID}) when InStreamID > LastStreamID ->
    335 	loop(State#state{buffer= <<>>});
    336 parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) ->
    337 	after_parse(parse_request(Buffer, State, EmptyLines));
    338 parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) ->
    339 	after_parse(parse_header(Buffer,
    340 		State#state{in_state=PS#ps_header{headers=undefined}},
    341 		Headers));
    342 parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=Name}}) ->
    343 	after_parse(parse_hd_before_value(Buffer,
    344 		State#state{in_state=PS#ps_header{headers=undefined, name=undefined}},
    345 		Headers, Name));
    346 parse(Buffer, State=#state{in_state=#ps_body{}}) ->
    347 	after_parse(parse_body(Buffer, State)).
    348 
    349 after_parse({request, Req=#{streamid := StreamID, method := Method,
    350 		headers := Headers, version := Version},
    351 		State0=#state{opts=Opts, buffer=Buffer, streams=Streams0}}) ->
    352 	try cowboy_stream:init(StreamID, Req, Opts) of
    353 		{Commands, StreamState} ->
    354 			Flow = maps:get(initial_stream_flow_size, Opts, 65535),
    355 			TE = maps:get(<<"te">>, Headers, undefined),
    356 			Streams = [#stream{id=StreamID, state=StreamState,
    357 				method=Method, version=Version, te=TE}|Streams0],
    358 			State1 = case maybe_req_close(State0, Headers, Version) of
    359 				close -> State0#state{streams=Streams, last_streamid=StreamID, flow=Flow};
    360 				keepalive -> State0#state{streams=Streams, flow=Flow}
    361 			end,
    362 			State = set_timeout(State1, idle_timeout),
    363 			parse(Buffer, commands(State, StreamID, Commands))
    364 	catch Class:Exception:Stacktrace ->
    365 		cowboy:log(cowboy_stream:make_error_log(init,
    366 			[StreamID, Req, Opts],
    367 			Class, Exception, Stacktrace), Opts),
    368 		early_error(500, State0, {internal_error, {Class, Exception},
    369 			'Unhandled exception in cowboy_stream:init/3.'}, Req),
    370 		parse(Buffer, State0)
    371 	end;
    372 %% Streams are sequential so the body is always about the last stream created
    373 %% unless that stream has terminated.
    374 after_parse({data, StreamID, IsFin, Data, State0=#state{opts=Opts, buffer=Buffer,
    375 		streams=Streams0=[Stream=#stream{id=StreamID, state=StreamState0}|_]}}) ->
    376 	try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
    377 		{Commands, StreamState} ->
    378 			Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
    379 				Stream#stream{state=StreamState}),
    380 			State1 = set_timeout(State0, case IsFin of
    381 				fin -> request_timeout;
    382 				nofin -> idle_timeout
    383 			end),
    384 			State = update_flow(IsFin, Data, State1#state{streams=Streams}),
    385 			parse(Buffer, commands(State, StreamID, Commands))
    386 	catch Class:Exception:Stacktrace ->
    387 		cowboy:log(cowboy_stream:make_error_log(data,
    388 			[StreamID, IsFin, Data, StreamState0],
    389 			Class, Exception, Stacktrace), Opts),
    390 		%% @todo Should call parse after this.
    391 		stream_terminate(State0, StreamID, {internal_error, {Class, Exception},
    392 			'Unhandled exception in cowboy_stream:data/4.'})
    393 	end;
    394 %% No corresponding stream. We must skip the body of the previous request
    395 %% in order to process the next one.
    396 after_parse({data, _, IsFin, _, State}) ->
    397 	loop(set_timeout(State, case IsFin of
    398 		fin -> request_timeout;
    399 		nofin -> idle_timeout
    400 	end));
    401 after_parse({more, State}) ->
    402 	loop(set_timeout(State, idle_timeout)).
    403 
    404 update_flow(fin, _, State) ->
    405 	%% This function is only called after parsing, therefore we
    406 	%% are expecting to be in active mode already.
    407 	State#state{flow=infinity};
    408 update_flow(nofin, Data, State0=#state{flow=Flow0}) ->
    409 	Flow = Flow0 - byte_size(Data),
    410 	State = State0#state{flow=Flow},
    411 	if
    412 		Flow0 > 0, Flow =< 0 ->
    413 			passive(State);
    414 		true ->
    415 			State
    416 	end.
    417 
    418 %% Request-line.
    419 
    420 -spec parse_request(Buffer, State, non_neg_integer())
    421 	-> {request, cowboy_req:req(), State}
    422 	| {data, cowboy_stream:streamid(), cowboy_stream:fin(), binary(), State}
    423 	| {more, State}
    424 	when Buffer::binary(), State::#state{}.
    425 %% Empty lines must be using \r\n.
    426 parse_request(<< $\n, _/bits >>, State, _) ->
    427 	error_terminate(400, State, {connection_error, protocol_error,
    428 		'Empty lines between requests must use the CRLF line terminator. (RFC7230 3.5)'});
    429 parse_request(<< $\s, _/bits >>, State, _) ->
    430 	error_terminate(400, State, {connection_error, protocol_error,
    431 		'The request-line must not begin with a space. (RFC7230 3.1.1, RFC7230 3.5)'});
    432 %% We limit the length of the Request-line to MaxLength to avoid endlessly
    433 %% reading from the socket and eventually crashing.
    434 parse_request(Buffer, State=#state{opts=Opts, in_streamid=InStreamID}, EmptyLines) ->
    435 	MaxLength = maps:get(max_request_line_length, Opts, 8000),
    436 	MaxEmptyLines = maps:get(max_empty_lines, Opts, 5),
    437 	case match_eol(Buffer, 0) of
    438 		nomatch when byte_size(Buffer) > MaxLength ->
    439 			error_terminate(414, State, {connection_error, limit_reached,
    440 				'The request-line length is larger than configuration allows. (RFC7230 3.1.1)'});
    441 		nomatch ->
    442 			{more, State#state{buffer=Buffer, in_state=#ps_request_line{empty_lines=EmptyLines}}};
    443 		1 when EmptyLines =:= MaxEmptyLines ->
    444 			error_terminate(400, State, {connection_error, limit_reached,
    445 				'More empty lines were received than configuration allows. (RFC7230 3.5)'});
    446 		1 ->
    447 			<< _:16, Rest/bits >> = Buffer,
    448 			parse_request(Rest, State, EmptyLines + 1);
    449 		_ ->
    450 			case Buffer of
    451 				%% @todo * is only for server-wide OPTIONS request (RFC7230 5.3.4); tests
    452 				<< "OPTIONS * ", Rest/bits >> ->
    453 					parse_version(Rest, State, <<"OPTIONS">>, undefined, <<"*">>, <<>>);
    454 				<<"CONNECT ", _/bits>> ->
    455 					error_terminate(501, State, {connection_error, no_error,
    456 						'The CONNECT method is currently not implemented. (RFC7231 4.3.6)'});
    457 				<<"TRACE ", _/bits>> ->
    458 					error_terminate(501, State, {connection_error, no_error,
    459 						'The TRACE method is currently not implemented. (RFC7231 4.3.8)'});
    460 				%% Accept direct HTTP/2 only at the beginning of the connection.
    461 				<< "PRI * HTTP/2.0\r\n", _/bits >> when InStreamID =:= 1 ->
    462 					%% @todo Might be worth throwing to get a clean stacktrace.
    463 					http2_upgrade(State, Buffer);
    464 				_ ->
    465 					parse_method(Buffer, State, <<>>,
    466 						maps:get(max_method_length, Opts, 32))
    467 			end
    468 	end.
    469 
    470 match_eol(<< $\n, _/bits >>, N) ->
    471 	N;
    472 match_eol(<< _, Rest/bits >>, N) ->
    473 	match_eol(Rest, N + 1);
    474 match_eol(_, _) ->
    475 	nomatch.
    476 
    477 parse_method(_, State, _, 0) ->
    478 	error_terminate(501, State, {connection_error, limit_reached,
    479 		'The method name is longer than configuration allows. (RFC7230 3.1.1)'});
    480 parse_method(<< C, Rest/bits >>, State, SoFar, Remaining) ->
    481 	case C of
    482 		$\r -> error_terminate(400, State, {connection_error, protocol_error,
    483 			'The method name must not be followed with a line break. (RFC7230 3.1.1)'});
    484 		$\s -> parse_uri(Rest, State, SoFar);
    485 		_ when ?IS_TOKEN(C) -> parse_method(Rest, State, << SoFar/binary, C >>, Remaining - 1);
    486 		_ -> error_terminate(400, State, {connection_error, protocol_error,
    487 			'The method name must contain only valid token characters. (RFC7230 3.1.1)'})
    488 	end.
    489 
    490 parse_uri(<< H, T, T, P, "://", Rest/bits >>, State, Method)
    491 		when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T;
    492 			P =:= $p orelse P =:= $P ->
    493 	parse_uri_authority(Rest, State, Method);
    494 parse_uri(<< H, T, T, P, S, "://", Rest/bits >>, State, Method)
    495 		when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T;
    496 			P =:= $p orelse P =:= $P; S =:= $s orelse S =:= $S ->
    497 	parse_uri_authority(Rest, State, Method);
    498 parse_uri(<< $/, Rest/bits >>, State, Method) ->
    499 	parse_uri_path(Rest, State, Method, undefined, <<$/>>);
    500 parse_uri(_, State, _) ->
    501 	error_terminate(400, State, {connection_error, protocol_error,
    502 		'Invalid request-line or request-target. (RFC7230 3.1.1, RFC7230 5.3)'}).
    503 
    504 %% @todo We probably want to apply max_authority_length also
    505 %% to the host header and to document this option. It might
    506 %% also be useful for HTTP/2 requests.
    507 parse_uri_authority(Rest, State=#state{opts=Opts}, Method) ->
    508 	parse_uri_authority(Rest, State, Method, <<>>,
    509 		maps:get(max_authority_length, Opts, 255)).
    510 
    511 parse_uri_authority(_, State, _, _, 0) ->
    512 	error_terminate(414, State, {connection_error, limit_reached,
    513 		'The authority component of the absolute URI is longer than configuration allows. (RFC7230 2.7.1)'});
    514 parse_uri_authority(<<C, Rest/bits>>, State, Method, SoFar, Remaining) ->
    515 	case C of
    516 		$\r ->
    517 			error_terminate(400, State, {connection_error, protocol_error,
    518 				'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
    519 		$@ ->
    520 			error_terminate(400, State, {connection_error, protocol_error,
    521 				'Absolute URIs must not include a userinfo component. (RFC7230 2.7.1)'});
    522 		C when SoFar =:= <<>> andalso
    523 				((C =:= $/) orelse (C =:= $\s) orelse (C =:= $?) orelse (C =:= $#)) ->
    524 			error_terminate(400, State, {connection_error, protocol_error,
    525 				'Absolute URIs must include a non-empty host component. (RFC7230 2.7.1)'});
    526 		$: when SoFar =:= <<>> ->
    527 			error_terminate(400, State, {connection_error, protocol_error,
    528 				'Absolute URIs must include a non-empty host component. (RFC7230 2.7.1)'});
    529 		$/ -> parse_uri_path(Rest, State, Method, SoFar, <<"/">>);
    530 		$\s -> parse_version(Rest, State, Method, SoFar, <<"/">>, <<>>);
    531 		$? -> parse_uri_query(Rest, State, Method, SoFar, <<"/">>, <<>>);
    532 		$# -> skip_uri_fragment(Rest, State, Method, SoFar, <<"/">>, <<>>);
    533 		C -> parse_uri_authority(Rest, State, Method, <<SoFar/binary, C>>, Remaining - 1)
    534 	end.
    535 
    536 parse_uri_path(<<C, Rest/bits>>, State, Method, Authority, SoFar) ->
    537 	case C of
    538 		$\r -> error_terminate(400, State, {connection_error, protocol_error,
    539 			'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
    540 		$\s -> parse_version(Rest, State, Method, Authority, SoFar, <<>>);
    541 		$? -> parse_uri_query(Rest, State, Method, Authority, SoFar, <<>>);
    542 		$# -> skip_uri_fragment(Rest, State, Method, Authority, SoFar, <<>>);
    543 		_ -> parse_uri_path(Rest, State, Method, Authority, <<SoFar/binary, C>>)
    544 	end.
    545 
    546 parse_uri_query(<<C, Rest/bits>>, State, M, A, P, SoFar) ->
    547 	case C of
    548 		$\r -> error_terminate(400, State, {connection_error, protocol_error,
    549 			'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
    550 		$\s -> parse_version(Rest, State, M, A, P, SoFar);
    551 		$# -> skip_uri_fragment(Rest, State, M, A, P, SoFar);
    552 		_ -> parse_uri_query(Rest, State, M, A, P, <<SoFar/binary, C>>)
    553 	end.
    554 
    555 skip_uri_fragment(<<C, Rest/bits>>, State, M, A, P, Q) ->
    556 	case C of
    557 		$\r -> error_terminate(400, State, {connection_error, protocol_error,
    558 			'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
    559 		$\s -> parse_version(Rest, State, M, A, P, Q);
    560 		_ -> skip_uri_fragment(Rest, State, M, A, P, Q)
    561 	end.
    562 
    563 parse_version(<< "HTTP/1.1\r\n", Rest/bits >>, State, M, A, P, Q) ->
    564 	before_parse_headers(Rest, State, M, A, P, Q, 'HTTP/1.1');
    565 parse_version(<< "HTTP/1.0\r\n", Rest/bits >>, State, M, A, P, Q) ->
    566 	before_parse_headers(Rest, State, M, A, P, Q, 'HTTP/1.0');
    567 parse_version(<< "HTTP/1.", _, C, _/bits >>, State, _, _, _, _) when C =:= $\s; C =:= $\t ->
    568 	error_terminate(400, State, {connection_error, protocol_error,
    569 		'Whitespace is not allowed after the HTTP version. (RFC7230 3.1.1)'});
    570 parse_version(<< C, _/bits >>, State, _, _, _, _) when C =:= $\s; C =:= $\t ->
    571 	error_terminate(400, State, {connection_error, protocol_error,
    572 		'The separator between request target and version must be a single SP. (RFC7230 3.1.1)'});
    573 parse_version(_, State, _, _, _, _) ->
    574 	error_terminate(505, State, {connection_error, protocol_error,
    575 		'Unsupported HTTP version. (RFC7230 2.6)'}).
    576 
    577 before_parse_headers(Rest, State, M, A, P, Q, V) ->
    578 	parse_header(Rest, State#state{in_state=#ps_header{
    579 		method=M, authority=A, path=P, qs=Q, version=V}}, #{}).
    580 
    581 %% Headers.
    582 
    583 %% We need two or more bytes in the buffer to continue.
    584 parse_header(Rest, State=#state{in_state=PS}, Headers) when byte_size(Rest) < 2 ->
    585 	{more, State#state{buffer=Rest, in_state=PS#ps_header{headers=Headers}}};
    586 parse_header(<< $\r, $\n, Rest/bits >>, S, Headers) ->
    587 	request(Rest, S, Headers);
    588 parse_header(Buffer, State=#state{opts=Opts, in_state=PS}, Headers) ->
    589 	MaxHeaders = maps:get(max_headers, Opts, 100),
    590 	NumHeaders = maps:size(Headers),
    591 	if
    592 		NumHeaders >= MaxHeaders ->
    593 			error_terminate(431, State#state{in_state=PS#ps_header{headers=Headers}},
    594 				{connection_error, limit_reached,
    595 					'The number of headers is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'});
    596 		true ->
    597 			parse_header_colon(Buffer, State, Headers)
    598 	end.
    599 
    600 parse_header_colon(Buffer, State=#state{opts=Opts, in_state=PS}, Headers) ->
    601 	MaxLength = maps:get(max_header_name_length, Opts, 64),
    602 	case match_colon(Buffer, 0) of
    603 		nomatch when byte_size(Buffer) > MaxLength ->
    604 			error_terminate(431, State#state{in_state=PS#ps_header{headers=Headers}},
    605 				{connection_error, limit_reached,
    606 					'A header name is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'});
    607 		nomatch ->
    608 			%% We don't have a colon but we might have an invalid header line,
    609 			%% so check if we have an LF and abort with an error if we do.
    610 			case match_eol(Buffer, 0) of
    611 				nomatch ->
    612 					{more, State#state{buffer=Buffer, in_state=PS#ps_header{headers=Headers}}};
    613 				_ ->
    614 					error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}},
    615 						{connection_error, protocol_error,
    616 							'A header line is missing a colon separator. (RFC7230 3.2.4)'})
    617 			end;
    618 		_ ->
    619 			parse_hd_name(Buffer, State, Headers, <<>>)
    620 	end.
    621 
    622 match_colon(<< $:, _/bits >>, N) ->
    623 	N;
    624 match_colon(<< _, Rest/bits >>, N) ->
    625 	match_colon(Rest, N + 1);
    626 match_colon(_, _) ->
    627 	nomatch.
    628 
    629 parse_hd_name(<< $:, Rest/bits >>, State, H, SoFar) ->
    630 	parse_hd_before_value(Rest, State, H, SoFar);
    631 parse_hd_name(<< C, _/bits >>, State=#state{in_state=PS}, H, <<>>) when ?IS_WS(C) ->
    632 	error_terminate(400, State#state{in_state=PS#ps_header{headers=H}},
    633 		{connection_error, protocol_error,
    634 			'Whitespace is not allowed before the header name. (RFC7230 3.2)'});
    635 parse_hd_name(<< C, _/bits >>, State=#state{in_state=PS}, H, _) when ?IS_WS(C) ->
    636 	error_terminate(400, State#state{in_state=PS#ps_header{headers=H}},
    637 		{connection_error, protocol_error,
    638 			'Whitespace is not allowed between the header name and the colon. (RFC7230 3.2.4)'});
    639 parse_hd_name(<< C, Rest/bits >>, State, H, SoFar) ->
    640 	?LOWER(parse_hd_name, Rest, State, H, SoFar).
    641 
    642 parse_hd_before_value(<< $\s, Rest/bits >>, S, H, N) ->
    643 	parse_hd_before_value(Rest, S, H, N);
    644 parse_hd_before_value(<< $\t, Rest/bits >>, S, H, N) ->
    645 	parse_hd_before_value(Rest, S, H, N);
    646 parse_hd_before_value(Buffer, State=#state{opts=Opts, in_state=PS}, H, N) ->
    647 	MaxLength = maps:get(max_header_value_length, Opts, 4096),
    648 	case match_eol(Buffer, 0) of
    649 		nomatch when byte_size(Buffer) > MaxLength ->
    650 			error_terminate(431, State#state{in_state=PS#ps_header{headers=H}},
    651 				{connection_error, limit_reached,
    652 					'A header value is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'});
    653 		nomatch ->
    654 			{more, State#state{buffer=Buffer, in_state=PS#ps_header{headers=H, name=N}}};
    655 		_ ->
    656 			parse_hd_value(Buffer, State, H, N, <<>>)
    657 	end.
    658 
    659 parse_hd_value(<< $\r, $\n, Rest/bits >>, S, Headers0, Name, SoFar) ->
    660 	Value = clean_value_ws_end(SoFar, byte_size(SoFar) - 1),
    661 	Headers = case maps:get(Name, Headers0, undefined) of
    662 		undefined -> Headers0#{Name => Value};
    663 		%% The cookie header does not use proper HTTP header lists.
    664 		Value0 when Name =:= <<"cookie">> -> Headers0#{Name => << Value0/binary, "; ", Value/binary >>};
    665 		Value0 -> Headers0#{Name => << Value0/binary, ", ", Value/binary >>}
    666 	end,
    667 	parse_header(Rest, S, Headers);
    668 parse_hd_value(<< C, Rest/bits >>, S, H, N, SoFar) ->
    669 	parse_hd_value(Rest, S, H, N, << SoFar/binary, C >>).
    670 
    671 clean_value_ws_end(_, -1) ->
    672 	<<>>;
    673 clean_value_ws_end(Value, N) ->
    674 	case binary:at(Value, N) of
    675 		$\s -> clean_value_ws_end(Value, N - 1);
    676 		$\t -> clean_value_ws_end(Value, N - 1);
    677 		_ ->
    678 			S = N + 1,
    679 			<< Value2:S/binary, _/bits >> = Value,
    680 			Value2
    681 	end.
    682 
    683 -ifdef(TEST).
    684 clean_value_ws_end_test_() ->
    685 	Tests = [
    686 		{<<>>, <<>>},
    687 		{<<"     ">>, <<>>},
    688 		{<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
    689 			"text/html;level=2;q=0.4, */*;q=0.5   \t   \t    ">>,
    690 			<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
    691 				"text/html;level=2;q=0.4, */*;q=0.5">>}
    692 	],
    693 	[{V, fun() -> R = clean_value_ws_end(V, byte_size(V) - 1) end} || {V, R} <- Tests].
    694 
    695 horse_clean_value_ws_end() ->
    696 	horse:repeat(200000,
    697 		clean_value_ws_end(
    698 			<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
    699 				"text/html;level=2;q=0.4, */*;q=0.5          ">>,
    700 			byte_size(<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
    701 				"text/html;level=2;q=0.4, */*;q=0.5          ">>) - 1)
    702 	).
    703 -endif.
    704 
    705 request(Buffer, State=#state{transport=Transport,
    706 		in_state=PS=#ps_header{authority=Authority, version=Version}}, Headers) ->
    707 	case maps:get(<<"host">>, Headers, undefined) of
    708 		undefined when Version =:= 'HTTP/1.1' ->
    709 			%% @todo Might want to not close the connection on this and next one.
    710 			error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}},
    711 				{stream_error, protocol_error,
    712 					'HTTP/1.1 requests must include a host header. (RFC7230 5.4)'});
    713 		undefined ->
    714 			request(Buffer, State, Headers, <<>>, default_port(Transport:secure()));
    715 		%% @todo When CONNECT requests come in we need to ignore the RawHost
    716 		%% and instead use the Authority as the source of host.
    717 		RawHost when Authority =:= undefined; Authority =:= RawHost ->
    718 			request_parse_host(Buffer, State, Headers, RawHost);
    719 		%% RFC7230 does not explicitly ask us to reject requests
    720 		%% that have a different authority component and host header.
    721 		%% However it DOES ask clients to set them to the same value,
    722 		%% so we enforce that.
    723 		_ ->
    724 			error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}},
    725 				{stream_error, protocol_error,
    726 					'The host header is different than the absolute-form authority component. (RFC7230 5.4)'})
    727 	end.
    728 
    729 request_parse_host(Buffer, State=#state{transport=Transport, in_state=PS}, Headers, RawHost) ->
    730 	try cow_http_hd:parse_host(RawHost) of
    731 		{Host, undefined} ->
    732 			request(Buffer, State, Headers, Host, default_port(Transport:secure()));
    733 		{Host, Port} when Port > 0, Port =< 65535 ->
    734 			request(Buffer, State, Headers, Host, Port);
    735 		_ ->
    736 			error_terminate(400, State, {stream_error, protocol_error,
    737 				'The port component of the absolute-form is not in the range 0..65535. (RFC7230 2.7.1)'})
    738 	catch _:_ ->
    739 		error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}},
    740 			{stream_error, protocol_error,
    741 				'The host header is invalid. (RFC7230 5.4)'})
    742 	end.
    743 
    744 -spec default_port(boolean()) -> 80 | 443.
    745 default_port(true) -> 443;
    746 default_port(_) -> 80.
    747 
    748 %% End of request parsing.
    749 
    750 request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, sock=Sock, cert=Cert,
    751 		proxy_header=ProxyHeader, in_streamid=StreamID, in_state=
    752 			PS=#ps_header{method=Method, path=Path, qs=Qs, version=Version}},
    753 		Headers0, Host, Port) ->
    754 	Scheme = case Transport:secure() of
    755 		true -> <<"https">>;
    756 		false -> <<"http">>
    757 	end,
    758 	{Headers, HasBody, BodyLength, TDecodeFun, TDecodeState} = case Headers0 of
    759 		#{<<"transfer-encoding">> := TransferEncoding0} ->
    760 			try cow_http_hd:parse_transfer_encoding(TransferEncoding0) of
    761 				[<<"chunked">>] ->
    762 					{maps:remove(<<"content-length">>, Headers0),
    763 						true, undefined, fun cow_http_te:stream_chunked/2, {0, 0}};
    764 				_ ->
    765 					error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}},
    766 						{stream_error, protocol_error,
    767 							'Cowboy only supports transfer-encoding: chunked. (RFC7230 3.3.1)'})
    768 			catch _:_ ->
    769 				error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}},
    770 					{stream_error, protocol_error,
    771 						'The transfer-encoding header is invalid. (RFC7230 3.3.1)'})
    772 			end;
    773 		#{<<"content-length">> := <<"0">>} ->
    774 			{Headers0, false, 0, undefined, undefined};
    775 		#{<<"content-length">> := BinLength} ->
    776 			Length = try
    777 				cow_http_hd:parse_content_length(BinLength)
    778 			catch _:_ ->
    779 				error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}},
    780 					{stream_error, protocol_error,
    781 						'The content-length header is invalid. (RFC7230 3.3.2)'})
    782 			end,
    783 			{Headers0, true, Length, fun cow_http_te:stream_identity/2, {0, Length}};
    784 		_ ->
    785 			{Headers0, false, 0, undefined, undefined}
    786 	end,
    787 	Req0 = #{
    788 		ref => Ref,
    789 		pid => self(),
    790 		streamid => StreamID,
    791 		peer => Peer,
    792 		sock => Sock,
    793 		cert => Cert,
    794 		method => Method,
    795 		scheme => Scheme,
    796 		host => Host,
    797 		port => Port,
    798 		path => Path,
    799 		qs => Qs,
    800 		version => Version,
    801 		%% We are transparently taking care of transfer-encodings so
    802 		%% the user code has no need to know about it.
    803 		headers => maps:remove(<<"transfer-encoding">>, Headers),
    804 		has_body => HasBody,
    805 		body_length => BodyLength
    806 	},
    807 	%% We add the PROXY header information if any.
    808 	Req = case ProxyHeader of
    809 		undefined -> Req0;
    810 		_ -> Req0#{proxy_header => ProxyHeader}
    811 	end,
    812 	case is_http2_upgrade(Headers, Version) of
    813 		false ->
    814 			State = case HasBody of
    815 				true ->
    816 					State0#state{in_state=#ps_body{
    817 						length = BodyLength,
    818 						transfer_decode_fun = TDecodeFun,
    819 						transfer_decode_state = TDecodeState
    820 					}};
    821 				false ->
    822 					State0#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}
    823 			end,
    824 			{request, Req, State#state{buffer=Buffer}};
    825 		{true, HTTP2Settings} ->
    826 			%% We save the headers in case the upgrade will fail
    827 			%% and we need to pass them to cowboy_stream:early_error.
    828 			http2_upgrade(State0#state{in_state=PS#ps_header{headers=Headers}},
    829 				Buffer, HTTP2Settings, Req)
    830 	end.
    831 
    832 %% HTTP/2 upgrade.
    833 
    834 %% @todo We must not upgrade to h2c over a TLS connection.
    835 is_http2_upgrade(#{<<"connection">> := Conn, <<"upgrade">> := Upgrade,
    836 		<<"http2-settings">> := HTTP2Settings}, 'HTTP/1.1') ->
    837 	Conns = cow_http_hd:parse_connection(Conn),
    838 	case {lists:member(<<"upgrade">>, Conns), lists:member(<<"http2-settings">>, Conns)} of
    839 		{true, true} ->
    840 			Protocols = cow_http_hd:parse_upgrade(Upgrade),
    841 			case lists:member(<<"h2c">>, Protocols) of
    842 				true ->
    843 					{true, HTTP2Settings};
    844 				false ->
    845 					false
    846 			end;
    847 		_ ->
    848 			false
    849 	end;
    850 is_http2_upgrade(_, _) ->
    851 	false.
    852 
    853 %% Prior knowledge upgrade, without an HTTP/1.1 request.
    854 http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
    855 		proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert}, Buffer) ->
    856 	case Transport:secure() of
    857 		false ->
    858 			_ = cancel_timeout(State),
    859 			cowboy_http2:init(Parent, Ref, Socket, Transport,
    860 				ProxyHeader, Opts, Peer, Sock, Cert, Buffer);
    861 		true ->
    862 			error_terminate(400, State, {connection_error, protocol_error,
    863 				'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'})
    864 	end.
    865 
    866 %% Upgrade via an HTTP/1.1 request.
    867 http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
    868 		proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert},
    869 		Buffer, HTTP2Settings, Req) ->
    870 	%% @todo
    871 	%% However if the client sent a body, we need to read the body in full
    872 	%% and if we can't do that, return a 413 response. Some options are in order.
    873 	%% Always half-closed stream coming from this side.
    874 	try cow_http_hd:parse_http2_settings(HTTP2Settings) of
    875 		Settings ->
    876 			_ = cancel_timeout(State),
    877 			cowboy_http2:init(Parent, Ref, Socket, Transport,
    878 				ProxyHeader, Opts, Peer, Sock, Cert, Buffer, Settings, Req)
    879 	catch _:_ ->
    880 		error_terminate(400, State, {connection_error, protocol_error,
    881 			'The HTTP2-Settings header must contain a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'})
    882 	end.
    883 
    884 %% Request body parsing.
    885 
    886 parse_body(Buffer, State=#state{in_streamid=StreamID, in_state=
    887 		PS=#ps_body{received=Received, transfer_decode_fun=TDecode,
    888 			transfer_decode_state=TState0}}) ->
    889 	%% @todo Proper trailers.
    890 	try TDecode(Buffer, TState0) of
    891 		more ->
    892 			{more, State#state{buffer=Buffer}};
    893 		{more, Data, TState} ->
    894 			{data, StreamID, nofin, Data, State#state{buffer= <<>>,
    895 				in_state=PS#ps_body{received=Received + byte_size(Data),
    896 					transfer_decode_state=TState}}};
    897 		{more, Data, _Length, TState} when is_integer(_Length) ->
    898 			{data, StreamID, nofin, Data, State#state{buffer= <<>>,
    899 				in_state=PS#ps_body{received=Received + byte_size(Data),
    900 					transfer_decode_state=TState}}};
    901 		{more, Data, Rest, TState} ->
    902 			{data, StreamID, nofin, Data, State#state{buffer=Rest,
    903 				in_state=PS#ps_body{received=Received + byte_size(Data),
    904 					transfer_decode_state=TState}}};
    905 		{done, _HasTrailers, Rest} ->
    906 			{data, StreamID, fin, <<>>,
    907 				State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}}};
    908 		{done, Data, _HasTrailers, Rest} ->
    909 			{data, StreamID, fin, Data,
    910 				State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}}}
    911 	catch _:_ ->
    912 		Reason = {connection_error, protocol_error,
    913 			'Failure to decode the content. (RFC7230 4)'},
    914 		terminate(stream_terminate(State, StreamID, Reason), Reason)
    915 	end.
    916 
    917 %% Message handling.
    918 
    919 down(State=#state{opts=Opts, children=Children0}, Pid, Msg) ->
    920 	case cowboy_children:down(Children0, Pid) of
    921 		%% The stream was terminated already.
    922 		{ok, undefined, Children} ->
    923 			State#state{children=Children};
    924 		%% The stream is still running.
    925 		{ok, StreamID, Children} ->
    926 			info(State#state{children=Children}, StreamID, Msg);
    927 		%% The process was unknown.
    928 		error ->
    929 			cowboy:log(warning, "Received EXIT signal ~p for unknown process ~p.~n",
    930 				[Msg, Pid], Opts),
    931 			State
    932 	end.
    933 
    934 info(State=#state{opts=Opts, streams=Streams0}, StreamID, Msg) ->
    935 	case lists:keyfind(StreamID, #stream.id, Streams0) of
    936 		Stream = #stream{state=StreamState0} ->
    937 			try cowboy_stream:info(StreamID, Msg, StreamState0) of
    938 				{Commands, StreamState} ->
    939 					Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
    940 						Stream#stream{state=StreamState}),
    941 					commands(State#state{streams=Streams}, StreamID, Commands)
    942 			catch Class:Exception:Stacktrace ->
    943 				cowboy:log(cowboy_stream:make_error_log(info,
    944 					[StreamID, Msg, StreamState0],
    945 					Class, Exception, Stacktrace), Opts),
    946 				stream_terminate(State, StreamID, {internal_error, {Class, Exception},
    947 					'Unhandled exception in cowboy_stream:info/3.'})
    948 			end;
    949 		false ->
    950 			cowboy:log(warning, "Received message ~p for unknown stream ~p.~n",
    951 				[Msg, StreamID], Opts),
    952 			State
    953 	end.
    954 
    955 %% Commands.
    956 
    957 commands(State, _, []) ->
    958 	State;
    959 %% Supervise a child process.
    960 commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) ->
    961 	commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)},
    962 		StreamID, Tail);
    963 %% Error handling.
    964 commands(State, StreamID, [Error = {internal_error, _, _}|Tail]) ->
    965 	commands(stream_terminate(State, StreamID, Error), StreamID, Tail);
    966 %% Commands for a stream currently inactive.
    967 commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Commands)
    968 		when Current =/= StreamID ->
    969 
    970 	%% @todo We still want to handle some commands...
    971 
    972 	Stream = #stream{queue=Queue} = lists:keyfind(StreamID, #stream.id, Streams0),
    973 	Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
    974 		Stream#stream{queue=Queue ++ Commands}),
    975 	State#state{streams=Streams};
    976 %% When we have finished reading the request body, do nothing.
    977 commands(State=#state{flow=infinity}, StreamID, [{flow, _}|Tail]) ->
    978 	commands(State, StreamID, Tail);
    979 %% Read the request body.
    980 commands(State0=#state{flow=Flow0}, StreamID, [{flow, Size}|Tail]) ->
    981 	%% We must read *at least* Size of data otherwise functions
    982 	%% like cowboy_req:read_body/1,2 will wait indefinitely.
    983 	Flow = if
    984 		Flow0 < 0 -> Size;
    985 		true -> Flow0 + Size
    986 	end,
    987 	%% Reenable active mode if necessary.
    988 	State = if
    989 		Flow0 =< 0, Flow > 0 ->
    990 			active(State0);
    991 		true ->
    992 			State0
    993 	end,
    994 	commands(State#state{flow=Flow}, StreamID, Tail);
    995 %% Error responses are sent only if a response wasn't sent already.
    996 commands(State=#state{out_state=wait, out_streamid=StreamID}, StreamID,
    997 		[{error_response, Status, Headers0, Body}|Tail]) ->
    998 	%% We close the connection when the error response is 408, as it
    999 	%% indicates a timeout and the RFC recommends that we stop here. (RFC7231 6.5.7)
   1000 	Headers = case Status of
   1001 		408 -> Headers0#{<<"connection">> => <<"close">>};
   1002 		<<"408", _/bits>> -> Headers0#{<<"connection">> => <<"close">>};
   1003 		_ -> Headers0
   1004 	end,
   1005 	commands(State, StreamID, [{response, Status, Headers, Body}|Tail]);
   1006 commands(State, StreamID, [{error_response, _, _, _}|Tail]) ->
   1007 	commands(State, StreamID, Tail);
   1008 %% Send an informational response.
   1009 commands(State=#state{socket=Socket, transport=Transport, out_state=wait, streams=Streams},
   1010 		StreamID, [{inform, StatusCode, Headers}|Tail]) ->
   1011 	%% @todo I'm pretty sure the last stream in the list is the one we want
   1012 	%% considering all others are queued.
   1013 	#stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
   1014 	_ = case Version of
   1015 		'HTTP/1.1' ->
   1016 			Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1',
   1017 				headers_to_list(Headers)));
   1018 		%% Do not send informational responses to HTTP/1.0 clients. (RFC7231 6.2)
   1019 		'HTTP/1.0' ->
   1020 			ok
   1021 	end,
   1022 	commands(State, StreamID, Tail);
   1023 %% Send a full response.
   1024 %%
   1025 %% @todo Kill the stream if it sent a response when one has already been sent.
   1026 %% @todo Keep IsFin in the state.
   1027 %% @todo Same two things above apply to DATA, possibly promise too.
   1028 commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, streams=Streams}, StreamID,
   1029 		[{response, StatusCode, Headers0, Body}|Tail]) ->
   1030 	%% @todo I'm pretty sure the last stream in the list is the one we want
   1031 	%% considering all others are queued.
   1032 	#stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
   1033 	{State1, Headers} = connection(State0, Headers0, StreamID, Version),
   1034 	State = State1#state{out_state=done},
   1035 	%% @todo Ensure content-length is set. 204 must never have content-length set.
   1036 	Response = cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers)),
   1037 	%% @todo 204 and 304 responses must not include a response body. (RFC7230 3.3.1, RFC7230 3.3.2)
   1038 	case Body of
   1039 		{sendfile, _, _, _} ->
   1040 			Transport:send(Socket, Response),
   1041 			sendfile(State, Body);
   1042 		_ ->
   1043 			Transport:send(Socket, [Response, Body])
   1044 	end,
   1045 	commands(State, StreamID, Tail);
   1046 %% Send response headers and initiate chunked encoding or streaming.
   1047 commands(State0=#state{socket=Socket, transport=Transport,
   1048 		opts=Opts, overriden_opts=Override, streams=Streams0, out_state=OutState},
   1049 		StreamID, [{headers, StatusCode, Headers0}|Tail]) ->
   1050 	%% @todo Same as above (about the last stream in the list).
   1051 	Stream = #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams0),
   1052 	Status = cow_http:status_to_integer(StatusCode),
   1053 	ContentLength = maps:get(<<"content-length">>, Headers0, undefined),
   1054 	%% Chunked transfer-encoding can be disabled on a per-request basis.
   1055 	Chunked = case Override of
   1056 		#{chunked := Chunked0} -> Chunked0;
   1057 		_ -> maps:get(chunked, Opts, true)
   1058 	end,
   1059 	{State1, Headers1} = case {Status, ContentLength, Version} of
   1060 		{204, _, 'HTTP/1.1'} ->
   1061 			{State0#state{out_state=done}, Headers0};
   1062 		{304, _, 'HTTP/1.1'} ->
   1063 			{State0#state{out_state=done}, Headers0};
   1064 		{_, undefined, 'HTTP/1.1'} when Chunked ->
   1065 			{State0#state{out_state=chunked}, Headers0#{<<"transfer-encoding">> => <<"chunked">>}};
   1066 		%% Close the connection after streaming without content-length
   1067 		%% to all HTTP/1.0 clients and to HTTP/1.1 clients when chunked is disabled.
   1068 		{_, undefined, _} ->
   1069 			{State0#state{out_state=streaming, last_streamid=StreamID}, Headers0};
   1070 		%% Stream the response body without chunked transfer-encoding.
   1071 		_ ->
   1072 			ExpectedSize = cow_http_hd:parse_content_length(ContentLength),
   1073 			Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
   1074 				Stream#stream{local_expected_size=ExpectedSize}),
   1075 			{State0#state{out_state=streaming, streams=Streams}, Headers0}
   1076 	end,
   1077 	Headers2 = case stream_te(OutState, Stream) of
   1078 		trailers -> Headers1;
   1079 		_ -> maps:remove(<<"trailer">>, Headers1)
   1080 	end,
   1081 	{State, Headers} = connection(State1, Headers2, StreamID, Version),
   1082 	Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers))),
   1083 	commands(State, StreamID, Tail);
   1084 %% Send a response body chunk.
   1085 %% @todo We need to kill the stream if it tries to send data before headers.
   1086 commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out_state=OutState},
   1087 		StreamID, [{data, IsFin, Data}|Tail]) ->
   1088 	%% Do not send anything when the user asks to send an empty
   1089 	%% data frame, as that would break the protocol.
   1090 	Size = case Data of
   1091 		{sendfile, _, B, _} -> B;
   1092 		_ -> iolist_size(Data)
   1093 	end,
   1094 	%% Depending on the current state we may need to send nothing,
   1095 	%% the last chunk, chunked data with/without the last chunk,
   1096 	%% or just the data as-is.
   1097 	Stream = case lists:keyfind(StreamID, #stream.id, Streams0) of
   1098 		Stream0=#stream{method= <<"HEAD">>} ->
   1099 			Stream0;
   1100 		Stream0 when Size =:= 0, IsFin =:= fin, OutState =:= chunked ->
   1101 			Transport:send(Socket, <<"0\r\n\r\n">>),
   1102 			Stream0;
   1103 		Stream0 when Size =:= 0 ->
   1104 			Stream0;
   1105 		Stream0 when is_tuple(Data), OutState =:= chunked ->
   1106 			Transport:send(Socket, [integer_to_binary(Size, 16), <<"\r\n">>]),
   1107 			sendfile(State0, Data),
   1108 			Transport:send(Socket,
   1109 				case IsFin of
   1110 					fin -> <<"\r\n0\r\n\r\n">>;
   1111 					nofin -> <<"\r\n">>
   1112 				end),
   1113 			Stream0;
   1114 		Stream0 when OutState =:= chunked ->
   1115 			Transport:send(Socket, [
   1116 				integer_to_binary(Size, 16), <<"\r\n">>, Data,
   1117 				case IsFin of
   1118 					fin -> <<"\r\n0\r\n\r\n">>;
   1119 					nofin -> <<"\r\n">>
   1120 				end
   1121 			]),
   1122 			Stream0;
   1123 		Stream0 when OutState =:= streaming ->
   1124 			#stream{local_sent_size=SentSize0, local_expected_size=ExpectedSize} = Stream0,
   1125 			SentSize = SentSize0 + Size,
   1126 			if
   1127 				%% ExpectedSize may be undefined, which is > any integer value.
   1128 				SentSize > ExpectedSize ->
   1129 					terminate(State0, response_body_too_large);
   1130 				is_tuple(Data) ->
   1131 					sendfile(State0, Data);
   1132 				true ->
   1133 					Transport:send(Socket, Data)
   1134 			end,
   1135 			Stream0#stream{local_sent_size=SentSize}
   1136 	end,
   1137 	State = case IsFin of
   1138 		fin -> State0#state{out_state=done};
   1139 		nofin -> State0
   1140 	end,
   1141 	Streams = lists:keyreplace(StreamID, #stream.id, Streams0, Stream),
   1142 	commands(State#state{streams=Streams}, StreamID, Tail);
   1143 commands(State=#state{socket=Socket, transport=Transport, streams=Streams, out_state=OutState},
   1144 		StreamID, [{trailers, Trailers}|Tail]) ->
   1145 	case stream_te(OutState, lists:keyfind(StreamID, #stream.id, Streams)) of
   1146 		trailers ->
   1147 			Transport:send(Socket, [
   1148 				<<"0\r\n">>,
   1149 				cow_http:headers(maps:to_list(Trailers)),
   1150 				<<"\r\n">>
   1151 			]);
   1152 		no_trailers ->
   1153 			Transport:send(Socket, <<"0\r\n\r\n">>);
   1154 		not_chunked ->
   1155 			ok
   1156 	end,
   1157 	commands(State#state{out_state=done}, StreamID, Tail);
   1158 %% Protocol takeover.
   1159 commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
   1160 		out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID,
   1161 		[{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
   1162 	%% @todo If there's streams opened after this one, fail instead of 101.
   1163 	State1 = cancel_timeout(State0),
   1164 	%% Before we send the 101 response we need to stop receiving data
   1165 	%% from the socket, otherwise the data might be receive before the
   1166 	%% call to flush/0 and we end up inadvertently dropping a packet.
   1167 	%%
   1168 	%% @todo Handle cases where the request came with a body. We need
   1169 	%% to process or skip the body before the upgrade can be completed.
   1170 	State = passive(State1),
   1171 	%% Send a 101 response if necessary, then terminate the stream.
   1172 	#state{streams=Streams} = case OutState of
   1173 		wait -> info(State, StreamID, {inform, 101, Headers});
   1174 		_ -> State
   1175 	end,
   1176 	#stream{state=StreamState} = lists:keyfind(StreamID, #stream.id, Streams),
   1177 	%% @todo We need to shutdown processes here first.
   1178 	stream_call_terminate(StreamID, switch_protocol, StreamState, State),
   1179 	%% Terminate children processes and flush any remaining messages from the mailbox.
   1180 	cowboy_children:terminate(Children),
   1181 	flush(Parent),
   1182 	Protocol:takeover(Parent, Ref, Socket, Transport, Opts, Buffer, InitialState);
   1183 %% Set options dynamically.
   1184 commands(State0=#state{overriden_opts=Opts},
   1185 		StreamID, [{set_options, SetOpts}|Tail]) ->
   1186 	State1 = case SetOpts of
   1187 		#{idle_timeout := IdleTimeout} ->
   1188 			set_timeout(State0#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}},
   1189 				idle_timeout);
   1190 		_ ->
   1191 			State0
   1192 	end,
   1193 	State = case SetOpts of
   1194 		#{chunked := Chunked} ->
   1195 			State1#state{overriden_opts=Opts#{chunked => Chunked}};
   1196 		_ ->
   1197 			State1
   1198 	end,
   1199 	commands(State, StreamID, Tail);
   1200 %% Stream shutdown.
   1201 commands(State, StreamID, [stop|Tail]) ->
   1202 	%% @todo Do we want to run the commands after a stop?
   1203 	%% @todo We currently wait for the stop command before we
   1204 	%% continue with the next request/response. In theory, if
   1205 	%% the request body was read fully and the response body
   1206 	%% was sent fully we should be able to start working on
   1207 	%% the next request concurrently. This can be done as a
   1208 	%% future optimization.
   1209 	maybe_terminate(State, StreamID, Tail);
   1210 %% Log event.
   1211 commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) ->
   1212 	cowboy:log(Log, Opts),
   1213 	commands(State, StreamID, Tail);
   1214 %% HTTP/1.1 does not support push; ignore.
   1215 commands(State, StreamID, [{push, _, _, _, _, _, _, _}|Tail]) ->
   1216 	commands(State, StreamID, Tail).
   1217 
   1218 %% The set-cookie header is special; we can only send one cookie per header.
   1219 headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) ->
   1220 	Headers1 = maps:to_list(maps:remove(<<"set-cookie">>, Headers0)),
   1221 	Headers1 ++ [{<<"set-cookie">>, Value} || Value <- SetCookies];
   1222 headers_to_list(Headers) ->
   1223 	maps:to_list(Headers).
   1224 
   1225 %% We wrap the sendfile call into a try/catch because on OTP-20
   1226 %% and earlier a few different crashes could occur for sockets
   1227 %% that were closing or closed. For example a badarg in
   1228 %% erlang:port_get_data(#Port<...>) or a badmatch like
   1229 %% {{badmatch,{error,einval}},[{prim_file,sendfile,8,[]}...
   1230 %%
   1231 %% OTP-21 uses a NIF instead of a port so the implementation
   1232 %% and behavior has dramatically changed and it is unclear
   1233 %% whether it will be necessary in the future.
   1234 %%
   1235 %% This try/catch prevents some noisy logs to be written
   1236 %% when these errors occur.
   1237 sendfile(State=#state{socket=Socket, transport=Transport, opts=Opts},
   1238 		{sendfile, Offset, Bytes, Path}) ->
   1239 	try
   1240 		%% When sendfile is disabled we explicitly use the fallback.
   1241 		_ = case maps:get(sendfile, Opts, true) of
   1242 			true -> Transport:sendfile(Socket, Path, Offset, Bytes);
   1243 			false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, [])
   1244 		end,
   1245 		ok
   1246 	catch _:_ ->
   1247 		terminate(State, {socket_error, sendfile_crash,
   1248 			'An error occurred when using the sendfile function.'})
   1249 	end.
   1250 
   1251 %% Flush messages specific to cowboy_http before handing over the
   1252 %% connection to another protocol.
   1253 flush(Parent) ->
   1254 	receive
   1255 		{timeout, _, _} ->
   1256 			flush(Parent);
   1257 		{{Pid, _}, _} when Pid =:= self() ->
   1258 			flush(Parent);
   1259 		{'EXIT', Pid, _} when Pid =/= Parent ->
   1260 			flush(Parent)
   1261 	after 0 ->
   1262 		ok
   1263 	end.
   1264 
   1265 %% @todo In these cases I'm not sure if we should continue processing commands.
   1266 maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail) ->
   1267 	terminate(stream_terminate(State, StreamID, normal), normal); %% @todo Reason ok?
   1268 maybe_terminate(State, StreamID, _Tail) ->
   1269 	stream_terminate(State, StreamID, normal).
   1270 
   1271 stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InState,
   1272 		out_streamid=OutStreamID, out_state=OutState, streams=Streams0,
   1273 		children=Children0}, StreamID, Reason) ->
   1274 	#stream{version=Version, local_expected_size=ExpectedSize, local_sent_size=SentSize}
   1275 		= lists:keyfind(StreamID, #stream.id, Streams0),
   1276 	%% Send a response or terminate chunks depending on the current output state.
   1277 	State1 = #state{streams=Streams1} = case OutState of
   1278 		wait when element(1, Reason) =:= internal_error ->
   1279 			info(State0, StreamID, {response, 500, #{<<"content-length">> => <<"0">>}, <<>>});
   1280 		wait when element(1, Reason) =:= connection_error ->
   1281 			info(State0, StreamID, {response, 400, #{<<"content-length">> => <<"0">>}, <<>>});
   1282 		wait ->
   1283 			info(State0, StreamID, {response, 204, #{}, <<>>});
   1284 		chunked when Version =:= 'HTTP/1.1' ->
   1285 			info(State0, StreamID, {data, fin, <<>>});
   1286 		streaming when SentSize < ExpectedSize ->
   1287 			terminate(State0, response_body_too_small);
   1288 		_ -> %% done or Version =:= 'HTTP/1.0'
   1289 			State0
   1290 	end,
   1291 	%% Stop the stream, shutdown children and reset overriden options.
   1292 	{value, #stream{state=StreamState}, Streams}
   1293 		= lists:keytake(StreamID, #stream.id, Streams1),
   1294 	stream_call_terminate(StreamID, Reason, StreamState, State1),
   1295 	Children = cowboy_children:shutdown(Children0, StreamID),
   1296 	State = State1#state{overriden_opts=#{}, streams=Streams, children=Children},
   1297 	%% We want to drop the connection if the body was not read fully
   1298 	%% and we don't know its length or more remains to be read than
   1299 	%% configuration allows.
   1300 	MaxSkipBodyLength = maps:get(max_skip_body_length, Opts, 1000000),
   1301 	case InState of
   1302 		#ps_body{length=undefined}
   1303 				when InStreamID =:= OutStreamID ->
   1304 			terminate(State, skip_body_unknown_length);
   1305 		#ps_body{length=Len, received=Received}
   1306 				when InStreamID =:= OutStreamID, Received + MaxSkipBodyLength < Len ->
   1307 			terminate(State, skip_body_too_large);
   1308 		#ps_body{} when InStreamID =:= OutStreamID ->
   1309 			stream_next(State#state{flow=infinity});
   1310 		_ ->
   1311 			stream_next(State)
   1312 	end.
   1313 
   1314 stream_next(State0=#state{opts=Opts, active=Active, out_streamid=OutStreamID, streams=Streams}) ->
   1315 	NextOutStreamID = OutStreamID + 1,
   1316 	case lists:keyfind(NextOutStreamID, #stream.id, Streams) of
   1317 		false ->
   1318 			State0#state{out_streamid=NextOutStreamID, out_state=wait};
   1319 		#stream{queue=Commands} ->
   1320 			State = case Active of
   1321 				true -> State0;
   1322 				false -> active(State0)
   1323 			end,
   1324 			%% @todo Remove queue from the stream.
   1325 			%% We set the flow to the initial flow size even though
   1326 			%% we might have sent some data through already due to pipelining.
   1327 			Flow = maps:get(initial_stream_flow_size, Opts, 65535),
   1328 			commands(State#state{flow=Flow, out_streamid=NextOutStreamID, out_state=wait},
   1329 				NextOutStreamID, Commands)
   1330 	end.
   1331 
   1332 stream_call_terminate(StreamID, Reason, StreamState, #state{opts=Opts}) ->
   1333 	try
   1334 		cowboy_stream:terminate(StreamID, Reason, StreamState)
   1335 	catch Class:Exception:Stacktrace ->
   1336 		cowboy:log(cowboy_stream:make_error_log(terminate,
   1337 			[StreamID, Reason, StreamState],
   1338 			Class, Exception, Stacktrace), Opts)
   1339 	end.
   1340 
   1341 maybe_req_close(#state{opts=#{http10_keepalive := false}}, _, 'HTTP/1.0') ->
   1342 	close;
   1343 maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.0') ->
   1344 	Conns = cow_http_hd:parse_connection(Conn),
   1345 	case lists:member(<<"keep-alive">>, Conns) of
   1346 		true -> keepalive;
   1347 		false -> close
   1348 	end;
   1349 maybe_req_close(_, _, 'HTTP/1.0') ->
   1350 	close;
   1351 maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.1') ->
   1352 	case connection_hd_is_close(Conn) of
   1353 		true -> close;
   1354 		false -> keepalive
   1355 	end;
   1356 maybe_req_close(_, _, _) ->
   1357 	keepalive.
   1358 
   1359 connection(State=#state{last_streamid=StreamID}, Headers=#{<<"connection">> := Conn}, StreamID, _) ->
   1360 	case connection_hd_is_close(Conn) of
   1361 		true -> {State, Headers};
   1362 		%% @todo Here we need to remove keep-alive and add close, not just add close.
   1363 		false -> {State, Headers#{<<"connection">> => [<<"close, ">>, Conn]}}
   1364 	end;
   1365 connection(State=#state{last_streamid=StreamID}, Headers, StreamID, _) ->
   1366 	{State, Headers#{<<"connection">> => <<"close">>}};
   1367 connection(State, Headers=#{<<"connection">> := Conn}, StreamID, _) ->
   1368 	case connection_hd_is_close(Conn) of
   1369 		true -> {State#state{last_streamid=StreamID}, Headers};
   1370 		%% @todo Here we need to set keep-alive only if it wasn't set before.
   1371 		false -> {State, Headers}
   1372 	end;
   1373 connection(State, Headers, _, 'HTTP/1.0') ->
   1374 	{State, Headers#{<<"connection">> => <<"keep-alive">>}};
   1375 connection(State, Headers, _, _) ->
   1376 	{State, Headers}.
   1377 
   1378 connection_hd_is_close(Conn) ->
   1379 	Conns = cow_http_hd:parse_connection(iolist_to_binary(Conn)),
   1380 	lists:member(<<"close">>, Conns).
   1381 
   1382 stream_te(streaming, _) ->
   1383 	not_chunked;
   1384 %% No TE header was sent.
   1385 stream_te(_, #stream{te=undefined}) ->
   1386 	no_trailers;
   1387 stream_te(_, #stream{te=TE0}) ->
   1388 	try cow_http_hd:parse_te(TE0) of
   1389 		{TE1, _} -> TE1
   1390 	catch _:_ ->
   1391 		%% If we can't parse the TE header, assume we can't send trailers.
   1392 		no_trailers
   1393 	end.
   1394 
   1395 %% This function is only called when an error occurs on a new stream.
   1396 -spec error_terminate(cowboy:http_status(), #state{}, _) -> no_return().
   1397 error_terminate(StatusCode, State=#state{ref=Ref, peer=Peer, in_state=StreamState}, Reason) ->
   1398 	PartialReq = case StreamState of
   1399 		#ps_request_line{} -> #{
   1400 			ref => Ref,
   1401 			peer => Peer
   1402 		};
   1403 		#ps_header{method=Method, path=Path, qs=Qs,
   1404 				version=Version, headers=ReqHeaders} -> #{
   1405 			ref => Ref,
   1406 			peer => Peer,
   1407 			method => Method,
   1408 			path => Path,
   1409 			qs => Qs,
   1410 			version => Version,
   1411 			headers => case ReqHeaders of
   1412 				undefined -> #{};
   1413 				_ -> ReqHeaders
   1414 			end
   1415 		}
   1416 	end,
   1417 	early_error(StatusCode, State, Reason, PartialReq, #{<<"connection">> => <<"close">>}),
   1418 	terminate(State, Reason).
   1419 
   1420 early_error(StatusCode, State, Reason, PartialReq) ->
   1421 	early_error(StatusCode, State, Reason, PartialReq, #{}).
   1422 
   1423 early_error(StatusCode0, #state{socket=Socket, transport=Transport,
   1424 		opts=Opts, in_streamid=StreamID}, Reason, PartialReq, RespHeaders0) ->
   1425 	RespHeaders1 = RespHeaders0#{<<"content-length">> => <<"0">>},
   1426 	Resp = {response, StatusCode0, RespHeaders1, <<>>},
   1427 	try cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts) of
   1428 		{response, StatusCode, RespHeaders, RespBody} ->
   1429 			Transport:send(Socket, [
   1430 				cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(RespHeaders)),
   1431 				%% @todo We shouldn't send the body when the method is HEAD.
   1432 				%% @todo Technically we allow the sendfile tuple.
   1433 				RespBody
   1434 			])
   1435 	catch Class:Exception:Stacktrace ->
   1436 		cowboy:log(cowboy_stream:make_error_log(early_error,
   1437 			[StreamID, Reason, PartialReq, Resp, Opts],
   1438 			Class, Exception, Stacktrace), Opts),
   1439 		%% We still need to send an error response, so send what we initially
   1440 		%% wanted to send. It's better than nothing.
   1441 		Transport:send(Socket, cow_http:response(StatusCode0,
   1442 			'HTTP/1.1', maps:to_list(RespHeaders1)))
   1443 	end,
   1444 	ok.
   1445 
   1446 initiate_closing(State=#state{streams=[]}, Reason) ->
   1447 	terminate(State, Reason);
   1448 initiate_closing(State=#state{streams=[_Stream|Streams],
   1449 		out_streamid=OutStreamID}, Reason) ->
   1450 	terminate_all_streams(State, Streams, Reason),
   1451 	State#state{last_streamid=OutStreamID}.
   1452 
   1453 -spec terminate(_, _) -> no_return().
   1454 terminate(undefined, Reason) ->
   1455 	exit({shutdown, Reason});
   1456 terminate(State=#state{streams=Streams, children=Children}, Reason) ->
   1457 	terminate_all_streams(State, Streams, Reason),
   1458 	cowboy_children:terminate(Children),
   1459 	terminate_linger(State),
   1460 	exit({shutdown, Reason}).
   1461 
   1462 terminate_all_streams(_, [], _) ->
   1463 	ok;
   1464 terminate_all_streams(State, [#stream{id=StreamID, state=StreamState}|Tail], Reason) ->
   1465 	stream_call_terminate(StreamID, Reason, StreamState, State),
   1466 	terminate_all_streams(State, Tail, Reason).
   1467 
   1468 terminate_linger(State=#state{socket=Socket, transport=Transport, opts=Opts}) ->
   1469 	case Transport:shutdown(Socket, write) of
   1470 		ok ->
   1471 			case maps:get(linger_timeout, Opts, 1000) of
   1472 				0 ->
   1473 					ok;
   1474 				infinity ->
   1475 					terminate_linger_before_loop(State, undefined, Transport:messages());
   1476 				Timeout ->
   1477 					TimerRef = erlang:start_timer(Timeout, self(), linger_timeout),
   1478 					terminate_linger_before_loop(State, TimerRef, Transport:messages())
   1479 			end;
   1480 		{error, _} ->
   1481 			ok
   1482 	end.
   1483 
   1484 terminate_linger_before_loop(State, TimerRef, Messages) ->
   1485 	%% We may already be in active mode when we do this
   1486 	%% but it's OK because we are shutting down anyway.
   1487 	case setopts_active(State) of
   1488 		ok ->
   1489 			terminate_linger_loop(State, TimerRef, Messages);
   1490 		{error, _} ->
   1491 			ok
   1492 	end.
   1493 
   1494 terminate_linger_loop(State=#state{socket=Socket}, TimerRef, Messages) ->
   1495 	receive
   1496 		{OK, Socket, _} when OK =:= element(1, Messages) ->
   1497 			terminate_linger_loop(State, TimerRef, Messages);
   1498 		{Closed, Socket} when Closed =:= element(2, Messages) ->
   1499 			ok;
   1500 		{Error, Socket, _} when Error =:= element(3, Messages) ->
   1501 			ok;
   1502 		{Passive, Socket} when Passive =:= tcp_passive; Passive =:= ssl_passive ->
   1503 			terminate_linger_before_loop(State, TimerRef, Messages);
   1504 		{timeout, TimerRef, linger_timeout} ->
   1505 			ok;
   1506 		_ ->
   1507 			terminate_linger_loop(State, TimerRef, Messages)
   1508 	end.
   1509 
   1510 %% System callbacks.
   1511 
   1512 -spec system_continue(_, _, #state{}) -> ok.
   1513 system_continue(_, _, State) ->
   1514 	loop(State).
   1515 
   1516 -spec system_terminate(any(), _, _, #state{}) -> no_return().
   1517 system_terminate(Reason0, _, _, State) ->
   1518 	Reason = {stop, {exit, Reason0}, 'sys:terminate/2,3 was called.'},
   1519 	loop(initiate_closing(State, Reason)).
   1520 
   1521 -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
   1522 system_code_change(Misc, _, _, _) ->
   1523 	{ok, Misc}.