protocol.ex (111061B)
1 defmodule Postgrex.Protocol do 2 @moduledoc false 3 4 alias Postgrex.{Types, TypeServer, Query, Cursor, Copy} 5 import Postgrex.{Messages, BinaryUtils} 6 require Logger 7 use DBConnection 8 9 @timeout 15_000 10 @sock_opts [packet: :raw, mode: :binary, active: false] 11 @max_packet 64 * 1024 * 1024 12 @nonposix_errors [:closed, :timeout] 13 @max_rows 500 14 @text_type_oid 25 15 16 defstruct sock: nil, 17 connection_id: nil, 18 connection_key: nil, 19 peer: nil, 20 types: nil, 21 null: nil, 22 timeout: nil, 23 ping_timeout: nil, 24 parameters: %{}, 25 queries: nil, 26 postgres: :idle, 27 transactions: :strict, 28 buffer: nil, 29 disconnect_on_error_codes: [] 30 31 @type state :: %__MODULE__{ 32 sock: {module, any}, 33 connection_id: nil | pos_integer, 34 connection_key: nil | pos_integer, 35 peer: nil | {:inet.ip_address(), :inet.port_number()}, 36 types: nil | module, 37 null: atom, 38 timeout: timeout, 39 ping_timeout: timeout, 40 parameters: %{binary => binary} | reference, 41 queries: nil | :ets.tid(), 42 postgres: DBConnection.status() | {DBConnection.status(), reference}, 43 transactions: :strict | :naive, 44 buffer: nil | binary | :active_once, 45 disconnect_on_error_codes: [atom()] 46 } 47 48 @type notify :: (binary, binary -> any) 49 50 defmacrop new_status(opts, fields \\ []) do 51 defaults = 52 quote( 53 do: [ 54 notify: notify(unquote(opts)), 55 mode: mode(unquote(opts)), 56 messages: [], 57 prepare: false 58 ] 59 ) 60 61 {:%{}, [], Keyword.merge(defaults, fields)} 62 end 63 64 @impl true 65 @spec connect(Keyword.t()) :: 66 {:ok, state} 67 | {:error, Postgrex.Error.t() | %DBConnection.ConnectionError{}} 68 def connect(opts) do 69 endpoints = endpoints(opts) 70 71 timeout = opts[:timeout] || @timeout 72 ping_timeout = Keyword.get(opts, :ping_timeout, timeout) 73 sock_opts = [send_timeout: timeout] ++ (opts[:socket_options] || []) 74 ssl? = opts[:ssl] || false 75 types_mod = Keyword.fetch!(opts, :types) 76 disconnect_on_error_codes = opts[:disconnect_on_error_codes] || [] 77 target_server_type = opts[:target_server_type] || :any 78 79 transactions = 80 case opts[:transactions] || :naive do 81 :naive -> :naive 82 :strict -> :strict 83 end 84 85 prepare = 86 case opts[:prepare] || :named do 87 :named -> :named 88 :unnamed -> :unnamed 89 end 90 91 s = %__MODULE__{ 92 timeout: timeout, 93 ping_timeout: ping_timeout, 94 postgres: :idle, 95 transactions: transactions, 96 disconnect_on_error_codes: disconnect_on_error_codes 97 } 98 99 connect_timeout = Keyword.get(opts, :connect_timeout, timeout) 100 101 status = %{ 102 opts: opts, 103 types_mod: types_mod, 104 types_key: nil, 105 types_lock: nil, 106 prepare: prepare, 107 messages: [], 108 ssl: ssl?, 109 target_server_type: target_server_type, 110 search_path: opts[:search_path] 111 } 112 113 connect_endpoints(endpoints, sock_opts ++ @sock_opts, connect_timeout, s, status, []) 114 end 115 116 defp endpoints(opts) do 117 port = opts[:port] || 5432 118 119 case Keyword.fetch(opts, :socket) do 120 {:ok, file} -> 121 [{{:local, file}, 0, []}] 122 123 :error -> 124 case Keyword.fetch(opts, :socket_dir) do 125 {:ok, dir} -> 126 [{{:local, "#{dir}/.s.PGSQL.#{port}"}, 0, []}] 127 128 :error -> 129 case Keyword.fetch(opts, :endpoints) do 130 {:ok, endpoints} when is_list(endpoints) -> 131 Enum.map(endpoints, fn 132 {hostname, port} -> {to_charlist(hostname), port, []} 133 {hostname, port, extra_opts} -> {to_charlist(hostname), port, extra_opts} 134 end) 135 136 {:ok, _} -> 137 raise ArgumentError, "expected :endpoints to be a list of tuples" 138 139 :error -> 140 case Keyword.fetch(opts, :hostname) do 141 {:ok, hostname} -> 142 [{to_charlist(hostname), port, []}] 143 144 :error -> 145 raise ArgumentError, 146 "expected :hostname, endpoints, :socket_dir, or :socket to be given" 147 end 148 end 149 end 150 end 151 end 152 153 defp connect_endpoints( 154 [{host, port, extra_opts} | remaining_endpoints], 155 sock_opts, 156 timeout, 157 s, 158 %{opts: opts, types_mod: types_mod} = status, 159 previous_errors 160 ) do 161 types_key = if types_mod, do: {host, port, Keyword.fetch!(opts, :database)} 162 opts = Config.Reader.merge(opts, extra_opts) 163 164 status = %{status | types_key: types_key, opts: opts} 165 166 case connect_and_handshake(host, port, sock_opts, timeout, s, status) do 167 {:ok, _} = ret -> 168 ret 169 170 {:error, err} -> 171 connect_endpoints( 172 remaining_endpoints, 173 sock_opts, 174 timeout, 175 s, 176 status, 177 [{host, port, err} | previous_errors] 178 ) 179 end 180 end 181 182 defp connect_endpoints([], _, _, _, _, [{_, _, error}]), do: {:error, error} 183 184 defp connect_endpoints([], _, _, _, _, errors) when is_list(errors) do 185 concat_messages = 186 errors 187 |> Enum.reverse() 188 |> Enum.map_join("\n", fn {host, port, %error_module{} = error} -> 189 " * #{host}:#{port}: (#{inspect(error_module)}) #{Exception.message(error)}" 190 end) 191 192 message = "failed to establish connection to multiple endpoints:\n\n#{concat_messages}" 193 {:error, %Postgrex.Error{message: message}} 194 end 195 196 defp connect_and_handshake(host, port, sock_opts, timeout, s, status) do 197 case connect(host, port, sock_opts, timeout, s) do 198 {:ok, s} -> 199 handshake(s, status) 200 201 {:error, _} = error -> 202 error 203 end 204 end 205 206 @impl true 207 @spec disconnect(Exception.t(), state) :: :ok 208 def disconnect(_, s) do 209 # cancel the request first otherwise PostgreSQL will log 210 # every time the connection is explicitly disconnected 211 # because the associated PID will no longer exist. 212 cancel_request(s) 213 sock_close(s) 214 _ = recv_buffer(s) 215 delete_parameters(s) 216 queries_delete(s) 217 :ok 218 end 219 220 @impl true 221 @spec ping(state) :: 222 {:ok, state} 223 | {:disconnect, Postgrex.Error.t() | %DBConnection.ConnectionError{}, state} 224 def ping(%{postgres: :transaction, transactions: :strict} = s) do 225 sync_error(s, :transaction) 226 end 227 228 def ping(%{buffer: buffer} = s) do 229 status = new_status([], mode: :transaction) 230 s = %{s | buffer: nil} 231 232 case msg_send(s, msg_sync(), buffer) do 233 :ok when buffer == :active_once -> 234 ping_recv(s, status, :active_once, buffer) 235 236 :ok when is_binary(buffer) -> 237 ping_recv(s, status, nil, buffer) 238 239 {:disconnect, _, _} = dis -> 240 dis 241 end 242 end 243 244 @impl true 245 @spec checkout(state) :: 246 {:ok, state} 247 | {:disconnect, Postgrex.Error.t() | %DBConnection.ConnectionError{}, state} 248 def checkout(%{postgres: :transaction, transactions: :strict} = s) do 249 sync_error(s, :transaction) 250 end 251 252 def checkout(%{buffer: :active_once} = s) do 253 case setopts(s, [active: false], :active_once) do 254 :ok -> recv_buffer(s) 255 {:disconnect, _, _} = dis -> dis 256 end 257 end 258 259 @spec checkin(state) :: 260 {:ok, state} 261 | {:disconnect, Postgrex.Error.t() | %DBConnection.ConnectionError{}, state} 262 def checkin(%{postgres: :transaction, transactions: :strict} = s) do 263 sync_error(s, :transaction) 264 end 265 266 def checkin(%{buffer: buffer} = s) when is_binary(buffer) do 267 activate(s, buffer) 268 end 269 270 @impl true 271 @spec handle_prepare(Postgrex.Query.t(), Keyword.t(), state) :: 272 {:ok, Postgrex.Query.t(), state} 273 | {:error, %ArgumentError{} | Postgrex.Error.t(), state} 274 | {:error, %DBConnection.TransactionError{}, state} 275 | {:disconnect, %RuntimeError{}, state} 276 | {:disconnect, %DBConnection.ConnectionError{}, state} 277 def handle_prepare(%Query{} = query, _, %{postgres: {_, _}} = s) do 278 lock_error(s, :prepare, query) 279 end 280 281 def handle_prepare(%Query{ref: ref} = query, opts, s) when is_reference(ref) do 282 # If the query already has a reference, then it means DBConnection rescued 283 # a DBConnection.EncodeError and wants us to reprepare a query 284 %{name: name, statement: statement} = query 285 handle_prepare(%Query{name: name, statement: statement}, opts, s) 286 end 287 288 def handle_prepare(%Query{name: ""} = query, opts, s) do 289 prepare = Keyword.get(opts, :postgrex_prepare, false) 290 status = new_status(opts, prepare: prepare) 291 292 case prepare do 293 true -> parse_describe_close(s, status, query) 294 false -> parse_describe_flush(s, status, query) 295 end 296 end 297 298 def handle_prepare(%Query{} = query, opts, %{queries: nil} = s) do 299 # always use unnamed if no cache 300 handle_prepare(%Query{query | name: ""}, opts, s) 301 end 302 303 def handle_prepare(%Query{} = query, opts, s) do 304 if new_query = cached_query(s, query) do 305 {:ok, new_query, s} 306 else 307 prepare = Keyword.get(opts, :postgrex_prepare, false) 308 status = new_status(opts, prepare: prepare) 309 310 case prepare do 311 true -> close_parse_describe(s, status, query) 312 false -> close_parse_describe_flush(s, status, query) 313 end 314 end 315 end 316 317 @impl true 318 @spec handle_execute(Postgrex.Parameters.t(), nil, Keyword.t(), state) :: 319 {:ok, Postgrex.Parameters.t(), %{binary => binary}, state} 320 | {:error, Postgrex.Error.t(), state} 321 def handle_execute(%Postgrex.Parameters{} = p, nil, _, s) do 322 %{parameters: parameters} = s 323 324 case Postgrex.Parameters.fetch(parameters) do 325 {:ok, parameters} -> 326 {:ok, p, parameters, s} 327 328 :error -> 329 {:error, %Postgrex.Error{message: "parameters not available"}, s} 330 end 331 end 332 333 @spec handle_execute(Postgrex.Query.t(), list, Keyword.t(), state) :: 334 {:ok, Postgrex.Query.t(), Postgrex.Result.t() | Postgrex.Copy.t(), state} 335 | {:error, %ArgumentError{} | Postgrex.Error.t(), state} 336 | {:error, %DBConnection.TransactionError{}, state} 337 | {:disconnect, %RuntimeError{}, state} 338 | {:disconnect, %DBConnection.ConnectionError{}, state} 339 def handle_execute(%Query{} = query, params, opts, s) do 340 case Keyword.get(opts, :postgrex_copy, false) do 341 true -> handle_execute_copy(query, params, opts, s) 342 false -> handle_execute_result(query, params, opts, s) 343 end 344 end 345 346 @spec handle_execute(Postgrex.Copy.t(), {:copy_data, iodata} | :copy_done, Keyword.t(), state) :: 347 {:ok, Postgrex.Query.t(), Postgrex.Result.t(), state} 348 | {:error, %ArgumentError{} | Postgrex.Error.t(), state} 349 | {:disconnect, %RuntimeError{}, state} 350 | {:disconnect, %DBConnection.ConnectionError{}, state} 351 def handle_execute(%Copy{ref: ref} = copy, {:copy_data, iodata}, opts, s) do 352 case s do 353 %{postgres: {_, ^ref}} -> 354 copy_in_data(s, copy, iodata) 355 356 %{postgres: {_, _}} -> 357 lock_error(s, :execute, copy) 358 359 _ -> 360 copy_in_data(s, new_status(opts), copy, iodata) 361 end 362 end 363 364 def handle_execute(%Copy{ref: ref, query: query} = copy, :copy_done, opts, s) do 365 case s do 366 %{postgres: {_, ^ref}} -> 367 copy_in_done(s, new_status(opts), copy) 368 369 %{postgres: {_, _}} -> 370 lock_error(s, :execute, copy) 371 372 _ -> 373 with {:ok, result, s} <- close(s, new_status(opts), copy) do 374 {:ok, query, result, s} 375 end 376 end 377 end 378 379 defp handle_execute_result(%{ref: ref} = query, params, opts, %{postgres: {_, ref}} = s) do 380 # ref in lock so query is prepared 381 status = new_status(opts) 382 383 case query do 384 %{name: ""} -> bind_execute_close(s, status, query, params) 385 _ -> bind_execute(s, status, query, params) 386 end 387 end 388 389 defp handle_execute_result(%{} = query, _, _, %{postgres: {_, _ref}} = s) do 390 lock_error(s, :execute, query) 391 end 392 393 defp handle_execute_result(query, params, opts, s) do 394 if query_member?(s, query) do 395 rebind_execute(s, new_status(opts), query, params) 396 else 397 handle_prepare_execute(query, params, opts, s) 398 end 399 end 400 401 defp handle_execute_copy(query, params, opts, s) do 402 %{connection_id: connection_id} = s 403 404 copy = %Copy{ 405 portal: make_portal(), 406 ref: make_ref(), 407 query: query, 408 connection_id: connection_id 409 } 410 411 handle_bind(query, params, copy, opts, s) 412 end 413 414 @impl true 415 @spec handle_close(Postgrex.Query.t(), Keyword.t(), state) :: 416 {:ok, Postgrex.Result.t(), state} 417 | {:error, %ArgumentError{} | Postgrex.Error.t(), state} 418 | {:disconnect, %RuntimeError{}, state} 419 | {:disconnect, %DBConnection.ConnectionError{}, state} 420 def handle_close(%Query{ref: ref} = query, opts, %{postgres: {_, ref}} = s) do 421 flushed_close(s, new_status(opts), query) 422 end 423 424 def handle_close(%Query{} = query, _, %{postgres: {_, _}} = s) do 425 lock_error(s, :close, query) 426 end 427 428 def handle_close(%Query{} = query, opts, s) do 429 close(s, new_status(opts), query) 430 end 431 432 @impl true 433 @spec handle_declare(Postgrex.Query.t(), list, Keyword.t(), state) :: 434 {:ok, Postgrex.Query.t(), Postgrex.Cursor.t(), state} 435 | {:error, %ArgumentError{} | Postgrex.Error.t(), state} 436 | {:disconnect, %RuntimeError{}, state} 437 | {:disconnect, %DBConnection.ConnectionError{}, state} 438 def handle_declare(query, params, opts, s) do 439 %{connection_id: connection_id} = s 440 441 cursor = %Cursor{ 442 portal: make_portal(), 443 ref: make_ref(), 444 connection_id: connection_id, 445 mode: mode(opts) 446 } 447 448 handle_bind(query, params, cursor, opts, s) 449 end 450 451 @impl true 452 @spec handle_fetch(Postgrex.Query.t(), Postgrex.Cursor.t(), Keyword.t(), state) :: 453 {:cont | :halt, Postgrex.Result.t(), state} 454 | {:error, Postgrex.Error.t(), state} 455 | {:disconnect, %RuntimeError{}, state} 456 | {:disconnect, %DBConnection.ConnectionError{}, state} 457 def handle_fetch(query, cursor, opts, %{postgres: {_, ref}} = s) do 458 case cursor do 459 %Cursor{ref: ^ref, mode: mode} -> 460 status = new_status(opts, mode: mode) 461 max_rows = Keyword.get(opts, :max_rows, @max_rows) 462 fetch_copy_out(s, status, query, max_rows) 463 464 _ -> 465 lock_error(s, "fetch", cursor) 466 end 467 end 468 469 def handle_fetch(query, cursor, opts, s) do 470 max_rows = Keyword.get(opts, :max_rows, @max_rows) 471 execute(s, new_status(opts), query, cursor, max_rows) 472 end 473 474 @impl true 475 @spec handle_deallocate(Postgrex.Query.t(), Postgrex.Cursor.t(), Keyword.t(), state) :: 476 {:ok, Postgrex.Result.t(), state} 477 | {:error, Postgrex.Error.t(), state} 478 | {:disconnect, %RuntimeError{}, state} 479 | {:disconnect, %DBConnection.ConnectionError{}, state} 480 def handle_deallocate(query, %Cursor{ref: ref}, opts, %{postgres: {_, ref}} = s) do 481 copy_out_done(s, new_status(opts), query) 482 end 483 484 def handle_deallocate(_, %Cursor{} = cursor, _, %{postgres: {_, _}} = s) do 485 lock_error(s, :deallocate, cursor) 486 end 487 488 def handle_deallocate(_, %Cursor{} = cursor, opts, s) do 489 status = new_status(opts, mode: :transaction) 490 close(s, status, cursor) 491 end 492 493 @impl true 494 @spec handle_begin(Keyword.t(), state) :: 495 {:ok, Postgrex.Result.t(), state} 496 | {DBConnection.status(), state} 497 | {:disconnect, %RuntimeError{}, state} 498 | {:disconnect, %DBConnection.ConnectionError{} | Postgrex.Error.t(), state} 499 def handle_begin(_, %{postgres: {_, _}} = s) do 500 lock_error(s, :begin) 501 end 502 503 def handle_begin(opts, %{postgres: postgres} = s) do 504 case Keyword.get(opts, :mode, :transaction) do 505 :transaction when postgres == :idle -> 506 statement = "BEGIN" 507 handle_transaction(statement, opts, s) 508 509 :savepoint when postgres == :transaction -> 510 statement = "SAVEPOINT postgrex_savepoint" 511 handle_transaction(statement, opts, s) 512 513 mode when mode in [:transaction, :savepoint] -> 514 {postgres, s} 515 end 516 end 517 518 @impl true 519 @spec handle_commit(Keyword.t(), state) :: 520 {:ok, Postgrex.Result.t(), state} 521 | {DBConnection.status(), state} 522 | {:disconnect, %RuntimeError{}, state} 523 | {:disconnect, %DBConnection.ConnectionError{} | Postgrex.Error.t(), state} 524 def handle_commit(_, %{postgres: {_, _}} = s) do 525 lock_error(s, :commit) 526 end 527 528 def handle_commit(opts, %{postgres: postgres} = s) do 529 case Keyword.get(opts, :mode, :transaction) do 530 :transaction when postgres == :transaction -> 531 statement = "COMMIT" 532 handle_transaction(statement, opts, s) 533 534 :savepoint when postgres == :transaction -> 535 statement = "RELEASE SAVEPOINT postgrex_savepoint" 536 handle_transaction(statement, opts, s) 537 538 mode when mode in [:transaction, :savepoint] -> 539 {postgres, s} 540 end 541 end 542 543 @impl true 544 @spec handle_rollback(Keyword.t(), state) :: 545 {:ok, Postgrex.Result.t(), state} 546 | {DBConnection.status(), state} 547 | {:disconnect, %RuntimeError{}, state} 548 | {:disconnect, %DBConnection.ConnectionError{} | Postgrex.Error.t(), state} 549 def handle_rollback(_, %{postgres: {_, _}} = s) do 550 lock_error(s, :rollback) 551 end 552 553 def handle_rollback(opts, %{postgres: postgres} = s) do 554 case Keyword.get(opts, :mode, :transaction) do 555 :transaction when postgres in [:transaction, :error] -> 556 statement = "ROLLBACK" 557 handle_transaction(statement, opts, s) 558 559 :savepoint when postgres in [:transaction, :error] -> 560 stmt = "ROLLBACK TO SAVEPOINT postgrex_savepoint;RELEASE SAVEPOINT postgrex_savepoint" 561 handle_transaction(stmt, opts, s) 562 563 mode when mode in [:transaction, :savepoint] -> 564 {postgres, s} 565 end 566 end 567 568 @impl true 569 @spec handle_status(Keyword.t(), state) :: {DBConnection.status(), state} 570 def handle_status(_, %{postgres: {postgres, _}} = s), do: {postgres, s} 571 def handle_status(_, %{postgres: postgres} = s), do: {postgres, s} 572 573 @spec handle_info(any, Keyword.t(), state) :: 574 {:ok, state} 575 | {:unknown, state} 576 | {:error, Postgrex.Error.t(), state} 577 | {:disconnect, %DBConnection.ConnectionError{}, state} 578 def handle_info(msg, opts \\ [], s) do 579 case handle_socket(msg, s) do 580 {:data, data} -> handle_data(s, opts, data) 581 :ignore -> {:ok, s} 582 :unknown -> {:unknown, s} 583 disconnect -> disconnect 584 end 585 end 586 587 defp handle_socket({:tcp, sock, data}, %{sock: {:gen_tcp, sock}}) do 588 {:data, data} 589 end 590 591 defp handle_socket({:tcp_closed, sock}, %{sock: {:gen_tcp, sock}} = s) do 592 disconnect(s, :tcp, "async recv", :closed) 593 end 594 595 defp handle_socket({:tcp_error, sock, reason}, %{sock: {:gen_tcp, sock}} = s) do 596 disconnect(s, :tcp, "async recv", reason) 597 end 598 599 defp handle_socket({:ssl, sock, data}, %{sock: {:ssl, sock}}) do 600 {:data, data} 601 end 602 603 defp handle_socket({:ssl_closed, sock}, %{sock: {:ssl, sock}} = s) do 604 disconnect(s, :ssl, "async recv", :closed) 605 end 606 607 defp handle_socket({:ssl_error, sock, reason}, %{sock: {:ssl, sock}} = s) do 608 disconnect(s, :ssl, "async recv", reason) 609 end 610 611 defp handle_socket({closed, _sock}, _) when closed in [:tcp_closed, :ssl_closed] do 612 :ignore 613 end 614 615 defp handle_socket({error, _sock, _reason}, _) when error in [:tcp_error, :ssl_error] do 616 :ignore 617 end 618 619 defp handle_socket(_, _) do 620 :unknown 621 end 622 623 ## connect 624 625 defp connect(host, port, sock_opts, timeout, s) do 626 buffer? = Keyword.has_key?(sock_opts, :buffer) 627 628 case :gen_tcp.connect(host, port, sock_opts ++ @sock_opts, timeout) do 629 {:ok, sock} when buffer? -> 630 {:ok, %{s | sock: {:gen_tcp, sock}}} 631 632 {:ok, sock} -> 633 # A suitable :buffer is only set if :recbuf is included in 634 # :socket_options. 635 {:ok, [sndbuf: sndbuf, recbuf: recbuf, buffer: buffer]} = 636 :inet.getopts(sock, [:sndbuf, :recbuf, :buffer]) 637 638 buffer = buffer |> max(sndbuf) |> max(recbuf) 639 :ok = :inet.setopts(sock, buffer: buffer) 640 {:ok, %{s | sock: {:gen_tcp, sock}}} 641 642 {:error, reason} -> 643 case host do 644 {:local, socket_addr} -> 645 {:error, conn_error(:tcp, "connect (#{socket_addr})", reason)} 646 647 host -> 648 {:error, conn_error(:tcp, "connect (#{host}:#{port})", reason)} 649 end 650 end 651 end 652 653 ## handshake 654 655 defp handshake(%{sock: {:gen_tcp, sock}, timeout: timeout} = s, status) do 656 {:ok, peer} = :inet.peername(sock) 657 %{opts: opts} = status 658 handshake_timeout = Keyword.get(opts, :handshake_timeout, timeout) 659 timer = start_handshake_timer(handshake_timeout, sock) 660 661 case do_handshake(%{s | peer: peer}, status) do 662 {:ok, %{parameters: parameters} = s} -> 663 cancel_handshake_timer(timer) 664 ref = Postgrex.Parameters.insert(parameters) 665 {:ok, %{s | parameters: ref}} 666 667 {:disconnect, err, s} -> 668 cancel_handshake_timer(timer) 669 disconnect(err, s) 670 {:error, err} 671 end 672 end 673 674 defp start_handshake_timer(:infinity, _), do: :infinity 675 676 defp start_handshake_timer(timeout, sock) do 677 args = [timeout, self(), sock] 678 {:ok, tref} = :timer.apply_after(timeout, __MODULE__, :handshake_shutdown, args) 679 {:timer, tref} 680 end 681 682 @doc false 683 def handshake_shutdown(timeout, pid, sock) do 684 if Process.alive?(pid) do 685 Logger.error(fn -> 686 [ 687 inspect(__MODULE__), 688 " (", 689 inspect(pid), 690 ") timed out because it was handshaking for longer than ", 691 to_string(timeout) | "ms" 692 ] 693 end) 694 695 :gen_tcp.shutdown(sock, :read_write) 696 end 697 end 698 699 def cancel_handshake_timer(:infinity), do: :ok 700 701 def cancel_handshake_timer({:timer, tref}) do 702 {:ok, _} = :timer.cancel(tref) 703 :ok 704 end 705 706 defp do_handshake(s, %{ssl: true} = status), do: ssl(s, status) 707 defp do_handshake(s, %{ssl: false} = status), do: startup(s, status) 708 709 ## ssl 710 711 defp ssl(s, status) do 712 case msg_send(s, msg_ssl_request(), "") do 713 :ok -> ssl_recv(s, status) 714 {:disconnect, _, _} = dis -> dis 715 end 716 end 717 718 defp ssl_recv(%{sock: {:gen_tcp, sock}} = s, status) do 719 case :gen_tcp.recv(sock, 1, :infinity) do 720 {:ok, <<?S>>} -> 721 ssl_connect(s, status) 722 723 {:ok, <<?N>>} -> 724 disconnect(s, %Postgrex.Error{message: "ssl not available"}, "") 725 726 {:ok, <<?E>> = buffer} -> 727 # This can happen for "very ancient servers" according to docs, 728 # it shouldn't happen in regular operation 729 # See: https://www.postgresql.org/docs/10/static/protocol-flow.html#idm46428663878176 730 case msg_recv(s, :infinity, buffer) do 731 {:ok, msg_error(fields: fields), buffer} -> 732 disconnect(s, Postgrex.Error.exception(postgres: fields), buffer) 733 734 {:disconnect, _, _} = dis -> 735 dis 736 end 737 738 {:error, reason} -> 739 disconnect(s, :tcp, "recv", reason) 740 end 741 end 742 743 defp ssl_connect(%{sock: {:gen_tcp, sock}, timeout: timeout} = s, status) do 744 case :ssl.connect(sock, status.opts[:ssl_opts] || [], timeout) do 745 {:ok, ssl_sock} -> 746 startup(%{s | sock: {:ssl, ssl_sock}}, status) 747 748 {:error, reason} -> 749 disconnect(s, :ssl, "connect", reason) 750 end 751 end 752 753 ## startup 754 755 defp startup(s, %{opts: opts} = status) do 756 params = opts[:parameters] || [] 757 user = Keyword.fetch!(opts, :username) 758 database = Keyword.fetch!(opts, :database) 759 msg = msg_startup(params: [user: user, database: database] ++ params) 760 761 case msg_send(s, msg, "") do 762 :ok -> auth_recv(s, status, <<>>) 763 {:disconnect, _, _} = dis -> dis 764 end 765 end 766 767 ## auth 768 769 defp auth_recv(s, status, buffer) do 770 case msg_recv(s, :infinity, buffer) do 771 {:ok, msg_auth(type: :ok), buffer} -> 772 init_recv(s, status, buffer) 773 774 {:ok, msg_auth(type: :cleartext), buffer} -> 775 auth_cleartext(s, status, buffer) 776 777 {:ok, msg_auth(type: :md5, data: salt), buffer} -> 778 auth_md5(s, status, salt, buffer) 779 780 {:ok, msg_auth(type: :sasl, data: _), buffer} -> 781 auth_sasl(s, status, buffer) 782 783 {:ok, msg_auth(type: :sasl_cont, data: data), buffer} -> 784 auth_cont(s, status, data, buffer) 785 786 {:ok, msg_auth(type: :sasl_fin, data: _), buffer} -> 787 auth_recv(s, status, buffer) 788 789 {:ok, msg_error(fields: fields), buffer} -> 790 disconnect(s, Postgrex.Error.exception(postgres: fields), buffer) 791 792 {:disconnect, _, _} = dis -> 793 dis 794 end 795 end 796 797 defp auth_cleartext(s, %{opts: opts} = status, buffer) do 798 pass = Keyword.fetch!(opts, :password) 799 auth_send(s, msg_password(pass: [pass, 0]), status, buffer) 800 end 801 802 defp auth_md5(s, %{opts: opts} = status, salt, buffer) do 803 user = Keyword.fetch!(opts, :username) 804 pass = Keyword.fetch!(opts, :password) 805 806 digest = :erlang.md5([pass, user]) |> Base.encode16(case: :lower) 807 digest = :erlang.md5([digest, salt]) |> Base.encode16(case: :lower) 808 auth_send(s, msg_password(pass: ["md5", digest, 0]), status, buffer) 809 end 810 811 defp auth_sasl(s, status = _, buffer) do 812 auth_send(s, msg_password(pass: Postgrex.SCRAM.challenge()), status, buffer) 813 end 814 815 defp auth_cont(s, %{opts: opts} = status, data, buffer) do 816 auth_send(s, msg_password(pass: Postgrex.SCRAM.verify(data, opts)), status, buffer) 817 end 818 819 defp auth_send(s, msg, status, buffer) do 820 case msg_send(s, msg, buffer) do 821 :ok -> auth_recv(s, status, buffer) 822 {:disconnect, _, _} = dis -> dis 823 end 824 end 825 826 ## init 827 828 defp init_recv(s, status, buffer) do 829 case msg_recv(s, :infinity, buffer) do 830 {:ok, msg_backend_key(pid: pid, key: key), buffer} -> 831 init_recv(%{s | connection_id: pid, connection_key: key}, status, buffer) 832 833 {:ok, msg_ready(), buffer} -> 834 set_search_path(s, status, buffer) 835 836 {:ok, msg_error(fields: fields), buffer} -> 837 disconnect(s, Postgrex.Error.exception(postgres: fields), buffer) 838 839 {:ok, msg, buffer} -> 840 {s, status} = handle_msg(s, status, msg) 841 init_recv(s, status, buffer) 842 843 {:disconnect, _, _} = dis -> 844 dis 845 end 846 end 847 848 ## set search path on connection startup 849 850 defp set_search_path(s, %{search_path: nil} = status, buffer), 851 do: set_search_path_done(s, status, buffer) 852 853 defp set_search_path(s, %{search_path: search_path} = status, buffer) 854 when is_list(search_path), 855 do: set_search_path_send(s, status, buffer) 856 857 defp set_search_path(_, %{search_path: search_path}, _) do 858 raise ArgumentError, 859 "expected :search_path to be a list of strings, got: #{inspect(search_path)}" 860 end 861 862 defp set_search_path_send(s, status, buffer) do 863 search_path = Enum.intersperse(status.search_path, ",") 864 msg = msg_query(statement: ["set search_path to " | search_path]) 865 866 case msg_send(s, msg, buffer) do 867 :ok -> 868 set_search_path_recv(s, status, buffer) 869 870 {:disconnect, _, _} = dis -> 871 dis 872 end 873 end 874 875 defp set_search_path_recv(s, status, buffer) do 876 case msg_recv(s, :infinity, buffer) do 877 {:ok, msg_row_desc(fields: fields), buffer} -> 878 {[@text_type_oid], ["search_path"]} = columns(fields) 879 set_search_path_recv(s, status, buffer) 880 881 {:ok, msg_data_row(), buffer} -> 882 set_search_path_recv(s, status, buffer) 883 884 {:ok, msg_command_complete(), buffer} -> 885 set_search_path_recv(s, status, buffer) 886 887 {:ok, msg_ready(status: :idle), buffer} -> 888 set_search_path_done(s, status, buffer) 889 890 {:ok, msg_ready(status: postgres), _buffer} -> 891 err = %Postgrex.Error{message: "unexpected postgres status: #{postgres}"} 892 {:disconnect, err, s} 893 894 {:ok, msg_error(fields: fields), buffer} -> 895 err = Postgrex.Error.exception(postgres: fields) 896 {:disconnect, err, %{s | buffer: buffer}} 897 898 {:ok, msg, buffer} -> 899 {s, status} = handle_msg(s, status, msg) 900 set_search_path_recv(s, status, buffer) 901 902 {:disconnect, _, _} = dis -> 903 dis 904 end 905 end 906 907 defp set_search_path_done(s, status, buffer), 908 do: check_target_server_type(s, status, buffer) 909 910 ## check_target_server_type 911 912 defp check_target_server_type(s, %{target_server_type: :any} = status, buffer), 913 do: check_target_server_type_done(s, status, buffer) 914 915 defp check_target_server_type(s, status, buffer), 916 do: check_target_server_type_send(s, status, buffer) 917 918 defp check_target_server_type_send(s, status, buffer) do 919 msg = msg_query(statement: "show transaction_read_only") 920 921 case msg_send(s, msg, buffer) do 922 :ok -> 923 check_target_server_type_recv(s, status, buffer) 924 925 {:disconnect, err, s} -> 926 check_target_server_type_fail(s, err, status) 927 end 928 end 929 930 defp check_target_server_type_recv( 931 s, 932 %{target_server_type: expected_server_type} = status, 933 buffer 934 ) do 935 case msg_recv(s, :infinity, buffer) do 936 {:ok, msg_row_desc(fields: fields), buffer} -> 937 {[@text_type_oid], ["transaction_read_only"]} = columns(fields) 938 check_target_server_type_recv(s, status, buffer) 939 940 {:ok, msg_data_row(values: values), buffer} -> 941 <<len::uint32(), read_only_value::binary(len)>> = values 942 943 actual_server_type = 944 case read_only_value do 945 "off" -> :primary 946 "on" -> :secondary 947 end 948 949 case {expected_server_type, actual_server_type} do 950 {:any, _} -> check_target_server_type_recv(s, status, buffer) 951 {type, type} -> check_target_server_type_recv(s, status, buffer) 952 _ -> check_target_server_type_fail(s, expected_server_type, actual_server_type) 953 end 954 955 {:ok, msg_command_complete(), buffer} -> 956 check_target_server_type_recv(s, status, buffer) 957 958 {:ok, msg_ready(status: :idle), buffer} -> 959 check_target_server_type_done(s, status, buffer) 960 961 {:ok, msg_ready(status: postgres), _buffer} -> 962 err = %Postgrex.Error{message: "unexpected postgres status: #{postgres}"} 963 check_target_server_type_error(s, err, status) 964 965 {:ok, msg_error(fields: fields), buffer} -> 966 err = Postgrex.Error.exception(postgres: fields) 967 check_target_server_type_error(s, err, status, buffer) 968 969 {:ok, msg, buffer} -> 970 {s, status} = handle_msg(s, status, msg) 971 check_target_server_type_recv(s, status, buffer) 972 973 {:disconnect, err, s} -> 974 check_target_server_type_error(s, err, status) 975 end 976 end 977 978 defp check_target_server_type_done(s, status, buffer), do: bootstrap(s, status, buffer) 979 980 defp check_target_server_type_fail(s, expected, actual) do 981 msg = "the server type is not as expected. expected: #{expected}. actual: #{actual}" 982 err = %Postgrex.Error{message: msg} 983 {:disconnect, err, s} 984 end 985 986 defp check_target_server_type_error(s, err, _status) do 987 {:disconnect, err, s} 988 end 989 990 defp check_target_server_type_error(s, err, status, buffer) do 991 check_target_server_type_error(%{s | buffer: buffer}, err, status) 992 end 993 994 ## bootstrap 995 996 defp bootstrap(s, %{types_key: nil}, buffer) do 997 activate(s, buffer) 998 end 999 1000 defp bootstrap(s, status, buffer) do 1001 %{types_mod: types_mod, types_key: types_key} = status 1002 server = Postgrex.TypeSupervisor.locate(types_mod, types_key) 1003 1004 case TypeServer.fetch(server) do 1005 {:lock, ref, types} -> 1006 status = %{status | types_lock: {server, ref}} 1007 bootstrap_send(%{s | types: types}, status, buffer) 1008 1009 :noproc -> 1010 bootstrap(s, status, buffer) 1011 1012 :error -> 1013 {:disconnect, type_fetch_error(), %{s | buffer: buffer}} 1014 end 1015 end 1016 1017 defp bootstrap_send(%{types: types} = s, status, buffer) do 1018 %{parameters: parameters} = s 1019 version = Postgrex.Utils.parse_version(parameters["server_version"]) 1020 statement = Types.bootstrap_query(version, types) 1021 1022 if statement do 1023 bootstrap_send(s, status, statement, buffer) 1024 else 1025 %{types_lock: {server, ref}} = status 1026 TypeServer.done(server, ref) 1027 bootstrap_done(s, status, buffer) 1028 end 1029 end 1030 1031 defp bootstrap_send(s, status, statement, buffer) do 1032 msg = msg_query(statement: statement) 1033 1034 case msg_send(s, msg, buffer) do 1035 :ok -> 1036 bootstrap_recv(s, status, [], buffer) 1037 1038 {:disconnect, err, s} -> 1039 bootstrap_fail(s, err, status) 1040 end 1041 end 1042 1043 defp bootstrap_recv(s, status, type_infos, buffer) do 1044 case msg_recv(s, :infinity, buffer) do 1045 {:ok, msg_row_desc(), buffer} -> 1046 bootstrap_recv(s, status, type_infos, buffer) 1047 1048 {:ok, msg_data_row(values: values), buffer} -> 1049 type_infos = [Types.build_type_info(values) | type_infos] 1050 bootstrap_recv(s, status, type_infos, buffer) 1051 1052 {:ok, msg_command_complete(), buffer} -> 1053 bootstrap_types(s, status, Enum.reverse(type_infos), buffer) 1054 1055 {:ok, msg_error(fields: fields), buffer} -> 1056 err = Postgrex.Error.exception(postgres: fields) 1057 bootstrap_fail(s, err, status, buffer) 1058 1059 {:ok, msg, buffer} -> 1060 {s, status} = handle_msg(s, status, msg) 1061 bootstrap_recv(s, status, type_infos, buffer) 1062 1063 {:disconnect, err, s} -> 1064 bootstrap_fail(s, err, status) 1065 end 1066 end 1067 1068 defp bootstrap_types(s, status, type_infos, buffer) do 1069 %{types_lock: {server, ref}} = status 1070 TypeServer.update(server, ref, type_infos) 1071 bootstrap_sync_recv(s, status, buffer) 1072 end 1073 1074 defp bootstrap_sync_recv(s, status, buffer) do 1075 case msg_recv(s, :infinity, buffer) do 1076 {:ok, msg_ready(status: :idle), buffer} -> 1077 bootstrap_done(s, status, buffer) 1078 1079 {:ok, msg_ready(status: postgres), buffer} -> 1080 sync_error(s, postgres, buffer) 1081 1082 {:ok, msg, buffer} -> 1083 {s, status} = handle_msg(s, status, msg) 1084 bootstrap_sync_recv(s, status, buffer) 1085 1086 {:disconnect, _, _} = dis -> 1087 dis 1088 end 1089 end 1090 1091 defp bootstrap_done(s, %{prepare: :unnamed}, buffer), 1092 do: activate(s, buffer) 1093 1094 defp bootstrap_done(s, %{prepare: :named}, buffer), 1095 do: activate(%{s | queries: queries_new()}, buffer) 1096 1097 defp bootstrap_fail(s, err, %{types_lock: {server, ref}}) do 1098 TypeServer.done(server, ref) 1099 {:disconnect, err, s} 1100 end 1101 1102 defp bootstrap_fail(s, err, status, buffer) do 1103 bootstrap_fail(%{s | buffer: buffer}, err, status) 1104 end 1105 1106 defp type_fetch_error() do 1107 msg = "awaited on another connection that failed to bootstrap types" 1108 DBConnection.ConnectionError.exception(msg) 1109 end 1110 1111 ## replication/notifications 1112 1113 @spec handle_simple(String.t() | iolist(), state) :: 1114 {:ok, [Postgrex.Result.t()], state} 1115 | {:error, Postgrex.Error.t(), state} 1116 | {:disconnect, %DBConnection.ConnectionError{}, state} 1117 def handle_simple(statement, opts \\ [], %{buffer: buffer} = s) do 1118 status = new_status(opts, mode: :transaction) 1119 msgs = [msg_query(statement: statement)] 1120 1121 case msg_send(%{s | buffer: nil}, msgs, buffer) do 1122 :ok -> 1123 recv_simple(s, status, [], [], [], buffer) 1124 1125 {:disconnect, err, s} -> 1126 {:disconnect, err, s} 1127 1128 {:error, %Postgrex.Error{} = err, s, buffer} -> 1129 error_ready(s, status, err, buffer) 1130 end 1131 end 1132 1133 defp recv_simple(s, status, results, columns, rows, buffer) do 1134 case msg_recv(s, :infinity, buffer) do 1135 {:ok, msg_row_desc(fields: fields), buffer} -> 1136 columns = column_names(fields) 1137 recv_simple(s, status, results, columns, rows, buffer) 1138 1139 {:ok, msg_data_row(values: values), buffer} -> 1140 row = Types.decode_simple(values, s.types) 1141 recv_simple(s, status, results, columns, [row | rows], buffer) 1142 1143 {:ok, msg_command_complete(tag: tag), buffer} -> 1144 result = done(s, status, columns, Enum.reverse(rows), [tag]) 1145 recv_simple(s, status, [result | results], [], [], buffer) 1146 1147 {:ok, msg_error(fields: fields), buffer} -> 1148 err = Postgrex.Error.exception(postgres: fields) 1149 error_ready(s, status, err, buffer) 1150 1151 {:ok, msg_ready(status: postgres), buffer} -> 1152 s = %{s | postgres: postgres, buffer: buffer} 1153 {:ok, Enum.reverse(results), s} 1154 1155 {:ok, msg, buffer} -> 1156 {s, status} = handle_msg(s, status, msg) 1157 recv_simple(s, status, results, columns, rows, buffer) 1158 1159 {:disconnect, _, _} = dis -> 1160 dis 1161 end 1162 end 1163 1164 @spec handle_copy_send([binary], state) :: 1165 :ok 1166 | {:error, Postgrex.Error.t(), state} 1167 | {:disconnect, %DBConnection.ConnectionError{}, state} 1168 def handle_copy_send(binaries, %{buffer: buffer} = s) do 1169 msgs = Enum.map(binaries, &msg_copy_data(data: &1)) 1170 msg_send(s, msgs, buffer) 1171 end 1172 1173 @spec handle_copy_recv(any, Keyword.t(), state) :: 1174 {:ok, [binary | atom], state} 1175 | :unknown 1176 | {:error, Postgrex.Error.t(), state} 1177 | {:disconnect, %DBConnection.ConnectionError{}, state} 1178 def handle_copy_recv(msg, max_copies, s) do 1179 case handle_socket(msg, s) do 1180 {:data, data} -> handle_copy_recv(s, max_copies, [], 0, data) 1181 :ignore -> {:ok, [], s} 1182 :unknown -> :unknown 1183 disconnect -> disconnect 1184 end 1185 end 1186 1187 defp handle_copy_recv(s, max_copies, copies, max_copies, buffer) do 1188 with {:ok, s} <- activate(s, buffer) do 1189 {:ok, Enum.reverse(copies), s} 1190 end 1191 end 1192 1193 defp handle_copy_recv(%{timeout: timeout} = s, max_copies, copies, ncopies, buffer) do 1194 case msg_recv(s, timeout, buffer) do 1195 {:ok, msg_error(fields: fields), buffer} -> 1196 disconnect(s, Postgrex.Error.exception(postgres: fields), buffer) 1197 1198 {:ok, msg_copy_data(data: data), <<>>} -> 1199 with {:ok, s} <- activate(s, <<>>) do 1200 {:ok, Enum.reverse([data | copies]), s} 1201 end 1202 1203 {:ok, msg_copy_data(data: data), buffer} -> 1204 handle_copy_recv(s, max_copies, [data | copies], ncopies + 1, buffer) 1205 1206 {:ok, msg_copy_done(), buffer} -> 1207 handle_copy_recv(s, max_copies, copies, ncopies, buffer) 1208 1209 {:ok, msg_command_complete(), buffer} -> 1210 handle_copy_recv(s, max_copies, copies, ncopies, buffer) 1211 1212 {:ok, msg_ready(status: postgres), buffer} -> 1213 s = %{s | postgres: postgres, buffer: buffer} 1214 {:ok, Enum.reverse([:copy_done | copies]), s} 1215 1216 {:ok, _msg, buffer} -> 1217 handle_copy_recv(s, max_copies, copies, ncopies, buffer) 1218 1219 {:disconnect, _, _} = dis -> 1220 dis 1221 end 1222 end 1223 1224 @spec handle_streaming(String.t() | iolist(), state) :: 1225 {:ok, state} 1226 | {:error, Postgrex.Error.t(), state} 1227 | {:disconnect, %DBConnection.ConnectionError{}, state} 1228 def handle_streaming(statement, %{buffer: buffer} = s) do 1229 msgs = [msg_query(statement: statement)] 1230 1231 case msg_send(%{s | buffer: nil}, msgs, buffer) do 1232 :ok -> 1233 recv_streaming(s, buffer) 1234 1235 {:disconnect, err, s} -> 1236 {:disconnect, err, s} 1237 1238 {:error, %Postgrex.Error{} = err, s, buffer} -> 1239 status = new_status([], mode: :transaction) 1240 error_ready(s, status, err, buffer) 1241 end 1242 end 1243 1244 defp recv_streaming(s, buffer) do 1245 case msg_recv(s, :infinity, buffer) do 1246 {:ok, msg_copy_both_response(), buffer} -> 1247 {:ok, %{s | buffer: buffer}} 1248 1249 {:ok, msg_copy_out_response(), buffer} -> 1250 {:ok, %{s | buffer: buffer}} 1251 1252 {:ok, msg_error(fields: fields), buffer} -> 1253 status = new_status([], mode: :transaction) 1254 err = Postgrex.Error.exception(postgres: fields) 1255 error_ready(s, status, err, buffer) 1256 1257 {:disconnect, _, _} = dis -> 1258 dis 1259 end 1260 end 1261 1262 ## prepare 1263 1264 defp parse_describe(s, %{mode: :transaction} = status, query) do 1265 msgs = parse_describe_msgs(query, [msg_sync()]) 1266 %{buffer: buffer} = s 1267 1268 with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer), 1269 {:ok, query, s, buffer} <- recv_parse_describe(s, status, query, buffer), 1270 {:ok, s} <- recv_ready(s, status, buffer) do 1271 {:ok, query, s} 1272 else 1273 {:reload, oids, s, buffer} -> 1274 reload_ready(s, status, query, oids, buffer) 1275 1276 {:disconnect, err, s} -> 1277 {:disconnect, err, s} 1278 1279 {:error, %Postgrex.Error{} = err, s, buffer} -> 1280 error_ready(s, status, err, buffer) 1281 end 1282 end 1283 1284 defp parse_describe(%{postgres: :transaction} = s, %{mode: :savepoint} = status, query) do 1285 %{buffer: buffer} = s 1286 1287 msgs = 1288 [msg_query(statement: "SAVEPOINT postgrex_query")] ++ 1289 parse_describe_msgs(query, [msg_query(statement: "RELEASE SAVEPOINT postgrex_query")]) 1290 1291 with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer), 1292 {:ok, _, %{buffer: buffer} = s} <- recv_transaction(s, status, buffer), 1293 {:ok, query, s, buffer} <- recv_parse_describe(s, status, query, buffer), 1294 {:ok, _, s} <- recv_transaction(s, status, buffer) do 1295 {:ok, query, s} 1296 else 1297 {:reload, oids, s, buffer} -> 1298 reload_transaction(s, status, query, oids, buffer) 1299 1300 {:disconnect, err, s} -> 1301 {:disconnect, err, s} 1302 1303 {:error, %Postgrex.Error{} = err, s, buffer} -> 1304 rollback_flushed(s, status, err, buffer) 1305 end 1306 end 1307 1308 defp parse_describe(%{postgres: postgres} = s, %{mode: :savepoint}, _) 1309 when postgres in [:idle, :error] do 1310 transaction_error(s, postgres) 1311 end 1312 1313 defp parse_describe_close(s, %{mode: :transaction} = status, query) do 1314 %Query{name: name} = query 1315 %{buffer: buffer} = s 1316 msgs = parse_describe_msgs(query, [msg_close(type: :statement, name: name), msg_sync()]) 1317 1318 with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer), 1319 {:ok, query, s, buffer} <- recv_parse_describe(s, status, query, buffer), 1320 {:ok, s, buffer} <- recv_close(s, status, buffer), 1321 _ = query_delete(s, query), 1322 {:ok, s} <- recv_ready(s, status, buffer) do 1323 {:ok, query, s} 1324 else 1325 {:reload, oids, s, buffer} -> 1326 reload_closed(s, status, query, oids, buffer) 1327 1328 {:disconnect, err, s} -> 1329 {:disconnect, err, s} 1330 1331 {:error, %Postgrex.Error{} = err, s, buffer} -> 1332 error_ready(s, status, err, buffer) 1333 end 1334 end 1335 1336 defp parse_describe_close(s, %{mode: :savepoint} = status, query) do 1337 # only used for unnamed queries and the savepoint release will close the query 1338 parse_describe(s, status, query) 1339 end 1340 1341 defp parse_describe_flush(s, %{mode: :transaction} = status, query) do 1342 %{buffer: buffer} = s 1343 msgs = parse_describe_msgs(query, [msg_flush()]) 1344 1345 with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer), 1346 {:ok, %Query{ref: ref} = query, %{postgres: postgres} = s, buffer} <- 1347 recv_parse_describe(s, status, query, buffer) do 1348 # lock state with unique query reference as not synced 1349 {:ok, query, %{s | postgres: {postgres, ref}, buffer: buffer}} 1350 else 1351 {:error, err, s, buffer} -> 1352 error_flushed(s, status, err, buffer) 1353 1354 {:reload, oids, s, buffer} -> 1355 reload_flushed(s, status, query, oids, buffer) 1356 1357 {:disconnect, _err, _s} = disconnect -> 1358 disconnect 1359 end 1360 end 1361 1362 defp parse_describe_flush( 1363 %{postgres: :transaction, buffer: buffer} = s, 1364 %{mode: :savepoint} = status, 1365 query 1366 ) do 1367 msgs = 1368 [msg_query(statement: "SAVEPOINT postgrex_query")] ++ 1369 parse_describe_msgs(query, [msg_flush()]) 1370 1371 with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer), 1372 {:ok, _, %{buffer: buffer} = s} <- recv_transaction(s, status, buffer), 1373 {:ok, %Query{ref: ref} = query, %{postgres: postgres} = s, buffer} <- 1374 recv_parse_describe(s, status, query, buffer) do 1375 # lock state with unique query reference as not synced 1376 {:ok, query, %{s | postgres: {postgres, ref}, buffer: buffer}} 1377 else 1378 {:error, err, s, buffer} -> 1379 rollback_flushed(s, status, err, buffer) 1380 1381 {:reload, oids, s, buffer} -> 1382 reload_flushed(s, status, query, oids, buffer) 1383 1384 {:disconnect, _err, _s} = disconnect -> 1385 disconnect 1386 end 1387 end 1388 1389 defp parse_describe_flush(%{postgres: postgres} = s, %{mode: :savepoint}, _) 1390 when postgres in [:idle, :error] do 1391 transaction_error(s, postgres) 1392 end 1393 1394 defp close_parse_describe(s, %{mode: :transaction} = status, query) do 1395 %Query{name: name} = query 1396 %{buffer: buffer} = s 1397 1398 msgs = [msg_close(type: :statement, name: name)] ++ parse_describe_msgs(query, [msg_sync()]) 1399 1400 with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer), 1401 {:ok, s, buffer} <- recv_close(s, status, buffer), 1402 _ = query_delete(s, query), 1403 {:ok, query, s, buffer} <- recv_parse_describe(s, status, query, buffer), 1404 {:ok, s} <- recv_ready(s, status, buffer) do 1405 {:ok, query, s} 1406 else 1407 {:reload, oids, s, buffer} -> 1408 reload_ready(s, status, query, oids, buffer) 1409 1410 {:disconnect, err, s} -> 1411 {:disconnect, err, s} 1412 1413 {:error, %Postgrex.Error{} = err, s, buffer} -> 1414 error_ready(s, status, err, buffer) 1415 end 1416 end 1417 1418 defp close_parse_describe(%{postgres: :transaction} = s, %{mode: :savepoint} = status, query) do 1419 %Query{name: name} = query 1420 %{buffer: buffer} = s 1421 1422 msgs = 1423 [ 1424 msg_query(statement: "SAVEPOINT postgrex_query"), 1425 msg_close(type: :statement, name: name) 1426 ] ++ 1427 parse_describe_msgs(query, [msg_query(statement: "RELEASE SAVEPOINT postgrex_query")]) 1428 1429 with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer), 1430 {:ok, _, %{buffer: buffer} = s} <- recv_transaction(s, status, buffer), 1431 {:ok, s, buffer} <- recv_close(s, status, buffer), 1432 _ = query_delete(s, query), 1433 {:ok, query, s, buffer} <- recv_parse_describe(s, status, query, buffer), 1434 {:ok, _, s} <- recv_transaction(s, status, buffer) do 1435 {:ok, query, s} 1436 else 1437 {:reload, oids, s, buffer} -> 1438 reload_transaction(s, status, query, oids, buffer) 1439 1440 {:disconnect, err, s} -> 1441 {:disconnect, err, s} 1442 1443 {:error, %Postgrex.Error{} = err, s, buffer} -> 1444 rollback_flushed(s, status, err, buffer) 1445 end 1446 end 1447 1448 defp close_parse_describe(%{postgres: postgres} = s, %{mode: :savepoint}, _) 1449 when postgres in [:idle, :error] do 1450 transaction_error(s, postgres) 1451 end 1452 1453 defp close_parse_describe_flush(s, %{mode: :transaction} = status, query) do 1454 %Query{name: name} = query 1455 %{buffer: buffer} = s 1456 1457 msgs = [msg_close(type: :statement, name: name)] ++ parse_describe_msgs(query, [msg_flush()]) 1458 1459 with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer), 1460 {:ok, s, buffer} <- recv_close(s, status, buffer), 1461 _ = query_delete(s, query), 1462 {:ok, %Query{ref: ref} = query, %{postgres: postgres} = s, buffer} <- 1463 recv_parse_describe(s, status, query, buffer) do 1464 # lock state with unique query reference as not synced 1465 {:ok, query, %{s | postgres: {postgres, ref}, buffer: buffer}} 1466 else 1467 {:error, err, s, buffer} -> 1468 error_flushed(s, status, err, buffer) 1469 1470 {:reload, oids, s, buffer} -> 1471 reload_flushed(s, status, query, oids, buffer) 1472 1473 {:disconnect, _err, _s} = disconnect -> 1474 disconnect 1475 end 1476 end 1477 1478 defp close_parse_describe_flush( 1479 %{postgres: :transaction, buffer: buffer} = s, 1480 %{mode: :savepoint} = status, 1481 query 1482 ) do 1483 %Query{name: name} = query 1484 1485 msgs = 1486 [ 1487 msg_query(statement: "SAVEPOINT postgrex_query"), 1488 msg_close(type: :statement, name: name) 1489 ] ++ parse_describe_msgs(query, [msg_flush()]) 1490 1491 with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer), 1492 {:ok, _, %{buffer: buffer} = s} <- recv_transaction(s, status, buffer), 1493 {:ok, s, buffer} <- recv_close(s, status, buffer), 1494 _ = query_delete(s, query), 1495 {:ok, %Query{ref: ref} = query, %{postgres: postgres} = s, buffer} <- 1496 recv_parse_describe(s, status, query, buffer) do 1497 # lock state with unique query reference as not synced 1498 {:ok, query, %{s | postgres: {postgres, ref}, buffer: buffer}} 1499 else 1500 {:error, err, s, buffer} -> 1501 rollback_flushed(s, status, err, buffer) 1502 1503 {:reload, oids, s, buffer} -> 1504 reload_flushed(s, status, query, oids, buffer) 1505 1506 {:disconnect, _err, _s} = disconnect -> 1507 disconnect 1508 end 1509 end 1510 1511 defp close_parse_describe_flush(%{postgres: postgres} = s, %{mode: :savepoint}, _) 1512 when postgres in [:idle, :error] do 1513 transaction_error(s, postgres) 1514 end 1515 1516 defp parse_describe_msgs(query, tail) do 1517 %Query{name: name, statement: statement, param_oids: param_oids} = query 1518 type_oids = param_oids || [] 1519 1520 [ 1521 msg_parse(name: name, statement: statement, type_oids: type_oids), 1522 msg_describe(type: :statement, name: name) | tail 1523 ] 1524 end 1525 1526 defp recv_parse_describe( 1527 %{types: protocol_types} = s, 1528 status, 1529 %Query{ref: ref, types: query_types} = query, 1530 buffer 1531 ) 1532 when ref == nil or protocol_types != query_types do 1533 with {:ok, s, buffer} <- recv_parse(s, status, buffer), 1534 {:ok, param_oids, result_oids, columns, s, buffer} <- recv_describe(s, status, buffer) do 1535 describe(s, query, param_oids, result_oids, columns, buffer) 1536 else 1537 {:error, %Postgrex.Error{} = error, s, buffer} -> 1538 {:error, %{error | query: query.statement}, s, buffer} 1539 1540 {:disconnect, _, _} = disconnect -> 1541 disconnect 1542 end 1543 end 1544 1545 defp recv_parse_describe(s, status, query, buffer) do 1546 %Query{param_oids: param_oids, result_oids: result_oids, columns: columns} = query 1547 1548 with {:ok, s, buffer} <- recv_parse(s, status, buffer), 1549 {:ok, ^param_oids, ^result_oids, ^columns, s, buffer} <- 1550 recv_describe(s, status, param_oids, buffer) do 1551 query_put(s, query) 1552 {:ok, query, s, buffer} 1553 else 1554 {:ok, ^param_oids, new_result_oids, new_columns, s, buffer} -> 1555 redescribe(s, query, new_result_oids, new_columns, buffer) 1556 1557 {:error, %Postgrex.Error{}, _, _} = error -> 1558 error 1559 1560 {:disconnect, _, _} = disconnect -> 1561 disconnect 1562 end 1563 end 1564 1565 defp recv_parse(s, status, buffer) do 1566 case msg_recv(s, :infinity, buffer) do 1567 {:ok, msg_parse_complete(), buffer} -> 1568 {:ok, s, buffer} 1569 1570 {:ok, msg_error(fields: fields), buffer} -> 1571 {:error, Postgrex.Error.exception(postgres: fields), s, buffer} 1572 1573 {:ok, msg, buffer} -> 1574 {s, status} = handle_msg(s, status, msg) 1575 recv_parse(s, status, buffer) 1576 1577 {:disconnect, _, _} = dis -> 1578 dis 1579 end 1580 end 1581 1582 defp recv_describe(s, status, param_oids \\ [], buffer) do 1583 case msg_recv(s, :infinity, buffer) do 1584 {:ok, msg_no_data(), buffer} -> 1585 {:ok, param_oids, nil, nil, s, buffer} 1586 1587 {:ok, msg_parameter_desc(type_oids: param_oids), buffer} -> 1588 recv_describe(s, status, param_oids, buffer) 1589 1590 {:ok, msg_row_desc(fields: fields), buffer} -> 1591 {result_oids, columns} = columns(fields) 1592 {:ok, param_oids, result_oids, columns, s, buffer} 1593 1594 {:ok, msg_too_many_parameters(len: len, max_len: max), buffer} -> 1595 msg = "postgresql protocol can not handle #{len} parameters, the maximum is #{max}" 1596 err = Postgrex.QueryError.exception(msg) 1597 {:disconnect, err, %{s | buffer: buffer}} 1598 1599 {:ok, msg_error(fields: fields), buffer} -> 1600 {:error, Postgrex.Error.exception(postgres: fields), s, buffer} 1601 1602 {:ok, msg, buffer} -> 1603 {s, status} = handle_msg(s, status, msg) 1604 recv_describe(s, status, param_oids, buffer) 1605 1606 {:disconnect, _, _} = dis -> 1607 dis 1608 end 1609 end 1610 1611 defp describe(s, query, param_oids, result_oids, columns, buffer) do 1612 case describe_params(s, query, param_oids) do 1613 {:ok, query} -> 1614 redescribe(s, query, result_oids, columns, buffer) 1615 1616 {:reload, oids} -> 1617 reload_describe_result(s, oids, result_oids, buffer) 1618 1619 {:error, err} -> 1620 {:disconnect, err, %{s | buffer: buffer}} 1621 end 1622 end 1623 1624 defp redescribe(s, query, result_oids, columns, buffer) do 1625 with {:ok, query} <- describe_result(s, query, result_oids, columns) do 1626 query_put(s, query) 1627 {:ok, query, s, buffer} 1628 else 1629 {:reload, oids} -> 1630 {:reload, oids, s, buffer} 1631 1632 {:error, err} -> 1633 {:disconnect, err, %{s | buffer: buffer}} 1634 end 1635 end 1636 1637 defp describe_params(%{types: types}, query, param_oids) do 1638 with {:ok, param_info} <- fetch_type_info(param_oids, types), 1639 {param_formats, param_types} = Enum.unzip(param_info) do 1640 query = %Query{ 1641 query 1642 | param_oids: param_oids, 1643 param_formats: param_formats, 1644 param_types: param_types 1645 } 1646 1647 {:ok, query} 1648 end 1649 end 1650 1651 defp reload_describe_result(s, param_oids, nil, buffer) do 1652 {:reload, param_oids, s, buffer} 1653 end 1654 1655 defp reload_describe_result(%{types: types} = s, param_oids, result_oids, buffer) do 1656 case fetch_type_info(result_oids, types) do 1657 {:ok, _} -> 1658 {:reload, param_oids, s, buffer} 1659 1660 {:reload, reload_oids} -> 1661 {:reload, MapSet.union(param_oids, reload_oids), s, buffer} 1662 1663 {:error, err} -> 1664 {:disconnect, err, %{s | buffer: buffer}} 1665 end 1666 end 1667 1668 defp describe_result(%{types: types}, query, nil, nil) do 1669 query = %Query{ 1670 query 1671 | ref: make_ref(), 1672 types: types, 1673 columns: nil, 1674 result_oids: nil, 1675 result_formats: [], 1676 result_types: nil 1677 } 1678 1679 {:ok, query} 1680 end 1681 1682 defp describe_result(%{types: types}, query, result_oids, columns) do 1683 with {:ok, result_info} <- fetch_type_info(result_oids, types), 1684 {result_formats, result_types} = Enum.unzip(result_info) do 1685 query = %Query{ 1686 query 1687 | ref: make_ref(), 1688 types: types, 1689 columns: columns, 1690 result_oids: result_oids, 1691 result_formats: result_formats, 1692 result_types: result_types 1693 } 1694 1695 {:ok, query} 1696 end 1697 end 1698 1699 defp error_flushed(s, %{mode: :transaction} = status, err, buffer) do 1700 with :ok <- msg_send(s, [msg_sync()], buffer) do 1701 error_ready(s, status, err, buffer) 1702 else 1703 {:disconnect, _err, _s} = disconnect -> 1704 disconnect 1705 end 1706 end 1707 1708 defp rollback_flushed(s, %{mode: :savepoint} = status, err, buffer) do 1709 stmt = "ROLLBACK TO SAVEPOINT postgrex_query;RELEASE SAVEPOINT postgrex_query" 1710 msgs = [msg_sync(), msg_query(statement: stmt)] 1711 1712 with :ok <- msg_send(s, msgs, buffer), 1713 {:error, err, %{buffer: buffer} = s} <- error_ready(s, status, err, buffer), 1714 {:ok, _, s} <- recv_transaction(s, status, buffer) do 1715 {:error, err, s} 1716 else 1717 {:disconnect, _err, _s} = disconnect -> 1718 disconnect 1719 end 1720 end 1721 1722 defp reload_transaction(s, status, query, oids, buffer) do 1723 %Query{name: name} = query 1724 msgs = [msg_close(type: :statement, name: name), msg_sync()] 1725 1726 with {:ok, _, %{buffer: buffer} = s} <- recv_transaction(s, status, buffer), 1727 :ok <- msg_send(s, msgs, buffer) do 1728 reload_closed(s, status, query, oids, buffer) 1729 else 1730 {:disconnect, _err, _s} = disconnect -> 1731 disconnect 1732 end 1733 end 1734 1735 defp reload_flushed(s, %{mode: :transaction} = status, query, oids, buffer) do 1736 %Query{name: name} = query 1737 msgs = [msg_close(type: :statement, name: name), msg_sync()] 1738 1739 with :ok <- msg_send(s, msgs, buffer) do 1740 reload_closed(s, status, query, oids, buffer) 1741 else 1742 {:disconnect, _err, _s} = disconnect -> 1743 disconnect 1744 end 1745 end 1746 1747 defp reload_flushed(s, %{mode: :savepoint} = status, query, oids, buffer) do 1748 %Query{name: name} = query 1749 stmt = "ROLLBACK TO SAVEPOINT postgrex_query;RELEASE SAVEPOINT postgrex_query" 1750 msgs = [msg_close(type: :statement, name: name), msg_query(statement: stmt)] 1751 1752 with :ok <- msg_send(s, msgs, buffer), 1753 {:ok, s, buffer} <- recv_close(s, status, buffer), 1754 {:ok, _, %{buffer: buffer} = s} <- 1755 recv_transaction(s, status, buffer) do 1756 reload_spawn(%{s | buffer: nil}, status, query, oids, buffer) 1757 else 1758 {:disconnect, _err, _s} = disconnect -> 1759 disconnect 1760 end 1761 end 1762 1763 defp reload_ready(s, status, query, oids, buffer) do 1764 %Query{name: name} = query 1765 msgs = [msg_close(type: :statement, name: name), msg_sync()] 1766 1767 with {:ok, %{buffer: buffer} = s} <- recv_ready(s, status, buffer), 1768 :ok <- msg_send(s, msgs, buffer) do 1769 reload_closed(s, status, query, oids, buffer) 1770 else 1771 {:disconnect, _err, _s} = disconnect -> 1772 disconnect 1773 end 1774 end 1775 1776 defp reload_closed(s, status, query, oids, buffer) do 1777 with {:ok, s, buffer} <- recv_close(s, status, buffer), 1778 {:ok, %{buffer: buffer} = s} <- recv_ready(s, status, buffer) do 1779 reload_spawn(%{s | buffer: nil}, status, query, oids, buffer) 1780 else 1781 {:disconnect, _err, _s} = disconnect -> 1782 disconnect 1783 end 1784 end 1785 1786 defp fetch_type_info(oids, types, infos \\ [], reloads \\ MapSet.new()) 1787 1788 defp fetch_type_info([], _, infos, reloads) do 1789 case MapSet.size(reloads) do 1790 0 -> 1791 {:ok, Enum.reverse(infos)} 1792 1793 _ -> 1794 {:reload, reloads} 1795 end 1796 end 1797 1798 defp fetch_type_info([oid | oids], types, infos, reloads) do 1799 case Postgrex.Types.fetch(oid, types) do 1800 {:ok, info} -> 1801 fetch_type_info(oids, types, [info | infos], reloads) 1802 1803 {:error, %Postgrex.TypeInfo{} = info, mod} -> 1804 msg = Postgrex.Utils.type_msg(info, mod) 1805 {:error, Postgrex.QueryError.exception(msg)} 1806 1807 {:error, nil, _} -> 1808 fetch_type_info(oids, types, infos, MapSet.put(reloads, oid)) 1809 end 1810 end 1811 1812 defp reload_spawn(s, status, query, oids, buffer) do 1813 Logger.debug(fn -> 1814 [ 1815 inspect(query), 1816 " uses unknown oid(s) ", 1817 Enum.join(oids, ", ") 1818 | "forcing us to reload type information from the database. " <> 1819 "This is expected behaviour whenever you migrate your database." 1820 ] 1821 end) 1822 1823 ref = make_ref() 1824 {_, mon} = spawn_monitor(fn -> reload_init(s, status, oids, ref, buffer) end) 1825 1826 receive do 1827 {:DOWN, ^mon, _, _, {^ref, s, buffer}} -> 1828 reload_fetch(s, status, query, oids, buffer) 1829 1830 {:DOWN, ^mon, _, _, _} -> 1831 {:disconnect, type_fetch_error(), %{s | buffer: buffer}} 1832 end 1833 end 1834 1835 defp reload_init(%{types: types} = s, status, oids, exit_ref, buffer) do 1836 with {:ok, server} <- Postgrex.Types.owner(types), 1837 {:lock, lock_ref, ^types} <- TypeServer.fetch(server), 1838 status = Map.put(status, :types_lock, {server, lock_ref}), 1839 acc = {[], MapSet.new(), MapSet.new(), MapSet.new()}, 1840 {:ok, s} <- reload(s, status, oids, acc, buffer) do 1841 %{buffer: buffer} = s 1842 exit({exit_ref, %{s | buffer: nil}, buffer}) 1843 else 1844 :noproc -> 1845 exit(:normal) 1846 1847 :error -> 1848 exit(:normal) 1849 1850 {error, err, _} when error in [:error, :disconnect] -> 1851 raise err 1852 end 1853 end 1854 1855 defp reload(%{types: types} = s, status, oids, acc, buffer) do 1856 %{parameters: parameters} = s 1857 1858 with {:ok, parameters} <- Postgrex.Parameters.fetch(parameters) do 1859 version = Postgrex.Utils.parse_version(parameters["server_version"]) 1860 statement = Types.reload_query(version, Enum.to_list(oids), types) 1861 1862 if statement do 1863 reload_send(s, status, statement, acc, buffer) 1864 else 1865 %{types_lock: {server, ref}} = status 1866 {type_infos, _, _, _} = acc 1867 sorted_infos = Enum.sort_by(type_infos, & &1.oid) 1868 TypeServer.update(server, ref, sorted_infos) 1869 {:ok, %{s | buffer: buffer}} 1870 end 1871 else 1872 :error -> 1873 s = %{s | buffer: buffer} 1874 {:error, %Postgrex.Error{message: "parameters not available"}, s} 1875 end 1876 end 1877 1878 defp reload_send(s, status, statement, acc, buffer) do 1879 msg = msg_query(statement: statement) 1880 1881 case msg_send(s, msg, buffer) do 1882 :ok -> 1883 reload_recv(s, status, acc, buffer) 1884 1885 {:disconnect, err, s} -> 1886 bootstrap_fail(s, err, status) 1887 end 1888 end 1889 1890 defp reload_recv(%{types: types} = s, status, acc, buffer) do 1891 case msg_recv(s, :infinity, buffer) do 1892 {:ok, msg_row_desc(), buffer} -> 1893 reload_recv(s, status, acc, buffer) 1894 1895 {:ok, msg_data_row(values: values), buffer} -> 1896 reload_recv(s, status, reload_row(acc, values, types), buffer) 1897 1898 {:ok, msg_command_complete(), buffer} -> 1899 reload_complete(s, status, acc, buffer) 1900 1901 {:ok, msg_error(fields: fields), buffer} -> 1902 err = Postgrex.Error.exception(postgres: fields) 1903 bootstrap_fail(s, err, status, buffer) 1904 1905 {:ok, msg, buffer} -> 1906 {s, status} = handle_msg(s, status, msg) 1907 reload_recv(s, status, acc, buffer) 1908 1909 {:disconnect, err, s} -> 1910 bootstrap_fail(s, err, status) 1911 end 1912 end 1913 1914 defp reload_row({type_infos, oids, missing, current}, values, types) do 1915 %Postgrex.TypeInfo{oid: oid} = type_info = Types.build_type_info(values) 1916 1917 missing = 1918 missing 1919 |> put_missing_oids(type_info, oids, types) 1920 |> MapSet.delete(oid) 1921 1922 {[type_info | type_infos], MapSet.put(oids, oid), missing, current} 1923 end 1924 1925 defp put_missing_oids(missing, type_info, new, types) do 1926 %Postgrex.TypeInfo{array_elem: array_elem, base_type: base_type, comp_elems: comp_elems} = 1927 type_info 1928 1929 for oid <- [array_elem, base_type | comp_elems], 1930 oid !== 0, 1931 not MapSet.member?(new, oid), 1932 not bootstrapped?(types, oid), 1933 do: oid, 1934 into: missing 1935 end 1936 1937 defp bootstrapped?(types, oid) do 1938 case Postgrex.Types.fetch(oid, types) do 1939 {:ok, _} -> 1940 true 1941 1942 {:error, %Postgrex.TypeInfo{}, _} -> 1943 true 1944 1945 {:error, nil, _} -> 1946 false 1947 end 1948 end 1949 1950 defp reload_complete(s, status, {type_infos, new, missing, prev}, buffer) do 1951 case sync_recv(s, status, buffer) do 1952 {:ok, %{buffer: buffer} = s} -> 1953 s = %{s | buffer: nil} 1954 next = MapSet.delete(missing, prev) 1955 current = MapSet.union(next, prev) 1956 reload(s, status, Enum.to_list(next), {type_infos, new, MapSet.new(), current}, buffer) 1957 1958 {:disconnect, _, _} = error -> 1959 error 1960 end 1961 end 1962 1963 defp reload_fetch(%{types: types} = s, status, query, oids, buffer) do 1964 case oids |> Enum.to_list() |> fetch_type_info(types) do 1965 {:ok, _} -> 1966 reload_prepare(%{s | buffer: buffer}, status, query) 1967 1968 {:error, err} -> 1969 disconnect(s, err, buffer) 1970 1971 {:reload, oids} -> 1972 msg = "oid(s) #{Enum.join(oids, ", ")} lack type information after bootstrap" 1973 disconnect(s, RuntimeError.exception(message: msg), buffer) 1974 end 1975 end 1976 1977 defp reload_prepare(s, %{prepare: prepare} = status, query) do 1978 %Query{name: name} = query 1979 1980 case prepare do 1981 true when name == "" -> 1982 # unnamed queries closed on prepare when not re-using 1983 parse_describe_close(s, status, query) 1984 1985 true -> 1986 # named queries closed when oid not found 1987 parse_describe(s, status, query) 1988 1989 _ -> 1990 # flush awaiting execute or declare 1991 parse_describe_flush(s, status, query) 1992 end 1993 end 1994 1995 ## execute 1996 1997 defp lock_error(s, fun) do 1998 msg = "connection is locked copying to or from the database and can not #{fun} transaction" 1999 2000 {:disconnect, RuntimeError.exception(msg), s} 2001 end 2002 2003 defp lock_error(s, fun, query) do 2004 msg = 2005 "connection is locked copying to or from the database and can not #{fun} #{inspect(query)}" 2006 2007 {:disconnect, RuntimeError.exception(msg), s} 2008 end 2009 2010 defp transaction_error(s, status) do 2011 {:error, DBConnection.TransactionError.exception(status), s} 2012 end 2013 2014 defp handle_prepare_execute(%Query{name: ""} = query, params, opts, s) do 2015 status = new_status(opts) 2016 2017 case parse_describe_flush(s, status, query) do 2018 {:ok, query, s} -> 2019 bind_execute_close(s, status, query, params) 2020 2021 {error, _, _} = other when error in [:error, :disconnect] -> 2022 other 2023 end 2024 end 2025 2026 defp handle_prepare_execute(%Query{} = query, params, opts, s) do 2027 status = new_status(opts) 2028 2029 case close_parse_describe_flush(s, status, query) do 2030 {:ok, query, s} -> 2031 bind_execute(s, status, query, params) 2032 2033 {error, _, _} = other when error in [:error, :disconnect] -> 2034 other 2035 end 2036 end 2037 2038 defp bind_execute_close(s, %{mode: :transaction} = status, query, params) do 2039 %Query{param_formats: pfs, result_formats: rfs, name: name} = query 2040 %{buffer: buffer} = s 2041 2042 msgs = [ 2043 msg_bind( 2044 name_port: "", 2045 name_stat: name, 2046 param_formats: pfs, 2047 params: params, 2048 result_formats: rfs 2049 ), 2050 msg_execute(name_port: "", max_rows: 0), 2051 msg_close(type: :statement, name: name), 2052 msg_sync() 2053 ] 2054 2055 with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer), 2056 {:ok, s, buffer} <- recv_bind(s, status, buffer), 2057 {:ok, result, s, buffer} <- recv_execute(s, status, query, buffer), 2058 {:ok, s, buffer} <- recv_close(s, status, buffer), 2059 {:ok, s} <- recv_ready(s, status, buffer) do 2060 {:ok, query, result, s} 2061 else 2062 {:error, %Postgrex.Error{} = err, s, buffer} -> 2063 error_ready(s, status, err, buffer) 2064 |> maybe_disconnect() 2065 2066 {:disconnect, _err, _s} = disconnect -> 2067 disconnect 2068 end 2069 end 2070 2071 defp bind_execute_close(s, %{mode: :savepoint} = status, query, params) do 2072 # only used for un-named and query will always get closed by release 2073 bind_execute(s, status, query, params) 2074 end 2075 2076 defp bind_execute(s, %{mode: :transaction} = status, query, params) do 2077 %Query{param_formats: pfs, result_formats: rfs, name: name} = query 2078 %{buffer: buffer} = s 2079 2080 msgs = [ 2081 msg_bind( 2082 name_port: "", 2083 name_stat: name, 2084 param_formats: pfs, 2085 params: params, 2086 result_formats: rfs 2087 ), 2088 msg_execute(name_port: "", max_rows: 0), 2089 msg_sync() 2090 ] 2091 2092 with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer), 2093 {:ok, s, buffer} <- recv_bind(s, status, buffer), 2094 {:ok, result, s, buffer} <- recv_execute(s, status, query, buffer), 2095 {:ok, s} <- recv_ready(s, status, buffer) do 2096 {:ok, query, result, s} 2097 else 2098 {:error, %Postgrex.Error{} = err, s, buffer} -> 2099 query_delete_on_error(s, err, query) 2100 2101 error_ready(s, status, err, buffer) 2102 |> maybe_disconnect() 2103 2104 {:disconnect, _err, _s} = disconnect -> 2105 disconnect 2106 end 2107 end 2108 2109 defp bind_execute(s, %{mode: :savepoint} = status, query, params) do 2110 %Query{param_formats: pfs, result_formats: rfs, name: name} = query 2111 %{buffer: buffer} = s 2112 2113 msgs = [ 2114 msg_bind( 2115 name_port: "", 2116 name_stat: name, 2117 param_formats: pfs, 2118 params: params, 2119 result_formats: rfs 2120 ), 2121 msg_execute(name_port: "", max_rows: 0), 2122 msg_query(statement: "RELEASE SAVEPOINT postgrex_query") 2123 ] 2124 2125 with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer), 2126 {:ok, s, buffer} <- recv_bind(s, status, buffer), 2127 {:ok, result, s, buffer} <- recv_execute(s, status, query, buffer), 2128 {:ok, _, s} <- recv_transaction(s, status, buffer) do 2129 {:ok, query, result, s} 2130 else 2131 {:error, %Postgrex.Error{} = err, s, buffer} -> 2132 query_delete_on_error(s, err, query) 2133 rollback_flushed(s, status, err, buffer) 2134 2135 {:disconnect, _err, _s} = disconnect -> 2136 disconnect 2137 end 2138 end 2139 2140 defp maybe_disconnect({:error, _, %{disconnect_on_error_codes: []}} = result), do: result 2141 2142 defp maybe_disconnect( 2143 {:error, %Postgrex.Error{postgres: %{code: code}} = error, 2144 %{disconnect_on_error_codes: codes} = state} = result 2145 ) do 2146 if code in codes do 2147 {:disconnect, error, state} 2148 else 2149 result 2150 end 2151 end 2152 2153 defp maybe_disconnect(other), do: other 2154 2155 defp rebind_execute(s, %{mode: :transaction} = status, query, params) do 2156 # using a cached query is same as using it for the first time when don't 2157 # need to setup savepoints 2158 bind_execute(s, status, query, params) 2159 end 2160 2161 defp rebind_execute(%{postgres: :transaction} = s, %{mode: :savepoint} = status, query, params) do 2162 # using a named cache query so savepoint/simple query does not unprepare 2163 %Query{param_formats: pfs, result_formats: rfs, name: name} = query 2164 %{buffer: buffer} = s 2165 2166 msgs = [ 2167 msg_query(statement: "SAVEPOINT postgrex_query"), 2168 msg_bind( 2169 name_port: "", 2170 name_stat: name, 2171 param_formats: pfs, 2172 params: params, 2173 result_formats: rfs 2174 ), 2175 msg_execute(name_port: "", max_rows: 0), 2176 msg_query(statement: "RELEASE SAVEPOINT postgrex_query") 2177 ] 2178 2179 with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer), 2180 {:ok, _, %{buffer: buffer} = s} <- recv_transaction(s, status, buffer), 2181 {:ok, s, buffer} <- recv_bind(%{s | buffer: nil}, status, buffer), 2182 {:ok, result, s, buffer} <- recv_execute(s, status, query, buffer), 2183 {:ok, _, s} <- recv_transaction(s, status, buffer) do 2184 {:ok, query, result, s} 2185 else 2186 {:error, %Postgrex.Error{} = err, s, buffer} -> 2187 query_delete_on_error(s, err, query) 2188 rollback_flushed(s, status, err, buffer) 2189 2190 {:disconnect, _err, _s} = disconnect -> 2191 disconnect 2192 end 2193 end 2194 2195 defp rebind_execute(%{postgres: postgres} = s, %{mode: :savepoint}, _, _) 2196 when postgres in [:idle, :error] do 2197 transaction_error(s, postgres) 2198 end 2199 2200 defp recv_bind(s, status, buffer) do 2201 case msg_recv(s, :infinity, buffer) do 2202 {:ok, msg_bind_complete(), buffer} -> 2203 {:ok, s, buffer} 2204 2205 {:ok, msg_error(fields: fields), buffer} -> 2206 {:error, Postgrex.Error.exception(postgres: fields), s, buffer} 2207 2208 {:ok, msg, buffer} -> 2209 {s, status} = handle_msg(s, status, msg) 2210 recv_bind(s, status, buffer) 2211 2212 {:disconnect, _, _} = dis -> 2213 dis 2214 end 2215 end 2216 2217 defp recv_execute(s, status, query, rows \\ [], buffer) do 2218 %Query{result_types: types} = query 2219 2220 case rows_recv(s, types, rows, buffer) do 2221 {:ok, msg_command_complete(tag: tag), rows, buffer} -> 2222 {:ok, done(s, status, query, rows, tag), s, buffer} 2223 2224 {:ok, msg_error(fields: fields), _, buffer} -> 2225 {:error, Postgrex.Error.exception(postgres: fields), s, buffer} 2226 2227 {:ok, msg_empty_query(), [], buffer} -> 2228 {:ok, done(s, status, query, nil, nil), s, buffer} 2229 2230 {:ok, msg_copy_in_response(), [], buffer} -> 2231 copy_in_disconnect(s, query, buffer) 2232 2233 {:ok, msg_copy_out_response(), [], buffer} -> 2234 recv_copy_out(s, status, query, buffer) 2235 2236 {:ok, msg_copy_both_response(), [], buffer} -> 2237 copy_both_disconnect(s, query, buffer) 2238 2239 {:ok, msg, rows, buffer} -> 2240 {s, status} = handle_msg(s, status, msg) 2241 recv_execute(s, status, query, rows, buffer) 2242 2243 {:disconnect, _, _} = dis -> 2244 dis 2245 end 2246 end 2247 2248 defp copy_in_disconnect(s, query, buffer) do 2249 msg = "query #{inspect(query)} is trying to copy in but no copy data to send" 2250 {:disconnect, RuntimeError.exception(msg), %{s | buffer: buffer}} 2251 end 2252 2253 defp copy_both_disconnect(s, query, buffer) do 2254 msg = "query #{inspect(query)} is trying to copy both ways but it is not supported" 2255 {:disconnect, RuntimeError.exception(msg), %{s | buffer: buffer}} 2256 end 2257 2258 defp recv_copy_out(s, status, query, acc \\ [], buffer) do 2259 case msg_recv(s, :infinity, buffer) do 2260 {:ok, msg_copy_data(data: data), buffer} -> 2261 recv_copy_out(s, status, query, [data | acc], buffer) 2262 2263 {:ok, msg_copy_done(), buffer} -> 2264 recv_copy_out(s, status, query, acc, buffer) 2265 2266 {:ok, msg_command_complete(tag: tag), buffer} -> 2267 {:ok, done(s, status, query, acc, tag), s, buffer} 2268 2269 {:ok, msg_error(fields: fields), buffer} -> 2270 {:error, Postgrex.Error.exception(postgres: fields), s, buffer} 2271 2272 {:ok, msg, buffer} -> 2273 {s, status} = handle_msg(s, status, msg) 2274 recv_copy_out(s, status, query, acc, buffer) 2275 2276 {:disconnect, _, _} = dis -> 2277 dis 2278 end 2279 end 2280 2281 defp make_portal() do 2282 System.unique_integer([:positive]) 2283 |> Integer.to_string(36) 2284 end 2285 2286 defp handle_bind(%Query{ref: ref} = query, params, res, opts, %{postgres: {_, ref}} = s) do 2287 bind(s, new_status(opts), query, params, res) 2288 end 2289 2290 defp handle_bind(query, _, _, _, %{postgres: {_, _}} = s) do 2291 lock_error(s, :bind, query) 2292 end 2293 2294 defp handle_bind(query, params, res, opts, s) do 2295 if query_member?(s, query) do 2296 rebind(s, new_status(opts), query, params, res) 2297 else 2298 handle_prepare_bind(query, params, res, opts, s) 2299 end 2300 end 2301 2302 defp handle_prepare_bind(%Query{name: ""} = query, params, res, opts, s) do 2303 status = new_status(opts) 2304 2305 case parse_describe_flush(s, status, query) do 2306 {:ok, query, s} -> 2307 bind(s, status, query, params, res) 2308 2309 {error, _, _} = other when error in [:error, :disconnect] -> 2310 other 2311 end 2312 end 2313 2314 defp handle_prepare_bind(%Query{} = query, params, res, opts, s) do 2315 status = new_status(opts) 2316 2317 case close_parse_describe_flush(s, status, query) do 2318 {:ok, query, s} -> 2319 bind(s, status, query, params, res) 2320 2321 {error, _, _} = other when error in [:error, :disconnect] -> 2322 other 2323 end 2324 end 2325 2326 defp bind(s, %{mode: :transaction} = status, query, params, cursor) do 2327 %Query{param_formats: pfs, result_formats: rfs, name: name} = query 2328 %{portal: portal} = cursor 2329 %{buffer: buffer} = s 2330 2331 msgs = [ 2332 msg_bind( 2333 name_port: portal, 2334 name_stat: name, 2335 param_formats: pfs, 2336 params: params, 2337 result_formats: rfs 2338 ), 2339 msg_sync() 2340 ] 2341 2342 with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer), 2343 {:ok, s, buffer} <- recv_bind(s, status, buffer), 2344 {:ok, s} <- recv_ready(s, status, buffer) do 2345 {:ok, query, cursor, s} 2346 else 2347 {:error, %Postgrex.Error{} = err, s, buffer} -> 2348 error_ready(s, status, err, buffer) 2349 2350 {:disconnect, _err, _s} = disconnect -> 2351 disconnect 2352 end 2353 end 2354 2355 defp bind(s, %{mode: :savepoint} = status, query, params, cursor) do 2356 %Query{param_formats: pfs, result_formats: rfs, name: name} = query 2357 %{portal: portal} = cursor 2358 %{buffer: buffer} = s 2359 2360 msgs = [ 2361 msg_bind( 2362 name_port: portal, 2363 name_stat: name, 2364 param_formats: pfs, 2365 params: params, 2366 result_formats: rfs 2367 ), 2368 msg_query(statement: "RELEASE SAVEPOINT postgrex_query") 2369 ] 2370 2371 with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer), 2372 {:ok, s, buffer} <- recv_bind(s, status, buffer), 2373 {:ok, _, s} <- recv_transaction(s, status, buffer) do 2374 {:ok, query, cursor, s} 2375 else 2376 {:error, %Postgrex.Error{} = err, s, buffer} -> 2377 rollback_flushed(s, status, err, buffer) 2378 2379 {:disconnect, _err, _s} = disconnect -> 2380 disconnect 2381 end 2382 end 2383 2384 defp rebind(s, %{mode: :transaction} = status, query, params, cursor) do 2385 # using a cached query is same as using it for the first time when don't 2386 # need to setup savepoints 2387 bind(s, status, query, params, cursor) 2388 end 2389 2390 defp rebind(%{postgres: :transaction} = s, %{mode: :savepoint} = status, query, params, cursor) do 2391 %Query{param_formats: pfs, result_formats: rfs, name: name} = query 2392 %{portal: portal} = cursor 2393 %{buffer: buffer} = s 2394 2395 msgs = [ 2396 msg_query(statement: "SAVEPOINT postgrex_query"), 2397 msg_bind( 2398 name_port: portal, 2399 name_stat: name, 2400 param_formats: pfs, 2401 params: params, 2402 result_formats: rfs 2403 ), 2404 msg_query(statement: "RELEASE SAVEPOINT postgrex_query") 2405 ] 2406 2407 with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer), 2408 {:ok, _, %{buffer: buffer} = s} <- recv_transaction(s, status, buffer), 2409 {:ok, s, buffer} <- recv_bind(s, status, buffer), 2410 {:ok, _, s} <- recv_transaction(s, status, buffer) do 2411 {:ok, query, cursor, s} 2412 else 2413 {:error, %Postgrex.Error{} = err, s, buffer} -> 2414 rollback_flushed(s, status, err, buffer) 2415 2416 {:disconnect, _err, _s} = disconnect -> 2417 disconnect 2418 end 2419 end 2420 2421 defp rebind(%{postgres: postgres} = s, %{mode: :savepoint}, _, _, _) 2422 when postgres in [:idle, :error] do 2423 transaction_error(s, postgres) 2424 end 2425 2426 defp execute(s, %{mode: :transaction} = status, query, cursor, max_rows) do 2427 %Cursor{portal: portal} = cursor 2428 msgs = [msg_execute(name_port: portal, max_rows: max_rows), msg_sync()] 2429 %{buffer: buffer} = s 2430 2431 with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer), 2432 {ok, result, s, buffer} when ok in [:cont, :halt] <- 2433 recv_execute(s, status, query, cursor, max_rows, [], buffer), 2434 {:ok, s} <- recv_ready(s, status, buffer) do 2435 {ok, result, s} 2436 else 2437 {:copy_out, result, s} -> 2438 {:cont, result, s} 2439 2440 {:error, %Postgrex.Error{} = err, s, buffer} -> 2441 error_ready(s, status, err, buffer) 2442 2443 {:disconnect, _err, _s} = disconnect -> 2444 disconnect 2445 end 2446 end 2447 2448 defp execute( 2449 %{postgres: :transaction} = s, 2450 %{mode: :savepoint} = status, 2451 query, 2452 cursor, 2453 max_rows 2454 ) do 2455 %Cursor{portal: portal} = cursor 2456 %{buffer: buffer} = s 2457 2458 msgs = [ 2459 msg_query(statement: "SAVEPOINT postgrex_query"), 2460 msg_execute(name_port: portal, max_rows: max_rows), 2461 msg_query(statement: "RELEASE SAVEPOINT postgrex_query") 2462 ] 2463 2464 with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer), 2465 {:ok, _, %{buffer: buffer} = s} <- recv_transaction(s, status, buffer), 2466 {ok, result, s, buffer} when ok in [:cont, :halt] <- 2467 recv_execute(s, status, query, cursor, max_rows, [], buffer), 2468 {:ok, _, s} <- recv_transaction(s, status, buffer) do 2469 {ok, result, s} 2470 else 2471 {:copy_out, result, s} -> 2472 {:cont, result, s} 2473 2474 {:error, %Postgrex.Error{} = err, s, buffer} -> 2475 rollback_flushed(s, status, err, buffer) 2476 2477 {:disconnect, _err, _s} = disconnect -> 2478 disconnect 2479 end 2480 end 2481 2482 defp recv_execute(s, status, query, cursor, max_rows, rows, buffer) do 2483 %Query{result_types: types} = query 2484 2485 case rows_recv(s, types, rows, buffer) do 2486 {:ok, msg_command_complete(tag: tag), rows, buffer} -> 2487 {:halt, halt(s, status, query, rows, tag), s, buffer} 2488 2489 {:ok, msg_portal_suspend(), rows, buffer} -> 2490 {:cont, done(s, status, query, rows, :stream, max_rows), s, buffer} 2491 2492 {:ok, msg_error(fields: fields), _, buffer} -> 2493 {:error, Postgrex.Error.exception(postgres: fields), s, buffer} 2494 2495 {:ok, msg_empty_query(), [], buffer} -> 2496 {:halt, done(s, status, query, nil, nil), s, buffer} 2497 2498 {:ok, msg_copy_in_response(), [], buffer} -> 2499 copy_in_disconnect(s, query, buffer) 2500 2501 {:ok, msg_copy_out_response(), [], buffer} -> 2502 %{postgres: postgres} = s 2503 %Cursor{ref: ref} = cursor 2504 s = %{s | postgres: {postgres, ref}} 2505 recv_copy_out(s, status, query, max_rows, [], buffer) 2506 2507 {:ok, msg_copy_both_response(), [], buffer} -> 2508 copy_both_disconnect(s, query, buffer) 2509 2510 {:ok, msg, rows, buffer} -> 2511 {s, status} = handle_msg(s, status, msg) 2512 recv_execute(s, status, query, cursor, max_rows, rows, buffer) 2513 2514 {:disconnect, _, _} = dis -> 2515 dis 2516 end 2517 end 2518 2519 defp fetch_copy_out(%{buffer: buffer} = s, %{mode: :transaction} = status, query, max_rows) do 2520 s = %{s | buffer: nil} 2521 2522 with {:halt, result, s, buffer} <- recv_copy_out(s, status, query, max_rows, [], buffer), 2523 {:ok, s} <- recv_ready(s, status, buffer) do 2524 {:halt, result, s} 2525 else 2526 {:copy_out, result, s} -> 2527 {:cont, result, s} 2528 2529 {:error, err, s, buffer} -> 2530 error_ready(s, status, err, buffer) 2531 2532 {:disconnect, _err, _s} = disconnect -> 2533 disconnect 2534 end 2535 end 2536 2537 defp fetch_copy_out(%{buffer: buffer} = s, %{mode: :savepoint} = status, query, max_rows) do 2538 s = %{s | buffer: nil} 2539 2540 with {:halt, result, s, buffer} <- recv_copy_out(s, status, query, max_rows, [], buffer), 2541 {:ok, _, s} <- recv_transaction(s, status, buffer) do 2542 {:halt, result, s} 2543 else 2544 {:copy_out, result, s} -> 2545 {:cont, result, s} 2546 2547 {:error, err, s, buffer} -> 2548 rollback_flushed(s, status, err, buffer) 2549 2550 {:disconnect, _err, _s} = disconnect -> 2551 disconnect 2552 end 2553 end 2554 2555 defp recv_copy_out(s, status, query, max_rows, [], buffer) do 2556 max_rows = if max_rows == 0, do: :infinity, else: max_rows 2557 recv_copy_out(s, status, query, max_rows, [], 0, buffer) 2558 end 2559 2560 defp recv_copy_out(s, status, query, max_rows, acc, max_rows, buffer) do 2561 s = %{s | buffer: buffer} 2562 {:copy_out, done(s, status, query, acc, :copy_stream, max_rows), s} 2563 end 2564 2565 defp recv_copy_out(s, status, query, max_rows, acc, nrows, buffer) do 2566 case msg_recv(s, :infinity, buffer) do 2567 {:ok, msg_copy_data(data: data), buffer} -> 2568 recv_copy_out(s, status, query, max_rows, [data | acc], nrows + 1, buffer) 2569 2570 {:ok, msg_copy_done(), buffer} -> 2571 recv_copy_out(s, status, query, max_rows, acc, nrows, buffer) 2572 2573 {:ok, msg_command_complete(tag: tag), buffer} -> 2574 {:halt, halt(s, status, query, acc, tag), s, buffer} 2575 2576 {:ok, msg_error(fields: fields), buffer} -> 2577 {:error, Postgrex.Error.exception(postgres: fields), s, buffer} 2578 2579 {:ok, msg, buffer} -> 2580 {s, status} = handle_msg(s, status, msg) 2581 recv_copy_out(s, status, query, max_rows, acc, nrows, buffer) 2582 2583 {:disconnect, _, _} = dis -> 2584 dis 2585 end 2586 end 2587 2588 defp copy_in_data(s, %{mode: :transaction}, copy, data) do 2589 %Copy{portal: portal, ref: ref, query: query} = copy 2590 %{postgres: postgres, buffer: buffer} = s 2591 msgs = [msg_execute(name_port: portal, max_rows: 0), data] 2592 2593 case msg_send(s, msgs, buffer) do 2594 :ok -> 2595 {:ok, query, copied(s), %{s | postgres: {postgres, ref}}} 2596 2597 {:disconnect, _err, _s} = disconnect -> 2598 disconnect 2599 end 2600 end 2601 2602 defp copy_in_data(s, %{mode: :savepoint} = status, copy, data) do 2603 %Copy{portal: portal, ref: ref, query: query} = copy 2604 %{buffer: buffer} = s 2605 2606 msgs = [ 2607 msg_query(statement: "SAVEPOINT postgrex_query"), 2608 msg_execute(name_port: portal, max_rows: 0), 2609 data 2610 ] 2611 2612 with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer), 2613 {:ok, _, %{postgres: postgres} = s} <- recv_transaction(s, status, buffer) do 2614 {:ok, query, copied(s), %{s | postgres: {postgres, ref}}} 2615 else 2616 {:disconnect, _err, _s} = disconnect -> 2617 disconnect 2618 end 2619 end 2620 2621 defp copy_in_data(%{sock: {mod, sock}} = s, %{query: query}, data) do 2622 case mod.send(sock, data) do 2623 :ok -> 2624 {:ok, query, copied(s), s} 2625 2626 {:error, reason} -> 2627 disconnect(s, tag(mod), "send", reason) 2628 end 2629 end 2630 2631 defp copied(%{connection_id: connection_id}) do 2632 %Postgrex.Result{ 2633 command: :copy_stream, 2634 num_rows: :copy_stream, 2635 rows: nil, 2636 columns: nil, 2637 connection_id: connection_id 2638 } 2639 end 2640 2641 defp copy_in_done(s, %{mode: :transaction} = status, %Copy{query: query}) do 2642 %{buffer: buffer} = s 2643 msgs = [msg_copy_done(), msg_sync()] 2644 2645 with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer), 2646 {:ok, result, s, buffer} <- recv_copy_in(s, status, query, buffer), 2647 {:ok, s} <- recv_ready(s, status, buffer) do 2648 {:ok, query, result, s} 2649 else 2650 {:error, %Postgrex.Error{} = err, s, buffer} -> 2651 error_ready(s, status, err, buffer) 2652 2653 {:disconnect, _err, _s} = disconnect -> 2654 disconnect 2655 end 2656 end 2657 2658 defp copy_in_done(s, %{mode: :savepoint} = status, %Copy{query: query}) do 2659 %{buffer: buffer} = s 2660 msgs = [msg_copy_done(), msg_query(statement: "RELEASE SAVEPOINT postgrex_query")] 2661 2662 with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer), 2663 {:ok, result, s, buffer} <- recv_copy_in(s, status, query, buffer), 2664 {:ok, _, s} <- recv_transaction(s, status, buffer) do 2665 {:ok, query, result, s} 2666 else 2667 {:error, %Postgrex.Error{} = err, s, buffer} -> 2668 rollback_flushed(s, status, err, buffer) 2669 2670 {:disconnect, _err, _s} = disconnect -> 2671 disconnect 2672 end 2673 end 2674 2675 defp recv_copy_in(s, status, query, buffer) do 2676 %Query{result_types: types} = query 2677 2678 case rows_recv(s, types, [], buffer) do 2679 {:ok, msg_copy_in_response(), [], buffer} -> 2680 recv_copy_in_done(s, status, query, buffer) 2681 2682 {:ok, msg_command_complete(tag: tag), rows, buffer} -> 2683 {:ok, done(s, status, query, rows, tag), s, buffer} 2684 2685 {:ok, msg_empty_query(), [], buffer} -> 2686 {:ok, done(s, status, query, nil, nil), s, buffer} 2687 2688 {:ok, msg_error(fields: fields), _, buffer} -> 2689 {:error, Postgrex.Error.exception(postgres: fields), s, buffer} 2690 2691 {:ok, msg_copy_out_response(), [], buffer} -> 2692 recv_copy_out(s, status, query, buffer) 2693 2694 {:ok, msg_copy_both_response(), [], buffer} -> 2695 copy_both_disconnect(s, query, buffer) 2696 2697 {:ok, msg, [], buffer} -> 2698 {s, status} = handle_msg(s, status, msg) 2699 recv_copy_in(s, status, query, buffer) 2700 2701 {:ok, msg, [_ | _] = rows, buffer} -> 2702 {s, status} = handle_msg(s, status, msg) 2703 recv_execute(s, status, query, rows, buffer) 2704 2705 {:disconnect, _, _} = dis -> 2706 dis 2707 end 2708 end 2709 2710 defp recv_copy_in_done(s, status, query, buffer) do 2711 case msg_recv(s, :infinity, buffer) do 2712 {:ok, msg_command_complete(tag: tag), buffer} -> 2713 {:ok, done(s, status, query, nil, tag), s, buffer} 2714 2715 {:ok, msg_error(fields: fields), buffer} -> 2716 {:error, Postgrex.Error.exception(postgres: fields), s, buffer} 2717 2718 {:ok, msg, buffer} -> 2719 {s, status} = handle_msg(s, status, msg) 2720 recv_copy_in_done(s, status, query, buffer) 2721 2722 {:disconnect, _, _} = dis -> 2723 dis 2724 end 2725 end 2726 2727 ## close 2728 2729 defp copy_out_done(s, %{mode: :transaction} = status, query) do 2730 %{buffer: buffer} = s 2731 s = %{s | buffer: nil} 2732 2733 with {:halt, result, s, buffer} <- recv_copy_out(s, status, query, :infinity, [], buffer), 2734 {:ok, s} <- recv_ready(s, status, buffer) do 2735 {:ok, result, s} 2736 else 2737 {:error, %Postgrex.Error{} = err, s, buffer} -> 2738 error_ready(s, status, err, buffer) 2739 2740 {:disconnect, _err, _s} = disconnect -> 2741 disconnect 2742 end 2743 end 2744 2745 defp copy_out_done(s, %{mode: :savepoint} = status, query) do 2746 %{buffer: buffer} = s 2747 s = %{s | buffer: nil} 2748 2749 with {:halt, result, s, buffer} <- recv_copy_out(s, status, query, :infinity, [], buffer), 2750 {:ok, _, s} <- recv_transaction(s, status, buffer) do 2751 {:ok, result, s} 2752 else 2753 {:error, %Postgrex.Error{} = err, s, buffer} -> 2754 rollback_flushed(s, status, err, buffer) 2755 2756 {:disconnect, _err, _s} = disconnect -> 2757 disconnect 2758 end 2759 end 2760 2761 defp flushed_close(s, %{mode: :transaction} = status, query) do 2762 # closing query without transaction if not flushed is the same as if doing 2763 # with preparing immediately before. 2764 close(s, status, query) 2765 end 2766 2767 defp flushed_close(s, %{mode: :savepoint} = status, query) do 2768 %Query{name: name} = query 2769 %{buffer: buffer} = s 2770 stmt = "ROLLBACK TO SAVEPOINT postgrex_query;RELEASE SAVEPOINT postgrex_query" 2771 msgs = [msg_close(type: :statement, name: name), msg_query(statement: stmt)] 2772 2773 with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer), 2774 {:ok, s, buffer} <- recv_close(s, status, buffer), 2775 _ = query_delete(s, query), 2776 {:ok, _, s} <- recv_transaction(s, status, buffer) do 2777 %{connection_id: connection_id} = s 2778 {:ok, %Postgrex.Result{command: :close, connection_id: connection_id}, s} 2779 else 2780 {:disconnect, _err, _s} = disconnect -> 2781 disconnect 2782 end 2783 end 2784 2785 defp close(s, status, %Query{name: name} = query) do 2786 %{buffer: buffer} = s 2787 msgs = [msg_close(type: :statement, name: name), msg_sync()] 2788 2789 with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer), 2790 {:ok, s, buffer} <- recv_close(s, status, buffer), 2791 _ = query_delete(s, query), 2792 {:ok, s} <- recv_ready(s, status, buffer) do 2793 %{connection_id: connection_id} = s 2794 {:ok, %Postgrex.Result{command: :close, connection_id: connection_id}, s} 2795 else 2796 {:disconnect, _err, _s} = disconnect -> 2797 disconnect 2798 end 2799 end 2800 2801 defp close(s, status, %{portal: portal}) do 2802 %{buffer: buffer} = s 2803 msgs = [msg_close(type: :portal, name: portal), msg_sync()] 2804 2805 with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer), 2806 {:ok, s, buffer} <- recv_close(s, status, buffer), 2807 {:ok, s} <- recv_ready(s, status, buffer) do 2808 %{connection_id: connection_id} = s 2809 {:ok, %Postgrex.Result{command: :close, connection_id: connection_id}, s} 2810 else 2811 {:disconnect, _err, _s} = disconnect -> 2812 disconnect 2813 end 2814 end 2815 2816 ## ping 2817 2818 defp ping_recv(s, status, old_buffer, buffer) do 2819 %{ping_timeout: timeout, postgres: postgres, transactions: transactions} = s 2820 2821 case msg_recv(s, timeout, buffer) do 2822 {:ok, msg_ready(status: :idle), buffer} 2823 when postgres == :transaction and transactions == :strict -> 2824 sync_error(s, :idle, buffer) 2825 2826 {:ok, msg_ready(status: :transaction), buffer} 2827 when postgres == :idle and transactions == :strict -> 2828 sync_error(s, :transaction, buffer) 2829 2830 {:ok, msg_ready(status: :error), buffer} 2831 when postgres == :idle and transactions == :strict -> 2832 sync_error(s, :error, buffer) 2833 2834 {:ok, msg_ready(status: postgres), buffer} when old_buffer == :active_once -> 2835 activate(%{s | postgres: postgres}, buffer) 2836 2837 {:ok, msg_ready(status: postgres), buffer} when is_nil(old_buffer) -> 2838 {:ok, %{s | postgres: postgres, buffer: buffer}} 2839 2840 {:ok, msg_error(fields: fields), buffer} -> 2841 disconnect(s, Postgrex.Error.exception(postgres: fields), buffer) 2842 2843 {:ok, msg, buffer} -> 2844 {s, status} = handle_msg(s, status, msg) 2845 ping_recv(s, status, old_buffer, buffer) 2846 2847 {:disconnect, _, _} = dis -> 2848 dis 2849 end 2850 end 2851 2852 ## transaction 2853 2854 defp handle_transaction(statement, opts, %{buffer: buffer} = s) do 2855 status = new_status(opts, mode: :transaction) 2856 msgs = [msg_query(statement: statement)] 2857 2858 case msg_send(%{s | buffer: nil}, msgs, buffer) do 2859 :ok -> 2860 recv_transaction(s, status, buffer) 2861 2862 {:disconnect, err, s} -> 2863 {:disconnect, err, s} 2864 2865 {:error, %Postgrex.Error{} = err, s, buffer} -> 2866 error_ready(s, status, err, buffer) 2867 end 2868 end 2869 2870 defp recv_transaction(s, status, tags \\ [], buffer) do 2871 case msg_recv(s, :infinity, buffer) do 2872 {:ok, msg_command_complete(tag: tag), buffer} -> 2873 recv_transaction(s, status, [tag | tags], buffer) 2874 2875 {:ok, msg_error(fields: fields), buffer} -> 2876 err = Postgrex.Error.exception(postgres: fields) 2877 {:disconnect, err, %{s | buffer: buffer}} 2878 2879 {:ok, msg_ready(status: postgres), buffer} -> 2880 s = %{s | postgres: postgres, buffer: buffer} 2881 {:ok, done(s, status, Enum.reverse(tags)), s} 2882 2883 {:ok, msg, buffer} -> 2884 {s, status} = handle_msg(s, status, msg) 2885 recv_transaction(s, status, tags, buffer) 2886 2887 {:disconnect, _, _} = dis -> 2888 dis 2889 end 2890 end 2891 2892 defp recv_close(s, status, buffer) do 2893 case msg_recv(s, :infinity, buffer) do 2894 {:ok, msg_close_complete(), buffer} -> 2895 {:ok, s, buffer} 2896 2897 {:ok, msg_error(fields: fields), buffer} -> 2898 err = Postgrex.Error.exception(postgres: fields) 2899 {:disconnect, err, %{s | buffer: buffer}} 2900 2901 {:ok, msg, buffer} -> 2902 {s, status} = handle_msg(s, status, msg) 2903 recv_close(s, status, buffer) 2904 2905 {:disconnect, _, _} = dis -> 2906 dis 2907 end 2908 end 2909 2910 defp recv_ready(%{transactions: :naive} = s, status, buffer) do 2911 case msg_recv(s, :infinity, buffer) do 2912 {:ok, msg_ready(status: postgres), buffer} -> 2913 {:ok, %{s | postgres: postgres, buffer: buffer}} 2914 2915 {:ok, msg_error(fields: fields), buffer} -> 2916 err = Postgrex.Error.exception(postgres: fields) 2917 {:disconnect, err, %{s | buffer: buffer}} 2918 2919 {:ok, msg, buffer} -> 2920 {s, status} = handle_msg(s, status, msg) 2921 recv_ready(s, status, buffer) 2922 2923 {:disconnect, _, _} = dis -> 2924 dis 2925 end 2926 end 2927 2928 defp recv_ready(%{transactions: :strict, postgres: {postgres, _}} = s, status, buffer) do 2929 recv_strict_ready(s, status, postgres, buffer) 2930 end 2931 2932 defp recv_ready(%{transactions: :strict, postgres: postgres} = s, status, buffer) do 2933 recv_strict_ready(s, status, postgres, buffer) 2934 end 2935 2936 defp recv_strict_ready(s, status, expected, buffer) do 2937 case msg_recv(s, :infinity, buffer) do 2938 {:ok, msg_ready(status: ^expected), buffer} -> 2939 {:ok, %{s | postgres: expected, buffer: buffer}} 2940 2941 {:ok, msg_ready(status: :error), buffer} when expected == :transaction -> 2942 {:ok, %{s | postgres: :error, buffer: buffer}} 2943 2944 {:ok, msg_ready(status: unexpected), buffer} -> 2945 sync_error(s, unexpected, buffer) 2946 2947 {:ok, msg, buffer} -> 2948 {s, status} = handle_msg(s, status, msg) 2949 recv_strict_ready(s, status, expected, buffer) 2950 2951 {:disconnect, _, _} = dis -> 2952 dis 2953 end 2954 end 2955 2956 defp error_ready(s, status, %Postgrex.Error{} = err, buffer) do 2957 case recv_ready(s, status, buffer) do 2958 {:ok, s} -> 2959 %{connection_id: connection_id} = s 2960 {:error, %Postgrex.Error{err | connection_id: connection_id}, s} 2961 2962 {:disconnect, _, _} = disconnect -> 2963 disconnect 2964 end 2965 end 2966 2967 defp error_ready(s, status, err, buffer) do 2968 case recv_ready(s, status, buffer) do 2969 {:ok, s} -> {:error, err, s} 2970 {:disconnect, _, _} = disconnect -> disconnect 2971 end 2972 end 2973 2974 defp done(%{connection_id: connection_id}, %{messages: messages}, tags) do 2975 {command, _} = decode_tags(tags) 2976 2977 %Postgrex.Result{ 2978 command: command, 2979 num_rows: nil, 2980 rows: nil, 2981 columns: nil, 2982 connection_id: connection_id, 2983 messages: messages 2984 } 2985 end 2986 2987 defp done(s, status, %Query{} = query, rows, tag) do 2988 {command, nrows} = if tag, do: decode_tag(tag), else: {nil, nil} 2989 done(s, status, query, rows, command, nrows) 2990 end 2991 2992 defp done(%{connection_id: connection_id}, %{messages: messages}, columns, rows, tags) do 2993 {command, _} = decode_tags(tags) 2994 2995 %Postgrex.Result{ 2996 command: command, 2997 num_rows: length(rows), 2998 rows: rows, 2999 columns: columns, 3000 connection_id: connection_id, 3001 messages: messages 3002 } 3003 end 3004 3005 defp done(s, status, query, rows, command, nrows) do 3006 %{connection_id: connection_id} = s 3007 %{messages: messages} = status 3008 %Query{columns: cols} = query 3009 3010 # Fix for PostgreSQL 8.4 (doesn't include number of selected rows in tag) 3011 nrows = if is_nil(nrows) and command == :select, do: length(rows), else: nrows 3012 rows = if is_nil(cols) and rows == [] and command != :copy, do: nil, else: rows 3013 3014 %Postgrex.Result{ 3015 command: command, 3016 num_rows: nrows || 0, 3017 rows: rows, 3018 columns: cols, 3019 connection_id: connection_id, 3020 messages: messages 3021 } 3022 end 3023 3024 defp halt(s, status, query, rows, tag) do 3025 case done(s, status, query, rows, tag) do 3026 %Postgrex.Result{rows: rows} = result when is_list(rows) -> 3027 # shows rows for all streamed results but we only want for last chunk. 3028 %Postgrex.Result{result | num_rows: length(rows)} 3029 3030 result -> 3031 result 3032 end 3033 end 3034 3035 ## data 3036 3037 defp handle_data(s, opts, buffer) do 3038 data(s, new_status(opts, mode: :transaction), buffer) 3039 end 3040 3041 defp data(%{timeout: timeout} = s, status, buffer) do 3042 case msg_recv(s, timeout, buffer) do 3043 {:ok, msg_error(fields: fields), buffer} -> 3044 disconnect(s, Postgrex.Error.exception(postgres: fields), buffer) 3045 3046 {:ok, msg, <<>>} -> 3047 {s, _} = handle_msg(s, status, msg) 3048 activate(s, <<>>) 3049 3050 {:ok, msg, buffer} -> 3051 {s, status} = handle_msg(s, status, msg) 3052 data(s, status, buffer) 3053 3054 {:disconnect, _, _} = dis -> 3055 dis 3056 end 3057 end 3058 3059 ## helpers 3060 defp notify(opts) do 3061 opts[:notify] || fn _, _ -> :ok end 3062 end 3063 3064 defp mode(opts) do 3065 case opts[:mode] || :transaction do 3066 :transaction -> :transaction 3067 :savepoint -> :savepoint 3068 end 3069 end 3070 3071 defp columns(fields) do 3072 fields 3073 |> Enum.map(fn row_field(type_oid: oid, name: name) -> {oid, name} end) 3074 |> :lists.unzip() 3075 end 3076 3077 defp column_names(fields) do 3078 Enum.map(fields, fn row_field(name: name) -> name end) 3079 end 3080 3081 defp tag(:gen_tcp), do: :tcp 3082 defp tag(:ssl), do: :ssl 3083 3084 defp decode_tags([tag]), do: decode_tag(tag) 3085 defp decode_tags(tags), do: Enum.map_reduce(tags, nil, &decode_tags/2) 3086 3087 defp decode_tags(tag, acc) do 3088 case decode_tag(tag) do 3089 {command, nil} -> {command, acc} 3090 {command, nrows} -> {command, nrows + (acc || 0)} 3091 end 3092 end 3093 3094 defp decode_tag("INSERT " <> rest) do 3095 [_oid, nrows] = :binary.split(rest, " ") 3096 {:insert, String.to_integer(nrows)} 3097 end 3098 3099 defp decode_tag("SELECT " <> int), do: {:select, String.to_integer(int)} 3100 defp decode_tag("UPDATE " <> int), do: {:update, String.to_integer(int)} 3101 defp decode_tag("DELETE " <> int), do: {:delete, String.to_integer(int)} 3102 defp decode_tag("FETCH " <> int), do: {:fetch, String.to_integer(int)} 3103 defp decode_tag("MOVE " <> int), do: {:move, String.to_integer(int)} 3104 defp decode_tag("COPY " <> int), do: {:copy, String.to_integer(int)} 3105 defp decode_tag("BEGIN"), do: {:begin, nil} 3106 defp decode_tag("COMMIT"), do: {:commit, nil} 3107 defp decode_tag("ROLLBACK"), do: {:rollback, nil} 3108 defp decode_tag(tag), do: decode_tag(tag, "") 3109 defp decode_tag(<<>>, acc), do: {String.to_atom(acc), nil} 3110 defp decode_tag(<<?\s, t::binary>>, acc), do: decode_tag(t, <<acc::binary, ?_>>) 3111 3112 defp decode_tag(<<h, t::binary>>, acc) when h in ?A..?Z, 3113 do: decode_tag(t, <<acc::binary, h + 32>>) 3114 3115 # Valid SQL statements in PostgreSQL are only 3116 # uppercase A..Z and space. Therefore any other 3117 # character prompts a return of the accumulator 3118 # ignoring anything from the invalid character 3119 # and any trailing space. 3120 defp decode_tag(<<_h, _t::binary>>, acc) do 3121 tag = 3122 acc 3123 |> String.trim_trailing("_") 3124 |> String.to_atom() 3125 3126 {tag, nil} 3127 end 3128 3129 # It is ok to use infinity timeout here if in client process as timer is 3130 # running. 3131 defp msg_recv(%{sock: {:gen_tcp, sock}} = s, timeout, :active_once) do 3132 receive do 3133 {:tcp, ^sock, buffer} -> 3134 msg_recv(s, timeout, buffer) 3135 3136 {:tcp_closed, ^sock} -> 3137 disconnect(s, :tcp, "async_recv", :closed, :active_once) 3138 3139 {:tcp_error, ^sock, reason} -> 3140 disconnect(s, :tcp, "async_recv", reason, :active_once) 3141 after 3142 timeout -> 3143 disconnect(s, :tcp, "async_recv", :timeout, :active_one) 3144 end 3145 end 3146 3147 defp msg_recv(%{sock: {:ssl, sock}} = s, timeout, :active_once) do 3148 receive do 3149 {:ssl, ^sock, buffer} -> 3150 msg_recv(s, timeout, buffer) 3151 3152 {:ssl_closed, ^sock} -> 3153 disconnect(s, :ssl, "async_recv", :closed, :active_once) 3154 3155 {:ssl_error, ^sock, reason} -> 3156 disconnect(s, :ssl, "async_recv", reason, :active_once) 3157 after 3158 timeout -> 3159 disconnect(s, :ssl, "async_recv", :timeout, :active_once) 3160 end 3161 end 3162 3163 defp msg_recv(s, timeout, buffer) do 3164 case msg_decode(buffer) do 3165 {:ok, _, _} = ok -> ok 3166 {:more, more} -> msg_recv(s, timeout, buffer, more) 3167 end 3168 end 3169 3170 defp msg_recv(%{sock: {mod, sock}} = s, timeout, buffer, more) do 3171 case mod.recv(sock, min(more, @max_packet), timeout) do 3172 {:ok, data} when byte_size(data) < more -> 3173 msg_recv(s, timeout, [buffer | data], more - byte_size(data)) 3174 3175 {:ok, data} when is_binary(buffer) -> 3176 msg_recv(s, timeout, buffer <> data) 3177 3178 {:ok, data} when is_list(buffer) -> 3179 msg_recv(s, timeout, IO.iodata_to_binary([buffer | data])) 3180 3181 {:error, reason} -> 3182 action = 3183 if s.postgres == :idle do 3184 "recv (idle)" 3185 else 3186 "recv" 3187 end 3188 3189 disconnect(s, tag(mod), action, reason, IO.iodata_to_binary(buffer)) 3190 end 3191 end 3192 3193 defp msg_decode(bin) when byte_size(bin) < 5 do 3194 {:more, 0} 3195 end 3196 3197 defp msg_decode(<<type::int8(), size::int32(), rest::binary>>) do 3198 size = size - 4 3199 3200 case rest do 3201 <<body::binary(size), rest::binary>> -> 3202 {:ok, parse(body, type, size), rest} 3203 3204 _ -> 3205 {:more, size - byte_size(rest)} 3206 end 3207 end 3208 3209 defp rows_recv(%{types: types} = s, result_types, rows, buffer) do 3210 case Types.decode_rows(buffer, result_types, rows, types) do 3211 {:ok, rows, buffer} -> 3212 rows_msg(s, rows, buffer) 3213 3214 {:more, buffer, rows, more} -> 3215 rows_recv(s, result_types, rows, buffer, more) 3216 end 3217 end 3218 3219 defp rows_recv(%{sock: {mod, sock}} = s, result_types, rows, buffer, more) do 3220 case mod.recv(sock, 0, :infinity) do 3221 {:ok, data} when byte_size(data) < more -> 3222 rows_recv(s, result_types, rows, [buffer | data], more - byte_size(data)) 3223 3224 {:ok, data} when is_binary(buffer) -> 3225 rows_recv(s, result_types, rows, buffer <> data) 3226 3227 {:ok, data} when is_list(buffer) -> 3228 rows_recv(s, result_types, rows, IO.iodata_to_binary([buffer | data])) 3229 3230 {:error, reason} -> 3231 disconnect(s, tag(mod), "recv", reason, IO.iodata_to_binary(buffer)) 3232 end 3233 end 3234 3235 defp rows_msg(s, rows, buffer) do 3236 case msg_recv(s, :infinity, buffer) do 3237 {:ok, msg, buffer} -> 3238 {:ok, msg, rows, buffer} 3239 3240 {:disconnect, _, _} = dis -> 3241 dis 3242 end 3243 end 3244 3245 defp msg_send(s, msgs, buffer) when is_list(msgs) do 3246 binaries = Enum.reduce(msgs, [], &[&2 | maybe_encode_msg(&1)]) 3247 do_send(s, binaries, buffer) 3248 end 3249 3250 defp msg_send(s, msg, buffer) do 3251 do_send(s, encode_msg(msg), buffer) 3252 end 3253 3254 defp maybe_encode_msg(msg) when is_tuple(msg), do: encode_msg(msg) 3255 defp maybe_encode_msg(msg) when is_binary(msg) or is_list(msg), do: msg 3256 3257 defp do_send(%{sock: {mod, sock}} = s, data, buffer) do 3258 case mod.send(sock, data) do 3259 :ok -> 3260 :ok 3261 3262 {:error, reason} -> 3263 disconnect(s, tag(mod), "send", reason, buffer) 3264 end 3265 end 3266 3267 defp handle_msg(s, status, msg_parameter(name: name, value: value)) do 3268 %{parameters: parameters} = s 3269 3270 # Binaries likely part of much larger binary and 3271 # only keeping name/value over long term 3272 name = :binary.copy(name) 3273 value = :binary.copy(value) 3274 3275 cond do 3276 is_reference(parameters) -> 3277 _ = Postgrex.Parameters.put(parameters, name, value) 3278 {s, status} 3279 3280 is_map(parameters) -> 3281 {%{s | parameters: Map.put(parameters, name, value)}, status} 3282 end 3283 end 3284 3285 defp handle_msg(s, status, msg_notify(channel: channel, payload: payload)) do 3286 %{notify: notify} = status 3287 notify.(channel, payload) 3288 {s, status} 3289 end 3290 3291 defp handle_msg(s, status, msg_notice(fields: fields)) do 3292 {s, update_in(status.messages, &[fields | &1])} 3293 end 3294 3295 defp disconnect(s, tag, action, reason, buffer) do 3296 disconnect(%{s | buffer: buffer}, tag, action, reason) 3297 end 3298 3299 defp disconnect(s, tag, action, reason) do 3300 {:disconnect, conn_error(tag, action, reason), s} 3301 end 3302 3303 defp conn_error(mod, action, reason) when reason in @nonposix_errors do 3304 conn_error("#{mod} #{action}: #{reason}") 3305 end 3306 3307 defp conn_error(:tcp, action, reason) do 3308 formatted_reason = :inet.format_error(reason) 3309 conn_error("tcp #{action}: #{formatted_reason} - #{inspect(reason)}") 3310 end 3311 3312 defp conn_error(:ssl, action, reason) do 3313 formatted_reason = :ssl.format_error(reason) 3314 conn_error("ssl #{action}: #{formatted_reason} - #{inspect(reason)}") 3315 end 3316 3317 defp conn_error(message) do 3318 DBConnection.ConnectionError.exception(message) 3319 end 3320 3321 defp disconnect(%{connection_id: connection_id} = s, %Postgrex.Error{} = err, buffer) do 3322 {:disconnect, %{err | connection_id: connection_id}, %{s | buffer: buffer}} 3323 end 3324 3325 defp disconnect(s, %RuntimeError{} = err, buffer) do 3326 {:disconnect, err, %{s | buffer: buffer}} 3327 end 3328 3329 defp sync_recv(s, status, buffer) do 3330 %{postgres: postgres, transactions: transactions} = s 3331 3332 case msg_recv(s, :infinity, buffer) do 3333 {:ok, msg_ready(status: :idle), buffer} 3334 when postgres == :transaction and transactions == :strict -> 3335 sync_error(s, :idle, buffer) 3336 3337 {:ok, msg_ready(status: :transaction), buffer} 3338 when postgres == :idle and transactions == :strict -> 3339 sync_error(s, :transaction, buffer) 3340 3341 {:ok, msg_ready(status: :error), buffer} 3342 when postgres == :idle and transactions == :strict -> 3343 sync_error(s, :error, buffer) 3344 3345 {:ok, msg_ready(status: postgres), buffer} -> 3346 {:ok, %{s | postgres: postgres, buffer: buffer}} 3347 3348 {:ok, msg_error(fields: fields), buffer} -> 3349 disconnect(s, Postgrex.Error.exception(postgres: fields), buffer) 3350 3351 {:ok, msg, buffer} -> 3352 {s, status} = handle_msg(s, status, msg) 3353 sync_recv(s, status, buffer) 3354 3355 {:disconnect, _, _} = dis -> 3356 dis 3357 end 3358 end 3359 3360 defp sync_error(s, postgres, buffer) do 3361 sync_error(%{s | buffer: buffer}, postgres) 3362 end 3363 3364 defp sync_error(s, postgres) do 3365 err = %Postgrex.Error{message: "unexpected postgres status: #{postgres}"} 3366 {:disconnect, err, s} 3367 end 3368 3369 defp recv_buffer(%{sock: {:gen_tcp, sock}} = s) do 3370 receive do 3371 {:tcp, ^sock, buffer} -> 3372 {:ok, %{s | buffer: buffer}} 3373 3374 {:tcp_closed, ^sock} -> 3375 disconnect(s, :tcp, "async recv", :closed, "") 3376 3377 {:tcp_error, ^sock, reason} -> 3378 disconnect(s, :tcp, "async_recv", reason, "") 3379 after 3380 0 -> 3381 {:ok, %{s | buffer: <<>>}} 3382 end 3383 end 3384 3385 defp recv_buffer(%{sock: {:ssl, sock}} = s) do 3386 receive do 3387 {:ssl, ^sock, buffer} -> 3388 {:ok, %{s | buffer: buffer}} 3389 3390 {:ssl_closed, ^sock} -> 3391 disconnect(s, :ssl, "async recv", :closed, "") 3392 3393 {:ssl_error, ^sock, reason} -> 3394 disconnect(s, :ssl, "async recv", reason, "") 3395 after 3396 0 -> 3397 {:ok, %{s | buffer: <<>>}} 3398 end 3399 end 3400 3401 ## Fake [active: once] if buffer not empty 3402 defp activate(s, <<>>) do 3403 case setopts(s, [active: :once], <<>>) do 3404 :ok -> {:ok, %{s | buffer: :active_once}} 3405 other -> other 3406 end 3407 end 3408 3409 defp activate(%{sock: {mod, sock}} = s, buffer) do 3410 _ = send(self(), {tag(mod), sock, buffer}) 3411 {:ok, s} 3412 end 3413 3414 defp setopts(%{sock: {mod, sock}} = s, opts, buffer) do 3415 case setopts(mod, sock, opts) do 3416 :ok -> 3417 :ok 3418 3419 {:error, reason} -> 3420 disconnect(s, tag(mod), "setopts", reason, buffer) 3421 end 3422 end 3423 3424 defp setopts(:gen_tcp, sock, opts), do: :inet.setopts(sock, opts) 3425 defp setopts(:ssl, sock, opts), do: :ssl.setopts(sock, opts) 3426 3427 defp cancel_request(%{connection_key: nil}), do: :ok 3428 3429 defp cancel_request(s) do 3430 case do_cancel_request(s) do 3431 :ok -> 3432 :ok 3433 3434 {:error, action, reason} -> 3435 err = conn_error(:tcp, action, reason) 3436 3437 Logger.error(fn -> 3438 [ 3439 "#{inspect(__MODULE__)} #{inspect(self())} could not cancel backend: " 3440 | Exception.message(err) 3441 ] 3442 end) 3443 end 3444 end 3445 3446 defp do_cancel_request(%{peer: {:local, _} = peer} = s), do: do_cancel_request(peer, 0, s) 3447 defp do_cancel_request(%{peer: {ip, port}} = s), do: do_cancel_request(ip, port, s) 3448 3449 defp do_cancel_request(ip, port, %{timeout: timeout} = s) do 3450 case :gen_tcp.connect(ip, port, [mode: :binary, active: false], timeout) do 3451 {:ok, sock} -> cancel_send_recv(s, sock) 3452 {:error, reason} -> {:error, :connect, reason} 3453 end 3454 end 3455 3456 defp cancel_send_recv(%{connection_id: pid, connection_key: key} = s, sock) do 3457 msg = msg_cancel_request(pid: pid, key: key) 3458 3459 case :gen_tcp.send(sock, encode_msg(msg)) do 3460 :ok -> cancel_recv(s, sock) 3461 {:error, reason} -> {:error, :send, reason} 3462 end 3463 end 3464 3465 defp cancel_recv(%{timeout: timeout}, sock) do 3466 # ignore result as socket will close, else can do nothing 3467 _ = :gen_tcp.recv(sock, 0, timeout) 3468 :gen_tcp.close(sock) 3469 end 3470 3471 defp sock_close(%{sock: {mod, sock}}), do: mod.close(sock) 3472 3473 defp delete_parameters(%{parameters: ref}) when is_reference(ref) do 3474 Postgrex.Parameters.delete(ref) 3475 end 3476 3477 defp delete_parameters(_), do: :ok 3478 3479 defp queries_new(), do: :ets.new(__MODULE__, [:set, :public]) 3480 3481 defp queries_delete(%{queries: nil}), do: true 3482 defp queries_delete(%{queries: queries}), do: :ets.delete(queries) 3483 3484 defp query_put(%{queries: nil}, _), do: :ok 3485 defp query_put(_, %Query{ref: nil}), do: :ok 3486 defp query_put(_, %Query{name: ""}), do: :ok 3487 3488 defp query_put(%{queries: queries}, %Query{name: name, cache: :statement, ref: ref} = query) do 3489 try do 3490 :ets.insert(queries, {name, ref, query}) 3491 rescue 3492 ArgumentError -> 3493 # ets table deleted, socket will be closed, rescue here and get nice 3494 # error when trying to recv on socket. 3495 :ok 3496 else 3497 true -> :ok 3498 end 3499 end 3500 3501 defp query_put(%{queries: queries}, %Query{name: name, cache: :reference, ref: ref}) do 3502 try do 3503 :ets.insert(queries, {name, ref}) 3504 rescue 3505 ArgumentError -> 3506 # ets table deleted, socket will be closed, rescue here and get nice 3507 # error when trying to recv on socket. 3508 :ok 3509 else 3510 true -> :ok 3511 end 3512 end 3513 3514 defp query_delete(%{queries: nil}, _), do: :ok 3515 defp query_delete(_, %Query{name: ""}), do: :ok 3516 3517 defp query_delete(%{queries: queries}, %Query{name: name}) do 3518 try do 3519 :ets.delete(queries, name) 3520 rescue 3521 ArgumentError -> :ok 3522 else 3523 true -> :ok 3524 end 3525 end 3526 3527 defp query_delete_on_error( 3528 %{queries: queries}, 3529 %{postgres: %{code: code}}, 3530 %Query{name: name} 3531 ) 3532 when queries != nil and code in [:feature_not_supported, :invalid_sql_statement_name] do 3533 try do 3534 :ets.delete(queries, name) 3535 rescue 3536 ArgumentError -> :ok 3537 else 3538 true -> :ok 3539 end 3540 end 3541 3542 defp query_delete_on_error(_, _, _), do: :ok 3543 3544 defp query_member?(%{queries: nil}, _), do: false 3545 defp query_member?(_, %{name: ""}), do: false 3546 3547 defp query_member?(%{types: protocol_types}, %Query{types: query_types}) 3548 when protocol_types != query_types, 3549 do: false 3550 3551 defp query_member?(%{queries: queries}, %Query{name: name, ref: ref}) do 3552 try do 3553 :ets.lookup_element(queries, name, 2) 3554 rescue 3555 ArgumentError -> false 3556 else 3557 ^ref -> true 3558 _ -> false 3559 end 3560 end 3561 3562 defp cached_query(%{queries: queries}, %Query{cache: :statement} = query) do 3563 %{name: name, statement: statement} = query 3564 3565 try do 3566 :ets.lookup_element(queries, name, 3) 3567 rescue 3568 ArgumentError -> nil 3569 else 3570 %{statement: ^statement} = query -> query 3571 _ -> nil 3572 end 3573 end 3574 3575 defp cached_query(_, _) do 3576 nil 3577 end 3578 end