zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

cow_sse.erl (9115B)


      1 %% Copyright (c) 2017-2018, Loïc Hoguin <essen@ninenines.eu>
      2 %%
      3 %% Permission to use, copy, modify, and/or distribute this software for any
      4 %% purpose with or without fee is hereby granted, provided that the above
      5 %% copyright notice and this permission notice appear in all copies.
      6 %%
      7 %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      8 %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
      9 %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     10 %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     11 %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     12 %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     13 %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     14 
     15 -module(cow_sse).
     16 
     17 -export([init/0]).
     18 -export([parse/2]).
     19 -export([events/1]).
     20 -export([event/1]).
     21 
     22 -record(state, {
     23 	state_name = bom :: bom | events,
     24 	buffer = <<>> :: binary(),
     25 	last_event_id = <<>> :: binary(),
     26 	last_event_id_set = false :: boolean(),
     27 	event_type = <<>> :: binary(),
     28 	data = [] :: iolist(),
     29 	retry = undefined :: undefined | non_neg_integer()
     30 }).
     31 -type state() :: #state{}.
     32 -export_type([state/0]).
     33 
     34 -type parsed_event() :: #{
     35 	last_event_id := binary(),
     36 	event_type := binary(),
     37 	data := iolist()
     38 }.
     39 
     40 -type event() :: #{
     41 	comment => iodata(),
     42 	data => iodata(),
     43 	event => iodata() | atom(),
     44 	id => iodata(),
     45 	retry => non_neg_integer()
     46 }.
     47 -export_type([event/0]).
     48 
     49 -spec init() -> state().
     50 init() ->
     51 	#state{}.
     52 
     53 %% @todo Add a function to retrieve the retry value from the state.
     54 
     55 -spec parse(binary(), state())
     56 	-> {event, parsed_event(), State} | {more, State}.
     57 parse(Data0, State=#state{state_name=bom, buffer=Buffer}) ->
     58 	Data1 = case Buffer of
     59 		<<>> -> Data0;
     60 		_ -> << Buffer/binary, Data0/binary >>
     61 	end,
     62 	case Data1 of
     63 		%% Skip the BOM.
     64 		<< 16#fe, 16#ff, Data/bits >> ->
     65 			parse_event(Data, State#state{state_name=events, buffer= <<>>});
     66 		%% Not enough data to know wether we have a BOM.
     67 		<< 16#fe >> ->
     68 			{more, State#state{buffer=Data1}};
     69 		<<>> ->
     70 			{more, State};
     71 		%% No BOM.
     72 		_ ->
     73 			parse_event(Data1, State#state{state_name=events, buffer= <<>>})
     74 	end;
     75 %% Try to process data from the buffer if there is no new input.
     76 parse(<<>>, State=#state{buffer=Buffer}) ->
     77 	parse_event(Buffer, State#state{buffer= <<>>});
     78 %% Otherwise process the input data as-is.
     79 parse(Data0, State=#state{buffer=Buffer}) ->
     80 	Data = case Buffer of
     81 		<<>> -> Data0;
     82 		_ -> << Buffer/binary, Data0/binary >>
     83 	end,
     84 	parse_event(Data, State).
     85 
     86 parse_event(Data, State0) ->
     87 	case binary:split(Data, [<<"\r\n">>, <<"\r">>, <<"\n">>]) of
     88 		[Line, Rest] ->
     89 			case parse_line(Line, State0) of
     90 				{ok, State} ->
     91 					parse_event(Rest, State);
     92 				{event, Event, State} ->
     93 					{event, Event, State#state{buffer=Rest}}
     94 			end;
     95 		[_] ->
     96 			{more, State0#state{buffer=Data}}
     97 	end.
     98 
     99 %% Dispatch events on empty line.
    100 parse_line(<<>>, State) ->
    101 	dispatch_event(State);
    102 %% Ignore comments.
    103 parse_line(<< $:, _/bits >>, State) ->
    104 	{ok, State};
    105 %% Normal line.
    106 parse_line(Line, State) ->
    107 	case binary:split(Line, [<<":\s">>, <<":">>]) of
    108 		[Field, Value] ->
    109 			process_field(Field, Value, State);
    110 		[Field] ->
    111 			process_field(Field, <<>>, State)
    112 	end.
    113 
    114 process_field(<<"event">>, Value, State) ->
    115 	{ok, State#state{event_type=Value}};
    116 process_field(<<"data">>, Value, State=#state{data=Data}) ->
    117 	{ok, State#state{data=[<<$\n>>, Value|Data]}};
    118 process_field(<<"id">>, Value, State) ->
    119 	{ok, State#state{last_event_id=Value, last_event_id_set=true}};
    120 process_field(<<"retry">>, Value, State) ->
    121 	try
    122 		{ok, State#state{retry=binary_to_integer(Value)}}
    123 	catch _:_ ->
    124 		{ok, State}
    125 	end;
    126 process_field(_, _, State) ->
    127 	{ok, State}.
    128 
    129 %% Data is an empty string; abort.
    130 dispatch_event(State=#state{last_event_id_set=false, data=[]}) ->
    131 	{ok, State#state{event_type= <<>>}};
    132 %% Data is an empty string but we have a last_event_id:
    133 %% propagate it on its own so that the caller knows the
    134 %% most recent ID.
    135 dispatch_event(State=#state{last_event_id=LastEventID, data=[]}) ->
    136 	{event, #{
    137 		last_event_id => LastEventID
    138 	}, State#state{last_event_id_set=false, event_type= <<>>}};
    139 %% Dispatch the event.
    140 %%
    141 %% Always remove the last linebreak from the data.
    142 dispatch_event(State=#state{last_event_id=LastEventID,
    143 		event_type=EventType, data=[_|Data]}) ->
    144 	{event, #{
    145 		last_event_id => LastEventID,
    146 		event_type => case EventType of
    147 			<<>> -> <<"message">>;
    148 			_ -> EventType
    149 		end,
    150 		data => lists:reverse(Data)
    151 	}, State#state{last_event_id_set=false, event_type= <<>>, data=[]}}.
    152 
    153 -ifdef(TEST).
    154 parse_example1_test() ->
    155 	{event, #{
    156 		event_type := <<"message">>,
    157 		last_event_id := <<>>,
    158 		data := Data
    159 	}, State} = parse(<<
    160 		"data: YHOO\n"
    161 		"data: +2\n"
    162 		"data: 10\n"
    163 		"\n">>, init()),
    164 	<<"YHOO\n+2\n10">> = iolist_to_binary(Data),
    165 	{more, _} = parse(<<>>, State),
    166 	ok.
    167 
    168 parse_example2_test() ->
    169 	{event, #{
    170 		event_type := <<"message">>,
    171 		last_event_id := <<"1">>,
    172 		data := Data1
    173 	}, State0} = parse(<<
    174 		": test stream\n"
    175 		"\n"
    176 		"data: first event\n"
    177 		"id: 1\n"
    178 		"\n"
    179 		"data:second event\n"
    180 		"id\n"
    181 		"\n"
    182 		"data:  third event\n"
    183 		"\n">>, init()),
    184 	<<"first event">> = iolist_to_binary(Data1),
    185 	{event, #{
    186 		event_type := <<"message">>,
    187 		last_event_id := <<>>,
    188 		data := Data2
    189 	}, State1} = parse(<<>>, State0),
    190 	<<"second event">> = iolist_to_binary(Data2),
    191 	{event, #{
    192 		event_type := <<"message">>,
    193 		last_event_id := <<>>,
    194 		data := Data3
    195 	}, State} = parse(<<>>, State1),
    196 	<<" third event">> = iolist_to_binary(Data3),
    197 	{more, _} = parse(<<>>, State),
    198 	ok.
    199 
    200 parse_example3_test() ->
    201 	{event, #{
    202 		event_type := <<"message">>,
    203 		last_event_id := <<>>,
    204 		data := Data1
    205 	}, State0} = parse(<<
    206 		"data\n"
    207 		"\n"
    208 		"data\n"
    209 		"data\n"
    210 		"\n"
    211 		"data:\n">>, init()),
    212 	<<>> = iolist_to_binary(Data1),
    213 	{event, #{
    214 		event_type := <<"message">>,
    215 		last_event_id := <<>>,
    216 		data := Data2
    217 	}, State} = parse(<<>>, State0),
    218 	<<"\n">> = iolist_to_binary(Data2),
    219 	{more, _} = parse(<<>>, State),
    220 	ok.
    221 
    222 parse_example4_test() ->
    223 	{event, Event, State0} = parse(<<
    224 		"data:test\n"
    225 		"\n"
    226 		"data: test\n"
    227 		"\n">>, init()),
    228 	{event, Event, State} = parse(<<>>, State0),
    229 	{more, _} = parse(<<>>, State),
    230 	ok.
    231 
    232 parse_id_without_data_test() ->
    233 	{event, Event1, State0} = parse(<<
    234 		"id: 1\n"
    235 		"\n"
    236 		"data: data\n"
    237 		"\n"
    238 		"id: 2\n"
    239 		"\n">>, init()),
    240 	1 = maps:size(Event1),
    241 	#{last_event_id := <<"1">>} = Event1,
    242 	{event, #{
    243 		event_type := <<"message">>,
    244 		last_event_id := <<"1">>,
    245 		data := Data
    246 	}, State1} = parse(<<>>, State0),
    247 	<<"data">> = iolist_to_binary(Data),
    248 	{event, Event2, State} = parse(<<>>, State1),
    249 	1 = maps:size(Event2),
    250 	#{last_event_id := <<"2">>} = Event2,
    251 	{more, _} = parse(<<>>, State),
    252 	ok.
    253 
    254 parse_repeated_id_without_data_test() ->
    255 	{event, Event1, State0} = parse(<<
    256 		"id: 1\n"
    257 		"\n"
    258 		"event: message\n" %% This will be ignored since there's no data.
    259 		"\n"
    260 		"id: 1\n"
    261 		"\n"
    262 		"id: 2\n"
    263 		"\n">>, init()),
    264 	{event, Event1, State1} = parse(<<>>, State0),
    265 	1 = maps:size(Event1),
    266 	#{last_event_id := <<"1">>} = Event1,
    267 	{event, Event2, State} = parse(<<>>, State1),
    268 	1 = maps:size(Event2),
    269 	#{last_event_id := <<"2">>} = Event2,
    270 	{more, _} = parse(<<>>, State),
    271 	ok.
    272 
    273 parse_split_event_test() ->
    274 	{more, State} = parse(<<
    275 		"data: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
    276 		"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
    277 		"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA">>, init()),
    278 	{event, _, _} = parse(<<"==\n\n">>, State),
    279 	ok.
    280 -endif.
    281 
    282 -spec events([event()]) -> iolist().
    283 events(Events) ->
    284 	[event(Event) || Event <- Events].
    285 
    286 -spec event(event()) -> iolist().
    287 event(Event) ->
    288 	[
    289 		event_comment(Event),
    290 		event_id(Event),
    291 		event_name(Event),
    292 		event_data(Event),
    293 		event_retry(Event),
    294 		$\n
    295 	].
    296 
    297 event_comment(#{comment := Comment}) ->
    298 	prefix_lines(Comment, <<>>);
    299 event_comment(_) ->
    300 	[].
    301 
    302 event_id(#{id := ID}) ->
    303 	nomatch = binary:match(iolist_to_binary(ID), <<"\n">>),
    304 	[<<"id: ">>, ID, $\n];
    305 event_id(_) ->
    306 	[].
    307 
    308 event_name(#{event := Name0}) ->
    309 	Name = if
    310 		is_atom(Name0) -> atom_to_binary(Name0, utf8);
    311 		true -> iolist_to_binary(Name0)
    312 	end,
    313 	nomatch = binary:match(Name, <<"\n">>),
    314 	[<<"event: ">>, Name, $\n];
    315 event_name(_) ->
    316 	[].
    317 
    318 event_data(#{data := Data}) ->
    319 	prefix_lines(Data, <<"data">>);
    320 event_data(_) ->
    321 	[].
    322 
    323 event_retry(#{retry := Retry}) ->
    324 	[<<"retry: ">>, integer_to_binary(Retry), $\n];
    325 event_retry(_) ->
    326 	[].
    327 
    328 prefix_lines(IoData, Prefix) ->
    329 	Lines = binary:split(iolist_to_binary(IoData), <<"\n">>, [global]),
    330 	[[Prefix, <<": ">>, Line, $\n] || Line <- Lines].
    331 
    332 -ifdef(TEST).
    333 event_test() ->
    334 	_ = event(#{}),
    335 	_ = event(#{comment => "test"}),
    336 	_ = event(#{data => "test"}),
    337 	_ = event(#{data => "test\ntest\ntest"}),
    338 	_ = event(#{data => "test\ntest\ntest\n"}),
    339 	_ = event(#{data => <<"test\ntest\ntest">>}),
    340 	_ = event(#{data => [<<"test">>, $\n, <<"test">>, [$\n, "test"]]}),
    341 	_ = event(#{event => test}),
    342 	_ = event(#{event => "test"}),
    343 	_ = event(#{id => "test"}),
    344 	_ = event(#{retry => 5000}),
    345 	_ = event(#{event => "test", data => "test"}),
    346 	_ = event(#{id => "test", event => "test", data => "test"}),
    347 	ok.
    348 -endif.