cow_http2_machine.erl (68837B)
1 %% Copyright (c) 2018, Loïc Hoguin <essen@ninenines.eu> 2 %% 3 %% Permission to use, copy, modify, and/or distribute this software for any 4 %% purpose with or without fee is hereby granted, provided that the above 5 %% copyright notice and this permission notice appear in all copies. 6 %% 7 %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 8 %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 9 %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 10 %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 11 %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 12 %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 13 %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 14 15 -module(cow_http2_machine). 16 17 -export([init/2]). 18 -export([init_stream/2]). 19 -export([init_upgrade_stream/2]). 20 -export([frame/2]). 21 -export([ignored_frame/1]). 22 -export([timeout/3]). 23 -export([prepare_headers/5]). 24 -export([prepare_push_promise/4]). 25 -export([prepare_trailers/3]). 26 -export([send_or_queue_data/4]). 27 -export([ensure_window/2]). 28 -export([ensure_window/3]). 29 -export([update_window/2]). 30 -export([update_window/3]). 31 -export([reset_stream/2]). 32 -export([get_connection_local_buffer_size/1]). 33 -export([get_local_setting/2]). 34 -export([get_remote_settings/1]). 35 -export([get_last_streamid/1]). 36 -export([set_last_streamid/1]). 37 -export([get_stream_local_buffer_size/2]). 38 -export([get_stream_local_state/2]). 39 -export([get_stream_remote_state/2]). 40 -export([is_lingering_stream/2]). 41 42 -type opts() :: #{ 43 connection_window_margin_size => 0..16#7fffffff, 44 connection_window_update_threshold => 0..16#7fffffff, 45 enable_connect_protocol => boolean(), 46 initial_connection_window_size => 65535..16#7fffffff, 47 initial_stream_window_size => 0..16#7fffffff, 48 max_connection_window_size => 0..16#7fffffff, 49 max_concurrent_streams => non_neg_integer() | infinity, 50 max_decode_table_size => non_neg_integer(), 51 max_encode_table_size => non_neg_integer(), 52 max_frame_size_received => 16384..16777215, 53 max_frame_size_sent => 16384..16777215 | infinity, 54 max_stream_window_size => 0..16#7fffffff, 55 message_tag => any(), 56 preface_timeout => timeout(), 57 settings_timeout => timeout(), 58 stream_window_data_threshold => 0..16#7fffffff, 59 stream_window_margin_size => 0..16#7fffffff, 60 stream_window_update_threshold => 0..16#7fffffff 61 }. 62 -export_type([opts/0]). 63 64 %% The order of the fields is significant. 65 -record(sendfile, { 66 offset :: non_neg_integer(), 67 bytes :: pos_integer(), 68 path :: file:name_all() 69 }). 70 71 -record(stream, { 72 id = undefined :: cow_http2:streamid(), 73 74 %% Request method. 75 method = undefined :: binary(), 76 77 %% Whether we finished sending data. 78 local = idle :: idle | cow_http2:fin(), 79 80 %% Local flow control window (how much we can send). 81 local_window :: integer(), 82 83 %% Buffered data waiting for the flow control window to increase. 84 local_buffer = queue:new() :: 85 queue:queue({cow_http2:fin(), non_neg_integer(), {data, iodata()} | #sendfile{}}), 86 local_buffer_size = 0 :: non_neg_integer(), 87 local_trailers = undefined :: undefined | cow_http:headers(), 88 89 %% Whether we finished receiving data. 90 remote = idle :: idle | cow_http2:fin(), 91 92 %% Remote flow control window (how much we accept to receive). 93 remote_window :: integer(), 94 95 %% Size expected and read from the request body. 96 remote_expected_size = undefined :: undefined | non_neg_integer(), 97 remote_read_size = 0 :: non_neg_integer(), 98 99 %% Unparsed te header. Used to know if we can send trailers. 100 %% Note that we can always send trailers to the server. 101 te :: undefined | binary() 102 }). 103 104 -type stream() :: #stream{}. 105 106 -type continued_frame() :: 107 {headers, cow_http2:streamid(), cow_http2:fin(), cow_http2:head_fin(), binary()} | 108 {push_promise, cow_http2:streamid(), cow_http2:head_fin(), cow_http2:streamid(), binary()}. 109 110 -record(http2_machine, { 111 %% Whether the HTTP/2 endpoint is a client or a server. 112 mode :: client | server, 113 114 %% HTTP/2 SETTINGS customization. 115 opts = #{} :: opts(), 116 117 %% Connection-wide frame processing state. 118 state = settings :: settings | normal 119 | {continuation, request | response | trailers | push_promise, continued_frame()}, 120 121 %% Timer for the connection preface. 122 preface_timer = undefined :: undefined | reference(), 123 124 %% Timer for the ack for a SETTINGS frame we sent. 125 settings_timer = undefined :: undefined | reference(), 126 127 %% Settings are separate for each endpoint. In addition, settings 128 %% must be acknowledged before they can be expected to be applied. 129 local_settings = #{ 130 % header_table_size => 4096, 131 % enable_push => true, 132 % max_concurrent_streams => infinity, 133 initial_window_size => 65535 134 % max_frame_size => 16384 135 % max_header_list_size => infinity 136 } :: map(), 137 next_settings = undefined :: undefined | map(), 138 remote_settings = #{ 139 initial_window_size => 65535 140 } :: map(), 141 142 %% Connection-wide flow control window. 143 local_window = 65535 :: integer(), %% How much we can send. 144 remote_window = 65535 :: integer(), %% How much we accept to receive. 145 146 %% Stream identifiers. 147 local_streamid :: pos_integer(), %% The next streamid to be used. 148 remote_streamid = 0 :: non_neg_integer(), %% The last streamid received. 149 last_remote_streamid = 16#7fffffff :: non_neg_integer(), %% Used in GOAWAY. 150 151 %% Currently active HTTP/2 streams. Streams may be initiated either 152 %% by the client or by the server through PUSH_PROMISE frames. 153 streams = #{} :: #{cow_http2:streamid() => stream()}, 154 155 %% HTTP/2 streams that have recently been reset locally. 156 %% We are expected to keep receiving additional frames after 157 %% sending an RST_STREAM. 158 local_lingering_streams = [] :: [cow_http2:streamid()], 159 160 %% HTTP/2 streams that have recently been reset remotely. 161 %% We keep a few of these around in order to reject subsequent 162 %% frames on these streams. 163 remote_lingering_streams = [] :: [cow_http2:streamid()], 164 165 %% HPACK decoding and encoding state. 166 decode_state = cow_hpack:init() :: cow_hpack:state(), 167 encode_state = cow_hpack:init() :: cow_hpack:state() 168 }). 169 170 -opaque http2_machine() :: #http2_machine{}. 171 -export_type([http2_machine/0]). 172 173 -type pseudo_headers() :: #{} %% Trailers 174 | #{ %% Responses. 175 status := cow_http:status() 176 } | #{ %% Normal CONNECT requests. 177 method := binary(), 178 authority := binary() 179 } | #{ %% Other requests and extended CONNECT requests. 180 method := binary(), 181 scheme := binary(), 182 authority := binary(), 183 path := binary(), 184 protocol => binary() 185 }. 186 187 %% Returns true when the given StreamID is for a local-initiated stream. 188 -define(IS_SERVER_LOCAL(StreamID), ((StreamID rem 2) =:= 0)). 189 -define(IS_CLIENT_LOCAL(StreamID), ((StreamID rem 2) =:= 1)). 190 -define(IS_LOCAL(Mode, StreamID), ( 191 ((Mode =:= server) andalso ?IS_SERVER_LOCAL(StreamID)) 192 orelse 193 ((Mode =:= client) andalso ?IS_CLIENT_LOCAL(StreamID)) 194 )). 195 196 -spec init(client | server, opts()) -> {ok, iodata(), http2_machine()}. 197 init(client, Opts) -> 198 NextSettings = settings_init(Opts), 199 client_preface(#http2_machine{ 200 mode=client, 201 opts=Opts, 202 preface_timer=start_timer(preface_timeout, Opts), 203 settings_timer=start_timer(settings_timeout, Opts), 204 next_settings=NextSettings, 205 local_streamid=1 206 }); 207 init(server, Opts) -> 208 NextSettings = settings_init(Opts), 209 common_preface(#http2_machine{ 210 mode=server, 211 opts=Opts, 212 preface_timer=start_timer(preface_timeout, Opts), 213 settings_timer=start_timer(settings_timeout, Opts), 214 next_settings=NextSettings, 215 local_streamid=2 216 }). 217 218 %% @todo In Cowlib 3.0 we should always include MessageTag in the message. 219 %% It can be set to 'undefined' if the option is missing. 220 start_timer(Name, Opts=#{message_tag := MessageTag}) -> 221 case maps:get(Name, Opts, 5000) of 222 infinity -> undefined; 223 Timeout -> erlang:start_timer(Timeout, self(), {?MODULE, MessageTag, Name}) 224 end; 225 start_timer(Name, Opts) -> 226 case maps:get(Name, Opts, 5000) of 227 infinity -> undefined; 228 Timeout -> erlang:start_timer(Timeout, self(), {?MODULE, Name}) 229 end. 230 231 client_preface(State0) -> 232 {ok, CommonPreface, State} = common_preface(State0), 233 {ok, [ 234 <<"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n">>, 235 CommonPreface 236 ], State}. 237 238 %% We send next_settings and use defaults until we get an ack. 239 %% 240 %% We also send a WINDOW_UPDATE frame for the connection when 241 %% the user specified an initial_connection_window_size. 242 common_preface(State=#http2_machine{opts=Opts, next_settings=NextSettings}) -> 243 case maps:get(initial_connection_window_size, Opts, 65535) of 244 65535 -> 245 {ok, cow_http2:settings(NextSettings), State}; 246 Size -> 247 {ok, [ 248 cow_http2:settings(NextSettings), 249 cow_http2:window_update(Size - 65535) 250 ], update_window(Size - 65535, State)} 251 end. 252 253 settings_init(Opts) -> 254 S0 = setting_from_opt(#{}, Opts, max_decode_table_size, 255 header_table_size, 4096), 256 S1 = setting_from_opt(S0, Opts, max_concurrent_streams, 257 max_concurrent_streams, infinity), 258 S2 = setting_from_opt(S1, Opts, initial_stream_window_size, 259 initial_window_size, 65535), 260 S3 = setting_from_opt(S2, Opts, max_frame_size_received, 261 max_frame_size, 16384), 262 %% @todo max_header_list_size 263 setting_from_opt(S3, Opts, enable_connect_protocol, 264 enable_connect_protocol, false). 265 266 setting_from_opt(Settings, Opts, OptName, SettingName, Default) -> 267 case maps:get(OptName, Opts, Default) of 268 Default -> Settings; 269 Value -> Settings#{SettingName => Value} 270 end. 271 272 -spec init_stream(binary(), State) 273 -> {ok, cow_http2:streamid(), State} when State::http2_machine(). 274 init_stream(Method, State=#http2_machine{mode=client, local_streamid=LocalStreamID, 275 local_settings=#{initial_window_size := RemoteWindow}, 276 remote_settings=#{initial_window_size := LocalWindow}}) -> 277 Stream = #stream{id=LocalStreamID, method=Method, 278 local_window=LocalWindow, remote_window=RemoteWindow}, 279 {ok, LocalStreamID, stream_store(Stream, State#http2_machine{ 280 local_streamid=LocalStreamID + 2})}. 281 282 -spec init_upgrade_stream(binary(), State) 283 -> {ok, cow_http2:streamid(), State} when State::http2_machine(). 284 init_upgrade_stream(Method, State=#http2_machine{mode=server, remote_streamid=0, 285 local_settings=#{initial_window_size := RemoteWindow}, 286 remote_settings=#{initial_window_size := LocalWindow}}) -> 287 Stream = #stream{id=1, method=Method, 288 remote=fin, remote_expected_size=0, 289 local_window=LocalWindow, remote_window=RemoteWindow, te=undefined}, 290 {ok, 1, stream_store(Stream, State#http2_machine{remote_streamid=1})}. 291 292 -spec frame(cow_http2:frame(), State) 293 -> {ok, State} 294 | {ok, {data, cow_http2:streamid(), cow_http2:fin(), binary()}, State} 295 | {ok, {headers, cow_http2:streamid(), cow_http2:fin(), 296 cow_http:headers(), pseudo_headers(), non_neg_integer() | undefined}, State} 297 | {ok, {trailers, cow_http2:streamid(), cow_http:headers()}, State} 298 | {ok, {rst_stream, cow_http2:streamid(), cow_http2:error()}, State} 299 | {ok, {push_promise, cow_http2:streamid(), cow_http2:streamid(), 300 cow_http:headers(), pseudo_headers()}, State} 301 | {ok, {goaway, cow_http2:streamid(), cow_http2:error(), binary()}, State} 302 | {send, [{cow_http2:streamid(), cow_http2:fin(), 303 [{data, iodata()} | #sendfile{} | {trailers, cow_http:headers()}]}], State} 304 | {error, {stream_error, cow_http2:streamid(), cow_http2:error(), atom()}, State} 305 | {error, {connection_error, cow_http2:error(), atom()}, State} 306 when State::http2_machine(). 307 frame(Frame, State=#http2_machine{state=settings, preface_timer=TRef}) -> 308 ok = case TRef of 309 undefined -> ok; 310 _ -> erlang:cancel_timer(TRef, [{async, true}, {info, false}]) 311 end, 312 settings_frame(Frame, State#http2_machine{state=normal, preface_timer=undefined}); 313 frame(Frame, State=#http2_machine{state={continuation, _, _}}) -> 314 maybe_discard_result(continuation_frame(Frame, State)); 315 frame(settings_ack, State=#http2_machine{state=normal}) -> 316 settings_ack_frame(State); 317 frame(Frame, State=#http2_machine{state=normal}) -> 318 Result = case element(1, Frame) of 319 data -> data_frame(Frame, State); 320 headers -> headers_frame(Frame, State); 321 priority -> priority_frame(Frame, State); 322 rst_stream -> rst_stream_frame(Frame, State); 323 settings -> settings_frame(Frame, State); 324 push_promise -> push_promise_frame(Frame, State); 325 ping -> ping_frame(Frame, State); 326 ping_ack -> ping_ack_frame(Frame, State); 327 goaway -> goaway_frame(Frame, State); 328 window_update -> window_update_frame(Frame, State); 329 continuation -> unexpected_continuation_frame(Frame, State); 330 _ -> ignored_frame(State) 331 end, 332 maybe_discard_result(Result). 333 334 %% RFC7540 6.9. After sending a GOAWAY frame, the sender can discard frames for 335 %% streams initiated by the receiver with identifiers higher than the identified 336 %% last stream. However, any frames that alter connection state cannot be 337 %% completely ignored. For instance, HEADERS, PUSH_PROMISE, and CONTINUATION 338 %% frames MUST be minimally processed to ensure the state maintained for header 339 %% compression is consistent. 340 maybe_discard_result(FrameResult={ok, Result, State=#http2_machine{mode=Mode, 341 last_remote_streamid=MaxID}}) 342 when element(1, Result) =/= goaway -> 343 case element(2, Result) of 344 StreamID when StreamID > MaxID, not ?IS_LOCAL(Mode, StreamID) -> 345 {ok, State}; 346 _StreamID -> 347 FrameResult 348 end; 349 maybe_discard_result(FrameResult) -> 350 FrameResult. 351 352 %% DATA frame. 353 354 data_frame({data, StreamID, _, _}, State=#http2_machine{mode=Mode, 355 local_streamid=LocalStreamID, remote_streamid=RemoteStreamID}) 356 when (?IS_LOCAL(Mode, StreamID) andalso (StreamID >= LocalStreamID)) 357 orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID > RemoteStreamID)) -> 358 {error, {connection_error, protocol_error, 359 'DATA frame received on a stream in idle state. (RFC7540 5.1)'}, 360 State}; 361 data_frame({data, _, _, Data}, State=#http2_machine{remote_window=ConnWindow}) 362 when byte_size(Data) > ConnWindow -> 363 {error, {connection_error, flow_control_error, 364 'DATA frame overflowed the connection flow control window. (RFC7540 6.9, RFC7540 6.9.1)'}, 365 State}; 366 data_frame(Frame={data, StreamID, _, Data}, State0=#http2_machine{ 367 remote_window=ConnWindow, local_lingering_streams=Lingering}) -> 368 DataLen = byte_size(Data), 369 State = State0#http2_machine{remote_window=ConnWindow - DataLen}, 370 case stream_get(StreamID, State) of 371 #stream{remote_window=StreamWindow} when StreamWindow < DataLen -> 372 stream_reset(StreamID, State, flow_control_error, 373 'DATA frame overflowed the stream flow control window. (RFC7540 6.9, RFC7540 6.9.1)'); 374 Stream = #stream{remote=nofin} -> 375 data_frame(Frame, State, Stream, DataLen); 376 #stream{remote=idle} -> 377 stream_reset(StreamID, State, protocol_error, 378 'DATA frame received before a HEADERS frame. (RFC7540 8.1, RFC7540 8.1.2.6)'); 379 #stream{remote=fin} -> 380 stream_reset(StreamID, State, stream_closed, 381 'DATA frame received for a half-closed (remote) stream. (RFC7540 5.1)'); 382 undefined -> 383 %% After we send an RST_STREAM frame and terminate a stream, 384 %% the remote endpoint might still be sending us some more 385 %% frames until it can process this RST_STREAM. 386 case lists:member(StreamID, Lingering) of 387 true -> 388 {ok, State}; 389 false -> 390 {error, {connection_error, stream_closed, 391 'DATA frame received for a closed stream. (RFC7540 5.1)'}, 392 State} 393 end 394 end. 395 396 data_frame(Frame={data, _, IsFin, _}, State0, Stream0=#stream{id=StreamID, 397 remote_window=StreamWindow, remote_read_size=StreamRead}, DataLen) -> 398 Stream = Stream0#stream{remote=IsFin, 399 remote_window=StreamWindow - DataLen, 400 remote_read_size=StreamRead + DataLen}, 401 State = stream_store(Stream, State0), 402 case is_body_size_valid(Stream) of 403 true -> 404 {ok, Frame, State}; 405 false -> 406 stream_reset(StreamID, State, protocol_error, 407 'The total size of DATA frames is different than the content-length. (RFC7540 8.1.2.6)') 408 end. 409 410 %% It's always valid when no content-length header was specified. 411 is_body_size_valid(#stream{remote_expected_size=undefined}) -> 412 true; 413 %% We didn't finish reading the body but the size is already larger than expected. 414 is_body_size_valid(#stream{remote=nofin, remote_expected_size=Expected, 415 remote_read_size=Read}) when Read > Expected -> 416 false; 417 is_body_size_valid(#stream{remote=nofin}) -> 418 true; 419 is_body_size_valid(#stream{remote=fin, remote_expected_size=Expected, 420 remote_read_size=Expected}) -> 421 true; 422 %% We finished reading the body and the size read is not the one expected. 423 is_body_size_valid(_) -> 424 false. 425 426 %% HEADERS frame. 427 %% 428 %% We always close the connection when we detect errors before 429 %% decoding the headers to not waste resources on non-compliant 430 %% endpoints, making us stricter than the RFC requires. 431 432 %% Convenience record to manipulate the tuple. 433 %% The order of the fields matter. 434 -record(headers, { 435 id :: cow_http2:streamid(), 436 fin :: cow_http2:fin(), 437 head :: cow_http2:head_fin(), 438 data :: binary() 439 }). 440 441 headers_frame(Frame=#headers{}, State=#http2_machine{mode=Mode}) -> 442 case Mode of 443 server -> server_headers_frame(Frame, State); 444 client -> client_headers_frame(Frame, State) 445 end; 446 %% @todo Handle the PRIORITY data, but only if this returns an ok tuple. 447 %% @todo Do not lose the PRIORITY information if CONTINUATION frames follow. 448 headers_frame({headers, StreamID, IsFin, IsHeadFin, 449 _IsExclusive, _DepStreamID, _Weight, HeaderData}, 450 State=#http2_machine{mode=Mode}) -> 451 HeadersFrame = #headers{id=StreamID, fin=IsFin, head=IsHeadFin, data=HeaderData}, 452 case Mode of 453 server -> server_headers_frame(HeadersFrame, State); 454 client -> client_headers_frame(HeadersFrame, State) 455 end. 456 457 %% Reject HEADERS frames with even-numbered streamid. 458 server_headers_frame(#headers{id=StreamID}, State) 459 when ?IS_SERVER_LOCAL(StreamID) -> 460 {error, {connection_error, protocol_error, 461 'HEADERS frame received with even-numbered streamid. (RFC7540 5.1.1)'}, 462 State}; 463 %% HEADERS frame on an idle stream: new request. 464 server_headers_frame(Frame=#headers{id=StreamID, head=IsHeadFin}, 465 State=#http2_machine{mode=server, remote_streamid=RemoteStreamID}) 466 when StreamID > RemoteStreamID -> 467 case IsHeadFin of 468 head_fin -> 469 headers_decode(Frame, State, request, undefined); 470 head_nofin -> 471 {ok, State#http2_machine{state={continuation, request, Frame}}} 472 end; 473 %% Either a HEADERS frame received on (half-)closed stream, 474 %% or a HEADERS frame containing the trailers. 475 server_headers_frame(Frame=#headers{id=StreamID, fin=IsFin, head=IsHeadFin}, State) -> 476 case stream_get(StreamID, State) of 477 %% Trailers. 478 Stream = #stream{remote=nofin} when IsFin =:= fin -> 479 case IsHeadFin of 480 head_fin -> 481 headers_decode(Frame, State, trailers, Stream); 482 head_nofin -> 483 {ok, State#http2_machine{state={continuation, trailers, Frame}}} 484 end; 485 #stream{remote=nofin} -> 486 {error, {connection_error, protocol_error, 487 'Trailing HEADERS frame received without the END_STREAM flag set. (RFC7540 8.1, RFC7540 8.1.2.6)'}, 488 State}; 489 _ -> 490 {error, {connection_error, stream_closed, 491 'HEADERS frame received on a stream in closed or half-closed state. (RFC7540 5.1)'}, 492 State} 493 end. 494 495 %% Either a HEADERS frame received on an (half-)closed stream, 496 %% or a HEADERS frame containing the response or the trailers. 497 client_headers_frame(Frame=#headers{id=StreamID, fin=IsFin, head=IsHeadFin}, 498 State=#http2_machine{local_streamid=LocalStreamID, remote_streamid=RemoteStreamID}) 499 when (?IS_CLIENT_LOCAL(StreamID) andalso (StreamID < LocalStreamID)) 500 orelse ((not ?IS_CLIENT_LOCAL(StreamID)) andalso (StreamID =< RemoteStreamID)) -> 501 case stream_get(StreamID, State) of 502 Stream = #stream{remote=idle} -> 503 case IsHeadFin of 504 head_fin -> 505 headers_decode(Frame, State, response, Stream); 506 head_nofin -> 507 {ok, State#http2_machine{state={continuation, response, Frame}}} 508 end; 509 Stream = #stream{remote=nofin} when IsFin =:= fin -> 510 case IsHeadFin of 511 head_fin -> 512 headers_decode(Frame, State, trailers, Stream); 513 head_nofin -> 514 {ok, State#http2_machine{state={continuation, trailers, Frame}}} 515 end; 516 #stream{remote=nofin} -> 517 {error, {connection_error, protocol_error, 518 'Trailing HEADERS frame received without the END_STREAM flag set. (RFC7540 8.1, RFC7540 8.1.2.6)'}, 519 State}; 520 _ -> 521 {error, {connection_error, stream_closed, 522 'HEADERS frame received on a stream in closed or half-closed state. (RFC7540 5.1)'}, 523 State} 524 end; 525 %% Reject HEADERS frames received on idle streams. 526 client_headers_frame(_, State) -> 527 {error, {connection_error, protocol_error, 528 'HEADERS frame received on an idle stream. (RFC7540 5.1.1)'}, 529 State}. 530 531 headers_decode(Frame=#headers{head=head_fin, data=HeaderData}, 532 State=#http2_machine{decode_state=DecodeState0}, Type, Stream) -> 533 try cow_hpack:decode(HeaderData, DecodeState0) of 534 {Headers, DecodeState} when Type =:= request -> 535 headers_enforce_concurrency_limit(Frame, 536 State#http2_machine{decode_state=DecodeState}, Type, Stream, Headers); 537 {Headers, DecodeState} -> 538 headers_pseudo_headers(Frame, 539 State#http2_machine{decode_state=DecodeState}, Type, Stream, Headers) 540 catch _:_ -> 541 {error, {connection_error, compression_error, 542 'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'}, 543 State} 544 end. 545 546 headers_enforce_concurrency_limit(Frame=#headers{id=StreamID}, 547 State=#http2_machine{local_settings=LocalSettings, streams=Streams}, 548 Type, Stream, Headers) -> 549 MaxConcurrentStreams = maps:get(max_concurrent_streams, LocalSettings, infinity), 550 %% Using < is correct because this new stream is not included 551 %% in the Streams variable yet and so we'll end up with +1 stream. 552 case map_size(Streams) < MaxConcurrentStreams of 553 true -> 554 headers_pseudo_headers(Frame, State, Type, Stream, Headers); 555 false -> 556 {error, {stream_error, StreamID, refused_stream, 557 'Maximum number of concurrent streams has been reached. (RFC7540 5.1.2)'}, 558 State} 559 end. 560 561 headers_pseudo_headers(Frame, State=#http2_machine{local_settings=LocalSettings}, 562 Type, Stream, Headers0) when Type =:= request; Type =:= push_promise -> 563 IsExtendedConnectEnabled = maps:get(enable_connect_protocol, LocalSettings, false), 564 case request_pseudo_headers(Headers0, #{}) of 565 %% Extended CONNECT method (RFC8441). 566 {ok, PseudoHeaders=#{method := <<"CONNECT">>, scheme := _, 567 authority := _, path := _, protocol := _}, Headers} 568 when IsExtendedConnectEnabled -> 569 headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers); 570 {ok, #{method := <<"CONNECT">>, scheme := _, 571 authority := _, path := _}, _} 572 when IsExtendedConnectEnabled -> 573 headers_malformed(Frame, State, 574 'The :protocol pseudo-header MUST be sent with an extended CONNECT. (RFC8441 4)'); 575 {ok, #{protocol := _}, _} -> 576 headers_malformed(Frame, State, 577 'The :protocol pseudo-header is only defined for the extended CONNECT. (RFC8441 4)'); 578 %% Normal CONNECT (no scheme/path). 579 {ok, PseudoHeaders=#{method := <<"CONNECT">>, authority := _}, Headers} 580 when map_size(PseudoHeaders) =:= 2 -> 581 headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers); 582 {ok, #{method := <<"CONNECT">>}, _} -> 583 headers_malformed(Frame, State, 584 'CONNECT requests only use the :method and :authority pseudo-headers. (RFC7540 8.3)'); 585 %% Other requests. 586 {ok, PseudoHeaders=#{method := _, scheme := _, path := _}, Headers} -> 587 headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers); 588 {ok, _, _} -> 589 headers_malformed(Frame, State, 590 'A required pseudo-header was not found. (RFC7540 8.1.2.3)'); 591 {error, HumanReadable} -> 592 headers_malformed(Frame, State, HumanReadable) 593 end; 594 headers_pseudo_headers(Frame=#headers{id=StreamID}, 595 State, Type=response, Stream, Headers0) -> 596 case response_pseudo_headers(Headers0, #{}) of 597 {ok, PseudoHeaders=#{status := _}, Headers} -> 598 headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers); 599 {ok, _, _} -> 600 stream_reset(StreamID, State, protocol_error, 601 'A required pseudo-header was not found. (RFC7540 8.1.2.4)'); 602 {error, HumanReadable} -> 603 stream_reset(StreamID, State, protocol_error, HumanReadable) 604 end; 605 headers_pseudo_headers(Frame=#headers{id=StreamID}, 606 State, Type=trailers, Stream, Headers) -> 607 case trailers_contain_pseudo_headers(Headers) of 608 false -> 609 headers_regular_headers(Frame, State, Type, Stream, #{}, Headers); 610 true -> 611 stream_reset(StreamID, State, protocol_error, 612 'Trailer header blocks must not contain pseudo-headers. (RFC7540 8.1.2.1)') 613 end. 614 615 headers_malformed(#headers{id=StreamID}, State, HumanReadable) -> 616 {error, {stream_error, StreamID, protocol_error, HumanReadable}, State}. 617 618 request_pseudo_headers([{<<":method">>, _}|_], #{method := _}) -> 619 {error, 'Multiple :method pseudo-headers were found. (RFC7540 8.1.2.3)'}; 620 request_pseudo_headers([{<<":method">>, Method}|Tail], PseudoHeaders) -> 621 request_pseudo_headers(Tail, PseudoHeaders#{method => Method}); 622 request_pseudo_headers([{<<":scheme">>, _}|_], #{scheme := _}) -> 623 {error, 'Multiple :scheme pseudo-headers were found. (RFC7540 8.1.2.3)'}; 624 request_pseudo_headers([{<<":scheme">>, Scheme}|Tail], PseudoHeaders) -> 625 request_pseudo_headers(Tail, PseudoHeaders#{scheme => Scheme}); 626 request_pseudo_headers([{<<":authority">>, _}|_], #{authority := _}) -> 627 {error, 'Multiple :authority pseudo-headers were found. (RFC7540 8.1.2.3)'}; 628 request_pseudo_headers([{<<":authority">>, Authority}|Tail], PseudoHeaders) -> 629 request_pseudo_headers(Tail, PseudoHeaders#{authority => Authority}); 630 request_pseudo_headers([{<<":path">>, _}|_], #{path := _}) -> 631 {error, 'Multiple :path pseudo-headers were found. (RFC7540 8.1.2.3)'}; 632 request_pseudo_headers([{<<":path">>, Path}|Tail], PseudoHeaders) -> 633 request_pseudo_headers(Tail, PseudoHeaders#{path => Path}); 634 request_pseudo_headers([{<<":protocol">>, _}|_], #{protocol := _}) -> 635 {error, 'Multiple :protocol pseudo-headers were found. (RFC7540 8.1.2.3)'}; 636 request_pseudo_headers([{<<":protocol">>, Protocol}|Tail], PseudoHeaders) -> 637 request_pseudo_headers(Tail, PseudoHeaders#{protocol => Protocol}); 638 request_pseudo_headers([{<<":", _/bits>>, _}|_], _) -> 639 {error, 'An unknown or invalid pseudo-header was found. (RFC7540 8.1.2.1)'}; 640 request_pseudo_headers(Headers, PseudoHeaders) -> 641 {ok, PseudoHeaders, Headers}. 642 643 response_pseudo_headers([{<<":status">>, _}|_], #{status := _}) -> 644 {error, 'Multiple :status pseudo-headers were found. (RFC7540 8.1.2.3)'}; 645 response_pseudo_headers([{<<":status">>, Status}|Tail], PseudoHeaders) -> 646 try cow_http:status_to_integer(Status) of 647 IntStatus -> 648 response_pseudo_headers(Tail, PseudoHeaders#{status => IntStatus}) 649 catch _:_ -> 650 {error, 'The :status pseudo-header value is invalid. (RFC7540 8.1.2.4)'} 651 end; 652 response_pseudo_headers([{<<":", _/bits>>, _}|_], _) -> 653 {error, 'An unknown or invalid pseudo-header was found. (RFC7540 8.1.2.1)'}; 654 response_pseudo_headers(Headers, PseudoHeaders) -> 655 {ok, PseudoHeaders, Headers}. 656 657 trailers_contain_pseudo_headers([]) -> 658 false; 659 trailers_contain_pseudo_headers([{<<":", _/bits>>, _}|_]) -> 660 true; 661 trailers_contain_pseudo_headers([_|Tail]) -> 662 trailers_contain_pseudo_headers(Tail). 663 664 %% Rejecting invalid regular headers might be a bit too strong for clients. 665 headers_regular_headers(Frame=#headers{id=StreamID}, 666 State, Type, Stream, PseudoHeaders, Headers) -> 667 case regular_headers(Headers, Type) of 668 ok when Type =:= request -> 669 request_expected_size(Frame, State, Type, Stream, PseudoHeaders, Headers); 670 ok when Type =:= push_promise -> 671 push_promise_frame(Frame, State, Stream, PseudoHeaders, Headers); 672 ok when Type =:= response -> 673 response_expected_size(Frame, State, Type, Stream, PseudoHeaders, Headers); 674 ok when Type =:= trailers -> 675 trailers_frame(Frame, State, Stream, Headers); 676 {error, HumanReadable} when Type =:= request -> 677 headers_malformed(Frame, State, HumanReadable); 678 {error, HumanReadable} -> 679 stream_reset(StreamID, State, protocol_error, HumanReadable) 680 end. 681 682 regular_headers([{<<>>, _}|_], _) -> 683 {error, 'Empty header names are not valid regular headers. (CVE-2019-9516)'}; 684 regular_headers([{<<":", _/bits>>, _}|_], _) -> 685 {error, 'Pseudo-headers were found after regular headers. (RFC7540 8.1.2.1)'}; 686 regular_headers([{<<"connection">>, _}|_], _) -> 687 {error, 'The connection header is not allowed. (RFC7540 8.1.2.2)'}; 688 regular_headers([{<<"keep-alive">>, _}|_], _) -> 689 {error, 'The keep-alive header is not allowed. (RFC7540 8.1.2.2)'}; 690 regular_headers([{<<"proxy-authenticate">>, _}|_], _) -> 691 {error, 'The proxy-authenticate header is not allowed. (RFC7540 8.1.2.2)'}; 692 regular_headers([{<<"proxy-authorization">>, _}|_], _) -> 693 {error, 'The proxy-authorization header is not allowed. (RFC7540 8.1.2.2)'}; 694 regular_headers([{<<"transfer-encoding">>, _}|_], _) -> 695 {error, 'The transfer-encoding header is not allowed. (RFC7540 8.1.2.2)'}; 696 regular_headers([{<<"upgrade">>, _}|_], _) -> 697 {error, 'The upgrade header is not allowed. (RFC7540 8.1.2.2)'}; 698 regular_headers([{<<"te">>, Value}|_], request) when Value =/= <<"trailers">> -> 699 {error, 'The te header with a value other than "trailers" is not allowed. (RFC7540 8.1.2.2)'}; 700 regular_headers([{<<"te">>, _}|_], Type) when Type =/= request -> 701 {error, 'The te header is only allowed in request headers. (RFC7540 8.1.2.2)'}; 702 regular_headers([{Name, _}|Tail], Type) -> 703 Pattern = [ 704 <<$A>>, <<$B>>, <<$C>>, <<$D>>, <<$E>>, <<$F>>, <<$G>>, <<$H>>, <<$I>>, 705 <<$J>>, <<$K>>, <<$L>>, <<$M>>, <<$N>>, <<$O>>, <<$P>>, <<$Q>>, <<$R>>, 706 <<$S>>, <<$T>>, <<$U>>, <<$V>>, <<$W>>, <<$X>>, <<$Y>>, <<$Z>> 707 ], 708 case binary:match(Name, Pattern) of 709 nomatch -> regular_headers(Tail, Type); 710 _ -> {error, 'Header names must be lowercase. (RFC7540 8.1.2)'} 711 end; 712 regular_headers([], _) -> 713 ok. 714 715 request_expected_size(Frame=#headers{fin=IsFin}, State, Type, Stream, PseudoHeaders, Headers) -> 716 case [CL || {<<"content-length">>, CL} <- Headers] of 717 [] when IsFin =:= fin -> 718 headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0); 719 [] -> 720 headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, undefined); 721 [<<"0">>] when IsFin =:= fin -> 722 headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0); 723 [_] when IsFin =:= fin -> 724 headers_malformed(Frame, State, 725 'HEADERS frame with the END_STREAM flag contains a non-zero content-length. (RFC7540 8.1.2.6)'); 726 [BinLen] -> 727 headers_parse_expected_size(Frame, State, Type, Stream, 728 PseudoHeaders, Headers, BinLen); 729 _ -> 730 headers_malformed(Frame, State, 731 'Multiple content-length headers were received. (RFC7230 3.3.2)') 732 end. 733 734 response_expected_size(Frame=#headers{id=StreamID, fin=IsFin}, State, Type, 735 Stream=#stream{method=Method}, PseudoHeaders=#{status := Status}, Headers) -> 736 case [CL || {<<"content-length">>, CL} <- Headers] of 737 [] when IsFin =:= fin -> 738 headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0); 739 [] -> 740 headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, undefined); 741 [_] when Status >= 100, Status =< 199 -> 742 stream_reset(StreamID, State, protocol_error, 743 'Content-length header received in a 1xx response. (RFC7230 3.3.2)'); 744 [_] when Status =:= 204 -> 745 stream_reset(StreamID, State, protocol_error, 746 'Content-length header received in a 204 response. (RFC7230 3.3.2)'); 747 [_] when Status >= 200, Status =< 299, Method =:= <<"CONNECT">> -> 748 stream_reset(StreamID, State, protocol_error, 749 'Content-length header received in a 2xx response to a CONNECT request. (RFC7230 3.3.2).'); 750 %% Responses to HEAD requests, and 304 responses may contain 751 %% a content-length header that must be ignored. (RFC7230 3.3.2) 752 [_] when Method =:= <<"HEAD">> -> 753 headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0); 754 [_] when Status =:= 304 -> 755 headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0); 756 [<<"0">>] when IsFin =:= fin -> 757 headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0); 758 [_] when IsFin =:= fin -> 759 stream_reset(StreamID, State, protocol_error, 760 'HEADERS frame with the END_STREAM flag contains a non-zero content-length. (RFC7540 8.1.2.6)'); 761 [BinLen] -> 762 headers_parse_expected_size(Frame, State, Type, Stream, 763 PseudoHeaders, Headers, BinLen); 764 _ -> 765 stream_reset(StreamID, State, protocol_error, 766 'Multiple content-length headers were received. (RFC7230 3.3.2)') 767 end. 768 769 headers_parse_expected_size(Frame=#headers{id=StreamID}, 770 State, Type, Stream, PseudoHeaders, Headers, BinLen) -> 771 try cow_http_hd:parse_content_length(BinLen) of 772 Len -> 773 headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, Len) 774 catch 775 _:_ -> 776 HumanReadable = 'The content-length header is invalid. (RFC7230 3.3.2)', 777 case Type of 778 request -> headers_malformed(Frame, State, HumanReadable); 779 response -> stream_reset(StreamID, State, protocol_error, HumanReadable) 780 end 781 end. 782 783 headers_frame(#headers{id=StreamID, fin=IsFin}, State0=#http2_machine{ 784 local_settings=#{initial_window_size := RemoteWindow}, 785 remote_settings=#{initial_window_size := LocalWindow}}, 786 Type, Stream0, PseudoHeaders, Headers, Len) -> 787 {Stream, State1} = case Type of 788 request -> 789 TE = case lists:keyfind(<<"te">>, 1, Headers) of 790 {_, TE0} -> TE0; 791 false -> undefined 792 end, 793 {#stream{id=StreamID, method=maps:get(method, PseudoHeaders), 794 remote=IsFin, remote_expected_size=Len, 795 local_window=LocalWindow, remote_window=RemoteWindow, te=TE}, 796 State0#http2_machine{remote_streamid=StreamID}}; 797 response -> 798 Stream1 = case PseudoHeaders of 799 #{status := Status} when Status >= 100, Status =< 199 -> Stream0; 800 _ -> Stream0#stream{remote=IsFin, remote_expected_size=Len} 801 end, 802 {Stream1, State0} 803 end, 804 State = stream_store(Stream, State1), 805 {ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, Len}, State}. 806 807 trailers_frame(#headers{id=StreamID}, State0, Stream0, Headers) -> 808 Stream = Stream0#stream{remote=fin}, 809 State = stream_store(Stream, State0), 810 case is_body_size_valid(Stream) of 811 true -> 812 {ok, {trailers, StreamID, Headers}, State}; 813 false -> 814 stream_reset(StreamID, State, protocol_error, 815 'The total size of DATA frames is different than the content-length. (RFC7540 8.1.2.6)') 816 end. 817 818 %% PRIORITY frame. 819 %% 820 %% @todo Handle PRIORITY frames. 821 822 priority_frame(_Frame, State) -> 823 {ok, State}. 824 825 %% RST_STREAM frame. 826 827 rst_stream_frame({rst_stream, StreamID, _}, State=#http2_machine{mode=Mode, 828 local_streamid=LocalStreamID, remote_streamid=RemoteStreamID}) 829 when (?IS_LOCAL(Mode, StreamID) andalso (StreamID >= LocalStreamID)) 830 orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID > RemoteStreamID)) -> 831 {error, {connection_error, protocol_error, 832 'RST_STREAM frame received on a stream in idle state. (RFC7540 5.1)'}, 833 State}; 834 rst_stream_frame({rst_stream, StreamID, Reason}, State=#http2_machine{ 835 streams=Streams0, remote_lingering_streams=Lingering0}) -> 836 Streams = maps:remove(StreamID, Streams0), 837 %% We only keep up to 10 streams in this state. @todo Make it configurable? 838 Lingering = [StreamID|lists:sublist(Lingering0, 10 - 1)], 839 {ok, {rst_stream, StreamID, Reason}, 840 State#http2_machine{streams=Streams, remote_lingering_streams=Lingering}}. 841 842 %% SETTINGS frame. 843 844 settings_frame({settings, Settings}, State0=#http2_machine{ 845 opts=Opts, remote_settings=Settings0}) -> 846 State1 = State0#http2_machine{remote_settings=maps:merge(Settings0, Settings)}, 847 State2 = maps:fold(fun 848 (header_table_size, NewSize, State=#http2_machine{encode_state=EncodeState0}) -> 849 MaxSize = maps:get(max_encode_table_size, Opts, 4096), 850 EncodeState = cow_hpack:set_max_size(min(NewSize, MaxSize), EncodeState0), 851 State#http2_machine{encode_state=EncodeState}; 852 (initial_window_size, NewWindowSize, State) -> 853 OldWindowSize = maps:get(initial_window_size, Settings0, 65535), 854 streams_update_local_window(State, NewWindowSize - OldWindowSize); 855 (_, _, State) -> 856 State 857 end, State1, Settings), 858 case Settings of 859 #{initial_window_size := _} -> send_data(State2); 860 _ -> {ok, State2} 861 end; 862 %% We expect to receive a SETTINGS frame as part of the preface. 863 settings_frame(_F, State=#http2_machine{mode=server}) -> 864 {error, {connection_error, protocol_error, 865 'The preface sequence must be followed by a SETTINGS frame. (RFC7540 3.5)'}, 866 State}; 867 settings_frame(_F, State) -> 868 {error, {connection_error, protocol_error, 869 'The preface must begin with a SETTINGS frame. (RFC7540 3.5)'}, 870 State}. 871 872 %% When SETTINGS_INITIAL_WINDOW_SIZE changes we need to update 873 %% the local stream windows for all active streams and perhaps 874 %% resume sending data. 875 streams_update_local_window(State=#http2_machine{streams=Streams0}, Increment) -> 876 Streams = maps:map(fun(_, S=#stream{local_window=StreamWindow}) -> 877 S#stream{local_window=StreamWindow + Increment} 878 end, Streams0), 879 State#http2_machine{streams=Streams}. 880 881 %% Ack for a previously sent SETTINGS frame. 882 883 settings_ack_frame(State0=#http2_machine{settings_timer=TRef, 884 local_settings=Local0, next_settings=NextSettings}) -> 885 ok = case TRef of 886 undefined -> ok; 887 _ -> erlang:cancel_timer(TRef, [{async, true}, {info, false}]) 888 end, 889 Local = maps:merge(Local0, NextSettings), 890 State1 = State0#http2_machine{settings_timer=undefined, 891 local_settings=Local, next_settings=#{}}, 892 {ok, maps:fold(fun 893 (header_table_size, MaxSize, State=#http2_machine{decode_state=DecodeState0}) -> 894 DecodeState = cow_hpack:set_max_size(MaxSize, DecodeState0), 895 State#http2_machine{decode_state=DecodeState}; 896 (initial_window_size, NewWindowSize, State) -> 897 OldWindowSize = maps:get(initial_window_size, Local0, 65535), 898 streams_update_remote_window(State, NewWindowSize - OldWindowSize); 899 (_, _, State) -> 900 State 901 end, State1, NextSettings)}. 902 903 %% When we receive an ack to a SETTINGS frame we sent we need to update 904 %% the remote stream windows for all active streams. 905 streams_update_remote_window(State=#http2_machine{streams=Streams0}, Increment) -> 906 Streams = maps:map(fun(_, S=#stream{remote_window=StreamWindow}) -> 907 S#stream{remote_window=StreamWindow + Increment} 908 end, Streams0), 909 State#http2_machine{streams=Streams}. 910 911 %% PUSH_PROMISE frame. 912 913 %% Convenience record to manipulate the tuple. 914 %% The order of the fields matter. 915 -record(push_promise, { 916 id :: cow_http2:streamid(), 917 head :: cow_http2:head_fin(), 918 promised_id :: cow_http2:streamid(), 919 data :: binary() 920 }). 921 922 push_promise_frame(_, State=#http2_machine{mode=server}) -> 923 {error, {connection_error, protocol_error, 924 'PUSH_PROMISE frames MUST NOT be sent by the client. (RFC7540 6.6)'}, 925 State}; 926 push_promise_frame(_, State=#http2_machine{local_settings=#{enable_push := false}}) -> 927 {error, {connection_error, protocol_error, 928 'PUSH_PROMISE frame received despite SETTINGS_ENABLE_PUSH set to 0. (RFC7540 6.6)'}, 929 State}; 930 push_promise_frame(#push_promise{promised_id=PromisedStreamID}, 931 State=#http2_machine{remote_streamid=RemoteStreamID}) 932 when PromisedStreamID =< RemoteStreamID -> 933 {error, {connection_error, protocol_error, 934 'PUSH_PROMISE frame received for a promised stream in closed or half-closed state. (RFC7540 5.1, RFC7540 6.6)'}, 935 State}; 936 push_promise_frame(#push_promise{id=StreamID}, State) 937 when not ?IS_CLIENT_LOCAL(StreamID) -> 938 {error, {connection_error, protocol_error, 939 'PUSH_PROMISE frame received on a server-initiated stream. (RFC7540 6.6)'}, 940 State}; 941 push_promise_frame(Frame=#push_promise{id=StreamID, head=IsHeadFin, 942 promised_id=PromisedStreamID, data=HeaderData}, State) -> 943 case stream_get(StreamID, State) of 944 Stream=#stream{remote=idle} -> 945 case IsHeadFin of 946 head_fin -> 947 headers_decode(#headers{id=PromisedStreamID, 948 fin=fin, head=IsHeadFin, data=HeaderData}, 949 State, push_promise, Stream); 950 head_nofin -> 951 {ok, State#http2_machine{state={continuation, push_promise, Frame}}} 952 end; 953 _ -> 954 %% @todo Check if the stream is lingering. If it is, decode the frame 955 %% and do what? That's the big question and why it's not implemented yet. 956 % However, an endpoint that 957 % has sent RST_STREAM on the associated stream MUST handle PUSH_PROMISE 958 % frames that might have been created before the RST_STREAM frame is 959 % received and processed. (RFC7540 6.6) 960 {error, {connection_error, stream_closed, 961 'PUSH_PROMISE frame received on a stream in closed or half-closed state. (RFC7540 5.1, RFC7540 6.6)'}, 962 State} 963 end. 964 965 push_promise_frame(#headers{id=PromisedStreamID}, 966 State0=#http2_machine{ 967 local_settings=#{initial_window_size := RemoteWindow}, 968 remote_settings=#{initial_window_size := LocalWindow}}, 969 #stream{id=StreamID}, PseudoHeaders=#{method := Method}, Headers) -> 970 TE = case lists:keyfind(<<"te">>, 1, Headers) of 971 {_, TE0} -> TE0; 972 false -> undefined 973 end, 974 PromisedStream = #stream{id=PromisedStreamID, method=Method, 975 local=fin, local_window=LocalWindow, 976 remote_window=RemoteWindow, te=TE}, 977 State = stream_store(PromisedStream, 978 State0#http2_machine{remote_streamid=PromisedStreamID}), 979 {ok, {push_promise, StreamID, PromisedStreamID, Headers, PseudoHeaders}, State}. 980 981 %% PING frame. 982 983 ping_frame({ping, _}, State) -> 984 {ok, State}. 985 986 %% Ack for a previously sent PING frame. 987 %% 988 %% @todo Might want to check contents but probably a waste of time. 989 990 ping_ack_frame({ping_ack, _}, State) -> 991 {ok, State}. 992 993 %% GOAWAY frame. 994 995 goaway_frame(Frame={goaway, _, _, _}, State) -> 996 {ok, Frame, State}. 997 998 %% WINDOW_UPDATE frame. 999 1000 %% Connection-wide WINDOW_UPDATE frame. 1001 window_update_frame({window_update, Increment}, State=#http2_machine{local_window=ConnWindow}) 1002 when ConnWindow + Increment > 16#7fffffff -> 1003 {error, {connection_error, flow_control_error, 1004 'The flow control window must not be greater than 2^31-1. (RFC7540 6.9.1)'}, 1005 State}; 1006 window_update_frame({window_update, Increment}, State=#http2_machine{local_window=ConnWindow}) -> 1007 send_data(State#http2_machine{local_window=ConnWindow + Increment}); 1008 %% Stream-specific WINDOW_UPDATE frame. 1009 window_update_frame({window_update, StreamID, _}, State=#http2_machine{mode=Mode, 1010 local_streamid=LocalStreamID, remote_streamid=RemoteStreamID}) 1011 when (?IS_LOCAL(Mode, StreamID) andalso (StreamID >= LocalStreamID)) 1012 orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID > RemoteStreamID)) -> 1013 {error, {connection_error, protocol_error, 1014 'WINDOW_UPDATE frame received on a stream in idle state. (RFC7540 5.1)'}, 1015 State}; 1016 window_update_frame({window_update, StreamID, Increment}, 1017 State0=#http2_machine{remote_lingering_streams=Lingering}) -> 1018 case stream_get(StreamID, State0) of 1019 #stream{local_window=StreamWindow} when StreamWindow + Increment > 16#7fffffff -> 1020 stream_reset(StreamID, State0, flow_control_error, 1021 'The flow control window must not be greater than 2^31-1. (RFC7540 6.9.1)'); 1022 Stream0 = #stream{local_window=StreamWindow} -> 1023 send_data(Stream0#stream{local_window=StreamWindow + Increment}, State0); 1024 undefined -> 1025 %% WINDOW_UPDATE frames may be received for a short period of time 1026 %% after a stream is closed. They must be ignored. 1027 case lists:member(StreamID, Lingering) of 1028 false -> {ok, State0}; 1029 true -> stream_reset(StreamID, State0, stream_closed, 1030 'WINDOW_UPDATE frame received after the stream was reset. (RFC7540 5.1)') 1031 end 1032 end. 1033 1034 %% CONTINUATION frame. 1035 1036 %% Convenience record to manipulate the tuple. 1037 %% The order of the fields matter. 1038 -record(continuation, { 1039 id :: cow_http2:streamid(), 1040 head :: cow_http2:head_fin(), 1041 data :: binary() 1042 }). 1043 1044 unexpected_continuation_frame(#continuation{}, State) -> 1045 {error, {connection_error, protocol_error, 1046 'CONTINUATION frames MUST be preceded by a HEADERS or PUSH_PROMISE frame. (RFC7540 6.10)'}, 1047 State}. 1048 1049 continuation_frame(#continuation{id=StreamID, head=head_fin, data=HeaderFragment1}, 1050 State=#http2_machine{state={continuation, Type, 1051 Frame=#headers{id=StreamID, data=HeaderFragment0}}}) -> 1052 HeaderData = <<HeaderFragment0/binary, HeaderFragment1/binary>>, 1053 headers_decode(Frame#headers{head=head_fin, data=HeaderData}, 1054 State#http2_machine{state=normal}, Type, stream_get(StreamID, State)); 1055 continuation_frame(#continuation{id=StreamID, head=head_fin, data=HeaderFragment1}, 1056 State=#http2_machine{state={continuation, Type, #push_promise{ 1057 id=StreamID, promised_id=PromisedStreamID, data=HeaderFragment0}}}) -> 1058 HeaderData = <<HeaderFragment0/binary, HeaderFragment1/binary>>, 1059 headers_decode(#headers{id=PromisedStreamID, fin=fin, head=head_fin, data=HeaderData}, 1060 State#http2_machine{state=normal}, Type, undefined); 1061 continuation_frame(#continuation{id=StreamID, data=HeaderFragment1}, 1062 State=#http2_machine{state={continuation, Type, ContinuedFrame0}}) 1063 when element(2, ContinuedFrame0) =:= StreamID -> 1064 ContinuedFrame = case ContinuedFrame0 of 1065 #headers{data=HeaderFragment0} -> 1066 HeaderData = <<HeaderFragment0/binary, HeaderFragment1/binary>>, 1067 ContinuedFrame0#headers{data=HeaderData}; 1068 #push_promise{data=HeaderFragment0} -> 1069 HeaderData = <<HeaderFragment0/binary, HeaderFragment1/binary>>, 1070 ContinuedFrame0#push_promise{data=HeaderData} 1071 end, 1072 {ok, State#http2_machine{state={continuation, Type, ContinuedFrame}}}; 1073 continuation_frame(_F, State) -> 1074 {error, {connection_error, protocol_error, 1075 'An invalid frame was received in the middle of a header block. (RFC7540 6.2)'}, 1076 State}. 1077 1078 %% Ignored frames. 1079 1080 -spec ignored_frame(State) 1081 -> {ok, State} 1082 | {error, {connection_error, protocol_error, atom()}, State} 1083 when State::http2_machine(). 1084 ignored_frame(State=#http2_machine{state={continuation, _, _}}) -> 1085 {error, {connection_error, protocol_error, 1086 'An invalid frame was received in the middle of a header block. (RFC7540 6.2)'}, 1087 State}; 1088 %% @todo It might be useful to error out when we receive 1089 %% too many unknown frames. (RFC7540 10.5) 1090 ignored_frame(State) -> 1091 {ok, State}. 1092 1093 %% Timeouts. 1094 1095 -spec timeout(preface_timeout | settings_timeout, reference(), State) 1096 -> {ok, State} 1097 | {error, {connection_error, cow_http2:error(), atom()}, State} 1098 when State::http2_machine(). 1099 timeout(preface_timeout, TRef, State=#http2_machine{preface_timer=TRef}) -> 1100 {error, {connection_error, protocol_error, 1101 'The preface was not received in a reasonable amount of time.'}, 1102 State}; 1103 timeout(settings_timeout, TRef, State=#http2_machine{settings_timer=TRef}) -> 1104 {error, {connection_error, settings_timeout, 1105 'The SETTINGS ack was not received within the configured time. (RFC7540 6.5.3)'}, 1106 State}; 1107 timeout(_, _, State) -> 1108 {ok, State}. 1109 1110 %% Functions for sending a message header or body. Note that 1111 %% this module does not send data directly, instead it returns 1112 %% a value that can then be used to send the frames. 1113 1114 -spec prepare_headers(cow_http2:streamid(), State, idle | cow_http2:fin(), 1115 pseudo_headers(), cow_http:headers()) 1116 -> {ok, cow_http2:fin(), iodata(), State} when State::http2_machine(). 1117 prepare_headers(StreamID, State=#http2_machine{encode_state=EncodeState0}, 1118 IsFin0, PseudoHeaders, Headers0) -> 1119 Stream = #stream{method=Method, local=idle} = stream_get(StreamID, State), 1120 IsFin = case {IsFin0, Method} of 1121 {idle, _} -> nofin; 1122 {_, <<"HEAD">>} -> fin; 1123 _ -> IsFin0 1124 end, 1125 Headers = merge_pseudo_headers(PseudoHeaders, remove_http11_headers(Headers0)), 1126 {HeaderBlock, EncodeState} = cow_hpack:encode(Headers, EncodeState0), 1127 {ok, IsFin, HeaderBlock, stream_store(Stream#stream{local=IsFin0}, 1128 State#http2_machine{encode_state=EncodeState})}. 1129 1130 -spec prepare_push_promise(cow_http2:streamid(), State, pseudo_headers(), cow_http:headers()) 1131 -> {ok, cow_http2:streamid(), iodata(), State} 1132 | {error, no_push} when State::http2_machine(). 1133 prepare_push_promise(_, #http2_machine{remote_settings=#{enable_push := false}}, _, _) -> 1134 {error, no_push}; 1135 prepare_push_promise(StreamID, State=#http2_machine{encode_state=EncodeState0, 1136 local_settings=#{initial_window_size := RemoteWindow}, 1137 remote_settings=#{initial_window_size := LocalWindow}, 1138 local_streamid=LocalStreamID}, PseudoHeaders, Headers0) -> 1139 #stream{local=idle} = stream_get(StreamID, State), 1140 TE = case lists:keyfind(<<"te">>, 1, Headers0) of 1141 {_, TE0} -> TE0; 1142 false -> undefined 1143 end, 1144 Headers = merge_pseudo_headers(PseudoHeaders, remove_http11_headers(Headers0)), 1145 {HeaderBlock, EncodeState} = cow_hpack:encode(Headers, EncodeState0), 1146 {ok, LocalStreamID, HeaderBlock, stream_store( 1147 #stream{id=LocalStreamID, method=maps:get(method, PseudoHeaders), 1148 remote=fin, remote_expected_size=0, 1149 local_window=LocalWindow, remote_window=RemoteWindow, te=TE}, 1150 State#http2_machine{encode_state=EncodeState, local_streamid=LocalStreamID + 2})}. 1151 1152 remove_http11_headers(Headers) -> 1153 RemoveHeaders0 = [ 1154 <<"keep-alive">>, 1155 <<"proxy-connection">>, 1156 <<"transfer-encoding">>, 1157 <<"upgrade">> 1158 ], 1159 RemoveHeaders = case lists:keyfind(<<"connection">>, 1, Headers) of 1160 false -> 1161 RemoveHeaders0; 1162 {_, ConnHd} -> 1163 %% We do not need to worry about any "close" header because 1164 %% that header name is reserved. 1165 Connection = cow_http_hd:parse_connection(ConnHd), 1166 Connection ++ [<<"connection">>|RemoveHeaders0] 1167 end, 1168 lists:filter(fun({Name, _}) -> 1169 not lists:member(Name, RemoveHeaders) 1170 end, Headers). 1171 1172 merge_pseudo_headers(PseudoHeaders, Headers0) -> 1173 lists:foldl(fun 1174 ({status, Status}, Acc) when is_integer(Status) -> 1175 [{<<":status">>, integer_to_binary(Status)}|Acc]; 1176 ({Name, Value}, Acc) -> 1177 [{iolist_to_binary([$:, atom_to_binary(Name, latin1)]), Value}|Acc] 1178 end, Headers0, maps:to_list(PseudoHeaders)). 1179 1180 -spec prepare_trailers(cow_http2:streamid(), State, cow_http:headers()) 1181 -> {ok, iodata(), State} when State::http2_machine(). 1182 prepare_trailers(StreamID, State=#http2_machine{encode_state=EncodeState0}, Trailers) -> 1183 Stream = #stream{local=nofin} = stream_get(StreamID, State), 1184 {HeaderBlock, EncodeState} = cow_hpack:encode(Trailers, EncodeState0), 1185 {ok, HeaderBlock, stream_store(Stream#stream{local=fin}, 1186 State#http2_machine{encode_state=EncodeState})}. 1187 1188 -spec send_or_queue_data(cow_http2:streamid(), State, cow_http2:fin(), DataOrFileOrTrailers) 1189 -> {ok, State} 1190 | {send, [{cow_http2:streamid(), cow_http2:fin(), [DataOrFileOrTrailers]}], State} 1191 when State::http2_machine(), DataOrFileOrTrailers:: 1192 {data, iodata()} | #sendfile{} | {trailers, cow_http:headers()}. 1193 send_or_queue_data(StreamID, State0=#http2_machine{opts=Opts, local_window=ConnWindow}, 1194 IsFin0, DataOrFileOrTrailers0) -> 1195 %% @todo Probably just ignore if the method was HEAD. 1196 Stream0 = #stream{ 1197 local=nofin, 1198 local_window=StreamWindow, 1199 local_buffer_size=BufferSize, 1200 te=TE0 1201 } = stream_get(StreamID, State0), 1202 DataOrFileOrTrailers = case DataOrFileOrTrailers0 of 1203 {trailers, _} -> 1204 %% We only accept TE headers containing exactly "trailers" (RFC7540 8.1.2.1). 1205 TE = try cow_http_hd:parse_te(TE0) of 1206 {trailers, []} -> trailers; 1207 _ -> no_trailers 1208 catch _:_ -> 1209 %% If we can't parse the TE header, assume we can't send trailers. 1210 no_trailers 1211 end, 1212 case TE of 1213 trailers -> 1214 DataOrFileOrTrailers0; 1215 no_trailers -> 1216 {data, <<>>} 1217 end; 1218 _ -> 1219 DataOrFileOrTrailers0 1220 end, 1221 SendSize = case DataOrFileOrTrailers of 1222 {data, D} -> BufferSize + iolist_size(D); 1223 #sendfile{bytes=B} -> BufferSize + B; 1224 {trailers, _} -> 0 1225 end, 1226 MinSendSize = maps:get(stream_window_data_threshold, Opts, 16384), 1227 if 1228 %% If we cannot send the data all at once and the window 1229 %% is smaller than we are willing to send at a minimum, 1230 %% we queue the data directly. 1231 (StreamWindow < MinSendSize) 1232 andalso ((StreamWindow < SendSize) orelse (ConnWindow < SendSize)) -> 1233 {ok, stream_store(queue_data(Stream0, IsFin0, DataOrFileOrTrailers, in), State0)}; 1234 true -> 1235 case send_or_queue_data(Stream0, State0, [], IsFin0, DataOrFileOrTrailers, in) of 1236 {ok, Stream, State, []} -> 1237 {ok, stream_store(Stream, State)}; 1238 {ok, Stream=#stream{local=IsFin}, State, SendData} -> 1239 {send, [{StreamID, IsFin, lists:reverse(SendData)}], stream_store(Stream, State)} 1240 end 1241 end. 1242 1243 %% Internal data sending/queuing functions. 1244 1245 %% @todo Should we ever want to implement the PRIORITY mechanism, 1246 %% this would be the place to do it. Right now, we just go over 1247 %% all streams and send what we can until either everything is 1248 %% sent or we run out of space in the window. 1249 send_data(State0=#http2_machine{streams=Streams0}) -> 1250 Iterator = maps:iterator(Streams0), 1251 case send_data_for_all_streams(maps:next(Iterator), Streams0, State0, []) of 1252 {ok, Streams, State, []} -> 1253 {ok, State#http2_machine{streams=Streams}}; 1254 {ok, Streams, State, Send} -> 1255 {send, Send, State#http2_machine{streams=Streams}} 1256 end. 1257 1258 send_data_for_all_streams(none, Streams, State, Send) -> 1259 {ok, Streams, State, Send}; 1260 %% While technically we should never get < 0 here, let's be on the safe side. 1261 send_data_for_all_streams(_, Streams, State=#http2_machine{local_window=ConnWindow}, Send) 1262 when ConnWindow =< 0 -> 1263 {ok, Streams, State, Send}; 1264 %% We rely on send_data_for_one_stream/3 to do all the necessary checks about the stream. 1265 send_data_for_all_streams({StreamID, Stream0, Iterator}, Streams, State0, Send) -> 1266 case send_data_for_one_stream(Stream0, State0, []) of 1267 {ok, Stream, State, []} -> 1268 send_data_for_all_streams(maps:next(Iterator), 1269 Streams#{StreamID => Stream}, State, Send); 1270 %% We need to remove the stream here because we do not use stream_store/2. 1271 {ok, #stream{local=fin, remote=fin}, State, SendData} -> 1272 send_data_for_all_streams(maps:next(Iterator), 1273 maps:remove(StreamID, Streams), State, [{StreamID, fin, SendData}|Send]); 1274 {ok, Stream=#stream{local=IsFin}, State, SendData} -> 1275 send_data_for_all_streams(maps:next(Iterator), 1276 Streams#{StreamID => Stream}, State, [{StreamID, IsFin, SendData}|Send]) 1277 end. 1278 1279 send_data(Stream0, State0) -> 1280 case send_data_for_one_stream(Stream0, State0, []) of 1281 {ok, Stream, State, []} -> 1282 {ok, stream_store(Stream, State)}; 1283 {ok, Stream=#stream{id=StreamID, local=IsFin}, State, SendData} -> 1284 {send, [{StreamID, IsFin, SendData}], stream_store(Stream, State)} 1285 end. 1286 1287 send_data_for_one_stream(Stream=#stream{local=nofin, local_buffer_size=0, 1288 local_trailers=Trailers}, State, SendAcc) when Trailers =/= undefined -> 1289 {ok, Stream, State, lists:reverse([{trailers, Trailers}|SendAcc])}; 1290 send_data_for_one_stream(Stream=#stream{local=nofin, local_buffer=Q0, local_buffer_size=0}, 1291 State, SendAcc) -> 1292 case queue:len(Q0) of 1293 0 -> 1294 {ok, Stream, State, lists:reverse(SendAcc)}; 1295 1 -> 1296 %% We know there is a final empty data frame in the queue. 1297 %% We need to mark the stream as complete. 1298 {{value, {fin, 0, _}}, Q} = queue:out(Q0), 1299 {ok, Stream#stream{local=fin, local_buffer=Q}, State, lists:reverse(SendAcc)} 1300 end; 1301 send_data_for_one_stream(Stream=#stream{local=IsFin, local_window=StreamWindow, 1302 local_buffer_size=BufferSize}, State=#http2_machine{local_window=ConnWindow}, SendAcc) 1303 when ConnWindow =< 0; IsFin =:= fin; StreamWindow =< 0; BufferSize =:= 0 -> 1304 {ok, Stream, State, lists:reverse(SendAcc)}; 1305 send_data_for_one_stream(Stream0=#stream{local_window=StreamWindow, 1306 local_buffer=Q0, local_buffer_size=BufferSize}, 1307 State0=#http2_machine{opts=Opts, local_window=ConnWindow}, SendAcc0) -> 1308 MinSendSize = maps:get(stream_window_data_threshold, Opts, 16384), 1309 if 1310 %% If we cannot send the entire buffer at once and the window 1311 %% is smaller than we are willing to send at a minimum, do nothing. 1312 %% 1313 %% We only do this check the first time we go through this function; 1314 %% we want to send as much data as possible IF we send some. 1315 (SendAcc0 =:= []) andalso (StreamWindow < MinSendSize) 1316 andalso ((StreamWindow < BufferSize) orelse (ConnWindow < BufferSize)) -> 1317 {ok, Stream0, State0, []}; 1318 true -> 1319 %% We know there is an item in the queue. 1320 {{value, {IsFin, DataSize, Data}}, Q} = queue:out(Q0), 1321 Stream1 = Stream0#stream{local_buffer=Q, local_buffer_size=BufferSize - DataSize}, 1322 {ok, Stream, State, SendAcc} 1323 = send_or_queue_data(Stream1, State0, SendAcc0, IsFin, Data, in_r), 1324 send_data_for_one_stream(Stream, State, SendAcc) 1325 end. 1326 1327 %% We can send trailers immediately if the queue is empty, otherwise we queue. 1328 %% We always send trailer frames even if the window is empty. 1329 send_or_queue_data(Stream=#stream{local_buffer_size=0}, 1330 State, SendAcc, fin, {trailers, Trailers}, _) -> 1331 {ok, Stream, State, [{trailers, Trailers}|SendAcc]}; 1332 send_or_queue_data(Stream, State, SendAcc, fin, {trailers, Trailers}, _) -> 1333 {ok, Stream#stream{local_trailers=Trailers}, State, SendAcc}; 1334 %% Send data immediately if we can, buffer otherwise. 1335 send_or_queue_data(Stream=#stream{local_window=StreamWindow}, 1336 State=#http2_machine{local_window=ConnWindow}, 1337 SendAcc, IsFin, Data, In) 1338 when ConnWindow =< 0; StreamWindow =< 0 -> 1339 {ok, queue_data(Stream, IsFin, Data, In), State, SendAcc}; 1340 send_or_queue_data(Stream=#stream{local_window=StreamWindow}, 1341 State=#http2_machine{opts=Opts, remote_settings=RemoteSettings, 1342 local_window=ConnWindow}, SendAcc, IsFin, Data, In) -> 1343 RemoteMaxFrameSize = maps:get(max_frame_size, RemoteSettings, 16384), 1344 ConfiguredMaxFrameSize = maps:get(max_frame_size_sent, Opts, infinity), 1345 MaxSendSize = min( 1346 min(ConnWindow, StreamWindow), 1347 min(RemoteMaxFrameSize, ConfiguredMaxFrameSize) 1348 ), 1349 case Data of 1350 File = #sendfile{bytes=Bytes} when Bytes =< MaxSendSize -> 1351 {ok, Stream#stream{local=IsFin, local_window=StreamWindow - Bytes}, 1352 State#http2_machine{local_window=ConnWindow - Bytes}, 1353 [File|SendAcc]}; 1354 File = #sendfile{offset=Offset, bytes=Bytes} -> 1355 send_or_queue_data(Stream#stream{local_window=StreamWindow - MaxSendSize}, 1356 State#http2_machine{local_window=ConnWindow - MaxSendSize}, 1357 [File#sendfile{bytes=MaxSendSize}|SendAcc], IsFin, 1358 File#sendfile{offset=Offset + MaxSendSize, bytes=Bytes - MaxSendSize}, In); 1359 {data, Iolist0} -> 1360 IolistSize = iolist_size(Iolist0), 1361 if 1362 IolistSize =< MaxSendSize -> 1363 {ok, Stream#stream{local=IsFin, local_window=StreamWindow - IolistSize}, 1364 State#http2_machine{local_window=ConnWindow - IolistSize}, 1365 [{data, Iolist0}|SendAcc]}; 1366 true -> 1367 {Iolist, More} = cow_iolists:split(MaxSendSize, Iolist0), 1368 send_or_queue_data(Stream#stream{local_window=StreamWindow - MaxSendSize}, 1369 State#http2_machine{local_window=ConnWindow - MaxSendSize}, 1370 [{data, Iolist}|SendAcc], IsFin, {data, More}, In) 1371 end 1372 end. 1373 1374 queue_data(Stream=#stream{local_buffer=Q0, local_buffer_size=Size0}, IsFin, Data, In) -> 1375 DataSize = case Data of 1376 {sendfile, _, Bytes, _} -> Bytes; 1377 {data, Iolist} -> iolist_size(Iolist) 1378 end, 1379 %% Never queue non-final empty data frames. 1380 case {DataSize, IsFin} of 1381 {0, nofin} -> 1382 Stream; 1383 _ -> 1384 Q = queue:In({IsFin, DataSize, Data}, Q0), 1385 Stream#stream{local_buffer=Q, local_buffer_size=Size0 + DataSize} 1386 end. 1387 1388 %% Public interface to update the flow control window. 1389 %% 1390 %% The ensure_window function applies heuristics to avoid updating the 1391 %% window when it is not necessary. The update_window function updates 1392 %% the window unconditionally. 1393 %% 1394 %% The ensure_window function should be called when requesting more 1395 %% data (for example when reading a request or response body) as well 1396 %% as when receiving new data. Failure to do so may result in the 1397 %% window being depleted. 1398 %% 1399 %% The heuristics dictating whether the window must be updated and 1400 %% what the window size is depends on three options (margin, max 1401 %% and threshold) along with the Size argument. The window increment 1402 %% returned by this function may therefore be smaller than the Size 1403 %% argument. On the other hand the total window allocated over many 1404 %% calls may end up being larger than the initial Size argument. As 1405 %% a result, it is the responsibility of the caller to ensure that 1406 %% the Size argument is never lower than 0. 1407 1408 -spec ensure_window(non_neg_integer(), State) 1409 -> ok | {ok, pos_integer(), State} when State::http2_machine(). 1410 ensure_window(Size, State=#http2_machine{opts=Opts, remote_window=RemoteWindow}) -> 1411 case ensure_window(Size, RemoteWindow, connection, Opts) of 1412 ok -> 1413 ok; 1414 {ok, Increment} -> 1415 {ok, Increment, State#http2_machine{remote_window=RemoteWindow + Increment}} 1416 end. 1417 1418 -spec ensure_window(cow_http2:streamid(), non_neg_integer(), State) 1419 -> ok | {ok, pos_integer(), State} when State::http2_machine(). 1420 ensure_window(StreamID, Size, State=#http2_machine{opts=Opts}) -> 1421 case stream_get(StreamID, State) of 1422 %% For simplicity's sake, we do not consider attempts to ensure the window 1423 %% of a terminated stream to be errors. We simply act as if the stream 1424 %% window is large enough. 1425 undefined -> 1426 ok; 1427 Stream = #stream{remote_window=RemoteWindow} -> 1428 case ensure_window(Size, RemoteWindow, stream, Opts) of 1429 ok -> 1430 ok; 1431 {ok, Increment} -> 1432 {ok, Increment, stream_store(Stream#stream{remote_window=RemoteWindow + Increment}, State)} 1433 end 1434 end. 1435 1436 %% No need to update the window when we are not expecting data. 1437 ensure_window(0, _, _, _) -> 1438 ok; 1439 %% No need to update the window when it is already high enough. 1440 ensure_window(Size, Window, _, _) when Size =< Window -> 1441 ok; 1442 ensure_window(Size0, Window, Type, Opts) -> 1443 Threshold = ensure_window_threshold(Type, Opts), 1444 if 1445 %% We do not update the window when it is higher than the threshold. 1446 Window > Threshold -> 1447 ok; 1448 true -> 1449 Margin = ensure_window_margin(Type, Opts), 1450 Size = Size0 + Margin, 1451 MaxWindow = ensure_window_max(Type, Opts), 1452 Increment = if 1453 %% We cannot go above the maximum window size. 1454 Size > MaxWindow -> MaxWindow - Window; 1455 true -> Size - Window 1456 end, 1457 case Increment of 1458 0 -> ok; 1459 _ -> {ok, Increment} 1460 end 1461 end. 1462 1463 %% Margin defaults to the default initial window size. 1464 ensure_window_margin(connection, Opts) -> 1465 maps:get(connection_window_margin_size, Opts, 65535); 1466 ensure_window_margin(stream, Opts) -> 1467 maps:get(stream_window_margin_size, Opts, 65535). 1468 1469 %% Max window defaults to the max value allowed by the protocol. 1470 ensure_window_max(connection, Opts) -> 1471 maps:get(max_connection_window_size, Opts, 16#7fffffff); 1472 ensure_window_max(stream, Opts) -> 1473 maps:get(max_stream_window_size, Opts, 16#7fffffff). 1474 1475 %% Threshold defaults to 10 times the default frame size. 1476 ensure_window_threshold(connection, Opts) -> 1477 maps:get(connection_window_update_threshold, Opts, 163840); 1478 ensure_window_threshold(stream, Opts) -> 1479 maps:get(stream_window_update_threshold, Opts, 163840). 1480 1481 -spec update_window(1..16#7fffffff, State) 1482 -> State when State::http2_machine(). 1483 update_window(Size, State=#http2_machine{remote_window=RemoteWindow}) 1484 when Size > 0 -> 1485 State#http2_machine{remote_window=RemoteWindow + Size}. 1486 1487 -spec update_window(cow_http2:streamid(), 1..16#7fffffff, State) 1488 -> State when State::http2_machine(). 1489 update_window(StreamID, Size, State) 1490 when Size > 0 -> 1491 Stream = #stream{remote_window=RemoteWindow} = stream_get(StreamID, State), 1492 stream_store(Stream#stream{remote_window=RemoteWindow + Size}, State). 1493 1494 %% Public interface to reset streams. 1495 1496 -spec reset_stream(cow_http2:streamid(), State) 1497 -> {ok, State} | {error, not_found} when State::http2_machine(). 1498 reset_stream(StreamID, State=#http2_machine{streams=Streams0}) -> 1499 case maps:take(StreamID, Streams0) of 1500 {_, Streams} -> 1501 {ok, stream_linger(StreamID, State#http2_machine{streams=Streams})}; 1502 error -> 1503 {error, not_found} 1504 end. 1505 1506 %% Retrieve the buffer size for all streams. 1507 1508 -spec get_connection_local_buffer_size(http2_machine()) -> non_neg_integer(). 1509 get_connection_local_buffer_size(#http2_machine{streams=Streams}) -> 1510 maps:fold(fun(_, #stream{local_buffer_size=Size}, Acc) -> 1511 Acc + Size 1512 end, 0, Streams). 1513 1514 %% Retrieve a setting value, or its default value if not set. 1515 1516 -spec get_local_setting(atom(), http2_machine()) -> atom() | integer(). 1517 get_local_setting(Key, #http2_machine{local_settings=Settings}) -> 1518 maps:get(Key, Settings, default_setting_value(Key)). 1519 1520 -spec get_remote_settings(http2_machine()) -> map(). 1521 get_remote_settings(#http2_machine{mode=Mode, remote_settings=Settings}) -> 1522 Defaults0 = #{ 1523 header_table_size => default_setting_value(header_table_size), 1524 enable_push => default_setting_value(enable_push), 1525 max_concurrent_streams => default_setting_value(max_concurrent_streams), 1526 initial_window_size => default_setting_value(initial_window_size), 1527 max_frame_size => default_setting_value(max_frame_size), 1528 max_header_list_size => default_setting_value(max_header_list_size) 1529 }, 1530 Defaults = case Mode of 1531 server -> 1532 Defaults0#{enable_connect_protocol => default_setting_value(enable_connect_protocol)}; 1533 client -> 1534 Defaults0 1535 end, 1536 maps:merge(Defaults, Settings). 1537 1538 default_setting_value(header_table_size) -> 4096; 1539 default_setting_value(enable_push) -> true; 1540 default_setting_value(max_concurrent_streams) -> infinity; 1541 default_setting_value(initial_window_size) -> 65535; 1542 default_setting_value(max_frame_size) -> 16384; 1543 default_setting_value(max_header_list_size) -> infinity; 1544 default_setting_value(enable_connect_protocol) -> false. 1545 1546 %% Function to obtain the last known streamid received 1547 %% for the purposes of sending a GOAWAY frame and closing the connection. 1548 1549 -spec get_last_streamid(http2_machine()) -> cow_http2:streamid(). 1550 get_last_streamid(#http2_machine{remote_streamid=RemoteStreamID}) -> 1551 RemoteStreamID. 1552 1553 %% Set last accepted streamid to the last known streamid, for the purpose 1554 %% ignoring frames for remote streams created after sending GOAWAY. 1555 1556 -spec set_last_streamid(http2_machine()) -> {cow_http2:streamid(), http2_machine()}. 1557 set_last_streamid(State=#http2_machine{remote_streamid=StreamID, 1558 last_remote_streamid=LastStreamID}) when StreamID =< LastStreamID-> 1559 {StreamID, State#http2_machine{last_remote_streamid = StreamID}}. 1560 1561 %% Retrieve the local buffer size for a stream. 1562 1563 -spec get_stream_local_buffer_size(cow_http2:streamid(), http2_machine()) 1564 -> {ok, non_neg_integer()} | {error, not_found | closed}. 1565 get_stream_local_buffer_size(StreamID, State=#http2_machine{mode=Mode, 1566 local_streamid=LocalStreamID, remote_streamid=RemoteStreamID}) -> 1567 case stream_get(StreamID, State) of 1568 #stream{local_buffer_size=Size} -> 1569 {ok, Size}; 1570 undefined when (?IS_LOCAL(Mode, StreamID) andalso (StreamID < LocalStreamID)) 1571 orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID =< RemoteStreamID)) -> 1572 {error, closed}; 1573 undefined -> 1574 {error, not_found} 1575 end. 1576 1577 %% Retrieve the local state for a stream, including the state in the queue. 1578 1579 -spec get_stream_local_state(cow_http2:streamid(), http2_machine()) 1580 -> {ok, idle | cow_http2:fin(), empty | nofin | fin} | {error, not_found | closed}. 1581 get_stream_local_state(StreamID, State=#http2_machine{mode=Mode, 1582 local_streamid=LocalStreamID, remote_streamid=RemoteStreamID}) -> 1583 case stream_get(StreamID, State) of 1584 #stream{local=IsFin, local_buffer=Q, local_trailers=undefined} -> 1585 IsQueueFin = case queue:peek_r(Q) of 1586 empty -> empty; 1587 {value, {IsQueueFin0, _, _}} -> IsQueueFin0 1588 end, 1589 {ok, IsFin, IsQueueFin}; 1590 %% Trailers are queued so the local state is fin after the queue is drained. 1591 #stream{local=IsFin} -> 1592 {ok, IsFin, fin}; 1593 undefined when (?IS_LOCAL(Mode, StreamID) andalso (StreamID < LocalStreamID)) 1594 orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID =< RemoteStreamID)) -> 1595 {error, closed}; 1596 undefined -> 1597 {error, not_found} 1598 end. 1599 1600 %% Retrieve the remote state for a stream. 1601 1602 -spec get_stream_remote_state(cow_http2:streamid(), http2_machine()) 1603 -> {ok, idle | cow_http2:fin()} | {error, not_found | closed}. 1604 get_stream_remote_state(StreamID, State=#http2_machine{mode=Mode, 1605 local_streamid=LocalStreamID, remote_streamid=RemoteStreamID}) -> 1606 case stream_get(StreamID, State) of 1607 #stream{remote=IsFin} -> 1608 {ok, IsFin}; 1609 undefined when (?IS_LOCAL(Mode, StreamID) andalso (StreamID < LocalStreamID)) 1610 orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID =< RemoteStreamID)) -> 1611 {error, closed}; 1612 undefined -> 1613 {error, not_found} 1614 end. 1615 1616 %% Query whether the stream was reset recently by the remote endpoint. 1617 1618 -spec is_lingering_stream(cow_http2:streamid(), http2_machine()) -> boolean(). 1619 is_lingering_stream(StreamID, #http2_machine{ 1620 local_lingering_streams=Local, remote_lingering_streams=Remote}) -> 1621 case lists:member(StreamID, Local) of 1622 true -> true; 1623 false -> lists:member(StreamID, Remote) 1624 end. 1625 1626 %% Stream-related functions. 1627 1628 stream_get(StreamID, #http2_machine{streams=Streams}) -> 1629 maps:get(StreamID, Streams, undefined). 1630 1631 stream_store(#stream{id=StreamID, local=fin, remote=fin}, 1632 State=#http2_machine{streams=Streams0}) -> 1633 Streams = maps:remove(StreamID, Streams0), 1634 State#http2_machine{streams=Streams}; 1635 stream_store(Stream=#stream{id=StreamID}, 1636 State=#http2_machine{streams=Streams}) -> 1637 State#http2_machine{streams=Streams#{StreamID => Stream}}. 1638 1639 %% @todo Don't send an RST_STREAM if one was already sent. 1640 stream_reset(StreamID, State, Reason, HumanReadable) -> 1641 {error, {stream_error, StreamID, Reason, HumanReadable}, 1642 stream_linger(StreamID, State)}. 1643 1644 stream_linger(StreamID, State=#http2_machine{local_lingering_streams=Lingering0}) -> 1645 %% We only keep up to 100 streams in this state. @todo Make it configurable? 1646 Lingering = [StreamID|lists:sublist(Lingering0, 100 - 1)], 1647 State#http2_machine{local_lingering_streams=Lingering}.