zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

cowboy_metrics_h.erl (10859B)


      1 %% Copyright (c) 2017, Loïc Hoguin <essen@ninenines.eu>
      2 %%
      3 %% Permission to use, copy, modify, and/or distribute this software for any
      4 %% purpose with or without fee is hereby granted, provided that the above
      5 %% copyright notice and this permission notice appear in all copies.
      6 %%
      7 %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      8 %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
      9 %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     10 %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     11 %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     12 %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     13 %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     14 
     15 -module(cowboy_metrics_h).
     16 -behavior(cowboy_stream).
     17 
     18 -export([init/3]).
     19 -export([data/4]).
     20 -export([info/3]).
     21 -export([terminate/3]).
     22 -export([early_error/5]).
     23 
     24 -type proc_metrics() :: #{pid() => #{
     25 	%% Time at which the process spawned.
     26 	spawn := integer(),
     27 
     28 	%% Time at which the process exited.
     29 	exit => integer(),
     30 
     31 	%% Reason for the process exit.
     32 	reason => any()
     33 }}.
     34 
     35 -type informational_metrics() :: #{
     36 	%% Informational response status.
     37 	status := cowboy:http_status(),
     38 
     39 	%% Headers sent with the informational response.
     40 	headers := cowboy:http_headers(),
     41 
     42 	%% Time when the informational response was sent.
     43 	time := integer()
     44 }.
     45 
     46 -type metrics() :: #{
     47 	%% The identifier for this listener.
     48 	ref := ranch:ref(),
     49 
     50 	%% The pid for this connection.
     51 	pid := pid(),
     52 
     53 	%% The streamid also indicates the total number of requests on
     54 	%% this connection (StreamID div 2 + 1).
     55 	streamid := cowboy_stream:streamid(),
     56 
     57 	%% The terminate reason is always useful.
     58 	reason := cowboy_stream:reason(),
     59 
     60 	%% A filtered Req object or a partial Req object
     61 	%% depending on how far the request got to.
     62 	req => cowboy_req:req(),
     63 	partial_req => cowboy_stream:partial_req(),
     64 
     65 	%% Response status.
     66 	resp_status := cowboy:http_status(),
     67 
     68 	%% Filtered response headers.
     69 	resp_headers := cowboy:http_headers(),
     70 
     71 	%% Start/end of the processing of the request.
     72 	%%
     73 	%% This represents the time from this stream handler's init
     74 	%% to terminate.
     75 	req_start => integer(),
     76 	req_end => integer(),
     77 
     78 	%% Start/end of the receiving of the request body.
     79 	%% Begins when the first packet has been received.
     80 	req_body_start => integer(),
     81 	req_body_end => integer(),
     82 
     83 	%% Start/end of the sending of the response.
     84 	%% Begins when we send the headers and ends on the final
     85 	%% packet of the response body. If everything is sent at
     86 	%% once these values are identical.
     87 	resp_start => integer(),
     88 	resp_end => integer(),
     89 
     90 	%% For early errors all we get is the time we received it.
     91 	early_error_time => integer(),
     92 
     93 	%% Start/end of spawned processes. This is where most of
     94 	%% the user code lies, excluding stream handlers. On a
     95 	%% default Cowboy configuration there should be only one
     96 	%% process: the request process.
     97 	procs => proc_metrics(),
     98 
     99 	%% Informational responses sent before the final response.
    100 	informational => [informational_metrics()],
    101 
    102 	%% Length of the request and response bodies. This does
    103 	%% not include the framing.
    104 	req_body_length => non_neg_integer(),
    105 	resp_body_length => non_neg_integer(),
    106 
    107 	%% Additional metadata set by the user.
    108 	user_data => map()
    109 }.
    110 -export_type([metrics/0]).
    111 
    112 -type metrics_callback() :: fun((metrics()) -> any()).
    113 -export_type([metrics_callback/0]).
    114 
    115 -record(state, {
    116 	next :: any(),
    117 	callback :: fun((metrics()) -> any()),
    118 	resp_headers_filter :: undefined | fun((cowboy:http_headers()) -> cowboy:http_headers()),
    119 	req :: map(),
    120 	resp_status :: undefined | cowboy:http_status(),
    121 	resp_headers :: undefined | cowboy:http_headers(),
    122 	ref :: ranch:ref(),
    123 	req_start :: integer(),
    124 	req_end :: undefined | integer(),
    125 	req_body_start :: undefined | integer(),
    126 	req_body_end :: undefined | integer(),
    127 	resp_start :: undefined | integer(),
    128 	resp_end :: undefined | integer(),
    129 	procs = #{} :: proc_metrics(),
    130 	informational = [] :: [informational_metrics()],
    131 	req_body_length = 0 :: non_neg_integer(),
    132 	resp_body_length = 0 :: non_neg_integer(),
    133 	user_data = #{} :: map()
    134 }).
    135 
    136 -spec init(cowboy_stream:streamid(), cowboy_req:req(), cowboy:opts())
    137 	-> {[{spawn, pid(), timeout()}], #state{}}.
    138 init(StreamID, Req=#{ref := Ref}, Opts=#{metrics_callback := Fun}) ->
    139 	ReqStart = erlang:monotonic_time(),
    140 	{Commands, Next} = cowboy_stream:init(StreamID, Req, Opts),
    141 	FilteredReq = case maps:get(metrics_req_filter, Opts, undefined) of
    142 		undefined -> Req;
    143 		ReqFilter -> ReqFilter(Req)
    144 	end,
    145 	RespHeadersFilter = maps:get(metrics_resp_headers_filter, Opts, undefined),
    146 	{Commands, fold(Commands, #state{
    147 		next=Next,
    148 		callback=Fun,
    149 		resp_headers_filter=RespHeadersFilter,
    150 		req=FilteredReq,
    151 		ref=Ref,
    152 		req_start=ReqStart
    153 	})}.
    154 
    155 -spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State)
    156 	-> {cowboy_stream:commands(), State} when State::#state{}.
    157 data(StreamID, IsFin=fin, Data, State=#state{req_body_start=undefined}) ->
    158 	ReqBody = erlang:monotonic_time(),
    159 	do_data(StreamID, IsFin, Data, State#state{
    160 		req_body_start=ReqBody,
    161 		req_body_end=ReqBody,
    162 		req_body_length=byte_size(Data)
    163 	});
    164 data(StreamID, IsFin=fin, Data, State=#state{req_body_length=ReqBodyLen}) ->
    165 	ReqBodyEnd = erlang:monotonic_time(),
    166 	do_data(StreamID, IsFin, Data, State#state{
    167 		req_body_end=ReqBodyEnd,
    168 		req_body_length=ReqBodyLen + byte_size(Data)
    169 	});
    170 data(StreamID, IsFin, Data, State=#state{req_body_start=undefined}) ->
    171 	ReqBodyStart = erlang:monotonic_time(),
    172 	do_data(StreamID, IsFin, Data, State#state{
    173 		req_body_start=ReqBodyStart,
    174 		req_body_length=byte_size(Data)
    175 	});
    176 data(StreamID, IsFin, Data, State=#state{req_body_length=ReqBodyLen}) ->
    177 	do_data(StreamID, IsFin, Data, State#state{
    178 		req_body_length=ReqBodyLen + byte_size(Data)
    179 	}).
    180 
    181 do_data(StreamID, IsFin, Data, State0=#state{next=Next0}) ->
    182 	{Commands, Next} = cowboy_stream:data(StreamID, IsFin, Data, Next0),
    183 	{Commands, fold(Commands, State0#state{next=Next})}.
    184 
    185 -spec info(cowboy_stream:streamid(), any(), State)
    186 	-> {cowboy_stream:commands(), State} when State::#state{}.
    187 info(StreamID, Info={'EXIT', Pid, Reason}, State0=#state{procs=Procs}) ->
    188 	ProcEnd = erlang:monotonic_time(),
    189 	P = maps:get(Pid, Procs),
    190 	State = State0#state{procs=Procs#{Pid => P#{
    191 		exit => ProcEnd,
    192 		reason => Reason
    193 	}}},
    194 	do_info(StreamID, Info, State);
    195 info(StreamID, Info, State) ->
    196 	do_info(StreamID, Info, State).
    197 
    198 do_info(StreamID, Info, State0=#state{next=Next0}) ->
    199 	{Commands, Next} = cowboy_stream:info(StreamID, Info, Next0),
    200 	{Commands, fold(Commands, State0#state{next=Next})}.
    201 
    202 fold([], State) ->
    203 	State;
    204 fold([{spawn, Pid, _}|Tail], State0=#state{procs=Procs}) ->
    205 	ProcStart = erlang:monotonic_time(),
    206 	State = State0#state{procs=Procs#{Pid => #{spawn => ProcStart}}},
    207 	fold(Tail, State);
    208 fold([{inform, Status, Headers}|Tail],
    209 		State=#state{informational=Infos}) ->
    210 	Time = erlang:monotonic_time(),
    211 	fold(Tail, State#state{informational=[#{
    212 		status => Status,
    213 		headers => Headers,
    214 		time => Time
    215 	}|Infos]});
    216 fold([{response, Status, Headers, Body}|Tail],
    217 		State=#state{resp_headers_filter=RespHeadersFilter}) ->
    218 	Resp = erlang:monotonic_time(),
    219 	fold(Tail, State#state{
    220 		resp_status=Status,
    221 		resp_headers=case RespHeadersFilter of
    222 			undefined -> Headers;
    223 			_ -> RespHeadersFilter(Headers)
    224 		end,
    225 		resp_start=Resp,
    226 		resp_end=Resp,
    227 		resp_body_length=resp_body_length(Body)
    228 	});
    229 fold([{error_response, Status, Headers, Body}|Tail],
    230 		State=#state{resp_status=RespStatus}) ->
    231 	%% The error_response command only results in a response
    232 	%% if no response was sent before.
    233 	case RespStatus of
    234 		undefined ->
    235 			fold([{response, Status, Headers, Body}|Tail], State);
    236 		_ ->
    237 			fold(Tail, State)
    238 	end;
    239 fold([{headers, Status, Headers}|Tail],
    240 		State=#state{resp_headers_filter=RespHeadersFilter}) ->
    241 	RespStart = erlang:monotonic_time(),
    242 	fold(Tail, State#state{
    243 		resp_status=Status,
    244 		resp_headers=case RespHeadersFilter of
    245 			undefined -> Headers;
    246 			_ -> RespHeadersFilter(Headers)
    247 		end,
    248 		resp_start=RespStart
    249 	});
    250 %% @todo It might be worthwhile to keep the sendfile information around,
    251 %% especially if these frames ultimately result in a sendfile syscall.
    252 fold([{data, nofin, Data}|Tail], State=#state{resp_body_length=RespBodyLen}) ->
    253 	fold(Tail, State#state{
    254 		resp_body_length=RespBodyLen + resp_body_length(Data)
    255 	});
    256 fold([{data, fin, Data}|Tail], State=#state{resp_body_length=RespBodyLen}) ->
    257 	RespEnd = erlang:monotonic_time(),
    258 	fold(Tail, State#state{
    259 		resp_end=RespEnd,
    260 		resp_body_length=RespBodyLen + resp_body_length(Data)
    261 	});
    262 fold([{set_options, SetOpts}|Tail], State0=#state{user_data=OldUserData}) ->
    263 	State = case SetOpts of
    264 		#{metrics_user_data := NewUserData} ->
    265 			State0#state{user_data=maps:merge(OldUserData, NewUserData)};
    266 		_ ->
    267 			State0
    268 	end,
    269 	fold(Tail, State);
    270 fold([_|Tail], State) ->
    271 	fold(Tail, State).
    272 
    273 -spec terminate(cowboy_stream:streamid(), cowboy_stream:reason(), #state{}) -> any().
    274 terminate(StreamID, Reason, #state{next=Next, callback=Fun,
    275 		req=Req, resp_status=RespStatus, resp_headers=RespHeaders, ref=Ref,
    276 		req_start=ReqStart, req_body_start=ReqBodyStart,
    277 		req_body_end=ReqBodyEnd, resp_start=RespStart, resp_end=RespEnd,
    278 		procs=Procs, informational=Infos, user_data=UserData,
    279 		req_body_length=ReqBodyLen, resp_body_length=RespBodyLen}) ->
    280 	Res = cowboy_stream:terminate(StreamID, Reason, Next),
    281 	ReqEnd = erlang:monotonic_time(),
    282 	Metrics = #{
    283 		ref => Ref,
    284 		pid => self(),
    285 		streamid => StreamID,
    286 		reason => Reason,
    287 		req => Req,
    288 		resp_status => RespStatus,
    289 		resp_headers => RespHeaders,
    290 		req_start => ReqStart,
    291 		req_end => ReqEnd,
    292 		req_body_start => ReqBodyStart,
    293 		req_body_end => ReqBodyEnd,
    294 		resp_start => RespStart,
    295 		resp_end => RespEnd,
    296 		procs => Procs,
    297 		informational => lists:reverse(Infos),
    298 		req_body_length => ReqBodyLen,
    299 		resp_body_length => RespBodyLen,
    300 		user_data => UserData
    301 	},
    302 	Fun(Metrics),
    303 	Res.
    304 
    305 -spec early_error(cowboy_stream:streamid(), cowboy_stream:reason(),
    306 	cowboy_stream:partial_req(), Resp, cowboy:opts()) -> Resp
    307 	when Resp::cowboy_stream:resp_command().
    308 early_error(StreamID, Reason, PartialReq=#{ref := Ref}, Resp0, Opts=#{metrics_callback := Fun}) ->
    309 	Time = erlang:monotonic_time(),
    310 	Resp = {response, RespStatus, RespHeaders, RespBody}
    311 		= cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp0, Opts),
    312 	%% As far as metrics go we are limited in what we can provide
    313 	%% in this case.
    314 	Metrics = #{
    315 		ref => Ref,
    316 		pid => self(),
    317 		streamid => StreamID,
    318 		reason => Reason,
    319 		partial_req => PartialReq,
    320 		resp_status => RespStatus,
    321 		resp_headers => RespHeaders,
    322 		early_error_time => Time,
    323 		resp_body_length => resp_body_length(RespBody)
    324 	},
    325 	Fun(Metrics),
    326 	Resp.
    327 
    328 resp_body_length({sendfile, _, Len, _}) ->
    329 	Len;
    330 resp_body_length(Data) ->
    331 	iolist_size(Data).