cow_sse.erl (9115B)
1 %% Copyright (c) 2017-2018, Loïc Hoguin <essen@ninenines.eu> 2 %% 3 %% Permission to use, copy, modify, and/or distribute this software for any 4 %% purpose with or without fee is hereby granted, provided that the above 5 %% copyright notice and this permission notice appear in all copies. 6 %% 7 %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 8 %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 9 %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 10 %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 11 %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 12 %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 13 %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 14 15 -module(cow_sse). 16 17 -export([init/0]). 18 -export([parse/2]). 19 -export([events/1]). 20 -export([event/1]). 21 22 -record(state, { 23 state_name = bom :: bom | events, 24 buffer = <<>> :: binary(), 25 last_event_id = <<>> :: binary(), 26 last_event_id_set = false :: boolean(), 27 event_type = <<>> :: binary(), 28 data = [] :: iolist(), 29 retry = undefined :: undefined | non_neg_integer() 30 }). 31 -type state() :: #state{}. 32 -export_type([state/0]). 33 34 -type parsed_event() :: #{ 35 last_event_id := binary(), 36 event_type := binary(), 37 data := iolist() 38 }. 39 40 -type event() :: #{ 41 comment => iodata(), 42 data => iodata(), 43 event => iodata() | atom(), 44 id => iodata(), 45 retry => non_neg_integer() 46 }. 47 -export_type([event/0]). 48 49 -spec init() -> state(). 50 init() -> 51 #state{}. 52 53 %% @todo Add a function to retrieve the retry value from the state. 54 55 -spec parse(binary(), state()) 56 -> {event, parsed_event(), State} | {more, State}. 57 parse(Data0, State=#state{state_name=bom, buffer=Buffer}) -> 58 Data1 = case Buffer of 59 <<>> -> Data0; 60 _ -> << Buffer/binary, Data0/binary >> 61 end, 62 case Data1 of 63 %% Skip the BOM. 64 << 16#fe, 16#ff, Data/bits >> -> 65 parse_event(Data, State#state{state_name=events, buffer= <<>>}); 66 %% Not enough data to know wether we have a BOM. 67 << 16#fe >> -> 68 {more, State#state{buffer=Data1}}; 69 <<>> -> 70 {more, State}; 71 %% No BOM. 72 _ -> 73 parse_event(Data1, State#state{state_name=events, buffer= <<>>}) 74 end; 75 %% Try to process data from the buffer if there is no new input. 76 parse(<<>>, State=#state{buffer=Buffer}) -> 77 parse_event(Buffer, State#state{buffer= <<>>}); 78 %% Otherwise process the input data as-is. 79 parse(Data0, State=#state{buffer=Buffer}) -> 80 Data = case Buffer of 81 <<>> -> Data0; 82 _ -> << Buffer/binary, Data0/binary >> 83 end, 84 parse_event(Data, State). 85 86 parse_event(Data, State0) -> 87 case binary:split(Data, [<<"\r\n">>, <<"\r">>, <<"\n">>]) of 88 [Line, Rest] -> 89 case parse_line(Line, State0) of 90 {ok, State} -> 91 parse_event(Rest, State); 92 {event, Event, State} -> 93 {event, Event, State#state{buffer=Rest}} 94 end; 95 [_] -> 96 {more, State0#state{buffer=Data}} 97 end. 98 99 %% Dispatch events on empty line. 100 parse_line(<<>>, State) -> 101 dispatch_event(State); 102 %% Ignore comments. 103 parse_line(<< $:, _/bits >>, State) -> 104 {ok, State}; 105 %% Normal line. 106 parse_line(Line, State) -> 107 case binary:split(Line, [<<":\s">>, <<":">>]) of 108 [Field, Value] -> 109 process_field(Field, Value, State); 110 [Field] -> 111 process_field(Field, <<>>, State) 112 end. 113 114 process_field(<<"event">>, Value, State) -> 115 {ok, State#state{event_type=Value}}; 116 process_field(<<"data">>, Value, State=#state{data=Data}) -> 117 {ok, State#state{data=[<<$\n>>, Value|Data]}}; 118 process_field(<<"id">>, Value, State) -> 119 {ok, State#state{last_event_id=Value, last_event_id_set=true}}; 120 process_field(<<"retry">>, Value, State) -> 121 try 122 {ok, State#state{retry=binary_to_integer(Value)}} 123 catch _:_ -> 124 {ok, State} 125 end; 126 process_field(_, _, State) -> 127 {ok, State}. 128 129 %% Data is an empty string; abort. 130 dispatch_event(State=#state{last_event_id_set=false, data=[]}) -> 131 {ok, State#state{event_type= <<>>}}; 132 %% Data is an empty string but we have a last_event_id: 133 %% propagate it on its own so that the caller knows the 134 %% most recent ID. 135 dispatch_event(State=#state{last_event_id=LastEventID, data=[]}) -> 136 {event, #{ 137 last_event_id => LastEventID 138 }, State#state{last_event_id_set=false, event_type= <<>>}}; 139 %% Dispatch the event. 140 %% 141 %% Always remove the last linebreak from the data. 142 dispatch_event(State=#state{last_event_id=LastEventID, 143 event_type=EventType, data=[_|Data]}) -> 144 {event, #{ 145 last_event_id => LastEventID, 146 event_type => case EventType of 147 <<>> -> <<"message">>; 148 _ -> EventType 149 end, 150 data => lists:reverse(Data) 151 }, State#state{last_event_id_set=false, event_type= <<>>, data=[]}}. 152 153 -ifdef(TEST). 154 parse_example1_test() -> 155 {event, #{ 156 event_type := <<"message">>, 157 last_event_id := <<>>, 158 data := Data 159 }, State} = parse(<< 160 "data: YHOO\n" 161 "data: +2\n" 162 "data: 10\n" 163 "\n">>, init()), 164 <<"YHOO\n+2\n10">> = iolist_to_binary(Data), 165 {more, _} = parse(<<>>, State), 166 ok. 167 168 parse_example2_test() -> 169 {event, #{ 170 event_type := <<"message">>, 171 last_event_id := <<"1">>, 172 data := Data1 173 }, State0} = parse(<< 174 ": test stream\n" 175 "\n" 176 "data: first event\n" 177 "id: 1\n" 178 "\n" 179 "data:second event\n" 180 "id\n" 181 "\n" 182 "data: third event\n" 183 "\n">>, init()), 184 <<"first event">> = iolist_to_binary(Data1), 185 {event, #{ 186 event_type := <<"message">>, 187 last_event_id := <<>>, 188 data := Data2 189 }, State1} = parse(<<>>, State0), 190 <<"second event">> = iolist_to_binary(Data2), 191 {event, #{ 192 event_type := <<"message">>, 193 last_event_id := <<>>, 194 data := Data3 195 }, State} = parse(<<>>, State1), 196 <<" third event">> = iolist_to_binary(Data3), 197 {more, _} = parse(<<>>, State), 198 ok. 199 200 parse_example3_test() -> 201 {event, #{ 202 event_type := <<"message">>, 203 last_event_id := <<>>, 204 data := Data1 205 }, State0} = parse(<< 206 "data\n" 207 "\n" 208 "data\n" 209 "data\n" 210 "\n" 211 "data:\n">>, init()), 212 <<>> = iolist_to_binary(Data1), 213 {event, #{ 214 event_type := <<"message">>, 215 last_event_id := <<>>, 216 data := Data2 217 }, State} = parse(<<>>, State0), 218 <<"\n">> = iolist_to_binary(Data2), 219 {more, _} = parse(<<>>, State), 220 ok. 221 222 parse_example4_test() -> 223 {event, Event, State0} = parse(<< 224 "data:test\n" 225 "\n" 226 "data: test\n" 227 "\n">>, init()), 228 {event, Event, State} = parse(<<>>, State0), 229 {more, _} = parse(<<>>, State), 230 ok. 231 232 parse_id_without_data_test() -> 233 {event, Event1, State0} = parse(<< 234 "id: 1\n" 235 "\n" 236 "data: data\n" 237 "\n" 238 "id: 2\n" 239 "\n">>, init()), 240 1 = maps:size(Event1), 241 #{last_event_id := <<"1">>} = Event1, 242 {event, #{ 243 event_type := <<"message">>, 244 last_event_id := <<"1">>, 245 data := Data 246 }, State1} = parse(<<>>, State0), 247 <<"data">> = iolist_to_binary(Data), 248 {event, Event2, State} = parse(<<>>, State1), 249 1 = maps:size(Event2), 250 #{last_event_id := <<"2">>} = Event2, 251 {more, _} = parse(<<>>, State), 252 ok. 253 254 parse_repeated_id_without_data_test() -> 255 {event, Event1, State0} = parse(<< 256 "id: 1\n" 257 "\n" 258 "event: message\n" %% This will be ignored since there's no data. 259 "\n" 260 "id: 1\n" 261 "\n" 262 "id: 2\n" 263 "\n">>, init()), 264 {event, Event1, State1} = parse(<<>>, State0), 265 1 = maps:size(Event1), 266 #{last_event_id := <<"1">>} = Event1, 267 {event, Event2, State} = parse(<<>>, State1), 268 1 = maps:size(Event2), 269 #{last_event_id := <<"2">>} = Event2, 270 {more, _} = parse(<<>>, State), 271 ok. 272 273 parse_split_event_test() -> 274 {more, State} = parse(<< 275 "data: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" 276 "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" 277 "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA">>, init()), 278 {event, _, _} = parse(<<"==\n\n">>, State), 279 ok. 280 -endif. 281 282 -spec events([event()]) -> iolist(). 283 events(Events) -> 284 [event(Event) || Event <- Events]. 285 286 -spec event(event()) -> iolist(). 287 event(Event) -> 288 [ 289 event_comment(Event), 290 event_id(Event), 291 event_name(Event), 292 event_data(Event), 293 event_retry(Event), 294 $\n 295 ]. 296 297 event_comment(#{comment := Comment}) -> 298 prefix_lines(Comment, <<>>); 299 event_comment(_) -> 300 []. 301 302 event_id(#{id := ID}) -> 303 nomatch = binary:match(iolist_to_binary(ID), <<"\n">>), 304 [<<"id: ">>, ID, $\n]; 305 event_id(_) -> 306 []. 307 308 event_name(#{event := Name0}) -> 309 Name = if 310 is_atom(Name0) -> atom_to_binary(Name0, utf8); 311 true -> iolist_to_binary(Name0) 312 end, 313 nomatch = binary:match(Name, <<"\n">>), 314 [<<"event: ">>, Name, $\n]; 315 event_name(_) -> 316 []. 317 318 event_data(#{data := Data}) -> 319 prefix_lines(Data, <<"data">>); 320 event_data(_) -> 321 []. 322 323 event_retry(#{retry := Retry}) -> 324 [<<"retry: ">>, integer_to_binary(Retry), $\n]; 325 event_retry(_) -> 326 []. 327 328 prefix_lines(IoData, Prefix) -> 329 Lines = binary:split(iolist_to_binary(IoData), <<"\n">>, [global]), 330 [[Prefix, <<": ">>, Line, $\n] || Line <- Lines]. 331 332 -ifdef(TEST). 333 event_test() -> 334 _ = event(#{}), 335 _ = event(#{comment => "test"}), 336 _ = event(#{data => "test"}), 337 _ = event(#{data => "test\ntest\ntest"}), 338 _ = event(#{data => "test\ntest\ntest\n"}), 339 _ = event(#{data => <<"test\ntest\ntest">>}), 340 _ = event(#{data => [<<"test">>, $\n, <<"test">>, [$\n, "test"]]}), 341 _ = event(#{event => test}), 342 _ = event(#{event => "test"}), 343 _ = event(#{id => "test"}), 344 _ = event(#{retry => 5000}), 345 _ = event(#{event => "test", data => "test"}), 346 _ = event(#{id => "test", event => "test", data => "test"}), 347 ok. 348 -endif.