cowboy_stream_h.erl (13103B)
1 %% Copyright (c) 2016-2017, Loïc Hoguin <essen@ninenines.eu> 2 %% 3 %% Permission to use, copy, modify, and/or distribute this software for any 4 %% purpose with or without fee is hereby granted, provided that the above 5 %% copyright notice and this permission notice appear in all copies. 6 %% 7 %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 8 %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 9 %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 10 %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 11 %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 12 %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 13 %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 14 15 -module(cowboy_stream_h). 16 -behavior(cowboy_stream). 17 18 -export([init/3]). 19 -export([data/4]). 20 -export([info/3]). 21 -export([terminate/3]). 22 -export([early_error/5]). 23 24 -export([request_process/3]). 25 -export([resume/5]). 26 27 -record(state, { 28 next :: any(), 29 ref = undefined :: ranch:ref(), 30 pid = undefined :: pid(), 31 expect = undefined :: undefined | continue, 32 read_body_pid = undefined :: pid() | undefined, 33 read_body_ref = undefined :: reference() | undefined, 34 read_body_timer_ref = undefined :: reference() | undefined, 35 read_body_length = 0 :: non_neg_integer() | infinity | auto, 36 read_body_is_fin = nofin :: nofin | {fin, non_neg_integer()}, 37 read_body_buffer = <<>> :: binary(), 38 body_length = 0 :: non_neg_integer(), 39 stream_body_pid = undefined :: pid() | undefined, 40 stream_body_status = normal :: normal | blocking | blocked 41 }). 42 43 -spec init(cowboy_stream:streamid(), cowboy_req:req(), cowboy:opts()) 44 -> {[{spawn, pid(), timeout()}], #state{}}. 45 init(StreamID, Req=#{ref := Ref}, Opts) -> 46 Env = maps:get(env, Opts, #{}), 47 Middlewares = maps:get(middlewares, Opts, [cowboy_router, cowboy_handler]), 48 Shutdown = maps:get(shutdown_timeout, Opts, 5000), 49 Pid = proc_lib:spawn_link(?MODULE, request_process, [Req, Env, Middlewares]), 50 Expect = expect(Req), 51 {Commands, Next} = cowboy_stream:init(StreamID, Req, Opts), 52 {[{spawn, Pid, Shutdown}|Commands], 53 #state{next=Next, ref=Ref, pid=Pid, expect=Expect}}. 54 55 %% Ignore the expect header in HTTP/1.0. 56 expect(#{version := 'HTTP/1.0'}) -> 57 undefined; 58 expect(Req) -> 59 try cowboy_req:parse_header(<<"expect">>, Req) of 60 Expect -> 61 Expect 62 catch _:_ -> 63 undefined 64 end. 65 66 %% If we receive data and stream is waiting for data: 67 %% If we accumulated enough data or IsFin=fin, send it. 68 %% If we are in auto mode, send it and update flow control. 69 %% If not, buffer it. 70 %% If not, buffer it. 71 %% 72 %% We always reset the expect field when we receive data, 73 %% since the client started sending the request body before 74 %% we could send a 100 continue response. 75 76 -spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State) 77 -> {cowboy_stream:commands(), State} when State::#state{}. 78 %% Stream isn't waiting for data. 79 data(StreamID, IsFin, Data, State=#state{ 80 read_body_ref=undefined, read_body_buffer=Buffer, body_length=BodyLen}) -> 81 do_data(StreamID, IsFin, Data, [], State#state{ 82 expect=undefined, 83 read_body_is_fin=IsFin, 84 read_body_buffer= << Buffer/binary, Data/binary >>, 85 body_length=BodyLen + byte_size(Data) 86 }); 87 %% Stream is waiting for data using auto mode. 88 %% 89 %% There is no buffering done in auto mode. 90 data(StreamID, IsFin, Data, State=#state{read_body_pid=Pid, read_body_ref=Ref, 91 read_body_length=auto, body_length=BodyLen}) -> 92 send_request_body(Pid, Ref, IsFin, BodyLen, Data), 93 do_data(StreamID, IsFin, Data, [{flow, byte_size(Data)}], State#state{ 94 read_body_ref=undefined, 95 %% @todo This is wrong, it's missing byte_size(Data). 96 body_length=BodyLen 97 }); 98 %% Stream is waiting for data but we didn't receive enough to send yet. 99 data(StreamID, IsFin=nofin, Data, State=#state{ 100 read_body_length=ReadLen, read_body_buffer=Buffer, body_length=BodyLen}) 101 when byte_size(Data) + byte_size(Buffer) < ReadLen -> 102 do_data(StreamID, IsFin, Data, [], State#state{ 103 expect=undefined, 104 read_body_buffer= << Buffer/binary, Data/binary >>, 105 body_length=BodyLen + byte_size(Data) 106 }); 107 %% Stream is waiting for data and we received enough to send. 108 data(StreamID, IsFin, Data, State=#state{read_body_pid=Pid, read_body_ref=Ref, 109 read_body_timer_ref=TRef, read_body_buffer=Buffer, body_length=BodyLen0}) -> 110 BodyLen = BodyLen0 + byte_size(Data), 111 ok = erlang:cancel_timer(TRef, [{async, true}, {info, false}]), 112 send_request_body(Pid, Ref, IsFin, BodyLen, <<Buffer/binary, Data/binary>>), 113 do_data(StreamID, IsFin, Data, [], State#state{ 114 expect=undefined, 115 read_body_ref=undefined, 116 read_body_timer_ref=undefined, 117 read_body_buffer= <<>>, 118 body_length=BodyLen 119 }). 120 121 do_data(StreamID, IsFin, Data, Commands1, State=#state{next=Next0}) -> 122 {Commands2, Next} = cowboy_stream:data(StreamID, IsFin, Data, Next0), 123 {Commands1 ++ Commands2, State#state{next=Next}}. 124 125 -spec info(cowboy_stream:streamid(), any(), State) 126 -> {cowboy_stream:commands(), State} when State::#state{}. 127 info(StreamID, Info={'EXIT', Pid, normal}, State=#state{pid=Pid}) -> 128 do_info(StreamID, Info, [stop], State); 129 info(StreamID, Info={'EXIT', Pid, {{request_error, Reason, _HumanReadable}, _}}, 130 State=#state{pid=Pid}) -> 131 Status = case Reason of 132 timeout -> 408; 133 payload_too_large -> 413; 134 _ -> 400 135 end, 136 %% @todo Headers? Details in body? Log the crash? More stuff in debug only? 137 do_info(StreamID, Info, [ 138 {error_response, Status, #{<<"content-length">> => <<"0">>}, <<>>}, 139 stop 140 ], State); 141 info(StreamID, Exit={'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref, pid=Pid}) -> 142 Commands0 = [{internal_error, Exit, 'Stream process crashed.'}], 143 Commands = case Reason of 144 normal -> Commands0; 145 shutdown -> Commands0; 146 {shutdown, _} -> Commands0; 147 _ -> [{log, error, 148 "Ranch listener ~p, connection process ~p, stream ~p " 149 "had its request process ~p exit with reason " 150 "~999999p and stacktrace ~999999p~n", 151 [Ref, self(), StreamID, Pid, Reason, Stacktrace]} 152 |Commands0] 153 end, 154 do_info(StreamID, Exit, [ 155 {error_response, 500, #{<<"content-length">> => <<"0">>}, <<>>} 156 |Commands], State); 157 %% Request body, auto mode, no body buffered. 158 info(StreamID, Info={read_body, Pid, Ref, auto, infinity}, State=#state{read_body_buffer= <<>>}) -> 159 do_info(StreamID, Info, [], State#state{ 160 read_body_pid=Pid, 161 read_body_ref=Ref, 162 read_body_length=auto 163 }); 164 %% Request body, auto mode, body buffered or complete. 165 info(StreamID, Info={read_body, Pid, Ref, auto, infinity}, State=#state{ 166 read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) -> 167 send_request_body(Pid, Ref, IsFin, BodyLen, Buffer), 168 do_info(StreamID, Info, [{flow, byte_size(Buffer)}], 169 State#state{read_body_buffer= <<>>}); 170 %% Request body, body buffered large enough or complete. 171 %% 172 %% We do not send a 100 continue response if the client 173 %% already started sending the body. 174 info(StreamID, Info={read_body, Pid, Ref, Length, _}, State=#state{ 175 read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) 176 when IsFin =:= fin; byte_size(Buffer) >= Length -> 177 send_request_body(Pid, Ref, IsFin, BodyLen, Buffer), 178 do_info(StreamID, Info, [], State#state{read_body_buffer= <<>>}); 179 %% Request body, not enough to send yet. 180 info(StreamID, Info={read_body, Pid, Ref, Length, Period}, State=#state{expect=Expect}) -> 181 Commands = case Expect of 182 continue -> [{inform, 100, #{}}, {flow, Length}]; 183 undefined -> [{flow, Length}] 184 end, 185 TRef = erlang:send_after(Period, self(), {{self(), StreamID}, {read_body_timeout, Ref}}), 186 do_info(StreamID, Info, Commands, State#state{ 187 read_body_pid=Pid, 188 read_body_ref=Ref, 189 read_body_timer_ref=TRef, 190 read_body_length=Length 191 }); 192 %% Request body reading timeout; send what we got. 193 info(StreamID, Info={read_body_timeout, Ref}, State=#state{read_body_pid=Pid, read_body_ref=Ref, 194 read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) -> 195 send_request_body(Pid, Ref, IsFin, BodyLen, Buffer), 196 do_info(StreamID, Info, [], State#state{ 197 read_body_ref=undefined, 198 read_body_timer_ref=undefined, 199 read_body_buffer= <<>> 200 }); 201 info(StreamID, Info={read_body_timeout, _}, State) -> 202 do_info(StreamID, Info, [], State); 203 %% Response. 204 %% 205 %% We reset the expect field when a 100 continue response 206 %% is sent or when any final response is sent. 207 info(StreamID, Inform={inform, Status, _}, State0) -> 208 State = case cow_http:status_to_integer(Status) of 209 100 -> State0#state{expect=undefined}; 210 _ -> State0 211 end, 212 do_info(StreamID, Inform, [Inform], State); 213 info(StreamID, Response={response, _, _, _}, State) -> 214 do_info(StreamID, Response, [Response], State#state{expect=undefined}); 215 info(StreamID, Headers={headers, _, _}, State) -> 216 do_info(StreamID, Headers, [Headers], State#state{expect=undefined}); 217 %% Sending data involves the data message, the stream_buffer_full alarm 218 %% and the connection_buffer_full alarm. We stop sending acks when an alarm is on. 219 %% 220 %% We only apply backpressure when the message includes a pid. Otherwise 221 %% it is a message from Cowboy, or the user circumventing the backpressure. 222 %% 223 %% We currently do not support sending data from multiple processes concurrently. 224 info(StreamID, Data={data, _, _}, State) -> 225 do_info(StreamID, Data, [Data], State); 226 info(StreamID, Data0={data, Pid, _, _}, State0=#state{stream_body_status=Status}) -> 227 State = case Status of 228 normal -> 229 Pid ! {data_ack, self()}, 230 State0; 231 blocking -> 232 State0#state{stream_body_pid=Pid, stream_body_status=blocked}; 233 blocked -> 234 State0 235 end, 236 Data = erlang:delete_element(2, Data0), 237 do_info(StreamID, Data, [Data], State); 238 info(StreamID, Alarm={alarm, Name, on}, State0=#state{stream_body_status=Status}) 239 when Name =:= connection_buffer_full; Name =:= stream_buffer_full -> 240 State = case Status of 241 normal -> State0#state{stream_body_status=blocking}; 242 _ -> State0 243 end, 244 do_info(StreamID, Alarm, [], State); 245 info(StreamID, Alarm={alarm, Name, off}, State=#state{stream_body_pid=Pid, stream_body_status=Status}) 246 when Name =:= connection_buffer_full; Name =:= stream_buffer_full -> 247 _ = case Status of 248 normal -> ok; 249 blocking -> ok; 250 blocked -> Pid ! {data_ack, self()} 251 end, 252 do_info(StreamID, Alarm, [], State#state{stream_body_pid=undefined, stream_body_status=normal}); 253 info(StreamID, Trailers={trailers, _}, State) -> 254 do_info(StreamID, Trailers, [Trailers], State); 255 info(StreamID, Push={push, _, _, _, _, _, _, _}, State) -> 256 do_info(StreamID, Push, [Push], State); 257 info(StreamID, SwitchProtocol={switch_protocol, _, _, _}, State) -> 258 do_info(StreamID, SwitchProtocol, [SwitchProtocol], State#state{expect=undefined}); 259 %% Convert the set_options message to a command. 260 info(StreamID, SetOptions={set_options, _}, State) -> 261 do_info(StreamID, SetOptions, [SetOptions], State); 262 %% Unknown message, either stray or meant for a handler down the line. 263 info(StreamID, Info, State) -> 264 do_info(StreamID, Info, [], State). 265 266 do_info(StreamID, Info, Commands1, State0=#state{next=Next0}) -> 267 {Commands2, Next} = cowboy_stream:info(StreamID, Info, Next0), 268 {Commands1 ++ Commands2, State0#state{next=Next}}. 269 270 -spec terminate(cowboy_stream:streamid(), cowboy_stream:reason(), #state{}) -> ok. 271 terminate(StreamID, Reason, #state{next=Next}) -> 272 cowboy_stream:terminate(StreamID, Reason, Next). 273 274 -spec early_error(cowboy_stream:streamid(), cowboy_stream:reason(), 275 cowboy_stream:partial_req(), Resp, cowboy:opts()) -> Resp 276 when Resp::cowboy_stream:resp_command(). 277 early_error(StreamID, Reason, PartialReq, Resp, Opts) -> 278 cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts). 279 280 send_request_body(Pid, Ref, nofin, _, Data) -> 281 Pid ! {request_body, Ref, nofin, Data}, 282 ok; 283 send_request_body(Pid, Ref, fin, BodyLen, Data) -> 284 Pid ! {request_body, Ref, fin, BodyLen, Data}, 285 ok. 286 287 %% Request process. 288 289 %% We add the stacktrace to exit exceptions here in order 290 %% to simplify the debugging of errors. The proc_lib library 291 %% already adds the stacktrace to other types of exceptions. 292 -spec request_process(cowboy_req:req(), cowboy_middleware:env(), [module()]) -> ok. 293 request_process(Req, Env, Middlewares) -> 294 try 295 execute(Req, Env, Middlewares) 296 catch 297 exit:Reason={shutdown, _}:Stacktrace -> 298 erlang:raise(exit, Reason, Stacktrace); 299 exit:Reason:Stacktrace when Reason =/= normal, Reason =/= shutdown -> 300 erlang:raise(exit, {Reason, Stacktrace}, Stacktrace) 301 end. 302 303 execute(_, _, []) -> 304 ok; 305 execute(Req, Env, [Middleware|Tail]) -> 306 case Middleware:execute(Req, Env) of 307 {ok, Req2, Env2} -> 308 execute(Req2, Env2, Tail); 309 {suspend, Module, Function, Args} -> 310 proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module, Function, Args]); 311 {stop, _Req2} -> 312 ok 313 end. 314 315 -spec resume(cowboy_middleware:env(), [module()], module(), atom(), [any()]) -> ok. 316 resume(Env, Tail, Module, Function, Args) -> 317 case apply(Module, Function, Args) of 318 {ok, Req2, Env2} -> 319 execute(Req2, Env2, Tail); 320 {suspend, Module2, Function2, Args2} -> 321 proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module2, Function2, Args2]); 322 {stop, _Req2} -> 323 ok 324 end.