zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

cowboy_websocket.erl (27186B)


      1 %% Copyright (c) 2011-2017, Loïc Hoguin <essen@ninenines.eu>
      2 %%
      3 %% Permission to use, copy, modify, and/or distribute this software for any
      4 %% purpose with or without fee is hereby granted, provided that the above
      5 %% copyright notice and this permission notice appear in all copies.
      6 %%
      7 %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      8 %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
      9 %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     10 %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     11 %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     12 %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     13 %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     14 
     15 %% Cowboy supports versions 7 through 17 of the Websocket drafts.
     16 %% It also supports RFC6455, the proposed standard for Websocket.
     17 -module(cowboy_websocket).
     18 -behaviour(cowboy_sub_protocol).
     19 
     20 -export([is_upgrade_request/1]).
     21 -export([upgrade/4]).
     22 -export([upgrade/5]).
     23 -export([takeover/7]).
     24 -export([loop/3]).
     25 
     26 -export([system_continue/3]).
     27 -export([system_terminate/4]).
     28 -export([system_code_change/4]).
     29 
     30 -type commands() :: [cow_ws:frame()
     31 	| {active, boolean()}
     32 	| {deflate, boolean()}
     33 	| {set_options, map()}
     34 	| {shutdown_reason, any()}
     35 ].
     36 -export_type([commands/0]).
     37 
     38 -type call_result(State) :: {commands(), State} | {commands(), State, hibernate}.
     39 
     40 -type deprecated_call_result(State) :: {ok, State}
     41 	| {ok, State, hibernate}
     42 	| {reply, cow_ws:frame() | [cow_ws:frame()], State}
     43 	| {reply, cow_ws:frame() | [cow_ws:frame()], State, hibernate}
     44 	| {stop, State}.
     45 
     46 -type terminate_reason() :: normal | stop | timeout
     47 	| remote | {remote, cow_ws:close_code(), binary()}
     48 	| {error, badencoding | badframe | closed | atom()}
     49 	| {crash, error | exit | throw, any()}.
     50 
     51 -callback init(Req, any())
     52 	-> {ok | module(), Req, any()}
     53 	| {module(), Req, any(), any()}
     54 	when Req::cowboy_req:req().
     55 
     56 -callback websocket_init(State)
     57 	-> call_result(State) | deprecated_call_result(State) when State::any().
     58 -optional_callbacks([websocket_init/1]).
     59 
     60 -callback websocket_handle(ping | pong | {text | binary | ping | pong, binary()}, State)
     61 	-> call_result(State) | deprecated_call_result(State) when State::any().
     62 -callback websocket_info(any(), State)
     63 	-> call_result(State) | deprecated_call_result(State) when State::any().
     64 
     65 -callback terminate(any(), cowboy_req:req(), any()) -> ok.
     66 -optional_callbacks([terminate/3]).
     67 
     68 -type opts() :: #{
     69 	active_n => pos_integer(),
     70 	compress => boolean(),
     71 	deflate_opts => cow_ws:deflate_opts(),
     72 	idle_timeout => timeout(),
     73 	max_frame_size => non_neg_integer() | infinity,
     74 	req_filter => fun((cowboy_req:req()) -> map()),
     75 	validate_utf8 => boolean()
     76 }.
     77 -export_type([opts/0]).
     78 
     79 -record(state, {
     80 	parent :: undefined | pid(),
     81 	ref :: ranch:ref(),
     82 	socket = undefined :: inet:socket() | {pid(), cowboy_stream:streamid()} | undefined,
     83 	transport = undefined :: module() | undefined,
     84 	opts = #{} :: opts(),
     85 	active = true :: boolean(),
     86 	handler :: module(),
     87 	key = undefined :: undefined | binary(),
     88 	timeout_ref = undefined :: undefined | reference(),
     89 	messages = undefined :: undefined | {atom(), atom(), atom()}
     90 		| {atom(), atom(), atom(), atom()},
     91 	hibernate = false :: boolean(),
     92 	frag_state = undefined :: cow_ws:frag_state(),
     93 	frag_buffer = <<>> :: binary(),
     94 	utf8_state :: cow_ws:utf8_state(),
     95 	deflate = true :: boolean(),
     96 	extensions = #{} :: map(),
     97 	req = #{} :: map(),
     98 	shutdown_reason = normal :: any()
     99 }).
    100 
    101 %% Because the HTTP/1.1 and HTTP/2 handshakes are so different,
    102 %% this function is necessary to figure out whether a request
    103 %% is trying to upgrade to the Websocket protocol.
    104 
    105 -spec is_upgrade_request(cowboy_req:req()) -> boolean().
    106 is_upgrade_request(#{version := 'HTTP/2', method := <<"CONNECT">>, protocol := Protocol}) ->
    107 	<<"websocket">> =:= cowboy_bstr:to_lower(Protocol);
    108 is_upgrade_request(Req=#{version := 'HTTP/1.1', method := <<"GET">>}) ->
    109 	ConnTokens = cowboy_req:parse_header(<<"connection">>, Req, []),
    110 	case lists:member(<<"upgrade">>, ConnTokens) of
    111 		false ->
    112 			false;
    113 		true ->
    114 			UpgradeTokens = cowboy_req:parse_header(<<"upgrade">>, Req),
    115 			lists:member(<<"websocket">>, UpgradeTokens)
    116 	end;
    117 is_upgrade_request(_) ->
    118 	false.
    119 
    120 %% Stream process.
    121 
    122 -spec upgrade(Req, Env, module(), any())
    123 	-> {ok, Req, Env}
    124 	when Req::cowboy_req:req(), Env::cowboy_middleware:env().
    125 upgrade(Req, Env, Handler, HandlerState) ->
    126 	upgrade(Req, Env, Handler, HandlerState, #{}).
    127 
    128 -spec upgrade(Req, Env, module(), any(), opts())
    129 	-> {ok, Req, Env}
    130 	when Req::cowboy_req:req(), Env::cowboy_middleware:env().
    131 %% @todo Immediately crash if a response has already been sent.
    132 upgrade(Req0=#{version := Version}, Env, Handler, HandlerState, Opts) ->
    133 	FilteredReq = case maps:get(req_filter, Opts, undefined) of
    134 		undefined -> maps:with([method, version, scheme, host, port, path, qs, peer], Req0);
    135 		FilterFun -> FilterFun(Req0)
    136 	end,
    137 	Utf8State = case maps:get(validate_utf8, Opts, true) of
    138 		true -> 0;
    139 		false -> undefined
    140 	end,
    141 	State0 = #state{opts=Opts, handler=Handler, utf8_state=Utf8State, req=FilteredReq},
    142 	try websocket_upgrade(State0, Req0) of
    143 		{ok, State, Req} ->
    144 			websocket_handshake(State, Req, HandlerState, Env);
    145 		%% The status code 426 is specific to HTTP/1.1 connections.
    146 		{error, upgrade_required} when Version =:= 'HTTP/1.1' ->
    147 			{ok, cowboy_req:reply(426, #{
    148 				<<"connection">> => <<"upgrade">>,
    149 				<<"upgrade">> => <<"websocket">>
    150 			}, Req0), Env};
    151 		%% Use a generic 400 error for HTTP/2.
    152 		{error, upgrade_required} ->
    153 			{ok, cowboy_req:reply(400, Req0), Env}
    154 	catch _:_ ->
    155 		%% @todo Probably log something here?
    156 		%% @todo Test that we can have 2 /ws 400 status code in a row on the same connection.
    157 		%% @todo Does this even work?
    158 		{ok, cowboy_req:reply(400, Req0), Env}
    159 	end.
    160 
    161 websocket_upgrade(State, Req=#{version := Version}) ->
    162 	case is_upgrade_request(Req) of
    163 		false ->
    164 			{error, upgrade_required};
    165 		true when Version =:= 'HTTP/1.1' ->
    166 			Key = cowboy_req:header(<<"sec-websocket-key">>, Req),
    167 			false = Key =:= undefined,
    168 			websocket_version(State#state{key=Key}, Req);
    169 		true ->
    170 			websocket_version(State, Req)
    171 	end.
    172 
    173 websocket_version(State, Req) ->
    174 	WsVersion = cowboy_req:parse_header(<<"sec-websocket-version">>, Req),
    175 	case WsVersion of
    176 		7 -> ok;
    177 		8 -> ok;
    178 		13 -> ok
    179 	end,
    180 	websocket_extensions(State, Req#{websocket_version => WsVersion}).
    181 
    182 websocket_extensions(State=#state{opts=Opts}, Req) ->
    183 	%% @todo We want different options for this. For example
    184 	%% * compress everything auto
    185 	%% * compress only text auto
    186 	%% * compress only binary auto
    187 	%% * compress nothing auto (but still enabled it)
    188 	%% * disable compression
    189 	Compress = maps:get(compress, Opts, false),
    190 	case {Compress, cowboy_req:parse_header(<<"sec-websocket-extensions">>, Req)} of
    191 		{true, Extensions} when Extensions =/= undefined ->
    192 			websocket_extensions(State, Req, Extensions, []);
    193 		_ ->
    194 			{ok, State, Req}
    195 	end.
    196 
    197 websocket_extensions(State, Req, [], []) ->
    198 	{ok, State, Req};
    199 websocket_extensions(State, Req, [], [<<", ">>|RespHeader]) ->
    200 	{ok, State, cowboy_req:set_resp_header(<<"sec-websocket-extensions">>, lists:reverse(RespHeader), Req)};
    201 %% For HTTP/2 we ARE on the controlling process and do NOT want to update the owner.
    202 websocket_extensions(State=#state{opts=Opts, extensions=Extensions},
    203 		Req=#{pid := Pid, version := Version},
    204 		[{<<"permessage-deflate">>, Params}|Tail], RespHeader) ->
    205 	DeflateOpts0 = maps:get(deflate_opts, Opts, #{}),
    206 	DeflateOpts = case Version of
    207 		'HTTP/1.1' -> DeflateOpts0#{owner => Pid};
    208 		_ -> DeflateOpts0
    209 	end,
    210 	try cow_ws:negotiate_permessage_deflate(Params, Extensions, DeflateOpts) of
    211 		{ok, RespExt, Extensions2} ->
    212 			websocket_extensions(State#state{extensions=Extensions2},
    213 				Req, Tail, [<<", ">>, RespExt|RespHeader]);
    214 		ignore ->
    215 			websocket_extensions(State, Req, Tail, RespHeader)
    216 	catch exit:{error, incompatible_zlib_version, _} ->
    217 		websocket_extensions(State, Req, Tail, RespHeader)
    218 	end;
    219 websocket_extensions(State=#state{opts=Opts, extensions=Extensions},
    220 		Req=#{pid := Pid, version := Version},
    221 		[{<<"x-webkit-deflate-frame">>, Params}|Tail], RespHeader) ->
    222 	DeflateOpts0 = maps:get(deflate_opts, Opts, #{}),
    223 	DeflateOpts = case Version of
    224 		'HTTP/1.1' -> DeflateOpts0#{owner => Pid};
    225 		_ -> DeflateOpts0
    226 	end,
    227 	try cow_ws:negotiate_x_webkit_deflate_frame(Params, Extensions, DeflateOpts) of
    228 		{ok, RespExt, Extensions2} ->
    229 			websocket_extensions(State#state{extensions=Extensions2},
    230 				Req, Tail, [<<", ">>, RespExt|RespHeader]);
    231 		ignore ->
    232 			websocket_extensions(State, Req, Tail, RespHeader)
    233 	catch exit:{error, incompatible_zlib_version, _} ->
    234 		websocket_extensions(State, Req, Tail, RespHeader)
    235 	end;
    236 websocket_extensions(State, Req, [_|Tail], RespHeader) ->
    237 	websocket_extensions(State, Req, Tail, RespHeader).
    238 
    239 -spec websocket_handshake(#state{}, Req, any(), Env)
    240 	-> {ok, Req, Env}
    241 	when Req::cowboy_req:req(), Env::cowboy_middleware:env().
    242 websocket_handshake(State=#state{key=Key},
    243 		Req=#{version := 'HTTP/1.1', pid := Pid, streamid := StreamID},
    244 		HandlerState, Env) ->
    245 	Challenge = base64:encode(crypto:hash(sha,
    246 		<< Key/binary, "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" >>)),
    247 	%% @todo We don't want date and server headers.
    248 	Headers = cowboy_req:response_headers(#{
    249 		<<"connection">> => <<"Upgrade">>,
    250 		<<"upgrade">> => <<"websocket">>,
    251 		<<"sec-websocket-accept">> => Challenge
    252 	}, Req),
    253 	Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {State, HandlerState}}},
    254 	{ok, Req, Env};
    255 %% For HTTP/2 we do not let the process die, we instead keep it
    256 %% for the Websocket stream. This is because in HTTP/2 we only
    257 %% have a stream, it doesn't take over the whole connection.
    258 websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID},
    259 		HandlerState, _Env) ->
    260 	%% @todo We don't want date and server headers.
    261 	Headers = cowboy_req:response_headers(#{}, Req),
    262 	Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {State, HandlerState}}},
    263 	takeover(Pid, Ref, {Pid, StreamID}, undefined, undefined, <<>>,
    264 		{State, HandlerState}).
    265 
    266 %% Connection process.
    267 
    268 -record(ps_header, {
    269 	buffer = <<>> :: binary()
    270 }).
    271 
    272 -record(ps_payload, {
    273 	type :: cow_ws:frame_type(),
    274 	len :: non_neg_integer(),
    275 	mask_key :: cow_ws:mask_key(),
    276 	rsv :: cow_ws:rsv(),
    277 	close_code = undefined :: undefined | cow_ws:close_code(),
    278 	unmasked = <<>> :: binary(),
    279 	unmasked_len = 0 :: non_neg_integer(),
    280 	buffer = <<>> :: binary()
    281 }).
    282 
    283 -type parse_state() :: #ps_header{} | #ps_payload{}.
    284 
    285 -spec takeover(pid(), ranch:ref(), inet:socket() | {pid(), cowboy_stream:streamid()},
    286 	module() | undefined, any(), binary(),
    287 	{#state{}, any()}) -> no_return().
    288 takeover(Parent, Ref, Socket, Transport, _Opts, Buffer,
    289 		{State0=#state{handler=Handler}, HandlerState}) ->
    290 	%% @todo We should have an option to disable this behavior.
    291 	ranch:remove_connection(Ref),
    292 	Messages = case Transport of
    293 		undefined -> undefined;
    294 		_ -> Transport:messages()
    295 	end,
    296 	State = loop_timeout(State0#state{parent=Parent,
    297 		ref=Ref, socket=Socket, transport=Transport,
    298 		key=undefined, messages=Messages}),
    299 	%% We call parse_header/3 immediately because there might be
    300 	%% some data in the buffer that was sent along with the handshake.
    301 	%% While it is not allowed by the protocol to send frames immediately,
    302 	%% we still want to process that data if any.
    303 	case erlang:function_exported(Handler, websocket_init, 1) of
    304 		true -> handler_call(State, HandlerState, #ps_header{buffer=Buffer},
    305 			websocket_init, undefined, fun after_init/3);
    306 		false -> after_init(State, HandlerState, #ps_header{buffer=Buffer})
    307 	end.
    308 
    309 after_init(State=#state{active=true}, HandlerState, ParseState) ->
    310 	%% Enable active,N for HTTP/1.1, and auto read_body for HTTP/2.
    311 	%% We must do this only after calling websocket_init/1 (if any)
    312 	%% to give the handler a chance to disable active mode immediately.
    313 	setopts_active(State),
    314 	maybe_read_body(State),
    315 	parse_header(State, HandlerState, ParseState);
    316 after_init(State, HandlerState, ParseState) ->
    317 	parse_header(State, HandlerState, ParseState).
    318 
    319 %% We have two ways of reading the body for Websocket. For HTTP/1.1
    320 %% we have full control of the socket and can therefore use active,N.
    321 %% For HTTP/2 we are just a stream, and are instead using read_body
    322 %% (automatic mode). Technically HTTP/2 will only go passive after
    323 %% receiving the next data message, while HTTP/1.1 goes passive
    324 %% immediately but there might still be data to be processed in
    325 %% the message queue.
    326 
    327 setopts_active(#state{transport=undefined}) ->
    328 	ok;
    329 setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
    330 	N = maps:get(active_n, Opts, 100),
    331 	Transport:setopts(Socket, [{active, N}]).
    332 
    333 maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}) ->
    334 	%% @todo Keep Ref around.
    335 	ReadBodyRef = make_ref(),
    336 	Pid ! {Stream, {read_body, self(), ReadBodyRef, auto, infinity}},
    337 	ok;
    338 maybe_read_body(_) ->
    339 	ok.
    340 
    341 active(State) ->
    342 	setopts_active(State),
    343 	maybe_read_body(State),
    344 	State#state{active=true}.
    345 
    346 passive(State=#state{transport=undefined}) ->
    347 	%% Unfortunately we cannot currently cancel read_body.
    348 	%% But that's OK, we will just stop reading the body
    349 	%% after the next message.
    350 	State#state{active=false};
    351 passive(State=#state{socket=Socket, transport=Transport, messages=Messages}) ->
    352 	Transport:setopts(Socket, [{active, false}]),
    353 	flush_passive(Socket, Messages),
    354 	State#state{active=false}.
    355 
    356 flush_passive(Socket, Messages) ->
    357 	receive
    358 		{Passive, Socket} when Passive =:= element(4, Messages);
    359 				%% Hardcoded for compatibility with Ranch 1.x.
    360 				Passive =:= tcp_passive; Passive =:= ssl_passive ->
    361 			flush_passive(Socket, Messages)
    362 	after 0 ->
    363 		ok
    364 	end.
    365 
    366 before_loop(State=#state{hibernate=true}, HandlerState, ParseState) ->
    367 	proc_lib:hibernate(?MODULE, loop,
    368 		[State#state{hibernate=false}, HandlerState, ParseState]);
    369 before_loop(State, HandlerState, ParseState) ->
    370 	loop(State, HandlerState, ParseState).
    371 
    372 -spec loop_timeout(#state{}) -> #state{}.
    373 loop_timeout(State=#state{opts=Opts, timeout_ref=PrevRef}) ->
    374 	_ = case PrevRef of
    375 		undefined -> ignore;
    376 		PrevRef -> erlang:cancel_timer(PrevRef)
    377 	end,
    378 	case maps:get(idle_timeout, Opts, 60000) of
    379 		infinity ->
    380 			State#state{timeout_ref=undefined};
    381 		Timeout ->
    382 			TRef = erlang:start_timer(Timeout, self(), ?MODULE),
    383 			State#state{timeout_ref=TRef}
    384 	end.
    385 
    386 -spec loop(#state{}, any(), parse_state()) -> no_return().
    387 loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
    388 		timeout_ref=TRef}, HandlerState, ParseState) ->
    389 	receive
    390 		%% Socket messages. (HTTP/1.1)
    391 		{OK, Socket, Data} when OK =:= element(1, Messages) ->
    392 			State2 = loop_timeout(State),
    393 			parse(State2, HandlerState, ParseState, Data);
    394 		{Closed, Socket} when Closed =:= element(2, Messages) ->
    395 			terminate(State, HandlerState, {error, closed});
    396 		{Error, Socket, Reason} when Error =:= element(3, Messages) ->
    397 			terminate(State, HandlerState, {error, Reason});
    398 		{Passive, Socket} when Passive =:= element(4, Messages);
    399 				%% Hardcoded for compatibility with Ranch 1.x.
    400 				Passive =:= tcp_passive; Passive =:= ssl_passive ->
    401 			setopts_active(State),
    402 			loop(State, HandlerState, ParseState);
    403 		%% Body reading messages. (HTTP/2)
    404 		{request_body, _Ref, nofin, Data} ->
    405 			maybe_read_body(State),
    406 			State2 = loop_timeout(State),
    407 			parse(State2, HandlerState, ParseState, Data);
    408 		%% @todo We need to handle this case as if it was an {error, closed}
    409 		%% but not before we finish processing frames. We probably should have
    410 		%% a check in before_loop to let us stop looping if a flag is set.
    411 		{request_body, _Ref, fin, _, Data} ->
    412 			maybe_read_body(State),
    413 			State2 = loop_timeout(State),
    414 			parse(State2, HandlerState, ParseState, Data);
    415 		%% Timeouts.
    416 		{timeout, TRef, ?MODULE} ->
    417 			websocket_close(State, HandlerState, timeout);
    418 		{timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) ->
    419 			before_loop(State, HandlerState, ParseState);
    420 		%% System messages.
    421 		{'EXIT', Parent, Reason} ->
    422 			%% @todo We should exit gracefully.
    423 			exit(Reason);
    424 		{system, From, Request} ->
    425 			sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
    426 				{State, HandlerState, ParseState});
    427 		%% Calls from supervisor module.
    428 		{'$gen_call', From, Call} ->
    429 			cowboy_children:handle_supervisor_call(Call, From, [], ?MODULE),
    430 			before_loop(State, HandlerState, ParseState);
    431 		Message ->
    432 			handler_call(State, HandlerState, ParseState,
    433 				websocket_info, Message, fun before_loop/3)
    434 	end.
    435 
    436 parse(State, HandlerState, PS=#ps_header{buffer=Buffer}, Data) ->
    437 	parse_header(State, HandlerState, PS#ps_header{
    438 		buffer= <<Buffer/binary, Data/binary>>});
    439 parse(State, HandlerState, PS=#ps_payload{buffer=Buffer}, Data) ->
    440 	parse_payload(State, HandlerState, PS#ps_payload{buffer= <<>>},
    441 		<<Buffer/binary, Data/binary>>).
    442 
    443 parse_header(State=#state{opts=Opts, frag_state=FragState, extensions=Extensions},
    444 		HandlerState, ParseState=#ps_header{buffer=Data}) ->
    445 	MaxFrameSize = maps:get(max_frame_size, Opts, infinity),
    446 	case cow_ws:parse_header(Data, Extensions, FragState) of
    447 		%% All frames sent from the client to the server are masked.
    448 		{_, _, _, _, undefined, _} ->
    449 			websocket_close(State, HandlerState, {error, badframe});
    450 		{_, _, _, Len, _, _} when Len > MaxFrameSize ->
    451 			websocket_close(State, HandlerState, {error, badsize});
    452 		{Type, FragState2, Rsv, Len, MaskKey, Rest} ->
    453 			parse_payload(State#state{frag_state=FragState2}, HandlerState,
    454 				#ps_payload{type=Type, len=Len, mask_key=MaskKey, rsv=Rsv}, Rest);
    455 		more ->
    456 			before_loop(State, HandlerState, ParseState);
    457 		error ->
    458 			websocket_close(State, HandlerState, {error, badframe})
    459 	end.
    460 
    461 parse_payload(State=#state{frag_state=FragState, utf8_state=Incomplete, extensions=Extensions},
    462 		HandlerState, ParseState=#ps_payload{
    463 			type=Type, len=Len, mask_key=MaskKey, rsv=Rsv,
    464 			unmasked=Unmasked, unmasked_len=UnmaskedLen}, Data) ->
    465 	case cow_ws:parse_payload(Data, MaskKey, Incomplete, UnmaskedLen,
    466 			Type, Len, FragState, Extensions, Rsv) of
    467 		{ok, CloseCode, Payload, Utf8State, Rest} ->
    468 			dispatch_frame(State#state{utf8_state=Utf8State}, HandlerState,
    469 				ParseState#ps_payload{unmasked= <<Unmasked/binary, Payload/binary>>,
    470 					close_code=CloseCode}, Rest);
    471 		{ok, Payload, Utf8State, Rest} ->
    472 			dispatch_frame(State#state{utf8_state=Utf8State}, HandlerState,
    473 				ParseState#ps_payload{unmasked= <<Unmasked/binary, Payload/binary>>},
    474 				Rest);
    475 		{more, CloseCode, Payload, Utf8State} ->
    476 			before_loop(State#state{utf8_state=Utf8State}, HandlerState,
    477 				ParseState#ps_payload{len=Len - byte_size(Data), close_code=CloseCode,
    478 					unmasked= <<Unmasked/binary, Payload/binary>>,
    479 					unmasked_len=UnmaskedLen + byte_size(Data)});
    480 		{more, Payload, Utf8State} ->
    481 			before_loop(State#state{utf8_state=Utf8State}, HandlerState,
    482 				ParseState#ps_payload{len=Len - byte_size(Data),
    483 					unmasked= <<Unmasked/binary, Payload/binary>>,
    484 					unmasked_len=UnmaskedLen + byte_size(Data)});
    485 		Error = {error, _Reason} ->
    486 			websocket_close(State, HandlerState, Error)
    487 	end.
    488 
    489 dispatch_frame(State=#state{opts=Opts, frag_state=FragState, frag_buffer=SoFar}, HandlerState,
    490 		#ps_payload{type=Type0, unmasked=Payload0, close_code=CloseCode0}, RemainingData) ->
    491 	MaxFrameSize = maps:get(max_frame_size, Opts, infinity),
    492 	case cow_ws:make_frame(Type0, Payload0, CloseCode0, FragState) of
    493 		%% @todo Allow receiving fragments.
    494 		{fragment, _, _, Payload} when byte_size(Payload) + byte_size(SoFar) > MaxFrameSize ->
    495 			websocket_close(State, HandlerState, {error, badsize});
    496 		{fragment, nofin, _, Payload} ->
    497 			parse_header(State#state{frag_buffer= << SoFar/binary, Payload/binary >>},
    498 				HandlerState, #ps_header{buffer=RemainingData});
    499 		{fragment, fin, Type, Payload} ->
    500 			handler_call(State#state{frag_state=undefined, frag_buffer= <<>>}, HandlerState,
    501 				#ps_header{buffer=RemainingData},
    502 				websocket_handle, {Type, << SoFar/binary, Payload/binary >>},
    503 				fun parse_header/3);
    504 		close ->
    505 			websocket_close(State, HandlerState, remote);
    506 		{close, CloseCode, Payload} ->
    507 			websocket_close(State, HandlerState, {remote, CloseCode, Payload});
    508 		Frame = ping ->
    509 			transport_send(State, nofin, frame(pong, State)),
    510 			handler_call(State, HandlerState,
    511 				#ps_header{buffer=RemainingData},
    512 				websocket_handle, Frame, fun parse_header/3);
    513 		Frame = {ping, Payload} ->
    514 			transport_send(State, nofin, frame({pong, Payload}, State)),
    515 			handler_call(State, HandlerState,
    516 				#ps_header{buffer=RemainingData},
    517 				websocket_handle, Frame, fun parse_header/3);
    518 		Frame ->
    519 			handler_call(State, HandlerState,
    520 				#ps_header{buffer=RemainingData},
    521 				websocket_handle, Frame, fun parse_header/3)
    522 	end.
    523 
    524 handler_call(State=#state{handler=Handler}, HandlerState,
    525 		ParseState, Callback, Message, NextState) ->
    526 	try case Callback of
    527 		websocket_init -> Handler:websocket_init(HandlerState);
    528 		_ -> Handler:Callback(Message, HandlerState)
    529 	end of
    530 		{Commands, HandlerState2} when is_list(Commands) ->
    531 			handler_call_result(State,
    532 				HandlerState2, ParseState, NextState, Commands);
    533 		{Commands, HandlerState2, hibernate} when is_list(Commands) ->
    534 			handler_call_result(State#state{hibernate=true},
    535 				HandlerState2, ParseState, NextState, Commands);
    536 		%% The following call results are deprecated.
    537 		{ok, HandlerState2} ->
    538 			NextState(State, HandlerState2, ParseState);
    539 		{ok, HandlerState2, hibernate} ->
    540 			NextState(State#state{hibernate=true}, HandlerState2, ParseState);
    541 		{reply, Payload, HandlerState2} ->
    542 			case websocket_send(Payload, State) of
    543 				ok ->
    544 					NextState(State, HandlerState2, ParseState);
    545 				stop ->
    546 					terminate(State, HandlerState2, stop);
    547 				Error = {error, _} ->
    548 					terminate(State, HandlerState2, Error)
    549 			end;
    550 		{reply, Payload, HandlerState2, hibernate} ->
    551 			case websocket_send(Payload, State) of
    552 				ok ->
    553 					NextState(State#state{hibernate=true},
    554 						HandlerState2, ParseState);
    555 				stop ->
    556 					terminate(State, HandlerState2, stop);
    557 				Error = {error, _} ->
    558 					terminate(State, HandlerState2, Error)
    559 			end;
    560 		{stop, HandlerState2} ->
    561 			websocket_close(State, HandlerState2, stop)
    562 	catch Class:Reason:Stacktrace ->
    563 		websocket_send_close(State, {crash, Class, Reason}),
    564 		handler_terminate(State, HandlerState, {crash, Class, Reason}),
    565 		erlang:raise(Class, Reason, Stacktrace)
    566 	end.
    567 
    568 -spec handler_call_result(#state{}, any(), parse_state(), fun(), commands()) -> no_return().
    569 handler_call_result(State0, HandlerState, ParseState, NextState, Commands) ->
    570 	case commands(Commands, State0, []) of
    571 		{ok, State} ->
    572 			NextState(State, HandlerState, ParseState);
    573 		{stop, State} ->
    574 			terminate(State, HandlerState, stop);
    575 		{Error = {error, _}, State} ->
    576 			terminate(State, HandlerState, Error)
    577 	end.
    578 
    579 commands([], State, []) ->
    580 	{ok, State};
    581 commands([], State, Data) ->
    582 	Result = transport_send(State, nofin, lists:reverse(Data)),
    583 	{Result, State};
    584 commands([{active, Active}|Tail], State0=#state{active=Active0}, Data) when is_boolean(Active) ->
    585 	State = if
    586 		Active, not Active0 ->
    587 			active(State0);
    588 		Active0, not Active ->
    589 			passive(State0);
    590 		true ->
    591 			State0
    592 	end,
    593 	commands(Tail, State#state{active=Active}, Data);
    594 commands([{deflate, Deflate}|Tail], State, Data) when is_boolean(Deflate) ->
    595 	commands(Tail, State#state{deflate=Deflate}, Data);
    596 commands([{set_options, SetOpts}|Tail], State0=#state{opts=Opts}, Data) ->
    597 	State = case SetOpts of
    598 		#{idle_timeout := IdleTimeout} ->
    599 			loop_timeout(State0#state{opts=Opts#{idle_timeout => IdleTimeout}});
    600 		_ ->
    601 			State0
    602 	end,
    603 	commands(Tail, State, Data);
    604 commands([{shutdown_reason, ShutdownReason}|Tail], State, Data) ->
    605 	commands(Tail, State#state{shutdown_reason=ShutdownReason}, Data);
    606 commands([Frame|Tail], State, Data0) ->
    607 	Data = [frame(Frame, State)|Data0],
    608 	case is_close_frame(Frame) of
    609 		true ->
    610 			_ = transport_send(State, fin, lists:reverse(Data)),
    611 			{stop, State};
    612 		false ->
    613 			commands(Tail, State, Data)
    614 	end.
    615 
    616 transport_send(#state{socket=Stream={Pid, _}, transport=undefined}, IsFin, Data) ->
    617 	Pid ! {Stream, {data, IsFin, Data}},
    618 	ok;
    619 transport_send(#state{socket=Socket, transport=Transport}, _, Data) ->
    620 	Transport:send(Socket, Data).
    621 
    622 -spec websocket_send(cow_ws:frame(), #state{}) -> ok | stop | {error, atom()}.
    623 websocket_send(Frames, State) when is_list(Frames) ->
    624 	websocket_send_many(Frames, State, []);
    625 websocket_send(Frame, State) ->
    626 	Data = frame(Frame, State),
    627 	case is_close_frame(Frame) of
    628 		true ->
    629 			_ = transport_send(State, fin, Data),
    630 			stop;
    631 		false ->
    632 			transport_send(State, nofin, Data)
    633 	end.
    634 
    635 websocket_send_many([], State, Acc) ->
    636 	transport_send(State, nofin, lists:reverse(Acc));
    637 websocket_send_many([Frame|Tail], State, Acc0) ->
    638 	Acc = [frame(Frame, State)|Acc0],
    639 	case is_close_frame(Frame) of
    640 		true ->
    641 			_ = transport_send(State, fin, lists:reverse(Acc)),
    642 			stop;
    643 		false ->
    644 			websocket_send_many(Tail, State, Acc)
    645 	end.
    646 
    647 is_close_frame(close) -> true;
    648 is_close_frame({close, _}) -> true;
    649 is_close_frame({close, _, _}) -> true;
    650 is_close_frame(_) -> false.
    651 
    652 -spec websocket_close(#state{}, any(), terminate_reason()) -> no_return().
    653 websocket_close(State, HandlerState, Reason) ->
    654 	websocket_send_close(State, Reason),
    655 	terminate(State, HandlerState, Reason).
    656 
    657 websocket_send_close(State, Reason) ->
    658 	_ = case Reason of
    659 		Normal when Normal =:= stop; Normal =:= timeout ->
    660 			transport_send(State, fin, frame({close, 1000, <<>>}, State));
    661 		{error, badframe} ->
    662 			transport_send(State, fin, frame({close, 1002, <<>>}, State));
    663 		{error, badencoding} ->
    664 			transport_send(State, fin, frame({close, 1007, <<>>}, State));
    665 		{error, badsize} ->
    666 			transport_send(State, fin, frame({close, 1009, <<>>}, State));
    667 		{crash, _, _} ->
    668 			transport_send(State, fin, frame({close, 1011, <<>>}, State));
    669 		remote ->
    670 			transport_send(State, fin, frame(close, State));
    671 		{remote, Code, _} ->
    672 			transport_send(State, fin, frame({close, Code, <<>>}, State))
    673 	end,
    674 	ok.
    675 
    676 %% Don't compress frames while deflate is disabled.
    677 frame(Frame, #state{deflate=false, extensions=Extensions}) ->
    678 	cow_ws:frame(Frame, Extensions#{deflate => false});
    679 frame(Frame, #state{extensions=Extensions}) ->
    680 	cow_ws:frame(Frame, Extensions).
    681 
    682 -spec terminate(#state{}, any(), terminate_reason()) -> no_return().
    683 terminate(State=#state{shutdown_reason=Shutdown}, HandlerState, Reason) ->
    684 	handler_terminate(State, HandlerState, Reason),
    685 	case Shutdown of
    686 		normal -> exit(normal);
    687 		_ -> exit({shutdown, Shutdown})
    688 	end.
    689 
    690 handler_terminate(#state{handler=Handler, req=Req}, HandlerState, Reason) ->
    691 	cowboy_handler:terminate(Reason, Req, HandlerState, Handler).
    692 
    693 %% System callbacks.
    694 
    695 -spec system_continue(_, _, {#state{}, any(), parse_state()}) -> no_return().
    696 system_continue(_, _, {State, HandlerState, ParseState}) ->
    697 	loop(State, HandlerState, ParseState).
    698 
    699 -spec system_terminate(any(), _, _, {#state{}, any(), parse_state()}) -> no_return().
    700 system_terminate(Reason, _, _, {State, HandlerState, _}) ->
    701 	%% @todo We should exit gracefully, if possible.
    702 	terminate(State, HandlerState, Reason).
    703 
    704 -spec system_code_change(Misc, _, _, _)
    705 	-> {ok, Misc} when Misc::{#state{}, any(), parse_state()}.
    706 system_code_change(Misc, _, _, _) ->
    707 	{ok, Misc}.