zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

cowboy_stream.erl (7812B)


      1 %% Copyright (c) 2015-2017, Loïc Hoguin <essen@ninenines.eu>
      2 %%
      3 %% Permission to use, copy, modify, and/or distribute this software for any
      4 %% purpose with or without fee is hereby granted, provided that the above
      5 %% copyright notice and this permission notice appear in all copies.
      6 %%
      7 %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      8 %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
      9 %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     10 %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     11 %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     12 %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     13 %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     14 
     15 -module(cowboy_stream).
     16 
     17 -type state() :: any().
     18 -type human_reason() :: atom().
     19 
     20 -type streamid() :: any().
     21 -export_type([streamid/0]).
     22 
     23 -type fin() :: fin | nofin.
     24 -export_type([fin/0]).
     25 
     26 %% @todo Perhaps it makes more sense to have resp_body in this module?
     27 
     28 -type resp_command()
     29 	:: {response, cowboy:http_status(), cowboy:http_headers(), cowboy_req:resp_body()}.
     30 -export_type([resp_command/0]).
     31 
     32 -type commands() :: [{inform, cowboy:http_status(), cowboy:http_headers()}
     33 	| resp_command()
     34 	| {headers, cowboy:http_status(), cowboy:http_headers()}
     35 	| {data, fin(), cowboy_req:resp_body()}
     36 	| {trailers, cowboy:http_headers()}
     37 	| {push, binary(), binary(), binary(), inet:port_number(),
     38 		binary(), binary(), cowboy:http_headers()}
     39 	| {flow, pos_integer()}
     40 	| {spawn, pid(), timeout()}
     41 	| {error_response, cowboy:http_status(), cowboy:http_headers(), iodata()}
     42 	| {switch_protocol, cowboy:http_headers(), module(), state()}
     43 	| {internal_error, any(), human_reason()}
     44 	| {set_options, map()}
     45 	| {log, logger:level(), io:format(), list()}
     46 	| stop].
     47 -export_type([commands/0]).
     48 
     49 -type reason() :: normal | switch_protocol
     50 	| {internal_error, timeout | {error | exit | throw, any()}, human_reason()}
     51 	| {socket_error, closed | atom(), human_reason()}
     52 	| {stream_error, cow_http2:error(), human_reason()}
     53 	| {connection_error, cow_http2:error(), human_reason()}
     54 	| {stop, cow_http2:frame() | {exit, any()}, human_reason()}.
     55 -export_type([reason/0]).
     56 
     57 -type partial_req() :: map(). %% @todo Take what's in cowboy_req with everything? optional.
     58 -export_type([partial_req/0]).
     59 
     60 -callback init(streamid(), cowboy_req:req(), cowboy:opts()) -> {commands(), state()}.
     61 -callback data(streamid(), fin(), binary(), State) -> {commands(), State} when State::state().
     62 -callback info(streamid(), any(), State) -> {commands(), State} when State::state().
     63 -callback terminate(streamid(), reason(), state()) -> any().
     64 -callback early_error(streamid(), reason(), partial_req(), Resp, cowboy:opts())
     65 	-> Resp when Resp::resp_command().
     66 
     67 %% @todo To optimize the number of active timers we could have a command
     68 %% that enables a timeout that is called in the absence of any other call,
     69 %% similar to what gen_server does. However the nice thing about this is
     70 %% that the connection process can keep a single timer around (the same
     71 %% one that would be used to detect half-closed sockets) and use this
     72 %% timer and other events to trigger the timeout in streams at their
     73 %% intended time.
     74 %%
     75 %% This same timer can be used to try and send PING frames to help detect
     76 %% that the connection is indeed unresponsive.
     77 
     78 -export([init/3]).
     79 -export([data/4]).
     80 -export([info/3]).
     81 -export([terminate/3]).
     82 -export([early_error/5]).
     83 -export([make_error_log/5]).
     84 
     85 %% Note that this and other functions in this module do NOT catch
     86 %% exceptions. We want the exception to go all the way down to the
     87 %% protocol code.
     88 %%
     89 %% OK the failure scenario is not so clear. The problem is
     90 %% that the failure at any point in init/3 will result in the
     91 %% corresponding state being lost. I am unfortunately not
     92 %% confident we can do anything about this. If the crashing
     93 %% handler just created a process, we'll never know about it.
     94 %% Therefore at this time I choose to leave all failure handling
     95 %% to the protocol process.
     96 %%
     97 %% Note that a failure in init/3 will result in terminate/3
     98 %% NOT being called. This is because the state is not available.
     99 
    100 -spec init(streamid(), cowboy_req:req(), cowboy:opts())
    101 	-> {commands(), {module(), state()} | undefined}.
    102 init(StreamID, Req, Opts) ->
    103 	case maps:get(stream_handlers, Opts, [cowboy_stream_h]) of
    104 		[] ->
    105 			{[], undefined};
    106 		[Handler|Tail] ->
    107 			%% We call the next handler and remove it from the list of
    108 			%% stream handlers. This means that handlers that run after
    109 			%% it have no knowledge it exists. Should user require this
    110 			%% knowledge they can just define a separate option that will
    111 			%% be left untouched.
    112 			{Commands, State} = Handler:init(StreamID, Req, Opts#{stream_handlers => Tail}),
    113 			{Commands, {Handler, State}}
    114 	end.
    115 
    116 -spec data(streamid(), fin(), binary(), {Handler, State} | undefined)
    117 	-> {commands(), {Handler, State} | undefined}
    118 	when Handler::module(), State::state().
    119 data(_, _, _, undefined) ->
    120 	{[], undefined};
    121 data(StreamID, IsFin, Data, {Handler, State0}) ->
    122 	{Commands, State} = Handler:data(StreamID, IsFin, Data, State0),
    123 	{Commands, {Handler, State}}.
    124 
    125 -spec info(streamid(), any(), {Handler, State} | undefined)
    126 	-> {commands(), {Handler, State} | undefined}
    127 	when Handler::module(), State::state().
    128 info(_, _, undefined) ->
    129 	{[], undefined};
    130 info(StreamID, Info, {Handler, State0}) ->
    131 	{Commands, State} = Handler:info(StreamID, Info, State0),
    132 	{Commands, {Handler, State}}.
    133 
    134 -spec terminate(streamid(), reason(), {module(), state()} | undefined) -> ok.
    135 terminate(_, _, undefined) ->
    136 	ok;
    137 terminate(StreamID, Reason, {Handler, State}) ->
    138 	_ = Handler:terminate(StreamID, Reason, State),
    139 	ok.
    140 
    141 -spec early_error(streamid(), reason(), partial_req(), Resp, cowboy:opts())
    142 	-> Resp when Resp::resp_command().
    143 early_error(StreamID, Reason, PartialReq, Resp, Opts) ->
    144 	case maps:get(stream_handlers, Opts, [cowboy_stream_h]) of
    145 		[] ->
    146 			Resp;
    147 		[Handler|Tail] ->
    148 			%% This is the same behavior as in init/3.
    149 			Handler:early_error(StreamID, Reason,
    150 				PartialReq, Resp, Opts#{stream_handlers => Tail})
    151 	end.
    152 
    153 -spec make_error_log(init | data | info | terminate | early_error,
    154 	list(), error | exit | throw, any(), list())
    155 	-> {log, error, string(), list()}.
    156 make_error_log(init, [StreamID, Req, Opts], Class, Exception, Stacktrace) ->
    157 	{log, error,
    158 		"Unhandled exception ~p:~p in cowboy_stream:init(~p, Req, Opts)~n"
    159 		"Stacktrace: ~p~n"
    160 		"Req: ~p~n"
    161 		"Opts: ~p~n",
    162 		[Class, Exception, StreamID, Stacktrace, Req, Opts]};
    163 make_error_log(data, [StreamID, IsFin, Data, State], Class, Exception, Stacktrace) ->
    164 	{log, error,
    165 		"Unhandled exception ~p:~p in cowboy_stream:data(~p, ~p, Data, State)~n"
    166 		"Stacktrace: ~p~n"
    167 		"Data: ~p~n"
    168 		"State: ~p~n",
    169 		[Class, Exception, StreamID, IsFin, Stacktrace, Data, State]};
    170 make_error_log(info, [StreamID, Msg, State], Class, Exception, Stacktrace) ->
    171 	{log, error,
    172 		"Unhandled exception ~p:~p in cowboy_stream:info(~p, Msg, State)~n"
    173 		"Stacktrace: ~p~n"
    174 		"Msg: ~p~n"
    175 		"State: ~p~n",
    176 		[Class, Exception, StreamID, Stacktrace, Msg, State]};
    177 make_error_log(terminate, [StreamID, Reason, State], Class, Exception, Stacktrace) ->
    178 	{log, error,
    179 		"Unhandled exception ~p:~p in cowboy_stream:terminate(~p, Reason, State)~n"
    180 		"Stacktrace: ~p~n"
    181 		"Reason: ~p~n"
    182 		"State: ~p~n",
    183 		[Class, Exception, StreamID, Stacktrace, Reason, State]};
    184 make_error_log(early_error, [StreamID, Reason, PartialReq, Resp, Opts],
    185 		Class, Exception, Stacktrace) ->
    186 	{log, error,
    187 		"Unhandled exception ~p:~p in cowboy_stream:early_error(~p, Reason, PartialReq, Resp, Opts)~n"
    188 		"Stacktrace: ~p~n"
    189 		"Reason: ~p~n"
    190 		"PartialReq: ~p~n"
    191 		"Resp: ~p~n"
    192 		"Opts: ~p~n",
    193 		[Class, Exception, StreamID, Stacktrace, Reason, PartialReq, Resp, Opts]}.