zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

cowboy_stream_h.erl (13103B)


      1 %% Copyright (c) 2016-2017, Loïc Hoguin <essen@ninenines.eu>
      2 %%
      3 %% Permission to use, copy, modify, and/or distribute this software for any
      4 %% purpose with or without fee is hereby granted, provided that the above
      5 %% copyright notice and this permission notice appear in all copies.
      6 %%
      7 %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      8 %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
      9 %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     10 %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     11 %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     12 %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     13 %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     14 
     15 -module(cowboy_stream_h).
     16 -behavior(cowboy_stream).
     17 
     18 -export([init/3]).
     19 -export([data/4]).
     20 -export([info/3]).
     21 -export([terminate/3]).
     22 -export([early_error/5]).
     23 
     24 -export([request_process/3]).
     25 -export([resume/5]).
     26 
     27 -record(state, {
     28 	next :: any(),
     29 	ref = undefined :: ranch:ref(),
     30 	pid = undefined :: pid(),
     31 	expect = undefined :: undefined | continue,
     32 	read_body_pid = undefined :: pid() | undefined,
     33 	read_body_ref = undefined :: reference() | undefined,
     34 	read_body_timer_ref = undefined :: reference() | undefined,
     35 	read_body_length = 0 :: non_neg_integer() | infinity | auto,
     36 	read_body_is_fin = nofin :: nofin | {fin, non_neg_integer()},
     37 	read_body_buffer = <<>> :: binary(),
     38 	body_length = 0 :: non_neg_integer(),
     39 	stream_body_pid = undefined :: pid() | undefined,
     40 	stream_body_status = normal :: normal | blocking | blocked
     41 }).
     42 
     43 -spec init(cowboy_stream:streamid(), cowboy_req:req(), cowboy:opts())
     44 	-> {[{spawn, pid(), timeout()}], #state{}}.
     45 init(StreamID, Req=#{ref := Ref}, Opts) ->
     46 	Env = maps:get(env, Opts, #{}),
     47 	Middlewares = maps:get(middlewares, Opts, [cowboy_router, cowboy_handler]),
     48 	Shutdown = maps:get(shutdown_timeout, Opts, 5000),
     49 	Pid = proc_lib:spawn_link(?MODULE, request_process, [Req, Env, Middlewares]),
     50 	Expect = expect(Req),
     51 	{Commands, Next} = cowboy_stream:init(StreamID, Req, Opts),
     52 	{[{spawn, Pid, Shutdown}|Commands],
     53 		#state{next=Next, ref=Ref, pid=Pid, expect=Expect}}.
     54 
     55 %% Ignore the expect header in HTTP/1.0.
     56 expect(#{version := 'HTTP/1.0'}) ->
     57 	undefined;
     58 expect(Req) ->
     59 	try cowboy_req:parse_header(<<"expect">>, Req) of
     60 		Expect ->
     61 			Expect
     62 	catch _:_ ->
     63 		undefined
     64 	end.
     65 
     66 %% If we receive data and stream is waiting for data:
     67 %%   If we accumulated enough data or IsFin=fin, send it.
     68 %%   If we are in auto mode, send it and update flow control.
     69 %%   If not, buffer it.
     70 %% If not, buffer it.
     71 %%
     72 %% We always reset the expect field when we receive data,
     73 %% since the client started sending the request body before
     74 %% we could send a 100 continue response.
     75 
     76 -spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State)
     77 	-> {cowboy_stream:commands(), State} when State::#state{}.
     78 %% Stream isn't waiting for data.
     79 data(StreamID, IsFin, Data, State=#state{
     80 		read_body_ref=undefined, read_body_buffer=Buffer, body_length=BodyLen}) ->
     81 	do_data(StreamID, IsFin, Data, [], State#state{
     82 		expect=undefined,
     83 		read_body_is_fin=IsFin,
     84 		read_body_buffer= << Buffer/binary, Data/binary >>,
     85 		body_length=BodyLen + byte_size(Data)
     86 	});
     87 %% Stream is waiting for data using auto mode.
     88 %%
     89 %% There is no buffering done in auto mode.
     90 data(StreamID, IsFin, Data, State=#state{read_body_pid=Pid, read_body_ref=Ref,
     91 		read_body_length=auto, body_length=BodyLen}) ->
     92 	send_request_body(Pid, Ref, IsFin, BodyLen, Data),
     93 	do_data(StreamID, IsFin, Data, [{flow, byte_size(Data)}], State#state{
     94 		read_body_ref=undefined,
     95 		%% @todo This is wrong, it's missing byte_size(Data).
     96 		body_length=BodyLen
     97 	});
     98 %% Stream is waiting for data but we didn't receive enough to send yet.
     99 data(StreamID, IsFin=nofin, Data, State=#state{
    100 		read_body_length=ReadLen, read_body_buffer=Buffer, body_length=BodyLen})
    101 		when byte_size(Data) + byte_size(Buffer) < ReadLen ->
    102 	do_data(StreamID, IsFin, Data, [], State#state{
    103 		expect=undefined,
    104 		read_body_buffer= << Buffer/binary, Data/binary >>,
    105 		body_length=BodyLen + byte_size(Data)
    106 	});
    107 %% Stream is waiting for data and we received enough to send.
    108 data(StreamID, IsFin, Data, State=#state{read_body_pid=Pid, read_body_ref=Ref,
    109 		read_body_timer_ref=TRef, read_body_buffer=Buffer, body_length=BodyLen0}) ->
    110 	BodyLen = BodyLen0 + byte_size(Data),
    111 	ok = erlang:cancel_timer(TRef, [{async, true}, {info, false}]),
    112 	send_request_body(Pid, Ref, IsFin, BodyLen, <<Buffer/binary, Data/binary>>),
    113 	do_data(StreamID, IsFin, Data, [], State#state{
    114 		expect=undefined,
    115 		read_body_ref=undefined,
    116 		read_body_timer_ref=undefined,
    117 		read_body_buffer= <<>>,
    118 		body_length=BodyLen
    119 	}).
    120 
    121 do_data(StreamID, IsFin, Data, Commands1, State=#state{next=Next0}) ->
    122 	{Commands2, Next} = cowboy_stream:data(StreamID, IsFin, Data, Next0),
    123 	{Commands1 ++ Commands2, State#state{next=Next}}.
    124 
    125 -spec info(cowboy_stream:streamid(), any(), State)
    126 	-> {cowboy_stream:commands(), State} when State::#state{}.
    127 info(StreamID, Info={'EXIT', Pid, normal}, State=#state{pid=Pid}) ->
    128 	do_info(StreamID, Info, [stop], State);
    129 info(StreamID, Info={'EXIT', Pid, {{request_error, Reason, _HumanReadable}, _}},
    130 		State=#state{pid=Pid}) ->
    131 	Status = case Reason of
    132 		timeout -> 408;
    133 		payload_too_large -> 413;
    134 		_ -> 400
    135 	end,
    136 	%% @todo Headers? Details in body? Log the crash? More stuff in debug only?
    137 	do_info(StreamID, Info, [
    138 		{error_response, Status, #{<<"content-length">> => <<"0">>}, <<>>},
    139 		stop
    140 	], State);
    141 info(StreamID, Exit={'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref, pid=Pid}) ->
    142 	Commands0 = [{internal_error, Exit, 'Stream process crashed.'}],
    143 	Commands = case Reason of
    144 		normal -> Commands0;
    145 		shutdown -> Commands0;
    146 		{shutdown, _} -> Commands0;
    147 		_ -> [{log, error,
    148 				"Ranch listener ~p, connection process ~p, stream ~p "
    149 				"had its request process ~p exit with reason "
    150 				"~999999p and stacktrace ~999999p~n",
    151 				[Ref, self(), StreamID, Pid, Reason, Stacktrace]}
    152 			|Commands0]
    153 	end,
    154 	do_info(StreamID, Exit, [
    155 		{error_response, 500, #{<<"content-length">> => <<"0">>}, <<>>}
    156 	|Commands], State);
    157 %% Request body, auto mode, no body buffered.
    158 info(StreamID, Info={read_body, Pid, Ref, auto, infinity}, State=#state{read_body_buffer= <<>>}) ->
    159 	do_info(StreamID, Info, [], State#state{
    160 		read_body_pid=Pid,
    161 		read_body_ref=Ref,
    162 		read_body_length=auto
    163 	});
    164 %% Request body, auto mode, body buffered or complete.
    165 info(StreamID, Info={read_body, Pid, Ref, auto, infinity}, State=#state{
    166 		read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) ->
    167 	send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
    168 	do_info(StreamID, Info, [{flow, byte_size(Buffer)}],
    169 		State#state{read_body_buffer= <<>>});
    170 %% Request body, body buffered large enough or complete.
    171 %%
    172 %% We do not send a 100 continue response if the client
    173 %% already started sending the body.
    174 info(StreamID, Info={read_body, Pid, Ref, Length, _}, State=#state{
    175 		read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen})
    176 		when IsFin =:= fin; byte_size(Buffer) >= Length ->
    177 	send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
    178 	do_info(StreamID, Info, [], State#state{read_body_buffer= <<>>});
    179 %% Request body, not enough to send yet.
    180 info(StreamID, Info={read_body, Pid, Ref, Length, Period}, State=#state{expect=Expect}) ->
    181 	Commands = case Expect of
    182 		continue -> [{inform, 100, #{}}, {flow, Length}];
    183 		undefined -> [{flow, Length}]
    184 	end,
    185 	TRef = erlang:send_after(Period, self(), {{self(), StreamID}, {read_body_timeout, Ref}}),
    186 	do_info(StreamID, Info, Commands, State#state{
    187 		read_body_pid=Pid,
    188 		read_body_ref=Ref,
    189 		read_body_timer_ref=TRef,
    190 		read_body_length=Length
    191 	});
    192 %% Request body reading timeout; send what we got.
    193 info(StreamID, Info={read_body_timeout, Ref}, State=#state{read_body_pid=Pid, read_body_ref=Ref,
    194 		read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) ->
    195 	send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
    196 	do_info(StreamID, Info, [], State#state{
    197 		read_body_ref=undefined,
    198 		read_body_timer_ref=undefined,
    199 		read_body_buffer= <<>>
    200 	});
    201 info(StreamID, Info={read_body_timeout, _}, State) ->
    202 	do_info(StreamID, Info, [], State);
    203 %% Response.
    204 %%
    205 %% We reset the expect field when a 100 continue response
    206 %% is sent or when any final response is sent.
    207 info(StreamID, Inform={inform, Status, _}, State0) ->
    208 	State = case cow_http:status_to_integer(Status) of
    209 		100 -> State0#state{expect=undefined};
    210 		_ -> State0
    211 	end,
    212 	do_info(StreamID, Inform, [Inform], State);
    213 info(StreamID, Response={response, _, _, _}, State) ->
    214 	do_info(StreamID, Response, [Response], State#state{expect=undefined});
    215 info(StreamID, Headers={headers, _, _}, State) ->
    216 	do_info(StreamID, Headers, [Headers], State#state{expect=undefined});
    217 %% Sending data involves the data message, the stream_buffer_full alarm
    218 %% and the connection_buffer_full alarm. We stop sending acks when an alarm is on.
    219 %%
    220 %% We only apply backpressure when the message includes a pid. Otherwise
    221 %% it is a message from Cowboy, or the user circumventing the backpressure.
    222 %%
    223 %% We currently do not support sending data from multiple processes concurrently.
    224 info(StreamID, Data={data, _, _}, State) ->
    225 	do_info(StreamID, Data, [Data], State);
    226 info(StreamID, Data0={data, Pid, _, _}, State0=#state{stream_body_status=Status}) ->
    227 	State = case Status of
    228 		normal ->
    229 			Pid ! {data_ack, self()},
    230 			State0;
    231 		blocking ->
    232 			State0#state{stream_body_pid=Pid, stream_body_status=blocked};
    233 		blocked ->
    234 			State0
    235 	end,
    236 	Data = erlang:delete_element(2, Data0),
    237 	do_info(StreamID, Data, [Data], State);
    238 info(StreamID, Alarm={alarm, Name, on}, State0=#state{stream_body_status=Status})
    239 		when Name =:= connection_buffer_full; Name =:= stream_buffer_full ->
    240 	State = case Status of
    241 		normal -> State0#state{stream_body_status=blocking};
    242 		_ -> State0
    243 	end,
    244 	do_info(StreamID, Alarm, [], State);
    245 info(StreamID, Alarm={alarm, Name, off}, State=#state{stream_body_pid=Pid, stream_body_status=Status})
    246 		when Name =:= connection_buffer_full; Name =:= stream_buffer_full ->
    247 	_ = case Status of
    248 		normal -> ok;
    249 		blocking -> ok;
    250 		blocked -> Pid ! {data_ack, self()}
    251 	end,
    252 	do_info(StreamID, Alarm, [], State#state{stream_body_pid=undefined, stream_body_status=normal});
    253 info(StreamID, Trailers={trailers, _}, State) ->
    254 	do_info(StreamID, Trailers, [Trailers], State);
    255 info(StreamID, Push={push, _, _, _, _, _, _, _}, State) ->
    256 	do_info(StreamID, Push, [Push], State);
    257 info(StreamID, SwitchProtocol={switch_protocol, _, _, _}, State) ->
    258 	do_info(StreamID, SwitchProtocol, [SwitchProtocol], State#state{expect=undefined});
    259 %% Convert the set_options message to a command.
    260 info(StreamID, SetOptions={set_options, _}, State) ->
    261 	do_info(StreamID, SetOptions, [SetOptions], State);
    262 %% Unknown message, either stray or meant for a handler down the line.
    263 info(StreamID, Info, State) ->
    264 	do_info(StreamID, Info, [], State).
    265 
    266 do_info(StreamID, Info, Commands1, State0=#state{next=Next0}) ->
    267 	{Commands2, Next} = cowboy_stream:info(StreamID, Info, Next0),
    268 	{Commands1 ++ Commands2, State0#state{next=Next}}.
    269 
    270 -spec terminate(cowboy_stream:streamid(), cowboy_stream:reason(), #state{}) -> ok.
    271 terminate(StreamID, Reason, #state{next=Next}) ->
    272 	cowboy_stream:terminate(StreamID, Reason, Next).
    273 
    274 -spec early_error(cowboy_stream:streamid(), cowboy_stream:reason(),
    275 	cowboy_stream:partial_req(), Resp, cowboy:opts()) -> Resp
    276 	when Resp::cowboy_stream:resp_command().
    277 early_error(StreamID, Reason, PartialReq, Resp, Opts) ->
    278 	cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts).
    279 
    280 send_request_body(Pid, Ref, nofin, _, Data) ->
    281 	Pid ! {request_body, Ref, nofin, Data},
    282 	ok;
    283 send_request_body(Pid, Ref, fin, BodyLen, Data) ->
    284 	Pid ! {request_body, Ref, fin, BodyLen, Data},
    285 	ok.
    286 
    287 %% Request process.
    288 
    289 %% We add the stacktrace to exit exceptions here in order
    290 %% to simplify the debugging of errors. The proc_lib library
    291 %% already adds the stacktrace to other types of exceptions.
    292 -spec request_process(cowboy_req:req(), cowboy_middleware:env(), [module()]) -> ok.
    293 request_process(Req, Env, Middlewares) ->
    294 	try
    295 		execute(Req, Env, Middlewares)
    296 	catch
    297 		exit:Reason={shutdown, _}:Stacktrace ->
    298 			erlang:raise(exit, Reason, Stacktrace);
    299 		exit:Reason:Stacktrace when Reason =/= normal, Reason =/= shutdown ->
    300 			erlang:raise(exit, {Reason, Stacktrace}, Stacktrace)
    301 	end.
    302 
    303 execute(_, _, []) ->
    304 	ok;
    305 execute(Req, Env, [Middleware|Tail]) ->
    306 	case Middleware:execute(Req, Env) of
    307 		{ok, Req2, Env2} ->
    308 			execute(Req2, Env2, Tail);
    309 		{suspend, Module, Function, Args} ->
    310 			proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module, Function, Args]);
    311 		{stop, _Req2} ->
    312 			ok
    313 	end.
    314 
    315 -spec resume(cowboy_middleware:env(), [module()], module(), atom(), [any()]) -> ok.
    316 resume(Env, Tail, Module, Function, Args) ->
    317 	case apply(Module, Function, Args) of
    318 		{ok, Req2, Env2} ->
    319 			execute(Req2, Env2, Tail);
    320 		{suspend, Module2, Function2, Args2} ->
    321 			proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module2, Function2, Args2]);
    322 		{stop, _Req2} ->
    323 			ok
    324 	end.