cowboy_websocket.erl (27186B)
1 %% Copyright (c) 2011-2017, Loïc Hoguin <essen@ninenines.eu> 2 %% 3 %% Permission to use, copy, modify, and/or distribute this software for any 4 %% purpose with or without fee is hereby granted, provided that the above 5 %% copyright notice and this permission notice appear in all copies. 6 %% 7 %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 8 %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 9 %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 10 %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 11 %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 12 %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 13 %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 14 15 %% Cowboy supports versions 7 through 17 of the Websocket drafts. 16 %% It also supports RFC6455, the proposed standard for Websocket. 17 -module(cowboy_websocket). 18 -behaviour(cowboy_sub_protocol). 19 20 -export([is_upgrade_request/1]). 21 -export([upgrade/4]). 22 -export([upgrade/5]). 23 -export([takeover/7]). 24 -export([loop/3]). 25 26 -export([system_continue/3]). 27 -export([system_terminate/4]). 28 -export([system_code_change/4]). 29 30 -type commands() :: [cow_ws:frame() 31 | {active, boolean()} 32 | {deflate, boolean()} 33 | {set_options, map()} 34 | {shutdown_reason, any()} 35 ]. 36 -export_type([commands/0]). 37 38 -type call_result(State) :: {commands(), State} | {commands(), State, hibernate}. 39 40 -type deprecated_call_result(State) :: {ok, State} 41 | {ok, State, hibernate} 42 | {reply, cow_ws:frame() | [cow_ws:frame()], State} 43 | {reply, cow_ws:frame() | [cow_ws:frame()], State, hibernate} 44 | {stop, State}. 45 46 -type terminate_reason() :: normal | stop | timeout 47 | remote | {remote, cow_ws:close_code(), binary()} 48 | {error, badencoding | badframe | closed | atom()} 49 | {crash, error | exit | throw, any()}. 50 51 -callback init(Req, any()) 52 -> {ok | module(), Req, any()} 53 | {module(), Req, any(), any()} 54 when Req::cowboy_req:req(). 55 56 -callback websocket_init(State) 57 -> call_result(State) | deprecated_call_result(State) when State::any(). 58 -optional_callbacks([websocket_init/1]). 59 60 -callback websocket_handle(ping | pong | {text | binary | ping | pong, binary()}, State) 61 -> call_result(State) | deprecated_call_result(State) when State::any(). 62 -callback websocket_info(any(), State) 63 -> call_result(State) | deprecated_call_result(State) when State::any(). 64 65 -callback terminate(any(), cowboy_req:req(), any()) -> ok. 66 -optional_callbacks([terminate/3]). 67 68 -type opts() :: #{ 69 active_n => pos_integer(), 70 compress => boolean(), 71 deflate_opts => cow_ws:deflate_opts(), 72 idle_timeout => timeout(), 73 max_frame_size => non_neg_integer() | infinity, 74 req_filter => fun((cowboy_req:req()) -> map()), 75 validate_utf8 => boolean() 76 }. 77 -export_type([opts/0]). 78 79 -record(state, { 80 parent :: undefined | pid(), 81 ref :: ranch:ref(), 82 socket = undefined :: inet:socket() | {pid(), cowboy_stream:streamid()} | undefined, 83 transport = undefined :: module() | undefined, 84 opts = #{} :: opts(), 85 active = true :: boolean(), 86 handler :: module(), 87 key = undefined :: undefined | binary(), 88 timeout_ref = undefined :: undefined | reference(), 89 messages = undefined :: undefined | {atom(), atom(), atom()} 90 | {atom(), atom(), atom(), atom()}, 91 hibernate = false :: boolean(), 92 frag_state = undefined :: cow_ws:frag_state(), 93 frag_buffer = <<>> :: binary(), 94 utf8_state :: cow_ws:utf8_state(), 95 deflate = true :: boolean(), 96 extensions = #{} :: map(), 97 req = #{} :: map(), 98 shutdown_reason = normal :: any() 99 }). 100 101 %% Because the HTTP/1.1 and HTTP/2 handshakes are so different, 102 %% this function is necessary to figure out whether a request 103 %% is trying to upgrade to the Websocket protocol. 104 105 -spec is_upgrade_request(cowboy_req:req()) -> boolean(). 106 is_upgrade_request(#{version := 'HTTP/2', method := <<"CONNECT">>, protocol := Protocol}) -> 107 <<"websocket">> =:= cowboy_bstr:to_lower(Protocol); 108 is_upgrade_request(Req=#{version := 'HTTP/1.1', method := <<"GET">>}) -> 109 ConnTokens = cowboy_req:parse_header(<<"connection">>, Req, []), 110 case lists:member(<<"upgrade">>, ConnTokens) of 111 false -> 112 false; 113 true -> 114 UpgradeTokens = cowboy_req:parse_header(<<"upgrade">>, Req), 115 lists:member(<<"websocket">>, UpgradeTokens) 116 end; 117 is_upgrade_request(_) -> 118 false. 119 120 %% Stream process. 121 122 -spec upgrade(Req, Env, module(), any()) 123 -> {ok, Req, Env} 124 when Req::cowboy_req:req(), Env::cowboy_middleware:env(). 125 upgrade(Req, Env, Handler, HandlerState) -> 126 upgrade(Req, Env, Handler, HandlerState, #{}). 127 128 -spec upgrade(Req, Env, module(), any(), opts()) 129 -> {ok, Req, Env} 130 when Req::cowboy_req:req(), Env::cowboy_middleware:env(). 131 %% @todo Immediately crash if a response has already been sent. 132 upgrade(Req0=#{version := Version}, Env, Handler, HandlerState, Opts) -> 133 FilteredReq = case maps:get(req_filter, Opts, undefined) of 134 undefined -> maps:with([method, version, scheme, host, port, path, qs, peer], Req0); 135 FilterFun -> FilterFun(Req0) 136 end, 137 Utf8State = case maps:get(validate_utf8, Opts, true) of 138 true -> 0; 139 false -> undefined 140 end, 141 State0 = #state{opts=Opts, handler=Handler, utf8_state=Utf8State, req=FilteredReq}, 142 try websocket_upgrade(State0, Req0) of 143 {ok, State, Req} -> 144 websocket_handshake(State, Req, HandlerState, Env); 145 %% The status code 426 is specific to HTTP/1.1 connections. 146 {error, upgrade_required} when Version =:= 'HTTP/1.1' -> 147 {ok, cowboy_req:reply(426, #{ 148 <<"connection">> => <<"upgrade">>, 149 <<"upgrade">> => <<"websocket">> 150 }, Req0), Env}; 151 %% Use a generic 400 error for HTTP/2. 152 {error, upgrade_required} -> 153 {ok, cowboy_req:reply(400, Req0), Env} 154 catch _:_ -> 155 %% @todo Probably log something here? 156 %% @todo Test that we can have 2 /ws 400 status code in a row on the same connection. 157 %% @todo Does this even work? 158 {ok, cowboy_req:reply(400, Req0), Env} 159 end. 160 161 websocket_upgrade(State, Req=#{version := Version}) -> 162 case is_upgrade_request(Req) of 163 false -> 164 {error, upgrade_required}; 165 true when Version =:= 'HTTP/1.1' -> 166 Key = cowboy_req:header(<<"sec-websocket-key">>, Req), 167 false = Key =:= undefined, 168 websocket_version(State#state{key=Key}, Req); 169 true -> 170 websocket_version(State, Req) 171 end. 172 173 websocket_version(State, Req) -> 174 WsVersion = cowboy_req:parse_header(<<"sec-websocket-version">>, Req), 175 case WsVersion of 176 7 -> ok; 177 8 -> ok; 178 13 -> ok 179 end, 180 websocket_extensions(State, Req#{websocket_version => WsVersion}). 181 182 websocket_extensions(State=#state{opts=Opts}, Req) -> 183 %% @todo We want different options for this. For example 184 %% * compress everything auto 185 %% * compress only text auto 186 %% * compress only binary auto 187 %% * compress nothing auto (but still enabled it) 188 %% * disable compression 189 Compress = maps:get(compress, Opts, false), 190 case {Compress, cowboy_req:parse_header(<<"sec-websocket-extensions">>, Req)} of 191 {true, Extensions} when Extensions =/= undefined -> 192 websocket_extensions(State, Req, Extensions, []); 193 _ -> 194 {ok, State, Req} 195 end. 196 197 websocket_extensions(State, Req, [], []) -> 198 {ok, State, Req}; 199 websocket_extensions(State, Req, [], [<<", ">>|RespHeader]) -> 200 {ok, State, cowboy_req:set_resp_header(<<"sec-websocket-extensions">>, lists:reverse(RespHeader), Req)}; 201 %% For HTTP/2 we ARE on the controlling process and do NOT want to update the owner. 202 websocket_extensions(State=#state{opts=Opts, extensions=Extensions}, 203 Req=#{pid := Pid, version := Version}, 204 [{<<"permessage-deflate">>, Params}|Tail], RespHeader) -> 205 DeflateOpts0 = maps:get(deflate_opts, Opts, #{}), 206 DeflateOpts = case Version of 207 'HTTP/1.1' -> DeflateOpts0#{owner => Pid}; 208 _ -> DeflateOpts0 209 end, 210 try cow_ws:negotiate_permessage_deflate(Params, Extensions, DeflateOpts) of 211 {ok, RespExt, Extensions2} -> 212 websocket_extensions(State#state{extensions=Extensions2}, 213 Req, Tail, [<<", ">>, RespExt|RespHeader]); 214 ignore -> 215 websocket_extensions(State, Req, Tail, RespHeader) 216 catch exit:{error, incompatible_zlib_version, _} -> 217 websocket_extensions(State, Req, Tail, RespHeader) 218 end; 219 websocket_extensions(State=#state{opts=Opts, extensions=Extensions}, 220 Req=#{pid := Pid, version := Version}, 221 [{<<"x-webkit-deflate-frame">>, Params}|Tail], RespHeader) -> 222 DeflateOpts0 = maps:get(deflate_opts, Opts, #{}), 223 DeflateOpts = case Version of 224 'HTTP/1.1' -> DeflateOpts0#{owner => Pid}; 225 _ -> DeflateOpts0 226 end, 227 try cow_ws:negotiate_x_webkit_deflate_frame(Params, Extensions, DeflateOpts) of 228 {ok, RespExt, Extensions2} -> 229 websocket_extensions(State#state{extensions=Extensions2}, 230 Req, Tail, [<<", ">>, RespExt|RespHeader]); 231 ignore -> 232 websocket_extensions(State, Req, Tail, RespHeader) 233 catch exit:{error, incompatible_zlib_version, _} -> 234 websocket_extensions(State, Req, Tail, RespHeader) 235 end; 236 websocket_extensions(State, Req, [_|Tail], RespHeader) -> 237 websocket_extensions(State, Req, Tail, RespHeader). 238 239 -spec websocket_handshake(#state{}, Req, any(), Env) 240 -> {ok, Req, Env} 241 when Req::cowboy_req:req(), Env::cowboy_middleware:env(). 242 websocket_handshake(State=#state{key=Key}, 243 Req=#{version := 'HTTP/1.1', pid := Pid, streamid := StreamID}, 244 HandlerState, Env) -> 245 Challenge = base64:encode(crypto:hash(sha, 246 << Key/binary, "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" >>)), 247 %% @todo We don't want date and server headers. 248 Headers = cowboy_req:response_headers(#{ 249 <<"connection">> => <<"Upgrade">>, 250 <<"upgrade">> => <<"websocket">>, 251 <<"sec-websocket-accept">> => Challenge 252 }, Req), 253 Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {State, HandlerState}}}, 254 {ok, Req, Env}; 255 %% For HTTP/2 we do not let the process die, we instead keep it 256 %% for the Websocket stream. This is because in HTTP/2 we only 257 %% have a stream, it doesn't take over the whole connection. 258 websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID}, 259 HandlerState, _Env) -> 260 %% @todo We don't want date and server headers. 261 Headers = cowboy_req:response_headers(#{}, Req), 262 Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {State, HandlerState}}}, 263 takeover(Pid, Ref, {Pid, StreamID}, undefined, undefined, <<>>, 264 {State, HandlerState}). 265 266 %% Connection process. 267 268 -record(ps_header, { 269 buffer = <<>> :: binary() 270 }). 271 272 -record(ps_payload, { 273 type :: cow_ws:frame_type(), 274 len :: non_neg_integer(), 275 mask_key :: cow_ws:mask_key(), 276 rsv :: cow_ws:rsv(), 277 close_code = undefined :: undefined | cow_ws:close_code(), 278 unmasked = <<>> :: binary(), 279 unmasked_len = 0 :: non_neg_integer(), 280 buffer = <<>> :: binary() 281 }). 282 283 -type parse_state() :: #ps_header{} | #ps_payload{}. 284 285 -spec takeover(pid(), ranch:ref(), inet:socket() | {pid(), cowboy_stream:streamid()}, 286 module() | undefined, any(), binary(), 287 {#state{}, any()}) -> no_return(). 288 takeover(Parent, Ref, Socket, Transport, _Opts, Buffer, 289 {State0=#state{handler=Handler}, HandlerState}) -> 290 %% @todo We should have an option to disable this behavior. 291 ranch:remove_connection(Ref), 292 Messages = case Transport of 293 undefined -> undefined; 294 _ -> Transport:messages() 295 end, 296 State = loop_timeout(State0#state{parent=Parent, 297 ref=Ref, socket=Socket, transport=Transport, 298 key=undefined, messages=Messages}), 299 %% We call parse_header/3 immediately because there might be 300 %% some data in the buffer that was sent along with the handshake. 301 %% While it is not allowed by the protocol to send frames immediately, 302 %% we still want to process that data if any. 303 case erlang:function_exported(Handler, websocket_init, 1) of 304 true -> handler_call(State, HandlerState, #ps_header{buffer=Buffer}, 305 websocket_init, undefined, fun after_init/3); 306 false -> after_init(State, HandlerState, #ps_header{buffer=Buffer}) 307 end. 308 309 after_init(State=#state{active=true}, HandlerState, ParseState) -> 310 %% Enable active,N for HTTP/1.1, and auto read_body for HTTP/2. 311 %% We must do this only after calling websocket_init/1 (if any) 312 %% to give the handler a chance to disable active mode immediately. 313 setopts_active(State), 314 maybe_read_body(State), 315 parse_header(State, HandlerState, ParseState); 316 after_init(State, HandlerState, ParseState) -> 317 parse_header(State, HandlerState, ParseState). 318 319 %% We have two ways of reading the body for Websocket. For HTTP/1.1 320 %% we have full control of the socket and can therefore use active,N. 321 %% For HTTP/2 we are just a stream, and are instead using read_body 322 %% (automatic mode). Technically HTTP/2 will only go passive after 323 %% receiving the next data message, while HTTP/1.1 goes passive 324 %% immediately but there might still be data to be processed in 325 %% the message queue. 326 327 setopts_active(#state{transport=undefined}) -> 328 ok; 329 setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> 330 N = maps:get(active_n, Opts, 100), 331 Transport:setopts(Socket, [{active, N}]). 332 333 maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}) -> 334 %% @todo Keep Ref around. 335 ReadBodyRef = make_ref(), 336 Pid ! {Stream, {read_body, self(), ReadBodyRef, auto, infinity}}, 337 ok; 338 maybe_read_body(_) -> 339 ok. 340 341 active(State) -> 342 setopts_active(State), 343 maybe_read_body(State), 344 State#state{active=true}. 345 346 passive(State=#state{transport=undefined}) -> 347 %% Unfortunately we cannot currently cancel read_body. 348 %% But that's OK, we will just stop reading the body 349 %% after the next message. 350 State#state{active=false}; 351 passive(State=#state{socket=Socket, transport=Transport, messages=Messages}) -> 352 Transport:setopts(Socket, [{active, false}]), 353 flush_passive(Socket, Messages), 354 State#state{active=false}. 355 356 flush_passive(Socket, Messages) -> 357 receive 358 {Passive, Socket} when Passive =:= element(4, Messages); 359 %% Hardcoded for compatibility with Ranch 1.x. 360 Passive =:= tcp_passive; Passive =:= ssl_passive -> 361 flush_passive(Socket, Messages) 362 after 0 -> 363 ok 364 end. 365 366 before_loop(State=#state{hibernate=true}, HandlerState, ParseState) -> 367 proc_lib:hibernate(?MODULE, loop, 368 [State#state{hibernate=false}, HandlerState, ParseState]); 369 before_loop(State, HandlerState, ParseState) -> 370 loop(State, HandlerState, ParseState). 371 372 -spec loop_timeout(#state{}) -> #state{}. 373 loop_timeout(State=#state{opts=Opts, timeout_ref=PrevRef}) -> 374 _ = case PrevRef of 375 undefined -> ignore; 376 PrevRef -> erlang:cancel_timer(PrevRef) 377 end, 378 case maps:get(idle_timeout, Opts, 60000) of 379 infinity -> 380 State#state{timeout_ref=undefined}; 381 Timeout -> 382 TRef = erlang:start_timer(Timeout, self(), ?MODULE), 383 State#state{timeout_ref=TRef} 384 end. 385 386 -spec loop(#state{}, any(), parse_state()) -> no_return(). 387 loop(State=#state{parent=Parent, socket=Socket, messages=Messages, 388 timeout_ref=TRef}, HandlerState, ParseState) -> 389 receive 390 %% Socket messages. (HTTP/1.1) 391 {OK, Socket, Data} when OK =:= element(1, Messages) -> 392 State2 = loop_timeout(State), 393 parse(State2, HandlerState, ParseState, Data); 394 {Closed, Socket} when Closed =:= element(2, Messages) -> 395 terminate(State, HandlerState, {error, closed}); 396 {Error, Socket, Reason} when Error =:= element(3, Messages) -> 397 terminate(State, HandlerState, {error, Reason}); 398 {Passive, Socket} when Passive =:= element(4, Messages); 399 %% Hardcoded for compatibility with Ranch 1.x. 400 Passive =:= tcp_passive; Passive =:= ssl_passive -> 401 setopts_active(State), 402 loop(State, HandlerState, ParseState); 403 %% Body reading messages. (HTTP/2) 404 {request_body, _Ref, nofin, Data} -> 405 maybe_read_body(State), 406 State2 = loop_timeout(State), 407 parse(State2, HandlerState, ParseState, Data); 408 %% @todo We need to handle this case as if it was an {error, closed} 409 %% but not before we finish processing frames. We probably should have 410 %% a check in before_loop to let us stop looping if a flag is set. 411 {request_body, _Ref, fin, _, Data} -> 412 maybe_read_body(State), 413 State2 = loop_timeout(State), 414 parse(State2, HandlerState, ParseState, Data); 415 %% Timeouts. 416 {timeout, TRef, ?MODULE} -> 417 websocket_close(State, HandlerState, timeout); 418 {timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) -> 419 before_loop(State, HandlerState, ParseState); 420 %% System messages. 421 {'EXIT', Parent, Reason} -> 422 %% @todo We should exit gracefully. 423 exit(Reason); 424 {system, From, Request} -> 425 sys:handle_system_msg(Request, From, Parent, ?MODULE, [], 426 {State, HandlerState, ParseState}); 427 %% Calls from supervisor module. 428 {'$gen_call', From, Call} -> 429 cowboy_children:handle_supervisor_call(Call, From, [], ?MODULE), 430 before_loop(State, HandlerState, ParseState); 431 Message -> 432 handler_call(State, HandlerState, ParseState, 433 websocket_info, Message, fun before_loop/3) 434 end. 435 436 parse(State, HandlerState, PS=#ps_header{buffer=Buffer}, Data) -> 437 parse_header(State, HandlerState, PS#ps_header{ 438 buffer= <<Buffer/binary, Data/binary>>}); 439 parse(State, HandlerState, PS=#ps_payload{buffer=Buffer}, Data) -> 440 parse_payload(State, HandlerState, PS#ps_payload{buffer= <<>>}, 441 <<Buffer/binary, Data/binary>>). 442 443 parse_header(State=#state{opts=Opts, frag_state=FragState, extensions=Extensions}, 444 HandlerState, ParseState=#ps_header{buffer=Data}) -> 445 MaxFrameSize = maps:get(max_frame_size, Opts, infinity), 446 case cow_ws:parse_header(Data, Extensions, FragState) of 447 %% All frames sent from the client to the server are masked. 448 {_, _, _, _, undefined, _} -> 449 websocket_close(State, HandlerState, {error, badframe}); 450 {_, _, _, Len, _, _} when Len > MaxFrameSize -> 451 websocket_close(State, HandlerState, {error, badsize}); 452 {Type, FragState2, Rsv, Len, MaskKey, Rest} -> 453 parse_payload(State#state{frag_state=FragState2}, HandlerState, 454 #ps_payload{type=Type, len=Len, mask_key=MaskKey, rsv=Rsv}, Rest); 455 more -> 456 before_loop(State, HandlerState, ParseState); 457 error -> 458 websocket_close(State, HandlerState, {error, badframe}) 459 end. 460 461 parse_payload(State=#state{frag_state=FragState, utf8_state=Incomplete, extensions=Extensions}, 462 HandlerState, ParseState=#ps_payload{ 463 type=Type, len=Len, mask_key=MaskKey, rsv=Rsv, 464 unmasked=Unmasked, unmasked_len=UnmaskedLen}, Data) -> 465 case cow_ws:parse_payload(Data, MaskKey, Incomplete, UnmaskedLen, 466 Type, Len, FragState, Extensions, Rsv) of 467 {ok, CloseCode, Payload, Utf8State, Rest} -> 468 dispatch_frame(State#state{utf8_state=Utf8State}, HandlerState, 469 ParseState#ps_payload{unmasked= <<Unmasked/binary, Payload/binary>>, 470 close_code=CloseCode}, Rest); 471 {ok, Payload, Utf8State, Rest} -> 472 dispatch_frame(State#state{utf8_state=Utf8State}, HandlerState, 473 ParseState#ps_payload{unmasked= <<Unmasked/binary, Payload/binary>>}, 474 Rest); 475 {more, CloseCode, Payload, Utf8State} -> 476 before_loop(State#state{utf8_state=Utf8State}, HandlerState, 477 ParseState#ps_payload{len=Len - byte_size(Data), close_code=CloseCode, 478 unmasked= <<Unmasked/binary, Payload/binary>>, 479 unmasked_len=UnmaskedLen + byte_size(Data)}); 480 {more, Payload, Utf8State} -> 481 before_loop(State#state{utf8_state=Utf8State}, HandlerState, 482 ParseState#ps_payload{len=Len - byte_size(Data), 483 unmasked= <<Unmasked/binary, Payload/binary>>, 484 unmasked_len=UnmaskedLen + byte_size(Data)}); 485 Error = {error, _Reason} -> 486 websocket_close(State, HandlerState, Error) 487 end. 488 489 dispatch_frame(State=#state{opts=Opts, frag_state=FragState, frag_buffer=SoFar}, HandlerState, 490 #ps_payload{type=Type0, unmasked=Payload0, close_code=CloseCode0}, RemainingData) -> 491 MaxFrameSize = maps:get(max_frame_size, Opts, infinity), 492 case cow_ws:make_frame(Type0, Payload0, CloseCode0, FragState) of 493 %% @todo Allow receiving fragments. 494 {fragment, _, _, Payload} when byte_size(Payload) + byte_size(SoFar) > MaxFrameSize -> 495 websocket_close(State, HandlerState, {error, badsize}); 496 {fragment, nofin, _, Payload} -> 497 parse_header(State#state{frag_buffer= << SoFar/binary, Payload/binary >>}, 498 HandlerState, #ps_header{buffer=RemainingData}); 499 {fragment, fin, Type, Payload} -> 500 handler_call(State#state{frag_state=undefined, frag_buffer= <<>>}, HandlerState, 501 #ps_header{buffer=RemainingData}, 502 websocket_handle, {Type, << SoFar/binary, Payload/binary >>}, 503 fun parse_header/3); 504 close -> 505 websocket_close(State, HandlerState, remote); 506 {close, CloseCode, Payload} -> 507 websocket_close(State, HandlerState, {remote, CloseCode, Payload}); 508 Frame = ping -> 509 transport_send(State, nofin, frame(pong, State)), 510 handler_call(State, HandlerState, 511 #ps_header{buffer=RemainingData}, 512 websocket_handle, Frame, fun parse_header/3); 513 Frame = {ping, Payload} -> 514 transport_send(State, nofin, frame({pong, Payload}, State)), 515 handler_call(State, HandlerState, 516 #ps_header{buffer=RemainingData}, 517 websocket_handle, Frame, fun parse_header/3); 518 Frame -> 519 handler_call(State, HandlerState, 520 #ps_header{buffer=RemainingData}, 521 websocket_handle, Frame, fun parse_header/3) 522 end. 523 524 handler_call(State=#state{handler=Handler}, HandlerState, 525 ParseState, Callback, Message, NextState) -> 526 try case Callback of 527 websocket_init -> Handler:websocket_init(HandlerState); 528 _ -> Handler:Callback(Message, HandlerState) 529 end of 530 {Commands, HandlerState2} when is_list(Commands) -> 531 handler_call_result(State, 532 HandlerState2, ParseState, NextState, Commands); 533 {Commands, HandlerState2, hibernate} when is_list(Commands) -> 534 handler_call_result(State#state{hibernate=true}, 535 HandlerState2, ParseState, NextState, Commands); 536 %% The following call results are deprecated. 537 {ok, HandlerState2} -> 538 NextState(State, HandlerState2, ParseState); 539 {ok, HandlerState2, hibernate} -> 540 NextState(State#state{hibernate=true}, HandlerState2, ParseState); 541 {reply, Payload, HandlerState2} -> 542 case websocket_send(Payload, State) of 543 ok -> 544 NextState(State, HandlerState2, ParseState); 545 stop -> 546 terminate(State, HandlerState2, stop); 547 Error = {error, _} -> 548 terminate(State, HandlerState2, Error) 549 end; 550 {reply, Payload, HandlerState2, hibernate} -> 551 case websocket_send(Payload, State) of 552 ok -> 553 NextState(State#state{hibernate=true}, 554 HandlerState2, ParseState); 555 stop -> 556 terminate(State, HandlerState2, stop); 557 Error = {error, _} -> 558 terminate(State, HandlerState2, Error) 559 end; 560 {stop, HandlerState2} -> 561 websocket_close(State, HandlerState2, stop) 562 catch Class:Reason:Stacktrace -> 563 websocket_send_close(State, {crash, Class, Reason}), 564 handler_terminate(State, HandlerState, {crash, Class, Reason}), 565 erlang:raise(Class, Reason, Stacktrace) 566 end. 567 568 -spec handler_call_result(#state{}, any(), parse_state(), fun(), commands()) -> no_return(). 569 handler_call_result(State0, HandlerState, ParseState, NextState, Commands) -> 570 case commands(Commands, State0, []) of 571 {ok, State} -> 572 NextState(State, HandlerState, ParseState); 573 {stop, State} -> 574 terminate(State, HandlerState, stop); 575 {Error = {error, _}, State} -> 576 terminate(State, HandlerState, Error) 577 end. 578 579 commands([], State, []) -> 580 {ok, State}; 581 commands([], State, Data) -> 582 Result = transport_send(State, nofin, lists:reverse(Data)), 583 {Result, State}; 584 commands([{active, Active}|Tail], State0=#state{active=Active0}, Data) when is_boolean(Active) -> 585 State = if 586 Active, not Active0 -> 587 active(State0); 588 Active0, not Active -> 589 passive(State0); 590 true -> 591 State0 592 end, 593 commands(Tail, State#state{active=Active}, Data); 594 commands([{deflate, Deflate}|Tail], State, Data) when is_boolean(Deflate) -> 595 commands(Tail, State#state{deflate=Deflate}, Data); 596 commands([{set_options, SetOpts}|Tail], State0=#state{opts=Opts}, Data) -> 597 State = case SetOpts of 598 #{idle_timeout := IdleTimeout} -> 599 loop_timeout(State0#state{opts=Opts#{idle_timeout => IdleTimeout}}); 600 _ -> 601 State0 602 end, 603 commands(Tail, State, Data); 604 commands([{shutdown_reason, ShutdownReason}|Tail], State, Data) -> 605 commands(Tail, State#state{shutdown_reason=ShutdownReason}, Data); 606 commands([Frame|Tail], State, Data0) -> 607 Data = [frame(Frame, State)|Data0], 608 case is_close_frame(Frame) of 609 true -> 610 _ = transport_send(State, fin, lists:reverse(Data)), 611 {stop, State}; 612 false -> 613 commands(Tail, State, Data) 614 end. 615 616 transport_send(#state{socket=Stream={Pid, _}, transport=undefined}, IsFin, Data) -> 617 Pid ! {Stream, {data, IsFin, Data}}, 618 ok; 619 transport_send(#state{socket=Socket, transport=Transport}, _, Data) -> 620 Transport:send(Socket, Data). 621 622 -spec websocket_send(cow_ws:frame(), #state{}) -> ok | stop | {error, atom()}. 623 websocket_send(Frames, State) when is_list(Frames) -> 624 websocket_send_many(Frames, State, []); 625 websocket_send(Frame, State) -> 626 Data = frame(Frame, State), 627 case is_close_frame(Frame) of 628 true -> 629 _ = transport_send(State, fin, Data), 630 stop; 631 false -> 632 transport_send(State, nofin, Data) 633 end. 634 635 websocket_send_many([], State, Acc) -> 636 transport_send(State, nofin, lists:reverse(Acc)); 637 websocket_send_many([Frame|Tail], State, Acc0) -> 638 Acc = [frame(Frame, State)|Acc0], 639 case is_close_frame(Frame) of 640 true -> 641 _ = transport_send(State, fin, lists:reverse(Acc)), 642 stop; 643 false -> 644 websocket_send_many(Tail, State, Acc) 645 end. 646 647 is_close_frame(close) -> true; 648 is_close_frame({close, _}) -> true; 649 is_close_frame({close, _, _}) -> true; 650 is_close_frame(_) -> false. 651 652 -spec websocket_close(#state{}, any(), terminate_reason()) -> no_return(). 653 websocket_close(State, HandlerState, Reason) -> 654 websocket_send_close(State, Reason), 655 terminate(State, HandlerState, Reason). 656 657 websocket_send_close(State, Reason) -> 658 _ = case Reason of 659 Normal when Normal =:= stop; Normal =:= timeout -> 660 transport_send(State, fin, frame({close, 1000, <<>>}, State)); 661 {error, badframe} -> 662 transport_send(State, fin, frame({close, 1002, <<>>}, State)); 663 {error, badencoding} -> 664 transport_send(State, fin, frame({close, 1007, <<>>}, State)); 665 {error, badsize} -> 666 transport_send(State, fin, frame({close, 1009, <<>>}, State)); 667 {crash, _, _} -> 668 transport_send(State, fin, frame({close, 1011, <<>>}, State)); 669 remote -> 670 transport_send(State, fin, frame(close, State)); 671 {remote, Code, _} -> 672 transport_send(State, fin, frame({close, Code, <<>>}, State)) 673 end, 674 ok. 675 676 %% Don't compress frames while deflate is disabled. 677 frame(Frame, #state{deflate=false, extensions=Extensions}) -> 678 cow_ws:frame(Frame, Extensions#{deflate => false}); 679 frame(Frame, #state{extensions=Extensions}) -> 680 cow_ws:frame(Frame, Extensions). 681 682 -spec terminate(#state{}, any(), terminate_reason()) -> no_return(). 683 terminate(State=#state{shutdown_reason=Shutdown}, HandlerState, Reason) -> 684 handler_terminate(State, HandlerState, Reason), 685 case Shutdown of 686 normal -> exit(normal); 687 _ -> exit({shutdown, Shutdown}) 688 end. 689 690 handler_terminate(#state{handler=Handler, req=Req}, HandlerState, Reason) -> 691 cowboy_handler:terminate(Reason, Req, HandlerState, Handler). 692 693 %% System callbacks. 694 695 -spec system_continue(_, _, {#state{}, any(), parse_state()}) -> no_return(). 696 system_continue(_, _, {State, HandlerState, ParseState}) -> 697 loop(State, HandlerState, ParseState). 698 699 -spec system_terminate(any(), _, _, {#state{}, any(), parse_state()}) -> no_return(). 700 system_terminate(Reason, _, _, {State, HandlerState, _}) -> 701 %% @todo We should exit gracefully, if possible. 702 terminate(State, HandlerState, Reason). 703 704 -spec system_code_change(Misc, _, _, _) 705 -> {ok, Misc} when Misc::{#state{}, any(), parse_state()}. 706 system_code_change(Misc, _, _, _) -> 707 {ok, Misc}.