cowboy_http.erl (60202B)
1 %% Copyright (c) 2016-2017, Loïc Hoguin <essen@ninenines.eu> 2 %% 3 %% Permission to use, copy, modify, and/or distribute this software for any 4 %% purpose with or without fee is hereby granted, provided that the above 5 %% copyright notice and this permission notice appear in all copies. 6 %% 7 %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 8 %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 9 %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 10 %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 11 %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 12 %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 13 %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 14 15 -module(cowboy_http). 16 17 -export([init/6]). 18 19 -export([system_continue/3]). 20 -export([system_terminate/4]). 21 -export([system_code_change/4]). 22 23 -type opts() :: #{ 24 active_n => pos_integer(), 25 chunked => boolean(), 26 compress_buffering => boolean(), 27 compress_threshold => non_neg_integer(), 28 connection_type => worker | supervisor, 29 env => cowboy_middleware:env(), 30 http10_keepalive => boolean(), 31 idle_timeout => timeout(), 32 inactivity_timeout => timeout(), 33 initial_stream_flow_size => non_neg_integer(), 34 linger_timeout => timeout(), 35 logger => module(), 36 max_authority_length => non_neg_integer(), 37 max_empty_lines => non_neg_integer(), 38 max_header_name_length => non_neg_integer(), 39 max_header_value_length => non_neg_integer(), 40 max_headers => non_neg_integer(), 41 max_keepalive => non_neg_integer(), 42 max_method_length => non_neg_integer(), 43 max_request_line_length => non_neg_integer(), 44 metrics_callback => cowboy_metrics_h:metrics_callback(), 45 metrics_req_filter => fun((cowboy_req:req()) -> map()), 46 metrics_resp_headers_filter => fun((cowboy:http_headers()) -> cowboy:http_headers()), 47 middlewares => [module()], 48 proxy_header => boolean(), 49 request_timeout => timeout(), 50 sendfile => boolean(), 51 shutdown_timeout => timeout(), 52 stream_handlers => [module()], 53 tracer_callback => cowboy_tracer_h:tracer_callback(), 54 tracer_flags => [atom()], 55 tracer_match_specs => cowboy_tracer_h:tracer_match_specs(), 56 %% Open ended because configured stream handlers might add options. 57 _ => _ 58 }. 59 -export_type([opts/0]). 60 61 -record(ps_request_line, { 62 empty_lines = 0 :: non_neg_integer() 63 }). 64 65 -record(ps_header, { 66 method = undefined :: binary(), 67 authority = undefined :: binary() | undefined, 68 path = undefined :: binary(), 69 qs = undefined :: binary(), 70 version = undefined :: cowboy:http_version(), 71 headers = undefined :: cowboy:http_headers() | undefined, 72 name = undefined :: binary() | undefined 73 }). 74 75 -record(ps_body, { 76 length :: non_neg_integer() | undefined, 77 received = 0 :: non_neg_integer(), 78 transfer_decode_fun :: fun((binary(), cow_http_te:state()) -> cow_http_te:decode_ret()), 79 transfer_decode_state :: cow_http_te:state() 80 }). 81 82 -record(stream, { 83 id = undefined :: cowboy_stream:streamid(), 84 %% Stream handlers and their state. 85 state = undefined :: {module(), any()}, 86 %% Request method. 87 method = undefined :: binary(), 88 %% Client HTTP version for this stream. 89 version = undefined :: cowboy:http_version(), 90 %% Unparsed te header. Used to know if we can send trailers. 91 te :: undefined | binary(), 92 %% Expected body size. 93 local_expected_size = undefined :: undefined | non_neg_integer(), 94 %% Sent body size. 95 local_sent_size = 0 :: non_neg_integer(), 96 %% Commands queued. 97 queue = [] :: cowboy_stream:commands() 98 }). 99 100 -type stream() :: #stream{}. 101 102 -record(state, { 103 parent :: pid(), 104 ref :: ranch:ref(), 105 socket :: inet:socket(), 106 transport :: module(), 107 proxy_header :: undefined | ranch_proxy_header:proxy_info(), 108 opts = #{} :: cowboy:opts(), 109 buffer = <<>> :: binary(), 110 111 %% Some options may be overriden for the current stream. 112 overriden_opts = #{} :: cowboy:opts(), 113 114 %% Remote address and port for the connection. 115 peer = undefined :: {inet:ip_address(), inet:port_number()}, 116 117 %% Local address and port for the connection. 118 sock = undefined :: {inet:ip_address(), inet:port_number()}, 119 120 %% Client certificate (TLS only). 121 cert :: undefined | binary(), 122 123 timer = undefined :: undefined | reference(), 124 125 %% Whether we are currently receiving data from the socket. 126 active = true :: boolean(), 127 128 %% Identifier for the stream currently being read (or waiting to be received). 129 in_streamid = 1 :: pos_integer(), 130 131 %% Parsing state for the current stream or stream-to-be. 132 in_state = #ps_request_line{} :: #ps_request_line{} | #ps_header{} | #ps_body{}, 133 134 %% Flow requested for the current stream. 135 flow = infinity :: non_neg_integer() | infinity, 136 137 %% Identifier for the stream currently being written. 138 %% Note that out_streamid =< in_streamid. 139 out_streamid = 1 :: pos_integer(), 140 141 %% Whether we finished writing data for the current stream. 142 out_state = wait :: wait | chunked | streaming | done, 143 144 %% The connection will be closed after this stream. 145 last_streamid = undefined :: pos_integer(), 146 147 %% Currently active HTTP/1.1 streams. 148 streams = [] :: [stream()], 149 150 %% Children processes created by streams. 151 children = cowboy_children:init() :: cowboy_children:children() 152 }). 153 154 -include_lib("cowlib/include/cow_inline.hrl"). 155 -include_lib("cowlib/include/cow_parse.hrl"). 156 157 -spec init(pid(), ranch:ref(), inet:socket(), module(), 158 ranch_proxy_header:proxy_info(), cowboy:opts()) -> ok. 159 init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> 160 Peer0 = Transport:peername(Socket), 161 Sock0 = Transport:sockname(Socket), 162 Cert1 = case Transport:name() of 163 ssl -> 164 case ssl:peercert(Socket) of 165 {error, no_peercert} -> 166 {ok, undefined}; 167 Cert0 -> 168 Cert0 169 end; 170 _ -> 171 {ok, undefined} 172 end, 173 case {Peer0, Sock0, Cert1} of 174 {{ok, Peer}, {ok, Sock}, {ok, Cert}} -> 175 State = #state{ 176 parent=Parent, ref=Ref, socket=Socket, 177 transport=Transport, proxy_header=ProxyHeader, opts=Opts, 178 peer=Peer, sock=Sock, cert=Cert, 179 last_streamid=maps:get(max_keepalive, Opts, 1000)}, 180 setopts_active(State), 181 loop(set_timeout(State, request_timeout)); 182 {{error, Reason}, _, _} -> 183 terminate(undefined, {socket_error, Reason, 184 'A socket error occurred when retrieving the peer name.'}); 185 {_, {error, Reason}, _} -> 186 terminate(undefined, {socket_error, Reason, 187 'A socket error occurred when retrieving the sock name.'}); 188 {_, _, {error, Reason}} -> 189 terminate(undefined, {socket_error, Reason, 190 'A socket error occurred when retrieving the client TLS certificate.'}) 191 end. 192 193 setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> 194 N = maps:get(active_n, Opts, 100), 195 Transport:setopts(Socket, [{active, N}]). 196 197 active(State) -> 198 setopts_active(State), 199 State#state{active=true}. 200 201 passive(State=#state{socket=Socket, transport=Transport}) -> 202 Transport:setopts(Socket, [{active, false}]), 203 Messages = Transport:messages(), 204 flush_passive(Socket, Messages), 205 State#state{active=false}. 206 207 flush_passive(Socket, Messages) -> 208 receive 209 {Passive, Socket} when Passive =:= element(4, Messages); 210 %% Hardcoded for compatibility with Ranch 1.x. 211 Passive =:= tcp_passive; Passive =:= ssl_passive -> 212 flush_passive(Socket, Messages) 213 after 0 -> 214 ok 215 end. 216 217 loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, 218 buffer=Buffer, timer=TimerRef, children=Children, in_streamid=InStreamID, 219 last_streamid=LastStreamID}) -> 220 Messages = Transport:messages(), 221 InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000), 222 receive 223 %% Discard data coming in after the last request 224 %% we want to process was received fully. 225 {OK, Socket, _} when OK =:= element(1, Messages), InStreamID > LastStreamID -> 226 loop(State); 227 %% Socket messages. 228 {OK, Socket, Data} when OK =:= element(1, Messages) -> 229 parse(<< Buffer/binary, Data/binary >>, State); 230 {Closed, Socket} when Closed =:= element(2, Messages) -> 231 terminate(State, {socket_error, closed, 'The socket has been closed.'}); 232 {Error, Socket, Reason} when Error =:= element(3, Messages) -> 233 terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'}); 234 {Passive, Socket} when Passive =:= element(4, Messages); 235 %% Hardcoded for compatibility with Ranch 1.x. 236 Passive =:= tcp_passive; Passive =:= ssl_passive -> 237 setopts_active(State), 238 loop(State); 239 %% Timeouts. 240 {timeout, Ref, {shutdown, Pid}} -> 241 cowboy_children:shutdown_timeout(Children, Ref, Pid), 242 loop(State); 243 {timeout, TimerRef, Reason} -> 244 timeout(State, Reason); 245 {timeout, _, _} -> 246 loop(State); 247 %% System messages. 248 {'EXIT', Parent, shutdown} -> 249 Reason = {stop, {exit, shutdown}, 'Parent process requested shutdown.'}, 250 loop(initiate_closing(State, Reason)); 251 {'EXIT', Parent, Reason} -> 252 terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'}); 253 {system, From, Request} -> 254 sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State); 255 %% Messages pertaining to a stream. 256 {{Pid, StreamID}, Msg} when Pid =:= self() -> 257 loop(info(State, StreamID, Msg)); 258 %% Exit signal from children. 259 Msg = {'EXIT', Pid, _} -> 260 loop(down(State, Pid, Msg)); 261 %% Calls from supervisor module. 262 {'$gen_call', From, Call} -> 263 cowboy_children:handle_supervisor_call(Call, From, Children, ?MODULE), 264 loop(State); 265 %% Unknown messages. 266 Msg -> 267 cowboy:log(warning, "Received stray message ~p.~n", [Msg], Opts), 268 loop(State) 269 after InactivityTimeout -> 270 terminate(State, {internal_error, timeout, 'No message or data received before timeout.'}) 271 end. 272 273 %% We do not set request_timeout if there are active streams. 274 set_timeout(State=#state{streams=[_|_]}, request_timeout) -> 275 State; 276 %% We do not set request_timeout if we are skipping a body. 277 set_timeout(State=#state{in_state=#ps_body{}}, request_timeout) -> 278 State; 279 %% We do not set idle_timeout if there are no active streams, 280 %% unless when we are skipping a body. 281 set_timeout(State=#state{streams=[], in_state=InState}, idle_timeout) 282 when element(1, InState) =/= ps_body -> 283 State; 284 %% Otherwise we can set the timeout. 285 set_timeout(State0=#state{opts=Opts, overriden_opts=Override}, Name) -> 286 State = cancel_timeout(State0), 287 Default = case Name of 288 request_timeout -> 5000; 289 idle_timeout -> 60000 290 end, 291 Timeout = case Override of 292 %% The timeout may have been overriden for the current stream. 293 #{Name := Timeout0} -> Timeout0; 294 _ -> maps:get(Name, Opts, Default) 295 end, 296 TimerRef = case Timeout of 297 infinity -> undefined; 298 Timeout -> erlang:start_timer(Timeout, self(), Name) 299 end, 300 State#state{timer=TimerRef}. 301 302 cancel_timeout(State=#state{timer=TimerRef}) -> 303 ok = case TimerRef of 304 undefined -> 305 ok; 306 _ -> 307 %% Do a synchronous cancel and remove the message if any 308 %% to avoid receiving stray messages. 309 _ = erlang:cancel_timer(TimerRef), 310 receive 311 {timeout, TimerRef, _} -> ok 312 after 0 -> 313 ok 314 end 315 end, 316 State#state{timer=undefined}. 317 318 -spec timeout(_, _) -> no_return(). 319 timeout(State=#state{in_state=#ps_request_line{}}, request_timeout) -> 320 terminate(State, {connection_error, timeout, 321 'No request-line received before timeout.'}); 322 timeout(State=#state{in_state=#ps_header{}}, request_timeout) -> 323 error_terminate(408, State, {connection_error, timeout, 324 'Request headers not received before timeout.'}); 325 timeout(State, idle_timeout) -> 326 terminate(State, {connection_error, timeout, 327 'Connection idle longer than configuration allows.'}). 328 329 parse(<<>>, State) -> 330 loop(State#state{buffer= <<>>}); 331 %% Do not process requests that come in after the last request 332 %% and discard the buffer if any to save memory. 333 parse(_, State=#state{in_streamid=InStreamID, in_state=#ps_request_line{}, 334 last_streamid=LastStreamID}) when InStreamID > LastStreamID -> 335 loop(State#state{buffer= <<>>}); 336 parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) -> 337 after_parse(parse_request(Buffer, State, EmptyLines)); 338 parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) -> 339 after_parse(parse_header(Buffer, 340 State#state{in_state=PS#ps_header{headers=undefined}}, 341 Headers)); 342 parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=Name}}) -> 343 after_parse(parse_hd_before_value(Buffer, 344 State#state{in_state=PS#ps_header{headers=undefined, name=undefined}}, 345 Headers, Name)); 346 parse(Buffer, State=#state{in_state=#ps_body{}}) -> 347 after_parse(parse_body(Buffer, State)). 348 349 after_parse({request, Req=#{streamid := StreamID, method := Method, 350 headers := Headers, version := Version}, 351 State0=#state{opts=Opts, buffer=Buffer, streams=Streams0}}) -> 352 try cowboy_stream:init(StreamID, Req, Opts) of 353 {Commands, StreamState} -> 354 Flow = maps:get(initial_stream_flow_size, Opts, 65535), 355 TE = maps:get(<<"te">>, Headers, undefined), 356 Streams = [#stream{id=StreamID, state=StreamState, 357 method=Method, version=Version, te=TE}|Streams0], 358 State1 = case maybe_req_close(State0, Headers, Version) of 359 close -> State0#state{streams=Streams, last_streamid=StreamID, flow=Flow}; 360 keepalive -> State0#state{streams=Streams, flow=Flow} 361 end, 362 State = set_timeout(State1, idle_timeout), 363 parse(Buffer, commands(State, StreamID, Commands)) 364 catch Class:Exception:Stacktrace -> 365 cowboy:log(cowboy_stream:make_error_log(init, 366 [StreamID, Req, Opts], 367 Class, Exception, Stacktrace), Opts), 368 early_error(500, State0, {internal_error, {Class, Exception}, 369 'Unhandled exception in cowboy_stream:init/3.'}, Req), 370 parse(Buffer, State0) 371 end; 372 %% Streams are sequential so the body is always about the last stream created 373 %% unless that stream has terminated. 374 after_parse({data, StreamID, IsFin, Data, State0=#state{opts=Opts, buffer=Buffer, 375 streams=Streams0=[Stream=#stream{id=StreamID, state=StreamState0}|_]}}) -> 376 try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of 377 {Commands, StreamState} -> 378 Streams = lists:keyreplace(StreamID, #stream.id, Streams0, 379 Stream#stream{state=StreamState}), 380 State1 = set_timeout(State0, case IsFin of 381 fin -> request_timeout; 382 nofin -> idle_timeout 383 end), 384 State = update_flow(IsFin, Data, State1#state{streams=Streams}), 385 parse(Buffer, commands(State, StreamID, Commands)) 386 catch Class:Exception:Stacktrace -> 387 cowboy:log(cowboy_stream:make_error_log(data, 388 [StreamID, IsFin, Data, StreamState0], 389 Class, Exception, Stacktrace), Opts), 390 %% @todo Should call parse after this. 391 stream_terminate(State0, StreamID, {internal_error, {Class, Exception}, 392 'Unhandled exception in cowboy_stream:data/4.'}) 393 end; 394 %% No corresponding stream. We must skip the body of the previous request 395 %% in order to process the next one. 396 after_parse({data, _, IsFin, _, State}) -> 397 loop(set_timeout(State, case IsFin of 398 fin -> request_timeout; 399 nofin -> idle_timeout 400 end)); 401 after_parse({more, State}) -> 402 loop(set_timeout(State, idle_timeout)). 403 404 update_flow(fin, _, State) -> 405 %% This function is only called after parsing, therefore we 406 %% are expecting to be in active mode already. 407 State#state{flow=infinity}; 408 update_flow(nofin, Data, State0=#state{flow=Flow0}) -> 409 Flow = Flow0 - byte_size(Data), 410 State = State0#state{flow=Flow}, 411 if 412 Flow0 > 0, Flow =< 0 -> 413 passive(State); 414 true -> 415 State 416 end. 417 418 %% Request-line. 419 420 -spec parse_request(Buffer, State, non_neg_integer()) 421 -> {request, cowboy_req:req(), State} 422 | {data, cowboy_stream:streamid(), cowboy_stream:fin(), binary(), State} 423 | {more, State} 424 when Buffer::binary(), State::#state{}. 425 %% Empty lines must be using \r\n. 426 parse_request(<< $\n, _/bits >>, State, _) -> 427 error_terminate(400, State, {connection_error, protocol_error, 428 'Empty lines between requests must use the CRLF line terminator. (RFC7230 3.5)'}); 429 parse_request(<< $\s, _/bits >>, State, _) -> 430 error_terminate(400, State, {connection_error, protocol_error, 431 'The request-line must not begin with a space. (RFC7230 3.1.1, RFC7230 3.5)'}); 432 %% We limit the length of the Request-line to MaxLength to avoid endlessly 433 %% reading from the socket and eventually crashing. 434 parse_request(Buffer, State=#state{opts=Opts, in_streamid=InStreamID}, EmptyLines) -> 435 MaxLength = maps:get(max_request_line_length, Opts, 8000), 436 MaxEmptyLines = maps:get(max_empty_lines, Opts, 5), 437 case match_eol(Buffer, 0) of 438 nomatch when byte_size(Buffer) > MaxLength -> 439 error_terminate(414, State, {connection_error, limit_reached, 440 'The request-line length is larger than configuration allows. (RFC7230 3.1.1)'}); 441 nomatch -> 442 {more, State#state{buffer=Buffer, in_state=#ps_request_line{empty_lines=EmptyLines}}}; 443 1 when EmptyLines =:= MaxEmptyLines -> 444 error_terminate(400, State, {connection_error, limit_reached, 445 'More empty lines were received than configuration allows. (RFC7230 3.5)'}); 446 1 -> 447 << _:16, Rest/bits >> = Buffer, 448 parse_request(Rest, State, EmptyLines + 1); 449 _ -> 450 case Buffer of 451 %% @todo * is only for server-wide OPTIONS request (RFC7230 5.3.4); tests 452 << "OPTIONS * ", Rest/bits >> -> 453 parse_version(Rest, State, <<"OPTIONS">>, undefined, <<"*">>, <<>>); 454 <<"CONNECT ", _/bits>> -> 455 error_terminate(501, State, {connection_error, no_error, 456 'The CONNECT method is currently not implemented. (RFC7231 4.3.6)'}); 457 <<"TRACE ", _/bits>> -> 458 error_terminate(501, State, {connection_error, no_error, 459 'The TRACE method is currently not implemented. (RFC7231 4.3.8)'}); 460 %% Accept direct HTTP/2 only at the beginning of the connection. 461 << "PRI * HTTP/2.0\r\n", _/bits >> when InStreamID =:= 1 -> 462 %% @todo Might be worth throwing to get a clean stacktrace. 463 http2_upgrade(State, Buffer); 464 _ -> 465 parse_method(Buffer, State, <<>>, 466 maps:get(max_method_length, Opts, 32)) 467 end 468 end. 469 470 match_eol(<< $\n, _/bits >>, N) -> 471 N; 472 match_eol(<< _, Rest/bits >>, N) -> 473 match_eol(Rest, N + 1); 474 match_eol(_, _) -> 475 nomatch. 476 477 parse_method(_, State, _, 0) -> 478 error_terminate(501, State, {connection_error, limit_reached, 479 'The method name is longer than configuration allows. (RFC7230 3.1.1)'}); 480 parse_method(<< C, Rest/bits >>, State, SoFar, Remaining) -> 481 case C of 482 $\r -> error_terminate(400, State, {connection_error, protocol_error, 483 'The method name must not be followed with a line break. (RFC7230 3.1.1)'}); 484 $\s -> parse_uri(Rest, State, SoFar); 485 _ when ?IS_TOKEN(C) -> parse_method(Rest, State, << SoFar/binary, C >>, Remaining - 1); 486 _ -> error_terminate(400, State, {connection_error, protocol_error, 487 'The method name must contain only valid token characters. (RFC7230 3.1.1)'}) 488 end. 489 490 parse_uri(<< H, T, T, P, "://", Rest/bits >>, State, Method) 491 when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T; 492 P =:= $p orelse P =:= $P -> 493 parse_uri_authority(Rest, State, Method); 494 parse_uri(<< H, T, T, P, S, "://", Rest/bits >>, State, Method) 495 when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T; 496 P =:= $p orelse P =:= $P; S =:= $s orelse S =:= $S -> 497 parse_uri_authority(Rest, State, Method); 498 parse_uri(<< $/, Rest/bits >>, State, Method) -> 499 parse_uri_path(Rest, State, Method, undefined, <<$/>>); 500 parse_uri(_, State, _) -> 501 error_terminate(400, State, {connection_error, protocol_error, 502 'Invalid request-line or request-target. (RFC7230 3.1.1, RFC7230 5.3)'}). 503 504 %% @todo We probably want to apply max_authority_length also 505 %% to the host header and to document this option. It might 506 %% also be useful for HTTP/2 requests. 507 parse_uri_authority(Rest, State=#state{opts=Opts}, Method) -> 508 parse_uri_authority(Rest, State, Method, <<>>, 509 maps:get(max_authority_length, Opts, 255)). 510 511 parse_uri_authority(_, State, _, _, 0) -> 512 error_terminate(414, State, {connection_error, limit_reached, 513 'The authority component of the absolute URI is longer than configuration allows. (RFC7230 2.7.1)'}); 514 parse_uri_authority(<<C, Rest/bits>>, State, Method, SoFar, Remaining) -> 515 case C of 516 $\r -> 517 error_terminate(400, State, {connection_error, protocol_error, 518 'The request-target must not be followed by a line break. (RFC7230 3.1.1)'}); 519 $@ -> 520 error_terminate(400, State, {connection_error, protocol_error, 521 'Absolute URIs must not include a userinfo component. (RFC7230 2.7.1)'}); 522 C when SoFar =:= <<>> andalso 523 ((C =:= $/) orelse (C =:= $\s) orelse (C =:= $?) orelse (C =:= $#)) -> 524 error_terminate(400, State, {connection_error, protocol_error, 525 'Absolute URIs must include a non-empty host component. (RFC7230 2.7.1)'}); 526 $: when SoFar =:= <<>> -> 527 error_terminate(400, State, {connection_error, protocol_error, 528 'Absolute URIs must include a non-empty host component. (RFC7230 2.7.1)'}); 529 $/ -> parse_uri_path(Rest, State, Method, SoFar, <<"/">>); 530 $\s -> parse_version(Rest, State, Method, SoFar, <<"/">>, <<>>); 531 $? -> parse_uri_query(Rest, State, Method, SoFar, <<"/">>, <<>>); 532 $# -> skip_uri_fragment(Rest, State, Method, SoFar, <<"/">>, <<>>); 533 C -> parse_uri_authority(Rest, State, Method, <<SoFar/binary, C>>, Remaining - 1) 534 end. 535 536 parse_uri_path(<<C, Rest/bits>>, State, Method, Authority, SoFar) -> 537 case C of 538 $\r -> error_terminate(400, State, {connection_error, protocol_error, 539 'The request-target must not be followed by a line break. (RFC7230 3.1.1)'}); 540 $\s -> parse_version(Rest, State, Method, Authority, SoFar, <<>>); 541 $? -> parse_uri_query(Rest, State, Method, Authority, SoFar, <<>>); 542 $# -> skip_uri_fragment(Rest, State, Method, Authority, SoFar, <<>>); 543 _ -> parse_uri_path(Rest, State, Method, Authority, <<SoFar/binary, C>>) 544 end. 545 546 parse_uri_query(<<C, Rest/bits>>, State, M, A, P, SoFar) -> 547 case C of 548 $\r -> error_terminate(400, State, {connection_error, protocol_error, 549 'The request-target must not be followed by a line break. (RFC7230 3.1.1)'}); 550 $\s -> parse_version(Rest, State, M, A, P, SoFar); 551 $# -> skip_uri_fragment(Rest, State, M, A, P, SoFar); 552 _ -> parse_uri_query(Rest, State, M, A, P, <<SoFar/binary, C>>) 553 end. 554 555 skip_uri_fragment(<<C, Rest/bits>>, State, M, A, P, Q) -> 556 case C of 557 $\r -> error_terminate(400, State, {connection_error, protocol_error, 558 'The request-target must not be followed by a line break. (RFC7230 3.1.1)'}); 559 $\s -> parse_version(Rest, State, M, A, P, Q); 560 _ -> skip_uri_fragment(Rest, State, M, A, P, Q) 561 end. 562 563 parse_version(<< "HTTP/1.1\r\n", Rest/bits >>, State, M, A, P, Q) -> 564 before_parse_headers(Rest, State, M, A, P, Q, 'HTTP/1.1'); 565 parse_version(<< "HTTP/1.0\r\n", Rest/bits >>, State, M, A, P, Q) -> 566 before_parse_headers(Rest, State, M, A, P, Q, 'HTTP/1.0'); 567 parse_version(<< "HTTP/1.", _, C, _/bits >>, State, _, _, _, _) when C =:= $\s; C =:= $\t -> 568 error_terminate(400, State, {connection_error, protocol_error, 569 'Whitespace is not allowed after the HTTP version. (RFC7230 3.1.1)'}); 570 parse_version(<< C, _/bits >>, State, _, _, _, _) when C =:= $\s; C =:= $\t -> 571 error_terminate(400, State, {connection_error, protocol_error, 572 'The separator between request target and version must be a single SP. (RFC7230 3.1.1)'}); 573 parse_version(_, State, _, _, _, _) -> 574 error_terminate(505, State, {connection_error, protocol_error, 575 'Unsupported HTTP version. (RFC7230 2.6)'}). 576 577 before_parse_headers(Rest, State, M, A, P, Q, V) -> 578 parse_header(Rest, State#state{in_state=#ps_header{ 579 method=M, authority=A, path=P, qs=Q, version=V}}, #{}). 580 581 %% Headers. 582 583 %% We need two or more bytes in the buffer to continue. 584 parse_header(Rest, State=#state{in_state=PS}, Headers) when byte_size(Rest) < 2 -> 585 {more, State#state{buffer=Rest, in_state=PS#ps_header{headers=Headers}}}; 586 parse_header(<< $\r, $\n, Rest/bits >>, S, Headers) -> 587 request(Rest, S, Headers); 588 parse_header(Buffer, State=#state{opts=Opts, in_state=PS}, Headers) -> 589 MaxHeaders = maps:get(max_headers, Opts, 100), 590 NumHeaders = maps:size(Headers), 591 if 592 NumHeaders >= MaxHeaders -> 593 error_terminate(431, State#state{in_state=PS#ps_header{headers=Headers}}, 594 {connection_error, limit_reached, 595 'The number of headers is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'}); 596 true -> 597 parse_header_colon(Buffer, State, Headers) 598 end. 599 600 parse_header_colon(Buffer, State=#state{opts=Opts, in_state=PS}, Headers) -> 601 MaxLength = maps:get(max_header_name_length, Opts, 64), 602 case match_colon(Buffer, 0) of 603 nomatch when byte_size(Buffer) > MaxLength -> 604 error_terminate(431, State#state{in_state=PS#ps_header{headers=Headers}}, 605 {connection_error, limit_reached, 606 'A header name is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'}); 607 nomatch -> 608 %% We don't have a colon but we might have an invalid header line, 609 %% so check if we have an LF and abort with an error if we do. 610 case match_eol(Buffer, 0) of 611 nomatch -> 612 {more, State#state{buffer=Buffer, in_state=PS#ps_header{headers=Headers}}}; 613 _ -> 614 error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}}, 615 {connection_error, protocol_error, 616 'A header line is missing a colon separator. (RFC7230 3.2.4)'}) 617 end; 618 _ -> 619 parse_hd_name(Buffer, State, Headers, <<>>) 620 end. 621 622 match_colon(<< $:, _/bits >>, N) -> 623 N; 624 match_colon(<< _, Rest/bits >>, N) -> 625 match_colon(Rest, N + 1); 626 match_colon(_, _) -> 627 nomatch. 628 629 parse_hd_name(<< $:, Rest/bits >>, State, H, SoFar) -> 630 parse_hd_before_value(Rest, State, H, SoFar); 631 parse_hd_name(<< C, _/bits >>, State=#state{in_state=PS}, H, <<>>) when ?IS_WS(C) -> 632 error_terminate(400, State#state{in_state=PS#ps_header{headers=H}}, 633 {connection_error, protocol_error, 634 'Whitespace is not allowed before the header name. (RFC7230 3.2)'}); 635 parse_hd_name(<< C, _/bits >>, State=#state{in_state=PS}, H, _) when ?IS_WS(C) -> 636 error_terminate(400, State#state{in_state=PS#ps_header{headers=H}}, 637 {connection_error, protocol_error, 638 'Whitespace is not allowed between the header name and the colon. (RFC7230 3.2.4)'}); 639 parse_hd_name(<< C, Rest/bits >>, State, H, SoFar) -> 640 ?LOWER(parse_hd_name, Rest, State, H, SoFar). 641 642 parse_hd_before_value(<< $\s, Rest/bits >>, S, H, N) -> 643 parse_hd_before_value(Rest, S, H, N); 644 parse_hd_before_value(<< $\t, Rest/bits >>, S, H, N) -> 645 parse_hd_before_value(Rest, S, H, N); 646 parse_hd_before_value(Buffer, State=#state{opts=Opts, in_state=PS}, H, N) -> 647 MaxLength = maps:get(max_header_value_length, Opts, 4096), 648 case match_eol(Buffer, 0) of 649 nomatch when byte_size(Buffer) > MaxLength -> 650 error_terminate(431, State#state{in_state=PS#ps_header{headers=H}}, 651 {connection_error, limit_reached, 652 'A header value is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'}); 653 nomatch -> 654 {more, State#state{buffer=Buffer, in_state=PS#ps_header{headers=H, name=N}}}; 655 _ -> 656 parse_hd_value(Buffer, State, H, N, <<>>) 657 end. 658 659 parse_hd_value(<< $\r, $\n, Rest/bits >>, S, Headers0, Name, SoFar) -> 660 Value = clean_value_ws_end(SoFar, byte_size(SoFar) - 1), 661 Headers = case maps:get(Name, Headers0, undefined) of 662 undefined -> Headers0#{Name => Value}; 663 %% The cookie header does not use proper HTTP header lists. 664 Value0 when Name =:= <<"cookie">> -> Headers0#{Name => << Value0/binary, "; ", Value/binary >>}; 665 Value0 -> Headers0#{Name => << Value0/binary, ", ", Value/binary >>} 666 end, 667 parse_header(Rest, S, Headers); 668 parse_hd_value(<< C, Rest/bits >>, S, H, N, SoFar) -> 669 parse_hd_value(Rest, S, H, N, << SoFar/binary, C >>). 670 671 clean_value_ws_end(_, -1) -> 672 <<>>; 673 clean_value_ws_end(Value, N) -> 674 case binary:at(Value, N) of 675 $\s -> clean_value_ws_end(Value, N - 1); 676 $\t -> clean_value_ws_end(Value, N - 1); 677 _ -> 678 S = N + 1, 679 << Value2:S/binary, _/bits >> = Value, 680 Value2 681 end. 682 683 -ifdef(TEST). 684 clean_value_ws_end_test_() -> 685 Tests = [ 686 {<<>>, <<>>}, 687 {<<" ">>, <<>>}, 688 {<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, " 689 "text/html;level=2;q=0.4, */*;q=0.5 \t \t ">>, 690 <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, " 691 "text/html;level=2;q=0.4, */*;q=0.5">>} 692 ], 693 [{V, fun() -> R = clean_value_ws_end(V, byte_size(V) - 1) end} || {V, R} <- Tests]. 694 695 horse_clean_value_ws_end() -> 696 horse:repeat(200000, 697 clean_value_ws_end( 698 <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, " 699 "text/html;level=2;q=0.4, */*;q=0.5 ">>, 700 byte_size(<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, " 701 "text/html;level=2;q=0.4, */*;q=0.5 ">>) - 1) 702 ). 703 -endif. 704 705 request(Buffer, State=#state{transport=Transport, 706 in_state=PS=#ps_header{authority=Authority, version=Version}}, Headers) -> 707 case maps:get(<<"host">>, Headers, undefined) of 708 undefined when Version =:= 'HTTP/1.1' -> 709 %% @todo Might want to not close the connection on this and next one. 710 error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}}, 711 {stream_error, protocol_error, 712 'HTTP/1.1 requests must include a host header. (RFC7230 5.4)'}); 713 undefined -> 714 request(Buffer, State, Headers, <<>>, default_port(Transport:secure())); 715 %% @todo When CONNECT requests come in we need to ignore the RawHost 716 %% and instead use the Authority as the source of host. 717 RawHost when Authority =:= undefined; Authority =:= RawHost -> 718 request_parse_host(Buffer, State, Headers, RawHost); 719 %% RFC7230 does not explicitly ask us to reject requests 720 %% that have a different authority component and host header. 721 %% However it DOES ask clients to set them to the same value, 722 %% so we enforce that. 723 _ -> 724 error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}}, 725 {stream_error, protocol_error, 726 'The host header is different than the absolute-form authority component. (RFC7230 5.4)'}) 727 end. 728 729 request_parse_host(Buffer, State=#state{transport=Transport, in_state=PS}, Headers, RawHost) -> 730 try cow_http_hd:parse_host(RawHost) of 731 {Host, undefined} -> 732 request(Buffer, State, Headers, Host, default_port(Transport:secure())); 733 {Host, Port} when Port > 0, Port =< 65535 -> 734 request(Buffer, State, Headers, Host, Port); 735 _ -> 736 error_terminate(400, State, {stream_error, protocol_error, 737 'The port component of the absolute-form is not in the range 0..65535. (RFC7230 2.7.1)'}) 738 catch _:_ -> 739 error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}}, 740 {stream_error, protocol_error, 741 'The host header is invalid. (RFC7230 5.4)'}) 742 end. 743 744 -spec default_port(boolean()) -> 80 | 443. 745 default_port(true) -> 443; 746 default_port(_) -> 80. 747 748 %% End of request parsing. 749 750 request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, sock=Sock, cert=Cert, 751 proxy_header=ProxyHeader, in_streamid=StreamID, in_state= 752 PS=#ps_header{method=Method, path=Path, qs=Qs, version=Version}}, 753 Headers0, Host, Port) -> 754 Scheme = case Transport:secure() of 755 true -> <<"https">>; 756 false -> <<"http">> 757 end, 758 {Headers, HasBody, BodyLength, TDecodeFun, TDecodeState} = case Headers0 of 759 #{<<"transfer-encoding">> := TransferEncoding0} -> 760 try cow_http_hd:parse_transfer_encoding(TransferEncoding0) of 761 [<<"chunked">>] -> 762 {maps:remove(<<"content-length">>, Headers0), 763 true, undefined, fun cow_http_te:stream_chunked/2, {0, 0}}; 764 _ -> 765 error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}}, 766 {stream_error, protocol_error, 767 'Cowboy only supports transfer-encoding: chunked. (RFC7230 3.3.1)'}) 768 catch _:_ -> 769 error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}}, 770 {stream_error, protocol_error, 771 'The transfer-encoding header is invalid. (RFC7230 3.3.1)'}) 772 end; 773 #{<<"content-length">> := <<"0">>} -> 774 {Headers0, false, 0, undefined, undefined}; 775 #{<<"content-length">> := BinLength} -> 776 Length = try 777 cow_http_hd:parse_content_length(BinLength) 778 catch _:_ -> 779 error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}}, 780 {stream_error, protocol_error, 781 'The content-length header is invalid. (RFC7230 3.3.2)'}) 782 end, 783 {Headers0, true, Length, fun cow_http_te:stream_identity/2, {0, Length}}; 784 _ -> 785 {Headers0, false, 0, undefined, undefined} 786 end, 787 Req0 = #{ 788 ref => Ref, 789 pid => self(), 790 streamid => StreamID, 791 peer => Peer, 792 sock => Sock, 793 cert => Cert, 794 method => Method, 795 scheme => Scheme, 796 host => Host, 797 port => Port, 798 path => Path, 799 qs => Qs, 800 version => Version, 801 %% We are transparently taking care of transfer-encodings so 802 %% the user code has no need to know about it. 803 headers => maps:remove(<<"transfer-encoding">>, Headers), 804 has_body => HasBody, 805 body_length => BodyLength 806 }, 807 %% We add the PROXY header information if any. 808 Req = case ProxyHeader of 809 undefined -> Req0; 810 _ -> Req0#{proxy_header => ProxyHeader} 811 end, 812 case is_http2_upgrade(Headers, Version) of 813 false -> 814 State = case HasBody of 815 true -> 816 State0#state{in_state=#ps_body{ 817 length = BodyLength, 818 transfer_decode_fun = TDecodeFun, 819 transfer_decode_state = TDecodeState 820 }}; 821 false -> 822 State0#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}} 823 end, 824 {request, Req, State#state{buffer=Buffer}}; 825 {true, HTTP2Settings} -> 826 %% We save the headers in case the upgrade will fail 827 %% and we need to pass them to cowboy_stream:early_error. 828 http2_upgrade(State0#state{in_state=PS#ps_header{headers=Headers}}, 829 Buffer, HTTP2Settings, Req) 830 end. 831 832 %% HTTP/2 upgrade. 833 834 %% @todo We must not upgrade to h2c over a TLS connection. 835 is_http2_upgrade(#{<<"connection">> := Conn, <<"upgrade">> := Upgrade, 836 <<"http2-settings">> := HTTP2Settings}, 'HTTP/1.1') -> 837 Conns = cow_http_hd:parse_connection(Conn), 838 case {lists:member(<<"upgrade">>, Conns), lists:member(<<"http2-settings">>, Conns)} of 839 {true, true} -> 840 Protocols = cow_http_hd:parse_upgrade(Upgrade), 841 case lists:member(<<"h2c">>, Protocols) of 842 true -> 843 {true, HTTP2Settings}; 844 false -> 845 false 846 end; 847 _ -> 848 false 849 end; 850 is_http2_upgrade(_, _) -> 851 false. 852 853 %% Prior knowledge upgrade, without an HTTP/1.1 request. 854 http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, 855 proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert}, Buffer) -> 856 case Transport:secure() of 857 false -> 858 _ = cancel_timeout(State), 859 cowboy_http2:init(Parent, Ref, Socket, Transport, 860 ProxyHeader, Opts, Peer, Sock, Cert, Buffer); 861 true -> 862 error_terminate(400, State, {connection_error, protocol_error, 863 'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'}) 864 end. 865 866 %% Upgrade via an HTTP/1.1 request. 867 http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, 868 proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert}, 869 Buffer, HTTP2Settings, Req) -> 870 %% @todo 871 %% However if the client sent a body, we need to read the body in full 872 %% and if we can't do that, return a 413 response. Some options are in order. 873 %% Always half-closed stream coming from this side. 874 try cow_http_hd:parse_http2_settings(HTTP2Settings) of 875 Settings -> 876 _ = cancel_timeout(State), 877 cowboy_http2:init(Parent, Ref, Socket, Transport, 878 ProxyHeader, Opts, Peer, Sock, Cert, Buffer, Settings, Req) 879 catch _:_ -> 880 error_terminate(400, State, {connection_error, protocol_error, 881 'The HTTP2-Settings header must contain a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'}) 882 end. 883 884 %% Request body parsing. 885 886 parse_body(Buffer, State=#state{in_streamid=StreamID, in_state= 887 PS=#ps_body{received=Received, transfer_decode_fun=TDecode, 888 transfer_decode_state=TState0}}) -> 889 %% @todo Proper trailers. 890 try TDecode(Buffer, TState0) of 891 more -> 892 {more, State#state{buffer=Buffer}}; 893 {more, Data, TState} -> 894 {data, StreamID, nofin, Data, State#state{buffer= <<>>, 895 in_state=PS#ps_body{received=Received + byte_size(Data), 896 transfer_decode_state=TState}}}; 897 {more, Data, _Length, TState} when is_integer(_Length) -> 898 {data, StreamID, nofin, Data, State#state{buffer= <<>>, 899 in_state=PS#ps_body{received=Received + byte_size(Data), 900 transfer_decode_state=TState}}}; 901 {more, Data, Rest, TState} -> 902 {data, StreamID, nofin, Data, State#state{buffer=Rest, 903 in_state=PS#ps_body{received=Received + byte_size(Data), 904 transfer_decode_state=TState}}}; 905 {done, _HasTrailers, Rest} -> 906 {data, StreamID, fin, <<>>, 907 State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}}}; 908 {done, Data, _HasTrailers, Rest} -> 909 {data, StreamID, fin, Data, 910 State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}}} 911 catch _:_ -> 912 Reason = {connection_error, protocol_error, 913 'Failure to decode the content. (RFC7230 4)'}, 914 terminate(stream_terminate(State, StreamID, Reason), Reason) 915 end. 916 917 %% Message handling. 918 919 down(State=#state{opts=Opts, children=Children0}, Pid, Msg) -> 920 case cowboy_children:down(Children0, Pid) of 921 %% The stream was terminated already. 922 {ok, undefined, Children} -> 923 State#state{children=Children}; 924 %% The stream is still running. 925 {ok, StreamID, Children} -> 926 info(State#state{children=Children}, StreamID, Msg); 927 %% The process was unknown. 928 error -> 929 cowboy:log(warning, "Received EXIT signal ~p for unknown process ~p.~n", 930 [Msg, Pid], Opts), 931 State 932 end. 933 934 info(State=#state{opts=Opts, streams=Streams0}, StreamID, Msg) -> 935 case lists:keyfind(StreamID, #stream.id, Streams0) of 936 Stream = #stream{state=StreamState0} -> 937 try cowboy_stream:info(StreamID, Msg, StreamState0) of 938 {Commands, StreamState} -> 939 Streams = lists:keyreplace(StreamID, #stream.id, Streams0, 940 Stream#stream{state=StreamState}), 941 commands(State#state{streams=Streams}, StreamID, Commands) 942 catch Class:Exception:Stacktrace -> 943 cowboy:log(cowboy_stream:make_error_log(info, 944 [StreamID, Msg, StreamState0], 945 Class, Exception, Stacktrace), Opts), 946 stream_terminate(State, StreamID, {internal_error, {Class, Exception}, 947 'Unhandled exception in cowboy_stream:info/3.'}) 948 end; 949 false -> 950 cowboy:log(warning, "Received message ~p for unknown stream ~p.~n", 951 [Msg, StreamID], Opts), 952 State 953 end. 954 955 %% Commands. 956 957 commands(State, _, []) -> 958 State; 959 %% Supervise a child process. 960 commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) -> 961 commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)}, 962 StreamID, Tail); 963 %% Error handling. 964 commands(State, StreamID, [Error = {internal_error, _, _}|Tail]) -> 965 commands(stream_terminate(State, StreamID, Error), StreamID, Tail); 966 %% Commands for a stream currently inactive. 967 commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Commands) 968 when Current =/= StreamID -> 969 970 %% @todo We still want to handle some commands... 971 972 Stream = #stream{queue=Queue} = lists:keyfind(StreamID, #stream.id, Streams0), 973 Streams = lists:keyreplace(StreamID, #stream.id, Streams0, 974 Stream#stream{queue=Queue ++ Commands}), 975 State#state{streams=Streams}; 976 %% When we have finished reading the request body, do nothing. 977 commands(State=#state{flow=infinity}, StreamID, [{flow, _}|Tail]) -> 978 commands(State, StreamID, Tail); 979 %% Read the request body. 980 commands(State0=#state{flow=Flow0}, StreamID, [{flow, Size}|Tail]) -> 981 %% We must read *at least* Size of data otherwise functions 982 %% like cowboy_req:read_body/1,2 will wait indefinitely. 983 Flow = if 984 Flow0 < 0 -> Size; 985 true -> Flow0 + Size 986 end, 987 %% Reenable active mode if necessary. 988 State = if 989 Flow0 =< 0, Flow > 0 -> 990 active(State0); 991 true -> 992 State0 993 end, 994 commands(State#state{flow=Flow}, StreamID, Tail); 995 %% Error responses are sent only if a response wasn't sent already. 996 commands(State=#state{out_state=wait, out_streamid=StreamID}, StreamID, 997 [{error_response, Status, Headers0, Body}|Tail]) -> 998 %% We close the connection when the error response is 408, as it 999 %% indicates a timeout and the RFC recommends that we stop here. (RFC7231 6.5.7) 1000 Headers = case Status of 1001 408 -> Headers0#{<<"connection">> => <<"close">>}; 1002 <<"408", _/bits>> -> Headers0#{<<"connection">> => <<"close">>}; 1003 _ -> Headers0 1004 end, 1005 commands(State, StreamID, [{response, Status, Headers, Body}|Tail]); 1006 commands(State, StreamID, [{error_response, _, _, _}|Tail]) -> 1007 commands(State, StreamID, Tail); 1008 %% Send an informational response. 1009 commands(State=#state{socket=Socket, transport=Transport, out_state=wait, streams=Streams}, 1010 StreamID, [{inform, StatusCode, Headers}|Tail]) -> 1011 %% @todo I'm pretty sure the last stream in the list is the one we want 1012 %% considering all others are queued. 1013 #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams), 1014 _ = case Version of 1015 'HTTP/1.1' -> 1016 Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', 1017 headers_to_list(Headers))); 1018 %% Do not send informational responses to HTTP/1.0 clients. (RFC7231 6.2) 1019 'HTTP/1.0' -> 1020 ok 1021 end, 1022 commands(State, StreamID, Tail); 1023 %% Send a full response. 1024 %% 1025 %% @todo Kill the stream if it sent a response when one has already been sent. 1026 %% @todo Keep IsFin in the state. 1027 %% @todo Same two things above apply to DATA, possibly promise too. 1028 commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, streams=Streams}, StreamID, 1029 [{response, StatusCode, Headers0, Body}|Tail]) -> 1030 %% @todo I'm pretty sure the last stream in the list is the one we want 1031 %% considering all others are queued. 1032 #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams), 1033 {State1, Headers} = connection(State0, Headers0, StreamID, Version), 1034 State = State1#state{out_state=done}, 1035 %% @todo Ensure content-length is set. 204 must never have content-length set. 1036 Response = cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers)), 1037 %% @todo 204 and 304 responses must not include a response body. (RFC7230 3.3.1, RFC7230 3.3.2) 1038 case Body of 1039 {sendfile, _, _, _} -> 1040 Transport:send(Socket, Response), 1041 sendfile(State, Body); 1042 _ -> 1043 Transport:send(Socket, [Response, Body]) 1044 end, 1045 commands(State, StreamID, Tail); 1046 %% Send response headers and initiate chunked encoding or streaming. 1047 commands(State0=#state{socket=Socket, transport=Transport, 1048 opts=Opts, overriden_opts=Override, streams=Streams0, out_state=OutState}, 1049 StreamID, [{headers, StatusCode, Headers0}|Tail]) -> 1050 %% @todo Same as above (about the last stream in the list). 1051 Stream = #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams0), 1052 Status = cow_http:status_to_integer(StatusCode), 1053 ContentLength = maps:get(<<"content-length">>, Headers0, undefined), 1054 %% Chunked transfer-encoding can be disabled on a per-request basis. 1055 Chunked = case Override of 1056 #{chunked := Chunked0} -> Chunked0; 1057 _ -> maps:get(chunked, Opts, true) 1058 end, 1059 {State1, Headers1} = case {Status, ContentLength, Version} of 1060 {204, _, 'HTTP/1.1'} -> 1061 {State0#state{out_state=done}, Headers0}; 1062 {304, _, 'HTTP/1.1'} -> 1063 {State0#state{out_state=done}, Headers0}; 1064 {_, undefined, 'HTTP/1.1'} when Chunked -> 1065 {State0#state{out_state=chunked}, Headers0#{<<"transfer-encoding">> => <<"chunked">>}}; 1066 %% Close the connection after streaming without content-length 1067 %% to all HTTP/1.0 clients and to HTTP/1.1 clients when chunked is disabled. 1068 {_, undefined, _} -> 1069 {State0#state{out_state=streaming, last_streamid=StreamID}, Headers0}; 1070 %% Stream the response body without chunked transfer-encoding. 1071 _ -> 1072 ExpectedSize = cow_http_hd:parse_content_length(ContentLength), 1073 Streams = lists:keyreplace(StreamID, #stream.id, Streams0, 1074 Stream#stream{local_expected_size=ExpectedSize}), 1075 {State0#state{out_state=streaming, streams=Streams}, Headers0} 1076 end, 1077 Headers2 = case stream_te(OutState, Stream) of 1078 trailers -> Headers1; 1079 _ -> maps:remove(<<"trailer">>, Headers1) 1080 end, 1081 {State, Headers} = connection(State1, Headers2, StreamID, Version), 1082 Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers))), 1083 commands(State, StreamID, Tail); 1084 %% Send a response body chunk. 1085 %% @todo We need to kill the stream if it tries to send data before headers. 1086 commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out_state=OutState}, 1087 StreamID, [{data, IsFin, Data}|Tail]) -> 1088 %% Do not send anything when the user asks to send an empty 1089 %% data frame, as that would break the protocol. 1090 Size = case Data of 1091 {sendfile, _, B, _} -> B; 1092 _ -> iolist_size(Data) 1093 end, 1094 %% Depending on the current state we may need to send nothing, 1095 %% the last chunk, chunked data with/without the last chunk, 1096 %% or just the data as-is. 1097 Stream = case lists:keyfind(StreamID, #stream.id, Streams0) of 1098 Stream0=#stream{method= <<"HEAD">>} -> 1099 Stream0; 1100 Stream0 when Size =:= 0, IsFin =:= fin, OutState =:= chunked -> 1101 Transport:send(Socket, <<"0\r\n\r\n">>), 1102 Stream0; 1103 Stream0 when Size =:= 0 -> 1104 Stream0; 1105 Stream0 when is_tuple(Data), OutState =:= chunked -> 1106 Transport:send(Socket, [integer_to_binary(Size, 16), <<"\r\n">>]), 1107 sendfile(State0, Data), 1108 Transport:send(Socket, 1109 case IsFin of 1110 fin -> <<"\r\n0\r\n\r\n">>; 1111 nofin -> <<"\r\n">> 1112 end), 1113 Stream0; 1114 Stream0 when OutState =:= chunked -> 1115 Transport:send(Socket, [ 1116 integer_to_binary(Size, 16), <<"\r\n">>, Data, 1117 case IsFin of 1118 fin -> <<"\r\n0\r\n\r\n">>; 1119 nofin -> <<"\r\n">> 1120 end 1121 ]), 1122 Stream0; 1123 Stream0 when OutState =:= streaming -> 1124 #stream{local_sent_size=SentSize0, local_expected_size=ExpectedSize} = Stream0, 1125 SentSize = SentSize0 + Size, 1126 if 1127 %% ExpectedSize may be undefined, which is > any integer value. 1128 SentSize > ExpectedSize -> 1129 terminate(State0, response_body_too_large); 1130 is_tuple(Data) -> 1131 sendfile(State0, Data); 1132 true -> 1133 Transport:send(Socket, Data) 1134 end, 1135 Stream0#stream{local_sent_size=SentSize} 1136 end, 1137 State = case IsFin of 1138 fin -> State0#state{out_state=done}; 1139 nofin -> State0 1140 end, 1141 Streams = lists:keyreplace(StreamID, #stream.id, Streams0, Stream), 1142 commands(State#state{streams=Streams}, StreamID, Tail); 1143 commands(State=#state{socket=Socket, transport=Transport, streams=Streams, out_state=OutState}, 1144 StreamID, [{trailers, Trailers}|Tail]) -> 1145 case stream_te(OutState, lists:keyfind(StreamID, #stream.id, Streams)) of 1146 trailers -> 1147 Transport:send(Socket, [ 1148 <<"0\r\n">>, 1149 cow_http:headers(maps:to_list(Trailers)), 1150 <<"\r\n">> 1151 ]); 1152 no_trailers -> 1153 Transport:send(Socket, <<"0\r\n\r\n">>); 1154 not_chunked -> 1155 ok 1156 end, 1157 commands(State#state{out_state=done}, StreamID, Tail); 1158 %% Protocol takeover. 1159 commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport, 1160 out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID, 1161 [{switch_protocol, Headers, Protocol, InitialState}|_Tail]) -> 1162 %% @todo If there's streams opened after this one, fail instead of 101. 1163 State1 = cancel_timeout(State0), 1164 %% Before we send the 101 response we need to stop receiving data 1165 %% from the socket, otherwise the data might be receive before the 1166 %% call to flush/0 and we end up inadvertently dropping a packet. 1167 %% 1168 %% @todo Handle cases where the request came with a body. We need 1169 %% to process or skip the body before the upgrade can be completed. 1170 State = passive(State1), 1171 %% Send a 101 response if necessary, then terminate the stream. 1172 #state{streams=Streams} = case OutState of 1173 wait -> info(State, StreamID, {inform, 101, Headers}); 1174 _ -> State 1175 end, 1176 #stream{state=StreamState} = lists:keyfind(StreamID, #stream.id, Streams), 1177 %% @todo We need to shutdown processes here first. 1178 stream_call_terminate(StreamID, switch_protocol, StreamState, State), 1179 %% Terminate children processes and flush any remaining messages from the mailbox. 1180 cowboy_children:terminate(Children), 1181 flush(Parent), 1182 Protocol:takeover(Parent, Ref, Socket, Transport, Opts, Buffer, InitialState); 1183 %% Set options dynamically. 1184 commands(State0=#state{overriden_opts=Opts}, 1185 StreamID, [{set_options, SetOpts}|Tail]) -> 1186 State1 = case SetOpts of 1187 #{idle_timeout := IdleTimeout} -> 1188 set_timeout(State0#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}}, 1189 idle_timeout); 1190 _ -> 1191 State0 1192 end, 1193 State = case SetOpts of 1194 #{chunked := Chunked} -> 1195 State1#state{overriden_opts=Opts#{chunked => Chunked}}; 1196 _ -> 1197 State1 1198 end, 1199 commands(State, StreamID, Tail); 1200 %% Stream shutdown. 1201 commands(State, StreamID, [stop|Tail]) -> 1202 %% @todo Do we want to run the commands after a stop? 1203 %% @todo We currently wait for the stop command before we 1204 %% continue with the next request/response. In theory, if 1205 %% the request body was read fully and the response body 1206 %% was sent fully we should be able to start working on 1207 %% the next request concurrently. This can be done as a 1208 %% future optimization. 1209 maybe_terminate(State, StreamID, Tail); 1210 %% Log event. 1211 commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) -> 1212 cowboy:log(Log, Opts), 1213 commands(State, StreamID, Tail); 1214 %% HTTP/1.1 does not support push; ignore. 1215 commands(State, StreamID, [{push, _, _, _, _, _, _, _}|Tail]) -> 1216 commands(State, StreamID, Tail). 1217 1218 %% The set-cookie header is special; we can only send one cookie per header. 1219 headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) -> 1220 Headers1 = maps:to_list(maps:remove(<<"set-cookie">>, Headers0)), 1221 Headers1 ++ [{<<"set-cookie">>, Value} || Value <- SetCookies]; 1222 headers_to_list(Headers) -> 1223 maps:to_list(Headers). 1224 1225 %% We wrap the sendfile call into a try/catch because on OTP-20 1226 %% and earlier a few different crashes could occur for sockets 1227 %% that were closing or closed. For example a badarg in 1228 %% erlang:port_get_data(#Port<...>) or a badmatch like 1229 %% {{badmatch,{error,einval}},[{prim_file,sendfile,8,[]}... 1230 %% 1231 %% OTP-21 uses a NIF instead of a port so the implementation 1232 %% and behavior has dramatically changed and it is unclear 1233 %% whether it will be necessary in the future. 1234 %% 1235 %% This try/catch prevents some noisy logs to be written 1236 %% when these errors occur. 1237 sendfile(State=#state{socket=Socket, transport=Transport, opts=Opts}, 1238 {sendfile, Offset, Bytes, Path}) -> 1239 try 1240 %% When sendfile is disabled we explicitly use the fallback. 1241 _ = case maps:get(sendfile, Opts, true) of 1242 true -> Transport:sendfile(Socket, Path, Offset, Bytes); 1243 false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, []) 1244 end, 1245 ok 1246 catch _:_ -> 1247 terminate(State, {socket_error, sendfile_crash, 1248 'An error occurred when using the sendfile function.'}) 1249 end. 1250 1251 %% Flush messages specific to cowboy_http before handing over the 1252 %% connection to another protocol. 1253 flush(Parent) -> 1254 receive 1255 {timeout, _, _} -> 1256 flush(Parent); 1257 {{Pid, _}, _} when Pid =:= self() -> 1258 flush(Parent); 1259 {'EXIT', Pid, _} when Pid =/= Parent -> 1260 flush(Parent) 1261 after 0 -> 1262 ok 1263 end. 1264 1265 %% @todo In these cases I'm not sure if we should continue processing commands. 1266 maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail) -> 1267 terminate(stream_terminate(State, StreamID, normal), normal); %% @todo Reason ok? 1268 maybe_terminate(State, StreamID, _Tail) -> 1269 stream_terminate(State, StreamID, normal). 1270 1271 stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InState, 1272 out_streamid=OutStreamID, out_state=OutState, streams=Streams0, 1273 children=Children0}, StreamID, Reason) -> 1274 #stream{version=Version, local_expected_size=ExpectedSize, local_sent_size=SentSize} 1275 = lists:keyfind(StreamID, #stream.id, Streams0), 1276 %% Send a response or terminate chunks depending on the current output state. 1277 State1 = #state{streams=Streams1} = case OutState of 1278 wait when element(1, Reason) =:= internal_error -> 1279 info(State0, StreamID, {response, 500, #{<<"content-length">> => <<"0">>}, <<>>}); 1280 wait when element(1, Reason) =:= connection_error -> 1281 info(State0, StreamID, {response, 400, #{<<"content-length">> => <<"0">>}, <<>>}); 1282 wait -> 1283 info(State0, StreamID, {response, 204, #{}, <<>>}); 1284 chunked when Version =:= 'HTTP/1.1' -> 1285 info(State0, StreamID, {data, fin, <<>>}); 1286 streaming when SentSize < ExpectedSize -> 1287 terminate(State0, response_body_too_small); 1288 _ -> %% done or Version =:= 'HTTP/1.0' 1289 State0 1290 end, 1291 %% Stop the stream, shutdown children and reset overriden options. 1292 {value, #stream{state=StreamState}, Streams} 1293 = lists:keytake(StreamID, #stream.id, Streams1), 1294 stream_call_terminate(StreamID, Reason, StreamState, State1), 1295 Children = cowboy_children:shutdown(Children0, StreamID), 1296 State = State1#state{overriden_opts=#{}, streams=Streams, children=Children}, 1297 %% We want to drop the connection if the body was not read fully 1298 %% and we don't know its length or more remains to be read than 1299 %% configuration allows. 1300 MaxSkipBodyLength = maps:get(max_skip_body_length, Opts, 1000000), 1301 case InState of 1302 #ps_body{length=undefined} 1303 when InStreamID =:= OutStreamID -> 1304 terminate(State, skip_body_unknown_length); 1305 #ps_body{length=Len, received=Received} 1306 when InStreamID =:= OutStreamID, Received + MaxSkipBodyLength < Len -> 1307 terminate(State, skip_body_too_large); 1308 #ps_body{} when InStreamID =:= OutStreamID -> 1309 stream_next(State#state{flow=infinity}); 1310 _ -> 1311 stream_next(State) 1312 end. 1313 1314 stream_next(State0=#state{opts=Opts, active=Active, out_streamid=OutStreamID, streams=Streams}) -> 1315 NextOutStreamID = OutStreamID + 1, 1316 case lists:keyfind(NextOutStreamID, #stream.id, Streams) of 1317 false -> 1318 State0#state{out_streamid=NextOutStreamID, out_state=wait}; 1319 #stream{queue=Commands} -> 1320 State = case Active of 1321 true -> State0; 1322 false -> active(State0) 1323 end, 1324 %% @todo Remove queue from the stream. 1325 %% We set the flow to the initial flow size even though 1326 %% we might have sent some data through already due to pipelining. 1327 Flow = maps:get(initial_stream_flow_size, Opts, 65535), 1328 commands(State#state{flow=Flow, out_streamid=NextOutStreamID, out_state=wait}, 1329 NextOutStreamID, Commands) 1330 end. 1331 1332 stream_call_terminate(StreamID, Reason, StreamState, #state{opts=Opts}) -> 1333 try 1334 cowboy_stream:terminate(StreamID, Reason, StreamState) 1335 catch Class:Exception:Stacktrace -> 1336 cowboy:log(cowboy_stream:make_error_log(terminate, 1337 [StreamID, Reason, StreamState], 1338 Class, Exception, Stacktrace), Opts) 1339 end. 1340 1341 maybe_req_close(#state{opts=#{http10_keepalive := false}}, _, 'HTTP/1.0') -> 1342 close; 1343 maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.0') -> 1344 Conns = cow_http_hd:parse_connection(Conn), 1345 case lists:member(<<"keep-alive">>, Conns) of 1346 true -> keepalive; 1347 false -> close 1348 end; 1349 maybe_req_close(_, _, 'HTTP/1.0') -> 1350 close; 1351 maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.1') -> 1352 case connection_hd_is_close(Conn) of 1353 true -> close; 1354 false -> keepalive 1355 end; 1356 maybe_req_close(_, _, _) -> 1357 keepalive. 1358 1359 connection(State=#state{last_streamid=StreamID}, Headers=#{<<"connection">> := Conn}, StreamID, _) -> 1360 case connection_hd_is_close(Conn) of 1361 true -> {State, Headers}; 1362 %% @todo Here we need to remove keep-alive and add close, not just add close. 1363 false -> {State, Headers#{<<"connection">> => [<<"close, ">>, Conn]}} 1364 end; 1365 connection(State=#state{last_streamid=StreamID}, Headers, StreamID, _) -> 1366 {State, Headers#{<<"connection">> => <<"close">>}}; 1367 connection(State, Headers=#{<<"connection">> := Conn}, StreamID, _) -> 1368 case connection_hd_is_close(Conn) of 1369 true -> {State#state{last_streamid=StreamID}, Headers}; 1370 %% @todo Here we need to set keep-alive only if it wasn't set before. 1371 false -> {State, Headers} 1372 end; 1373 connection(State, Headers, _, 'HTTP/1.0') -> 1374 {State, Headers#{<<"connection">> => <<"keep-alive">>}}; 1375 connection(State, Headers, _, _) -> 1376 {State, Headers}. 1377 1378 connection_hd_is_close(Conn) -> 1379 Conns = cow_http_hd:parse_connection(iolist_to_binary(Conn)), 1380 lists:member(<<"close">>, Conns). 1381 1382 stream_te(streaming, _) -> 1383 not_chunked; 1384 %% No TE header was sent. 1385 stream_te(_, #stream{te=undefined}) -> 1386 no_trailers; 1387 stream_te(_, #stream{te=TE0}) -> 1388 try cow_http_hd:parse_te(TE0) of 1389 {TE1, _} -> TE1 1390 catch _:_ -> 1391 %% If we can't parse the TE header, assume we can't send trailers. 1392 no_trailers 1393 end. 1394 1395 %% This function is only called when an error occurs on a new stream. 1396 -spec error_terminate(cowboy:http_status(), #state{}, _) -> no_return(). 1397 error_terminate(StatusCode, State=#state{ref=Ref, peer=Peer, in_state=StreamState}, Reason) -> 1398 PartialReq = case StreamState of 1399 #ps_request_line{} -> #{ 1400 ref => Ref, 1401 peer => Peer 1402 }; 1403 #ps_header{method=Method, path=Path, qs=Qs, 1404 version=Version, headers=ReqHeaders} -> #{ 1405 ref => Ref, 1406 peer => Peer, 1407 method => Method, 1408 path => Path, 1409 qs => Qs, 1410 version => Version, 1411 headers => case ReqHeaders of 1412 undefined -> #{}; 1413 _ -> ReqHeaders 1414 end 1415 } 1416 end, 1417 early_error(StatusCode, State, Reason, PartialReq, #{<<"connection">> => <<"close">>}), 1418 terminate(State, Reason). 1419 1420 early_error(StatusCode, State, Reason, PartialReq) -> 1421 early_error(StatusCode, State, Reason, PartialReq, #{}). 1422 1423 early_error(StatusCode0, #state{socket=Socket, transport=Transport, 1424 opts=Opts, in_streamid=StreamID}, Reason, PartialReq, RespHeaders0) -> 1425 RespHeaders1 = RespHeaders0#{<<"content-length">> => <<"0">>}, 1426 Resp = {response, StatusCode0, RespHeaders1, <<>>}, 1427 try cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts) of 1428 {response, StatusCode, RespHeaders, RespBody} -> 1429 Transport:send(Socket, [ 1430 cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(RespHeaders)), 1431 %% @todo We shouldn't send the body when the method is HEAD. 1432 %% @todo Technically we allow the sendfile tuple. 1433 RespBody 1434 ]) 1435 catch Class:Exception:Stacktrace -> 1436 cowboy:log(cowboy_stream:make_error_log(early_error, 1437 [StreamID, Reason, PartialReq, Resp, Opts], 1438 Class, Exception, Stacktrace), Opts), 1439 %% We still need to send an error response, so send what we initially 1440 %% wanted to send. It's better than nothing. 1441 Transport:send(Socket, cow_http:response(StatusCode0, 1442 'HTTP/1.1', maps:to_list(RespHeaders1))) 1443 end, 1444 ok. 1445 1446 initiate_closing(State=#state{streams=[]}, Reason) -> 1447 terminate(State, Reason); 1448 initiate_closing(State=#state{streams=[_Stream|Streams], 1449 out_streamid=OutStreamID}, Reason) -> 1450 terminate_all_streams(State, Streams, Reason), 1451 State#state{last_streamid=OutStreamID}. 1452 1453 -spec terminate(_, _) -> no_return(). 1454 terminate(undefined, Reason) -> 1455 exit({shutdown, Reason}); 1456 terminate(State=#state{streams=Streams, children=Children}, Reason) -> 1457 terminate_all_streams(State, Streams, Reason), 1458 cowboy_children:terminate(Children), 1459 terminate_linger(State), 1460 exit({shutdown, Reason}). 1461 1462 terminate_all_streams(_, [], _) -> 1463 ok; 1464 terminate_all_streams(State, [#stream{id=StreamID, state=StreamState}|Tail], Reason) -> 1465 stream_call_terminate(StreamID, Reason, StreamState, State), 1466 terminate_all_streams(State, Tail, Reason). 1467 1468 terminate_linger(State=#state{socket=Socket, transport=Transport, opts=Opts}) -> 1469 case Transport:shutdown(Socket, write) of 1470 ok -> 1471 case maps:get(linger_timeout, Opts, 1000) of 1472 0 -> 1473 ok; 1474 infinity -> 1475 terminate_linger_before_loop(State, undefined, Transport:messages()); 1476 Timeout -> 1477 TimerRef = erlang:start_timer(Timeout, self(), linger_timeout), 1478 terminate_linger_before_loop(State, TimerRef, Transport:messages()) 1479 end; 1480 {error, _} -> 1481 ok 1482 end. 1483 1484 terminate_linger_before_loop(State, TimerRef, Messages) -> 1485 %% We may already be in active mode when we do this 1486 %% but it's OK because we are shutting down anyway. 1487 case setopts_active(State) of 1488 ok -> 1489 terminate_linger_loop(State, TimerRef, Messages); 1490 {error, _} -> 1491 ok 1492 end. 1493 1494 terminate_linger_loop(State=#state{socket=Socket}, TimerRef, Messages) -> 1495 receive 1496 {OK, Socket, _} when OK =:= element(1, Messages) -> 1497 terminate_linger_loop(State, TimerRef, Messages); 1498 {Closed, Socket} when Closed =:= element(2, Messages) -> 1499 ok; 1500 {Error, Socket, _} when Error =:= element(3, Messages) -> 1501 ok; 1502 {Passive, Socket} when Passive =:= tcp_passive; Passive =:= ssl_passive -> 1503 terminate_linger_before_loop(State, TimerRef, Messages); 1504 {timeout, TimerRef, linger_timeout} -> 1505 ok; 1506 _ -> 1507 terminate_linger_loop(State, TimerRef, Messages) 1508 end. 1509 1510 %% System callbacks. 1511 1512 -spec system_continue(_, _, #state{}) -> ok. 1513 system_continue(_, _, State) -> 1514 loop(State). 1515 1516 -spec system_terminate(any(), _, _, #state{}) -> no_return(). 1517 system_terminate(Reason0, _, _, State) -> 1518 Reason = {stop, {exit, Reason0}, 'sys:terminate/2,3 was called.'}, 1519 loop(initiate_closing(State, Reason)). 1520 1521 -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}. 1522 system_code_change(Misc, _, _, _) -> 1523 {ok, Misc}.