zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

protocol.ex (111061B)


      1 defmodule Postgrex.Protocol do
      2   @moduledoc false
      3 
      4   alias Postgrex.{Types, TypeServer, Query, Cursor, Copy}
      5   import Postgrex.{Messages, BinaryUtils}
      6   require Logger
      7   use DBConnection
      8 
      9   @timeout 15_000
     10   @sock_opts [packet: :raw, mode: :binary, active: false]
     11   @max_packet 64 * 1024 * 1024
     12   @nonposix_errors [:closed, :timeout]
     13   @max_rows 500
     14   @text_type_oid 25
     15 
     16   defstruct sock: nil,
     17             connection_id: nil,
     18             connection_key: nil,
     19             peer: nil,
     20             types: nil,
     21             null: nil,
     22             timeout: nil,
     23             ping_timeout: nil,
     24             parameters: %{},
     25             queries: nil,
     26             postgres: :idle,
     27             transactions: :strict,
     28             buffer: nil,
     29             disconnect_on_error_codes: []
     30 
     31   @type state :: %__MODULE__{
     32           sock: {module, any},
     33           connection_id: nil | pos_integer,
     34           connection_key: nil | pos_integer,
     35           peer: nil | {:inet.ip_address(), :inet.port_number()},
     36           types: nil | module,
     37           null: atom,
     38           timeout: timeout,
     39           ping_timeout: timeout,
     40           parameters: %{binary => binary} | reference,
     41           queries: nil | :ets.tid(),
     42           postgres: DBConnection.status() | {DBConnection.status(), reference},
     43           transactions: :strict | :naive,
     44           buffer: nil | binary | :active_once,
     45           disconnect_on_error_codes: [atom()]
     46         }
     47 
     48   @type notify :: (binary, binary -> any)
     49 
     50   defmacrop new_status(opts, fields \\ []) do
     51     defaults =
     52       quote(
     53         do: [
     54           notify: notify(unquote(opts)),
     55           mode: mode(unquote(opts)),
     56           messages: [],
     57           prepare: false
     58         ]
     59       )
     60 
     61     {:%{}, [], Keyword.merge(defaults, fields)}
     62   end
     63 
     64   @impl true
     65   @spec connect(Keyword.t()) ::
     66           {:ok, state}
     67           | {:error, Postgrex.Error.t() | %DBConnection.ConnectionError{}}
     68   def connect(opts) do
     69     endpoints = endpoints(opts)
     70 
     71     timeout = opts[:timeout] || @timeout
     72     ping_timeout = Keyword.get(opts, :ping_timeout, timeout)
     73     sock_opts = [send_timeout: timeout] ++ (opts[:socket_options] || [])
     74     ssl? = opts[:ssl] || false
     75     types_mod = Keyword.fetch!(opts, :types)
     76     disconnect_on_error_codes = opts[:disconnect_on_error_codes] || []
     77     target_server_type = opts[:target_server_type] || :any
     78 
     79     transactions =
     80       case opts[:transactions] || :naive do
     81         :naive -> :naive
     82         :strict -> :strict
     83       end
     84 
     85     prepare =
     86       case opts[:prepare] || :named do
     87         :named -> :named
     88         :unnamed -> :unnamed
     89       end
     90 
     91     s = %__MODULE__{
     92       timeout: timeout,
     93       ping_timeout: ping_timeout,
     94       postgres: :idle,
     95       transactions: transactions,
     96       disconnect_on_error_codes: disconnect_on_error_codes
     97     }
     98 
     99     connect_timeout = Keyword.get(opts, :connect_timeout, timeout)
    100 
    101     status = %{
    102       opts: opts,
    103       types_mod: types_mod,
    104       types_key: nil,
    105       types_lock: nil,
    106       prepare: prepare,
    107       messages: [],
    108       ssl: ssl?,
    109       target_server_type: target_server_type,
    110       search_path: opts[:search_path]
    111     }
    112 
    113     connect_endpoints(endpoints, sock_opts ++ @sock_opts, connect_timeout, s, status, [])
    114   end
    115 
    116   defp endpoints(opts) do
    117     port = opts[:port] || 5432
    118 
    119     case Keyword.fetch(opts, :socket) do
    120       {:ok, file} ->
    121         [{{:local, file}, 0, []}]
    122 
    123       :error ->
    124         case Keyword.fetch(opts, :socket_dir) do
    125           {:ok, dir} ->
    126             [{{:local, "#{dir}/.s.PGSQL.#{port}"}, 0, []}]
    127 
    128           :error ->
    129             case Keyword.fetch(opts, :endpoints) do
    130               {:ok, endpoints} when is_list(endpoints) ->
    131                 Enum.map(endpoints, fn
    132                   {hostname, port} -> {to_charlist(hostname), port, []}
    133                   {hostname, port, extra_opts} -> {to_charlist(hostname), port, extra_opts}
    134                 end)
    135 
    136               {:ok, _} ->
    137                 raise ArgumentError, "expected :endpoints to be a list of tuples"
    138 
    139               :error ->
    140                 case Keyword.fetch(opts, :hostname) do
    141                   {:ok, hostname} ->
    142                     [{to_charlist(hostname), port, []}]
    143 
    144                   :error ->
    145                     raise ArgumentError,
    146                           "expected :hostname, endpoints, :socket_dir, or :socket to be given"
    147                 end
    148             end
    149         end
    150     end
    151   end
    152 
    153   defp connect_endpoints(
    154          [{host, port, extra_opts} | remaining_endpoints],
    155          sock_opts,
    156          timeout,
    157          s,
    158          %{opts: opts, types_mod: types_mod} = status,
    159          previous_errors
    160        ) do
    161     types_key = if types_mod, do: {host, port, Keyword.fetch!(opts, :database)}
    162     opts = Config.Reader.merge(opts, extra_opts)
    163 
    164     status = %{status | types_key: types_key, opts: opts}
    165 
    166     case connect_and_handshake(host, port, sock_opts, timeout, s, status) do
    167       {:ok, _} = ret ->
    168         ret
    169 
    170       {:error, err} ->
    171         connect_endpoints(
    172           remaining_endpoints,
    173           sock_opts,
    174           timeout,
    175           s,
    176           status,
    177           [{host, port, err} | previous_errors]
    178         )
    179     end
    180   end
    181 
    182   defp connect_endpoints([], _, _, _, _, [{_, _, error}]), do: {:error, error}
    183 
    184   defp connect_endpoints([], _, _, _, _, errors) when is_list(errors) do
    185     concat_messages =
    186       errors
    187       |> Enum.reverse()
    188       |> Enum.map_join("\n", fn {host, port, %error_module{} = error} ->
    189         "  * #{host}:#{port}: (#{inspect(error_module)}) #{Exception.message(error)}"
    190       end)
    191 
    192     message = "failed to establish connection to multiple endpoints:\n\n#{concat_messages}"
    193     {:error, %Postgrex.Error{message: message}}
    194   end
    195 
    196   defp connect_and_handshake(host, port, sock_opts, timeout, s, status) do
    197     case connect(host, port, sock_opts, timeout, s) do
    198       {:ok, s} ->
    199         handshake(s, status)
    200 
    201       {:error, _} = error ->
    202         error
    203     end
    204   end
    205 
    206   @impl true
    207   @spec disconnect(Exception.t(), state) :: :ok
    208   def disconnect(_, s) do
    209     # cancel the request first otherwise PostgreSQL will log
    210     # every time the connection is explicitly disconnected
    211     # because the associated PID will no longer exist.
    212     cancel_request(s)
    213     sock_close(s)
    214     _ = recv_buffer(s)
    215     delete_parameters(s)
    216     queries_delete(s)
    217     :ok
    218   end
    219 
    220   @impl true
    221   @spec ping(state) ::
    222           {:ok, state}
    223           | {:disconnect, Postgrex.Error.t() | %DBConnection.ConnectionError{}, state}
    224   def ping(%{postgres: :transaction, transactions: :strict} = s) do
    225     sync_error(s, :transaction)
    226   end
    227 
    228   def ping(%{buffer: buffer} = s) do
    229     status = new_status([], mode: :transaction)
    230     s = %{s | buffer: nil}
    231 
    232     case msg_send(s, msg_sync(), buffer) do
    233       :ok when buffer == :active_once ->
    234         ping_recv(s, status, :active_once, buffer)
    235 
    236       :ok when is_binary(buffer) ->
    237         ping_recv(s, status, nil, buffer)
    238 
    239       {:disconnect, _, _} = dis ->
    240         dis
    241     end
    242   end
    243 
    244   @impl true
    245   @spec checkout(state) ::
    246           {:ok, state}
    247           | {:disconnect, Postgrex.Error.t() | %DBConnection.ConnectionError{}, state}
    248   def checkout(%{postgres: :transaction, transactions: :strict} = s) do
    249     sync_error(s, :transaction)
    250   end
    251 
    252   def checkout(%{buffer: :active_once} = s) do
    253     case setopts(s, [active: false], :active_once) do
    254       :ok -> recv_buffer(s)
    255       {:disconnect, _, _} = dis -> dis
    256     end
    257   end
    258 
    259   @spec checkin(state) ::
    260           {:ok, state}
    261           | {:disconnect, Postgrex.Error.t() | %DBConnection.ConnectionError{}, state}
    262   def checkin(%{postgres: :transaction, transactions: :strict} = s) do
    263     sync_error(s, :transaction)
    264   end
    265 
    266   def checkin(%{buffer: buffer} = s) when is_binary(buffer) do
    267     activate(s, buffer)
    268   end
    269 
    270   @impl true
    271   @spec handle_prepare(Postgrex.Query.t(), Keyword.t(), state) ::
    272           {:ok, Postgrex.Query.t(), state}
    273           | {:error, %ArgumentError{} | Postgrex.Error.t(), state}
    274           | {:error, %DBConnection.TransactionError{}, state}
    275           | {:disconnect, %RuntimeError{}, state}
    276           | {:disconnect, %DBConnection.ConnectionError{}, state}
    277   def handle_prepare(%Query{} = query, _, %{postgres: {_, _}} = s) do
    278     lock_error(s, :prepare, query)
    279   end
    280 
    281   def handle_prepare(%Query{ref: ref} = query, opts, s) when is_reference(ref) do
    282     # If the query already has a reference, then it means DBConnection rescued
    283     # a DBConnection.EncodeError and wants us to reprepare a query
    284     %{name: name, statement: statement} = query
    285     handle_prepare(%Query{name: name, statement: statement}, opts, s)
    286   end
    287 
    288   def handle_prepare(%Query{name: ""} = query, opts, s) do
    289     prepare = Keyword.get(opts, :postgrex_prepare, false)
    290     status = new_status(opts, prepare: prepare)
    291 
    292     case prepare do
    293       true -> parse_describe_close(s, status, query)
    294       false -> parse_describe_flush(s, status, query)
    295     end
    296   end
    297 
    298   def handle_prepare(%Query{} = query, opts, %{queries: nil} = s) do
    299     # always use unnamed if no cache
    300     handle_prepare(%Query{query | name: ""}, opts, s)
    301   end
    302 
    303   def handle_prepare(%Query{} = query, opts, s) do
    304     if new_query = cached_query(s, query) do
    305       {:ok, new_query, s}
    306     else
    307       prepare = Keyword.get(opts, :postgrex_prepare, false)
    308       status = new_status(opts, prepare: prepare)
    309 
    310       case prepare do
    311         true -> close_parse_describe(s, status, query)
    312         false -> close_parse_describe_flush(s, status, query)
    313       end
    314     end
    315   end
    316 
    317   @impl true
    318   @spec handle_execute(Postgrex.Parameters.t(), nil, Keyword.t(), state) ::
    319           {:ok, Postgrex.Parameters.t(), %{binary => binary}, state}
    320           | {:error, Postgrex.Error.t(), state}
    321   def handle_execute(%Postgrex.Parameters{} = p, nil, _, s) do
    322     %{parameters: parameters} = s
    323 
    324     case Postgrex.Parameters.fetch(parameters) do
    325       {:ok, parameters} ->
    326         {:ok, p, parameters, s}
    327 
    328       :error ->
    329         {:error, %Postgrex.Error{message: "parameters not available"}, s}
    330     end
    331   end
    332 
    333   @spec handle_execute(Postgrex.Query.t(), list, Keyword.t(), state) ::
    334           {:ok, Postgrex.Query.t(), Postgrex.Result.t() | Postgrex.Copy.t(), state}
    335           | {:error, %ArgumentError{} | Postgrex.Error.t(), state}
    336           | {:error, %DBConnection.TransactionError{}, state}
    337           | {:disconnect, %RuntimeError{}, state}
    338           | {:disconnect, %DBConnection.ConnectionError{}, state}
    339   def handle_execute(%Query{} = query, params, opts, s) do
    340     case Keyword.get(opts, :postgrex_copy, false) do
    341       true -> handle_execute_copy(query, params, opts, s)
    342       false -> handle_execute_result(query, params, opts, s)
    343     end
    344   end
    345 
    346   @spec handle_execute(Postgrex.Copy.t(), {:copy_data, iodata} | :copy_done, Keyword.t(), state) ::
    347           {:ok, Postgrex.Query.t(), Postgrex.Result.t(), state}
    348           | {:error, %ArgumentError{} | Postgrex.Error.t(), state}
    349           | {:disconnect, %RuntimeError{}, state}
    350           | {:disconnect, %DBConnection.ConnectionError{}, state}
    351   def handle_execute(%Copy{ref: ref} = copy, {:copy_data, iodata}, opts, s) do
    352     case s do
    353       %{postgres: {_, ^ref}} ->
    354         copy_in_data(s, copy, iodata)
    355 
    356       %{postgres: {_, _}} ->
    357         lock_error(s, :execute, copy)
    358 
    359       _ ->
    360         copy_in_data(s, new_status(opts), copy, iodata)
    361     end
    362   end
    363 
    364   def handle_execute(%Copy{ref: ref, query: query} = copy, :copy_done, opts, s) do
    365     case s do
    366       %{postgres: {_, ^ref}} ->
    367         copy_in_done(s, new_status(opts), copy)
    368 
    369       %{postgres: {_, _}} ->
    370         lock_error(s, :execute, copy)
    371 
    372       _ ->
    373         with {:ok, result, s} <- close(s, new_status(opts), copy) do
    374           {:ok, query, result, s}
    375         end
    376     end
    377   end
    378 
    379   defp handle_execute_result(%{ref: ref} = query, params, opts, %{postgres: {_, ref}} = s) do
    380     # ref in lock so query is prepared
    381     status = new_status(opts)
    382 
    383     case query do
    384       %{name: ""} -> bind_execute_close(s, status, query, params)
    385       _ -> bind_execute(s, status, query, params)
    386     end
    387   end
    388 
    389   defp handle_execute_result(%{} = query, _, _, %{postgres: {_, _ref}} = s) do
    390     lock_error(s, :execute, query)
    391   end
    392 
    393   defp handle_execute_result(query, params, opts, s) do
    394     if query_member?(s, query) do
    395       rebind_execute(s, new_status(opts), query, params)
    396     else
    397       handle_prepare_execute(query, params, opts, s)
    398     end
    399   end
    400 
    401   defp handle_execute_copy(query, params, opts, s) do
    402     %{connection_id: connection_id} = s
    403 
    404     copy = %Copy{
    405       portal: make_portal(),
    406       ref: make_ref(),
    407       query: query,
    408       connection_id: connection_id
    409     }
    410 
    411     handle_bind(query, params, copy, opts, s)
    412   end
    413 
    414   @impl true
    415   @spec handle_close(Postgrex.Query.t(), Keyword.t(), state) ::
    416           {:ok, Postgrex.Result.t(), state}
    417           | {:error, %ArgumentError{} | Postgrex.Error.t(), state}
    418           | {:disconnect, %RuntimeError{}, state}
    419           | {:disconnect, %DBConnection.ConnectionError{}, state}
    420   def handle_close(%Query{ref: ref} = query, opts, %{postgres: {_, ref}} = s) do
    421     flushed_close(s, new_status(opts), query)
    422   end
    423 
    424   def handle_close(%Query{} = query, _, %{postgres: {_, _}} = s) do
    425     lock_error(s, :close, query)
    426   end
    427 
    428   def handle_close(%Query{} = query, opts, s) do
    429     close(s, new_status(opts), query)
    430   end
    431 
    432   @impl true
    433   @spec handle_declare(Postgrex.Query.t(), list, Keyword.t(), state) ::
    434           {:ok, Postgrex.Query.t(), Postgrex.Cursor.t(), state}
    435           | {:error, %ArgumentError{} | Postgrex.Error.t(), state}
    436           | {:disconnect, %RuntimeError{}, state}
    437           | {:disconnect, %DBConnection.ConnectionError{}, state}
    438   def handle_declare(query, params, opts, s) do
    439     %{connection_id: connection_id} = s
    440 
    441     cursor = %Cursor{
    442       portal: make_portal(),
    443       ref: make_ref(),
    444       connection_id: connection_id,
    445       mode: mode(opts)
    446     }
    447 
    448     handle_bind(query, params, cursor, opts, s)
    449   end
    450 
    451   @impl true
    452   @spec handle_fetch(Postgrex.Query.t(), Postgrex.Cursor.t(), Keyword.t(), state) ::
    453           {:cont | :halt, Postgrex.Result.t(), state}
    454           | {:error, Postgrex.Error.t(), state}
    455           | {:disconnect, %RuntimeError{}, state}
    456           | {:disconnect, %DBConnection.ConnectionError{}, state}
    457   def handle_fetch(query, cursor, opts, %{postgres: {_, ref}} = s) do
    458     case cursor do
    459       %Cursor{ref: ^ref, mode: mode} ->
    460         status = new_status(opts, mode: mode)
    461         max_rows = Keyword.get(opts, :max_rows, @max_rows)
    462         fetch_copy_out(s, status, query, max_rows)
    463 
    464       _ ->
    465         lock_error(s, "fetch", cursor)
    466     end
    467   end
    468 
    469   def handle_fetch(query, cursor, opts, s) do
    470     max_rows = Keyword.get(opts, :max_rows, @max_rows)
    471     execute(s, new_status(opts), query, cursor, max_rows)
    472   end
    473 
    474   @impl true
    475   @spec handle_deallocate(Postgrex.Query.t(), Postgrex.Cursor.t(), Keyword.t(), state) ::
    476           {:ok, Postgrex.Result.t(), state}
    477           | {:error, Postgrex.Error.t(), state}
    478           | {:disconnect, %RuntimeError{}, state}
    479           | {:disconnect, %DBConnection.ConnectionError{}, state}
    480   def handle_deallocate(query, %Cursor{ref: ref}, opts, %{postgres: {_, ref}} = s) do
    481     copy_out_done(s, new_status(opts), query)
    482   end
    483 
    484   def handle_deallocate(_, %Cursor{} = cursor, _, %{postgres: {_, _}} = s) do
    485     lock_error(s, :deallocate, cursor)
    486   end
    487 
    488   def handle_deallocate(_, %Cursor{} = cursor, opts, s) do
    489     status = new_status(opts, mode: :transaction)
    490     close(s, status, cursor)
    491   end
    492 
    493   @impl true
    494   @spec handle_begin(Keyword.t(), state) ::
    495           {:ok, Postgrex.Result.t(), state}
    496           | {DBConnection.status(), state}
    497           | {:disconnect, %RuntimeError{}, state}
    498           | {:disconnect, %DBConnection.ConnectionError{} | Postgrex.Error.t(), state}
    499   def handle_begin(_, %{postgres: {_, _}} = s) do
    500     lock_error(s, :begin)
    501   end
    502 
    503   def handle_begin(opts, %{postgres: postgres} = s) do
    504     case Keyword.get(opts, :mode, :transaction) do
    505       :transaction when postgres == :idle ->
    506         statement = "BEGIN"
    507         handle_transaction(statement, opts, s)
    508 
    509       :savepoint when postgres == :transaction ->
    510         statement = "SAVEPOINT postgrex_savepoint"
    511         handle_transaction(statement, opts, s)
    512 
    513       mode when mode in [:transaction, :savepoint] ->
    514         {postgres, s}
    515     end
    516   end
    517 
    518   @impl true
    519   @spec handle_commit(Keyword.t(), state) ::
    520           {:ok, Postgrex.Result.t(), state}
    521           | {DBConnection.status(), state}
    522           | {:disconnect, %RuntimeError{}, state}
    523           | {:disconnect, %DBConnection.ConnectionError{} | Postgrex.Error.t(), state}
    524   def handle_commit(_, %{postgres: {_, _}} = s) do
    525     lock_error(s, :commit)
    526   end
    527 
    528   def handle_commit(opts, %{postgres: postgres} = s) do
    529     case Keyword.get(opts, :mode, :transaction) do
    530       :transaction when postgres == :transaction ->
    531         statement = "COMMIT"
    532         handle_transaction(statement, opts, s)
    533 
    534       :savepoint when postgres == :transaction ->
    535         statement = "RELEASE SAVEPOINT postgrex_savepoint"
    536         handle_transaction(statement, opts, s)
    537 
    538       mode when mode in [:transaction, :savepoint] ->
    539         {postgres, s}
    540     end
    541   end
    542 
    543   @impl true
    544   @spec handle_rollback(Keyword.t(), state) ::
    545           {:ok, Postgrex.Result.t(), state}
    546           | {DBConnection.status(), state}
    547           | {:disconnect, %RuntimeError{}, state}
    548           | {:disconnect, %DBConnection.ConnectionError{} | Postgrex.Error.t(), state}
    549   def handle_rollback(_, %{postgres: {_, _}} = s) do
    550     lock_error(s, :rollback)
    551   end
    552 
    553   def handle_rollback(opts, %{postgres: postgres} = s) do
    554     case Keyword.get(opts, :mode, :transaction) do
    555       :transaction when postgres in [:transaction, :error] ->
    556         statement = "ROLLBACK"
    557         handle_transaction(statement, opts, s)
    558 
    559       :savepoint when postgres in [:transaction, :error] ->
    560         stmt = "ROLLBACK TO SAVEPOINT postgrex_savepoint;RELEASE SAVEPOINT postgrex_savepoint"
    561         handle_transaction(stmt, opts, s)
    562 
    563       mode when mode in [:transaction, :savepoint] ->
    564         {postgres, s}
    565     end
    566   end
    567 
    568   @impl true
    569   @spec handle_status(Keyword.t(), state) :: {DBConnection.status(), state}
    570   def handle_status(_, %{postgres: {postgres, _}} = s), do: {postgres, s}
    571   def handle_status(_, %{postgres: postgres} = s), do: {postgres, s}
    572 
    573   @spec handle_info(any, Keyword.t(), state) ::
    574           {:ok, state}
    575           | {:unknown, state}
    576           | {:error, Postgrex.Error.t(), state}
    577           | {:disconnect, %DBConnection.ConnectionError{}, state}
    578   def handle_info(msg, opts \\ [], s) do
    579     case handle_socket(msg, s) do
    580       {:data, data} -> handle_data(s, opts, data)
    581       :ignore -> {:ok, s}
    582       :unknown -> {:unknown, s}
    583       disconnect -> disconnect
    584     end
    585   end
    586 
    587   defp handle_socket({:tcp, sock, data}, %{sock: {:gen_tcp, sock}}) do
    588     {:data, data}
    589   end
    590 
    591   defp handle_socket({:tcp_closed, sock}, %{sock: {:gen_tcp, sock}} = s) do
    592     disconnect(s, :tcp, "async recv", :closed)
    593   end
    594 
    595   defp handle_socket({:tcp_error, sock, reason}, %{sock: {:gen_tcp, sock}} = s) do
    596     disconnect(s, :tcp, "async recv", reason)
    597   end
    598 
    599   defp handle_socket({:ssl, sock, data}, %{sock: {:ssl, sock}}) do
    600     {:data, data}
    601   end
    602 
    603   defp handle_socket({:ssl_closed, sock}, %{sock: {:ssl, sock}} = s) do
    604     disconnect(s, :ssl, "async recv", :closed)
    605   end
    606 
    607   defp handle_socket({:ssl_error, sock, reason}, %{sock: {:ssl, sock}} = s) do
    608     disconnect(s, :ssl, "async recv", reason)
    609   end
    610 
    611   defp handle_socket({closed, _sock}, _) when closed in [:tcp_closed, :ssl_closed] do
    612     :ignore
    613   end
    614 
    615   defp handle_socket({error, _sock, _reason}, _) when error in [:tcp_error, :ssl_error] do
    616     :ignore
    617   end
    618 
    619   defp handle_socket(_, _) do
    620     :unknown
    621   end
    622 
    623   ## connect
    624 
    625   defp connect(host, port, sock_opts, timeout, s) do
    626     buffer? = Keyword.has_key?(sock_opts, :buffer)
    627 
    628     case :gen_tcp.connect(host, port, sock_opts ++ @sock_opts, timeout) do
    629       {:ok, sock} when buffer? ->
    630         {:ok, %{s | sock: {:gen_tcp, sock}}}
    631 
    632       {:ok, sock} ->
    633         # A suitable :buffer is only set if :recbuf is included in
    634         # :socket_options.
    635         {:ok, [sndbuf: sndbuf, recbuf: recbuf, buffer: buffer]} =
    636           :inet.getopts(sock, [:sndbuf, :recbuf, :buffer])
    637 
    638         buffer = buffer |> max(sndbuf) |> max(recbuf)
    639         :ok = :inet.setopts(sock, buffer: buffer)
    640         {:ok, %{s | sock: {:gen_tcp, sock}}}
    641 
    642       {:error, reason} ->
    643         case host do
    644           {:local, socket_addr} ->
    645             {:error, conn_error(:tcp, "connect (#{socket_addr})", reason)}
    646 
    647           host ->
    648             {:error, conn_error(:tcp, "connect (#{host}:#{port})", reason)}
    649         end
    650     end
    651   end
    652 
    653   ## handshake
    654 
    655   defp handshake(%{sock: {:gen_tcp, sock}, timeout: timeout} = s, status) do
    656     {:ok, peer} = :inet.peername(sock)
    657     %{opts: opts} = status
    658     handshake_timeout = Keyword.get(opts, :handshake_timeout, timeout)
    659     timer = start_handshake_timer(handshake_timeout, sock)
    660 
    661     case do_handshake(%{s | peer: peer}, status) do
    662       {:ok, %{parameters: parameters} = s} ->
    663         cancel_handshake_timer(timer)
    664         ref = Postgrex.Parameters.insert(parameters)
    665         {:ok, %{s | parameters: ref}}
    666 
    667       {:disconnect, err, s} ->
    668         cancel_handshake_timer(timer)
    669         disconnect(err, s)
    670         {:error, err}
    671     end
    672   end
    673 
    674   defp start_handshake_timer(:infinity, _), do: :infinity
    675 
    676   defp start_handshake_timer(timeout, sock) do
    677     args = [timeout, self(), sock]
    678     {:ok, tref} = :timer.apply_after(timeout, __MODULE__, :handshake_shutdown, args)
    679     {:timer, tref}
    680   end
    681 
    682   @doc false
    683   def handshake_shutdown(timeout, pid, sock) do
    684     if Process.alive?(pid) do
    685       Logger.error(fn ->
    686         [
    687           inspect(__MODULE__),
    688           " (",
    689           inspect(pid),
    690           ") timed out because it was handshaking for longer than ",
    691           to_string(timeout) | "ms"
    692         ]
    693       end)
    694 
    695       :gen_tcp.shutdown(sock, :read_write)
    696     end
    697   end
    698 
    699   def cancel_handshake_timer(:infinity), do: :ok
    700 
    701   def cancel_handshake_timer({:timer, tref}) do
    702     {:ok, _} = :timer.cancel(tref)
    703     :ok
    704   end
    705 
    706   defp do_handshake(s, %{ssl: true} = status), do: ssl(s, status)
    707   defp do_handshake(s, %{ssl: false} = status), do: startup(s, status)
    708 
    709   ## ssl
    710 
    711   defp ssl(s, status) do
    712     case msg_send(s, msg_ssl_request(), "") do
    713       :ok -> ssl_recv(s, status)
    714       {:disconnect, _, _} = dis -> dis
    715     end
    716   end
    717 
    718   defp ssl_recv(%{sock: {:gen_tcp, sock}} = s, status) do
    719     case :gen_tcp.recv(sock, 1, :infinity) do
    720       {:ok, <<?S>>} ->
    721         ssl_connect(s, status)
    722 
    723       {:ok, <<?N>>} ->
    724         disconnect(s, %Postgrex.Error{message: "ssl not available"}, "")
    725 
    726       {:ok, <<?E>> = buffer} ->
    727         # This can happen for "very ancient servers" according to docs,
    728         # it shouldn't happen in regular operation
    729         # See: https://www.postgresql.org/docs/10/static/protocol-flow.html#idm46428663878176
    730         case msg_recv(s, :infinity, buffer) do
    731           {:ok, msg_error(fields: fields), buffer} ->
    732             disconnect(s, Postgrex.Error.exception(postgres: fields), buffer)
    733 
    734           {:disconnect, _, _} = dis ->
    735             dis
    736         end
    737 
    738       {:error, reason} ->
    739         disconnect(s, :tcp, "recv", reason)
    740     end
    741   end
    742 
    743   defp ssl_connect(%{sock: {:gen_tcp, sock}, timeout: timeout} = s, status) do
    744     case :ssl.connect(sock, status.opts[:ssl_opts] || [], timeout) do
    745       {:ok, ssl_sock} ->
    746         startup(%{s | sock: {:ssl, ssl_sock}}, status)
    747 
    748       {:error, reason} ->
    749         disconnect(s, :ssl, "connect", reason)
    750     end
    751   end
    752 
    753   ## startup
    754 
    755   defp startup(s, %{opts: opts} = status) do
    756     params = opts[:parameters] || []
    757     user = Keyword.fetch!(opts, :username)
    758     database = Keyword.fetch!(opts, :database)
    759     msg = msg_startup(params: [user: user, database: database] ++ params)
    760 
    761     case msg_send(s, msg, "") do
    762       :ok -> auth_recv(s, status, <<>>)
    763       {:disconnect, _, _} = dis -> dis
    764     end
    765   end
    766 
    767   ## auth
    768 
    769   defp auth_recv(s, status, buffer) do
    770     case msg_recv(s, :infinity, buffer) do
    771       {:ok, msg_auth(type: :ok), buffer} ->
    772         init_recv(s, status, buffer)
    773 
    774       {:ok, msg_auth(type: :cleartext), buffer} ->
    775         auth_cleartext(s, status, buffer)
    776 
    777       {:ok, msg_auth(type: :md5, data: salt), buffer} ->
    778         auth_md5(s, status, salt, buffer)
    779 
    780       {:ok, msg_auth(type: :sasl, data: _), buffer} ->
    781         auth_sasl(s, status, buffer)
    782 
    783       {:ok, msg_auth(type: :sasl_cont, data: data), buffer} ->
    784         auth_cont(s, status, data, buffer)
    785 
    786       {:ok, msg_auth(type: :sasl_fin, data: _), buffer} ->
    787         auth_recv(s, status, buffer)
    788 
    789       {:ok, msg_error(fields: fields), buffer} ->
    790         disconnect(s, Postgrex.Error.exception(postgres: fields), buffer)
    791 
    792       {:disconnect, _, _} = dis ->
    793         dis
    794     end
    795   end
    796 
    797   defp auth_cleartext(s, %{opts: opts} = status, buffer) do
    798     pass = Keyword.fetch!(opts, :password)
    799     auth_send(s, msg_password(pass: [pass, 0]), status, buffer)
    800   end
    801 
    802   defp auth_md5(s, %{opts: opts} = status, salt, buffer) do
    803     user = Keyword.fetch!(opts, :username)
    804     pass = Keyword.fetch!(opts, :password)
    805 
    806     digest = :erlang.md5([pass, user]) |> Base.encode16(case: :lower)
    807     digest = :erlang.md5([digest, salt]) |> Base.encode16(case: :lower)
    808     auth_send(s, msg_password(pass: ["md5", digest, 0]), status, buffer)
    809   end
    810 
    811   defp auth_sasl(s, status = _, buffer) do
    812     auth_send(s, msg_password(pass: Postgrex.SCRAM.challenge()), status, buffer)
    813   end
    814 
    815   defp auth_cont(s, %{opts: opts} = status, data, buffer) do
    816     auth_send(s, msg_password(pass: Postgrex.SCRAM.verify(data, opts)), status, buffer)
    817   end
    818 
    819   defp auth_send(s, msg, status, buffer) do
    820     case msg_send(s, msg, buffer) do
    821       :ok -> auth_recv(s, status, buffer)
    822       {:disconnect, _, _} = dis -> dis
    823     end
    824   end
    825 
    826   ## init
    827 
    828   defp init_recv(s, status, buffer) do
    829     case msg_recv(s, :infinity, buffer) do
    830       {:ok, msg_backend_key(pid: pid, key: key), buffer} ->
    831         init_recv(%{s | connection_id: pid, connection_key: key}, status, buffer)
    832 
    833       {:ok, msg_ready(), buffer} ->
    834         set_search_path(s, status, buffer)
    835 
    836       {:ok, msg_error(fields: fields), buffer} ->
    837         disconnect(s, Postgrex.Error.exception(postgres: fields), buffer)
    838 
    839       {:ok, msg, buffer} ->
    840         {s, status} = handle_msg(s, status, msg)
    841         init_recv(s, status, buffer)
    842 
    843       {:disconnect, _, _} = dis ->
    844         dis
    845     end
    846   end
    847 
    848   ## set search path on connection startup
    849 
    850   defp set_search_path(s, %{search_path: nil} = status, buffer),
    851     do: set_search_path_done(s, status, buffer)
    852 
    853   defp set_search_path(s, %{search_path: search_path} = status, buffer)
    854        when is_list(search_path),
    855        do: set_search_path_send(s, status, buffer)
    856 
    857   defp set_search_path(_, %{search_path: search_path}, _) do
    858     raise ArgumentError,
    859           "expected :search_path to be a list of strings, got: #{inspect(search_path)}"
    860   end
    861 
    862   defp set_search_path_send(s, status, buffer) do
    863     search_path = Enum.intersperse(status.search_path, ",")
    864     msg = msg_query(statement: ["set search_path to " | search_path])
    865 
    866     case msg_send(s, msg, buffer) do
    867       :ok ->
    868         set_search_path_recv(s, status, buffer)
    869 
    870       {:disconnect, _, _} = dis ->
    871         dis
    872     end
    873   end
    874 
    875   defp set_search_path_recv(s, status, buffer) do
    876     case msg_recv(s, :infinity, buffer) do
    877       {:ok, msg_row_desc(fields: fields), buffer} ->
    878         {[@text_type_oid], ["search_path"]} = columns(fields)
    879         set_search_path_recv(s, status, buffer)
    880 
    881       {:ok, msg_data_row(), buffer} ->
    882         set_search_path_recv(s, status, buffer)
    883 
    884       {:ok, msg_command_complete(), buffer} ->
    885         set_search_path_recv(s, status, buffer)
    886 
    887       {:ok, msg_ready(status: :idle), buffer} ->
    888         set_search_path_done(s, status, buffer)
    889 
    890       {:ok, msg_ready(status: postgres), _buffer} ->
    891         err = %Postgrex.Error{message: "unexpected postgres status: #{postgres}"}
    892         {:disconnect, err, s}
    893 
    894       {:ok, msg_error(fields: fields), buffer} ->
    895         err = Postgrex.Error.exception(postgres: fields)
    896         {:disconnect, err, %{s | buffer: buffer}}
    897 
    898       {:ok, msg, buffer} ->
    899         {s, status} = handle_msg(s, status, msg)
    900         set_search_path_recv(s, status, buffer)
    901 
    902       {:disconnect, _, _} = dis ->
    903         dis
    904     end
    905   end
    906 
    907   defp set_search_path_done(s, status, buffer),
    908     do: check_target_server_type(s, status, buffer)
    909 
    910   ## check_target_server_type
    911 
    912   defp check_target_server_type(s, %{target_server_type: :any} = status, buffer),
    913     do: check_target_server_type_done(s, status, buffer)
    914 
    915   defp check_target_server_type(s, status, buffer),
    916     do: check_target_server_type_send(s, status, buffer)
    917 
    918   defp check_target_server_type_send(s, status, buffer) do
    919     msg = msg_query(statement: "show transaction_read_only")
    920 
    921     case msg_send(s, msg, buffer) do
    922       :ok ->
    923         check_target_server_type_recv(s, status, buffer)
    924 
    925       {:disconnect, err, s} ->
    926         check_target_server_type_fail(s, err, status)
    927     end
    928   end
    929 
    930   defp check_target_server_type_recv(
    931          s,
    932          %{target_server_type: expected_server_type} = status,
    933          buffer
    934        ) do
    935     case msg_recv(s, :infinity, buffer) do
    936       {:ok, msg_row_desc(fields: fields), buffer} ->
    937         {[@text_type_oid], ["transaction_read_only"]} = columns(fields)
    938         check_target_server_type_recv(s, status, buffer)
    939 
    940       {:ok, msg_data_row(values: values), buffer} ->
    941         <<len::uint32(), read_only_value::binary(len)>> = values
    942 
    943         actual_server_type =
    944           case read_only_value do
    945             "off" -> :primary
    946             "on" -> :secondary
    947           end
    948 
    949         case {expected_server_type, actual_server_type} do
    950           {:any, _} -> check_target_server_type_recv(s, status, buffer)
    951           {type, type} -> check_target_server_type_recv(s, status, buffer)
    952           _ -> check_target_server_type_fail(s, expected_server_type, actual_server_type)
    953         end
    954 
    955       {:ok, msg_command_complete(), buffer} ->
    956         check_target_server_type_recv(s, status, buffer)
    957 
    958       {:ok, msg_ready(status: :idle), buffer} ->
    959         check_target_server_type_done(s, status, buffer)
    960 
    961       {:ok, msg_ready(status: postgres), _buffer} ->
    962         err = %Postgrex.Error{message: "unexpected postgres status: #{postgres}"}
    963         check_target_server_type_error(s, err, status)
    964 
    965       {:ok, msg_error(fields: fields), buffer} ->
    966         err = Postgrex.Error.exception(postgres: fields)
    967         check_target_server_type_error(s, err, status, buffer)
    968 
    969       {:ok, msg, buffer} ->
    970         {s, status} = handle_msg(s, status, msg)
    971         check_target_server_type_recv(s, status, buffer)
    972 
    973       {:disconnect, err, s} ->
    974         check_target_server_type_error(s, err, status)
    975     end
    976   end
    977 
    978   defp check_target_server_type_done(s, status, buffer), do: bootstrap(s, status, buffer)
    979 
    980   defp check_target_server_type_fail(s, expected, actual) do
    981     msg = "the server type is not as expected. expected: #{expected}. actual: #{actual}"
    982     err = %Postgrex.Error{message: msg}
    983     {:disconnect, err, s}
    984   end
    985 
    986   defp check_target_server_type_error(s, err, _status) do
    987     {:disconnect, err, s}
    988   end
    989 
    990   defp check_target_server_type_error(s, err, status, buffer) do
    991     check_target_server_type_error(%{s | buffer: buffer}, err, status)
    992   end
    993 
    994   ## bootstrap
    995 
    996   defp bootstrap(s, %{types_key: nil}, buffer) do
    997     activate(s, buffer)
    998   end
    999 
   1000   defp bootstrap(s, status, buffer) do
   1001     %{types_mod: types_mod, types_key: types_key} = status
   1002     server = Postgrex.TypeSupervisor.locate(types_mod, types_key)
   1003 
   1004     case TypeServer.fetch(server) do
   1005       {:lock, ref, types} ->
   1006         status = %{status | types_lock: {server, ref}}
   1007         bootstrap_send(%{s | types: types}, status, buffer)
   1008 
   1009       :noproc ->
   1010         bootstrap(s, status, buffer)
   1011 
   1012       :error ->
   1013         {:disconnect, type_fetch_error(), %{s | buffer: buffer}}
   1014     end
   1015   end
   1016 
   1017   defp bootstrap_send(%{types: types} = s, status, buffer) do
   1018     %{parameters: parameters} = s
   1019     version = Postgrex.Utils.parse_version(parameters["server_version"])
   1020     statement = Types.bootstrap_query(version, types)
   1021 
   1022     if statement do
   1023       bootstrap_send(s, status, statement, buffer)
   1024     else
   1025       %{types_lock: {server, ref}} = status
   1026       TypeServer.done(server, ref)
   1027       bootstrap_done(s, status, buffer)
   1028     end
   1029   end
   1030 
   1031   defp bootstrap_send(s, status, statement, buffer) do
   1032     msg = msg_query(statement: statement)
   1033 
   1034     case msg_send(s, msg, buffer) do
   1035       :ok ->
   1036         bootstrap_recv(s, status, [], buffer)
   1037 
   1038       {:disconnect, err, s} ->
   1039         bootstrap_fail(s, err, status)
   1040     end
   1041   end
   1042 
   1043   defp bootstrap_recv(s, status, type_infos, buffer) do
   1044     case msg_recv(s, :infinity, buffer) do
   1045       {:ok, msg_row_desc(), buffer} ->
   1046         bootstrap_recv(s, status, type_infos, buffer)
   1047 
   1048       {:ok, msg_data_row(values: values), buffer} ->
   1049         type_infos = [Types.build_type_info(values) | type_infos]
   1050         bootstrap_recv(s, status, type_infos, buffer)
   1051 
   1052       {:ok, msg_command_complete(), buffer} ->
   1053         bootstrap_types(s, status, Enum.reverse(type_infos), buffer)
   1054 
   1055       {:ok, msg_error(fields: fields), buffer} ->
   1056         err = Postgrex.Error.exception(postgres: fields)
   1057         bootstrap_fail(s, err, status, buffer)
   1058 
   1059       {:ok, msg, buffer} ->
   1060         {s, status} = handle_msg(s, status, msg)
   1061         bootstrap_recv(s, status, type_infos, buffer)
   1062 
   1063       {:disconnect, err, s} ->
   1064         bootstrap_fail(s, err, status)
   1065     end
   1066   end
   1067 
   1068   defp bootstrap_types(s, status, type_infos, buffer) do
   1069     %{types_lock: {server, ref}} = status
   1070     TypeServer.update(server, ref, type_infos)
   1071     bootstrap_sync_recv(s, status, buffer)
   1072   end
   1073 
   1074   defp bootstrap_sync_recv(s, status, buffer) do
   1075     case msg_recv(s, :infinity, buffer) do
   1076       {:ok, msg_ready(status: :idle), buffer} ->
   1077         bootstrap_done(s, status, buffer)
   1078 
   1079       {:ok, msg_ready(status: postgres), buffer} ->
   1080         sync_error(s, postgres, buffer)
   1081 
   1082       {:ok, msg, buffer} ->
   1083         {s, status} = handle_msg(s, status, msg)
   1084         bootstrap_sync_recv(s, status, buffer)
   1085 
   1086       {:disconnect, _, _} = dis ->
   1087         dis
   1088     end
   1089   end
   1090 
   1091   defp bootstrap_done(s, %{prepare: :unnamed}, buffer),
   1092     do: activate(s, buffer)
   1093 
   1094   defp bootstrap_done(s, %{prepare: :named}, buffer),
   1095     do: activate(%{s | queries: queries_new()}, buffer)
   1096 
   1097   defp bootstrap_fail(s, err, %{types_lock: {server, ref}}) do
   1098     TypeServer.done(server, ref)
   1099     {:disconnect, err, s}
   1100   end
   1101 
   1102   defp bootstrap_fail(s, err, status, buffer) do
   1103     bootstrap_fail(%{s | buffer: buffer}, err, status)
   1104   end
   1105 
   1106   defp type_fetch_error() do
   1107     msg = "awaited on another connection that failed to bootstrap types"
   1108     DBConnection.ConnectionError.exception(msg)
   1109   end
   1110 
   1111   ## replication/notifications
   1112 
   1113   @spec handle_simple(String.t() | iolist(), state) ::
   1114           {:ok, [Postgrex.Result.t()], state}
   1115           | {:error, Postgrex.Error.t(), state}
   1116           | {:disconnect, %DBConnection.ConnectionError{}, state}
   1117   def handle_simple(statement, opts \\ [], %{buffer: buffer} = s) do
   1118     status = new_status(opts, mode: :transaction)
   1119     msgs = [msg_query(statement: statement)]
   1120 
   1121     case msg_send(%{s | buffer: nil}, msgs, buffer) do
   1122       :ok ->
   1123         recv_simple(s, status, [], [], [], buffer)
   1124 
   1125       {:disconnect, err, s} ->
   1126         {:disconnect, err, s}
   1127 
   1128       {:error, %Postgrex.Error{} = err, s, buffer} ->
   1129         error_ready(s, status, err, buffer)
   1130     end
   1131   end
   1132 
   1133   defp recv_simple(s, status, results, columns, rows, buffer) do
   1134     case msg_recv(s, :infinity, buffer) do
   1135       {:ok, msg_row_desc(fields: fields), buffer} ->
   1136         columns = column_names(fields)
   1137         recv_simple(s, status, results, columns, rows, buffer)
   1138 
   1139       {:ok, msg_data_row(values: values), buffer} ->
   1140         row = Types.decode_simple(values, s.types)
   1141         recv_simple(s, status, results, columns, [row | rows], buffer)
   1142 
   1143       {:ok, msg_command_complete(tag: tag), buffer} ->
   1144         result = done(s, status, columns, Enum.reverse(rows), [tag])
   1145         recv_simple(s, status, [result | results], [], [], buffer)
   1146 
   1147       {:ok, msg_error(fields: fields), buffer} ->
   1148         err = Postgrex.Error.exception(postgres: fields)
   1149         error_ready(s, status, err, buffer)
   1150 
   1151       {:ok, msg_ready(status: postgres), buffer} ->
   1152         s = %{s | postgres: postgres, buffer: buffer}
   1153         {:ok, Enum.reverse(results), s}
   1154 
   1155       {:ok, msg, buffer} ->
   1156         {s, status} = handle_msg(s, status, msg)
   1157         recv_simple(s, status, results, columns, rows, buffer)
   1158 
   1159       {:disconnect, _, _} = dis ->
   1160         dis
   1161     end
   1162   end
   1163 
   1164   @spec handle_copy_send([binary], state) ::
   1165           :ok
   1166           | {:error, Postgrex.Error.t(), state}
   1167           | {:disconnect, %DBConnection.ConnectionError{}, state}
   1168   def handle_copy_send(binaries, %{buffer: buffer} = s) do
   1169     msgs = Enum.map(binaries, &msg_copy_data(data: &1))
   1170     msg_send(s, msgs, buffer)
   1171   end
   1172 
   1173   @spec handle_copy_recv(any, Keyword.t(), state) ::
   1174           {:ok, [binary | atom], state}
   1175           | :unknown
   1176           | {:error, Postgrex.Error.t(), state}
   1177           | {:disconnect, %DBConnection.ConnectionError{}, state}
   1178   def handle_copy_recv(msg, max_copies, s) do
   1179     case handle_socket(msg, s) do
   1180       {:data, data} -> handle_copy_recv(s, max_copies, [], 0, data)
   1181       :ignore -> {:ok, [], s}
   1182       :unknown -> :unknown
   1183       disconnect -> disconnect
   1184     end
   1185   end
   1186 
   1187   defp handle_copy_recv(s, max_copies, copies, max_copies, buffer) do
   1188     with {:ok, s} <- activate(s, buffer) do
   1189       {:ok, Enum.reverse(copies), s}
   1190     end
   1191   end
   1192 
   1193   defp handle_copy_recv(%{timeout: timeout} = s, max_copies, copies, ncopies, buffer) do
   1194     case msg_recv(s, timeout, buffer) do
   1195       {:ok, msg_error(fields: fields), buffer} ->
   1196         disconnect(s, Postgrex.Error.exception(postgres: fields), buffer)
   1197 
   1198       {:ok, msg_copy_data(data: data), <<>>} ->
   1199         with {:ok, s} <- activate(s, <<>>) do
   1200           {:ok, Enum.reverse([data | copies]), s}
   1201         end
   1202 
   1203       {:ok, msg_copy_data(data: data), buffer} ->
   1204         handle_copy_recv(s, max_copies, [data | copies], ncopies + 1, buffer)
   1205 
   1206       {:ok, msg_copy_done(), buffer} ->
   1207         handle_copy_recv(s, max_copies, copies, ncopies, buffer)
   1208 
   1209       {:ok, msg_command_complete(), buffer} ->
   1210         handle_copy_recv(s, max_copies, copies, ncopies, buffer)
   1211 
   1212       {:ok, msg_ready(status: postgres), buffer} ->
   1213         s = %{s | postgres: postgres, buffer: buffer}
   1214         {:ok, Enum.reverse([:copy_done | copies]), s}
   1215 
   1216       {:ok, _msg, buffer} ->
   1217         handle_copy_recv(s, max_copies, copies, ncopies, buffer)
   1218 
   1219       {:disconnect, _, _} = dis ->
   1220         dis
   1221     end
   1222   end
   1223 
   1224   @spec handle_streaming(String.t() | iolist(), state) ::
   1225           {:ok, state}
   1226           | {:error, Postgrex.Error.t(), state}
   1227           | {:disconnect, %DBConnection.ConnectionError{}, state}
   1228   def handle_streaming(statement, %{buffer: buffer} = s) do
   1229     msgs = [msg_query(statement: statement)]
   1230 
   1231     case msg_send(%{s | buffer: nil}, msgs, buffer) do
   1232       :ok ->
   1233         recv_streaming(s, buffer)
   1234 
   1235       {:disconnect, err, s} ->
   1236         {:disconnect, err, s}
   1237 
   1238       {:error, %Postgrex.Error{} = err, s, buffer} ->
   1239         status = new_status([], mode: :transaction)
   1240         error_ready(s, status, err, buffer)
   1241     end
   1242   end
   1243 
   1244   defp recv_streaming(s, buffer) do
   1245     case msg_recv(s, :infinity, buffer) do
   1246       {:ok, msg_copy_both_response(), buffer} ->
   1247         {:ok, %{s | buffer: buffer}}
   1248 
   1249       {:ok, msg_copy_out_response(), buffer} ->
   1250         {:ok, %{s | buffer: buffer}}
   1251 
   1252       {:ok, msg_error(fields: fields), buffer} ->
   1253         status = new_status([], mode: :transaction)
   1254         err = Postgrex.Error.exception(postgres: fields)
   1255         error_ready(s, status, err, buffer)
   1256 
   1257       {:disconnect, _, _} = dis ->
   1258         dis
   1259     end
   1260   end
   1261 
   1262   ## prepare
   1263 
   1264   defp parse_describe(s, %{mode: :transaction} = status, query) do
   1265     msgs = parse_describe_msgs(query, [msg_sync()])
   1266     %{buffer: buffer} = s
   1267 
   1268     with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer),
   1269          {:ok, query, s, buffer} <- recv_parse_describe(s, status, query, buffer),
   1270          {:ok, s} <- recv_ready(s, status, buffer) do
   1271       {:ok, query, s}
   1272     else
   1273       {:reload, oids, s, buffer} ->
   1274         reload_ready(s, status, query, oids, buffer)
   1275 
   1276       {:disconnect, err, s} ->
   1277         {:disconnect, err, s}
   1278 
   1279       {:error, %Postgrex.Error{} = err, s, buffer} ->
   1280         error_ready(s, status, err, buffer)
   1281     end
   1282   end
   1283 
   1284   defp parse_describe(%{postgres: :transaction} = s, %{mode: :savepoint} = status, query) do
   1285     %{buffer: buffer} = s
   1286 
   1287     msgs =
   1288       [msg_query(statement: "SAVEPOINT postgrex_query")] ++
   1289         parse_describe_msgs(query, [msg_query(statement: "RELEASE SAVEPOINT postgrex_query")])
   1290 
   1291     with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer),
   1292          {:ok, _, %{buffer: buffer} = s} <- recv_transaction(s, status, buffer),
   1293          {:ok, query, s, buffer} <- recv_parse_describe(s, status, query, buffer),
   1294          {:ok, _, s} <- recv_transaction(s, status, buffer) do
   1295       {:ok, query, s}
   1296     else
   1297       {:reload, oids, s, buffer} ->
   1298         reload_transaction(s, status, query, oids, buffer)
   1299 
   1300       {:disconnect, err, s} ->
   1301         {:disconnect, err, s}
   1302 
   1303       {:error, %Postgrex.Error{} = err, s, buffer} ->
   1304         rollback_flushed(s, status, err, buffer)
   1305     end
   1306   end
   1307 
   1308   defp parse_describe(%{postgres: postgres} = s, %{mode: :savepoint}, _)
   1309        when postgres in [:idle, :error] do
   1310     transaction_error(s, postgres)
   1311   end
   1312 
   1313   defp parse_describe_close(s, %{mode: :transaction} = status, query) do
   1314     %Query{name: name} = query
   1315     %{buffer: buffer} = s
   1316     msgs = parse_describe_msgs(query, [msg_close(type: :statement, name: name), msg_sync()])
   1317 
   1318     with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer),
   1319          {:ok, query, s, buffer} <- recv_parse_describe(s, status, query, buffer),
   1320          {:ok, s, buffer} <- recv_close(s, status, buffer),
   1321          _ = query_delete(s, query),
   1322          {:ok, s} <- recv_ready(s, status, buffer) do
   1323       {:ok, query, s}
   1324     else
   1325       {:reload, oids, s, buffer} ->
   1326         reload_closed(s, status, query, oids, buffer)
   1327 
   1328       {:disconnect, err, s} ->
   1329         {:disconnect, err, s}
   1330 
   1331       {:error, %Postgrex.Error{} = err, s, buffer} ->
   1332         error_ready(s, status, err, buffer)
   1333     end
   1334   end
   1335 
   1336   defp parse_describe_close(s, %{mode: :savepoint} = status, query) do
   1337     # only used for unnamed queries and the savepoint release will close the query
   1338     parse_describe(s, status, query)
   1339   end
   1340 
   1341   defp parse_describe_flush(s, %{mode: :transaction} = status, query) do
   1342     %{buffer: buffer} = s
   1343     msgs = parse_describe_msgs(query, [msg_flush()])
   1344 
   1345     with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer),
   1346          {:ok, %Query{ref: ref} = query, %{postgres: postgres} = s, buffer} <-
   1347            recv_parse_describe(s, status, query, buffer) do
   1348       # lock state with unique query reference as not synced
   1349       {:ok, query, %{s | postgres: {postgres, ref}, buffer: buffer}}
   1350     else
   1351       {:error, err, s, buffer} ->
   1352         error_flushed(s, status, err, buffer)
   1353 
   1354       {:reload, oids, s, buffer} ->
   1355         reload_flushed(s, status, query, oids, buffer)
   1356 
   1357       {:disconnect, _err, _s} = disconnect ->
   1358         disconnect
   1359     end
   1360   end
   1361 
   1362   defp parse_describe_flush(
   1363          %{postgres: :transaction, buffer: buffer} = s,
   1364          %{mode: :savepoint} = status,
   1365          query
   1366        ) do
   1367     msgs =
   1368       [msg_query(statement: "SAVEPOINT postgrex_query")] ++
   1369         parse_describe_msgs(query, [msg_flush()])
   1370 
   1371     with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer),
   1372          {:ok, _, %{buffer: buffer} = s} <- recv_transaction(s, status, buffer),
   1373          {:ok, %Query{ref: ref} = query, %{postgres: postgres} = s, buffer} <-
   1374            recv_parse_describe(s, status, query, buffer) do
   1375       # lock state with unique query reference as not synced
   1376       {:ok, query, %{s | postgres: {postgres, ref}, buffer: buffer}}
   1377     else
   1378       {:error, err, s, buffer} ->
   1379         rollback_flushed(s, status, err, buffer)
   1380 
   1381       {:reload, oids, s, buffer} ->
   1382         reload_flushed(s, status, query, oids, buffer)
   1383 
   1384       {:disconnect, _err, _s} = disconnect ->
   1385         disconnect
   1386     end
   1387   end
   1388 
   1389   defp parse_describe_flush(%{postgres: postgres} = s, %{mode: :savepoint}, _)
   1390        when postgres in [:idle, :error] do
   1391     transaction_error(s, postgres)
   1392   end
   1393 
   1394   defp close_parse_describe(s, %{mode: :transaction} = status, query) do
   1395     %Query{name: name} = query
   1396     %{buffer: buffer} = s
   1397 
   1398     msgs = [msg_close(type: :statement, name: name)] ++ parse_describe_msgs(query, [msg_sync()])
   1399 
   1400     with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer),
   1401          {:ok, s, buffer} <- recv_close(s, status, buffer),
   1402          _ = query_delete(s, query),
   1403          {:ok, query, s, buffer} <- recv_parse_describe(s, status, query, buffer),
   1404          {:ok, s} <- recv_ready(s, status, buffer) do
   1405       {:ok, query, s}
   1406     else
   1407       {:reload, oids, s, buffer} ->
   1408         reload_ready(s, status, query, oids, buffer)
   1409 
   1410       {:disconnect, err, s} ->
   1411         {:disconnect, err, s}
   1412 
   1413       {:error, %Postgrex.Error{} = err, s, buffer} ->
   1414         error_ready(s, status, err, buffer)
   1415     end
   1416   end
   1417 
   1418   defp close_parse_describe(%{postgres: :transaction} = s, %{mode: :savepoint} = status, query) do
   1419     %Query{name: name} = query
   1420     %{buffer: buffer} = s
   1421 
   1422     msgs =
   1423       [
   1424         msg_query(statement: "SAVEPOINT postgrex_query"),
   1425         msg_close(type: :statement, name: name)
   1426       ] ++
   1427         parse_describe_msgs(query, [msg_query(statement: "RELEASE SAVEPOINT postgrex_query")])
   1428 
   1429     with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer),
   1430          {:ok, _, %{buffer: buffer} = s} <- recv_transaction(s, status, buffer),
   1431          {:ok, s, buffer} <- recv_close(s, status, buffer),
   1432          _ = query_delete(s, query),
   1433          {:ok, query, s, buffer} <- recv_parse_describe(s, status, query, buffer),
   1434          {:ok, _, s} <- recv_transaction(s, status, buffer) do
   1435       {:ok, query, s}
   1436     else
   1437       {:reload, oids, s, buffer} ->
   1438         reload_transaction(s, status, query, oids, buffer)
   1439 
   1440       {:disconnect, err, s} ->
   1441         {:disconnect, err, s}
   1442 
   1443       {:error, %Postgrex.Error{} = err, s, buffer} ->
   1444         rollback_flushed(s, status, err, buffer)
   1445     end
   1446   end
   1447 
   1448   defp close_parse_describe(%{postgres: postgres} = s, %{mode: :savepoint}, _)
   1449        when postgres in [:idle, :error] do
   1450     transaction_error(s, postgres)
   1451   end
   1452 
   1453   defp close_parse_describe_flush(s, %{mode: :transaction} = status, query) do
   1454     %Query{name: name} = query
   1455     %{buffer: buffer} = s
   1456 
   1457     msgs = [msg_close(type: :statement, name: name)] ++ parse_describe_msgs(query, [msg_flush()])
   1458 
   1459     with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer),
   1460          {:ok, s, buffer} <- recv_close(s, status, buffer),
   1461          _ = query_delete(s, query),
   1462          {:ok, %Query{ref: ref} = query, %{postgres: postgres} = s, buffer} <-
   1463            recv_parse_describe(s, status, query, buffer) do
   1464       # lock state with unique query reference as not synced
   1465       {:ok, query, %{s | postgres: {postgres, ref}, buffer: buffer}}
   1466     else
   1467       {:error, err, s, buffer} ->
   1468         error_flushed(s, status, err, buffer)
   1469 
   1470       {:reload, oids, s, buffer} ->
   1471         reload_flushed(s, status, query, oids, buffer)
   1472 
   1473       {:disconnect, _err, _s} = disconnect ->
   1474         disconnect
   1475     end
   1476   end
   1477 
   1478   defp close_parse_describe_flush(
   1479          %{postgres: :transaction, buffer: buffer} = s,
   1480          %{mode: :savepoint} = status,
   1481          query
   1482        ) do
   1483     %Query{name: name} = query
   1484 
   1485     msgs =
   1486       [
   1487         msg_query(statement: "SAVEPOINT postgrex_query"),
   1488         msg_close(type: :statement, name: name)
   1489       ] ++ parse_describe_msgs(query, [msg_flush()])
   1490 
   1491     with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer),
   1492          {:ok, _, %{buffer: buffer} = s} <- recv_transaction(s, status, buffer),
   1493          {:ok, s, buffer} <- recv_close(s, status, buffer),
   1494          _ = query_delete(s, query),
   1495          {:ok, %Query{ref: ref} = query, %{postgres: postgres} = s, buffer} <-
   1496            recv_parse_describe(s, status, query, buffer) do
   1497       # lock state with unique query reference as not synced
   1498       {:ok, query, %{s | postgres: {postgres, ref}, buffer: buffer}}
   1499     else
   1500       {:error, err, s, buffer} ->
   1501         rollback_flushed(s, status, err, buffer)
   1502 
   1503       {:reload, oids, s, buffer} ->
   1504         reload_flushed(s, status, query, oids, buffer)
   1505 
   1506       {:disconnect, _err, _s} = disconnect ->
   1507         disconnect
   1508     end
   1509   end
   1510 
   1511   defp close_parse_describe_flush(%{postgres: postgres} = s, %{mode: :savepoint}, _)
   1512        when postgres in [:idle, :error] do
   1513     transaction_error(s, postgres)
   1514   end
   1515 
   1516   defp parse_describe_msgs(query, tail) do
   1517     %Query{name: name, statement: statement, param_oids: param_oids} = query
   1518     type_oids = param_oids || []
   1519 
   1520     [
   1521       msg_parse(name: name, statement: statement, type_oids: type_oids),
   1522       msg_describe(type: :statement, name: name) | tail
   1523     ]
   1524   end
   1525 
   1526   defp recv_parse_describe(
   1527          %{types: protocol_types} = s,
   1528          status,
   1529          %Query{ref: ref, types: query_types} = query,
   1530          buffer
   1531        )
   1532        when ref == nil or protocol_types != query_types do
   1533     with {:ok, s, buffer} <- recv_parse(s, status, buffer),
   1534          {:ok, param_oids, result_oids, columns, s, buffer} <- recv_describe(s, status, buffer) do
   1535       describe(s, query, param_oids, result_oids, columns, buffer)
   1536     else
   1537       {:error, %Postgrex.Error{} = error, s, buffer} ->
   1538         {:error, %{error | query: query.statement}, s, buffer}
   1539 
   1540       {:disconnect, _, _} = disconnect ->
   1541         disconnect
   1542     end
   1543   end
   1544 
   1545   defp recv_parse_describe(s, status, query, buffer) do
   1546     %Query{param_oids: param_oids, result_oids: result_oids, columns: columns} = query
   1547 
   1548     with {:ok, s, buffer} <- recv_parse(s, status, buffer),
   1549          {:ok, ^param_oids, ^result_oids, ^columns, s, buffer} <-
   1550            recv_describe(s, status, param_oids, buffer) do
   1551       query_put(s, query)
   1552       {:ok, query, s, buffer}
   1553     else
   1554       {:ok, ^param_oids, new_result_oids, new_columns, s, buffer} ->
   1555         redescribe(s, query, new_result_oids, new_columns, buffer)
   1556 
   1557       {:error, %Postgrex.Error{}, _, _} = error ->
   1558         error
   1559 
   1560       {:disconnect, _, _} = disconnect ->
   1561         disconnect
   1562     end
   1563   end
   1564 
   1565   defp recv_parse(s, status, buffer) do
   1566     case msg_recv(s, :infinity, buffer) do
   1567       {:ok, msg_parse_complete(), buffer} ->
   1568         {:ok, s, buffer}
   1569 
   1570       {:ok, msg_error(fields: fields), buffer} ->
   1571         {:error, Postgrex.Error.exception(postgres: fields), s, buffer}
   1572 
   1573       {:ok, msg, buffer} ->
   1574         {s, status} = handle_msg(s, status, msg)
   1575         recv_parse(s, status, buffer)
   1576 
   1577       {:disconnect, _, _} = dis ->
   1578         dis
   1579     end
   1580   end
   1581 
   1582   defp recv_describe(s, status, param_oids \\ [], buffer) do
   1583     case msg_recv(s, :infinity, buffer) do
   1584       {:ok, msg_no_data(), buffer} ->
   1585         {:ok, param_oids, nil, nil, s, buffer}
   1586 
   1587       {:ok, msg_parameter_desc(type_oids: param_oids), buffer} ->
   1588         recv_describe(s, status, param_oids, buffer)
   1589 
   1590       {:ok, msg_row_desc(fields: fields), buffer} ->
   1591         {result_oids, columns} = columns(fields)
   1592         {:ok, param_oids, result_oids, columns, s, buffer}
   1593 
   1594       {:ok, msg_too_many_parameters(len: len, max_len: max), buffer} ->
   1595         msg = "postgresql protocol can not handle #{len} parameters, the maximum is #{max}"
   1596         err = Postgrex.QueryError.exception(msg)
   1597         {:disconnect, err, %{s | buffer: buffer}}
   1598 
   1599       {:ok, msg_error(fields: fields), buffer} ->
   1600         {:error, Postgrex.Error.exception(postgres: fields), s, buffer}
   1601 
   1602       {:ok, msg, buffer} ->
   1603         {s, status} = handle_msg(s, status, msg)
   1604         recv_describe(s, status, param_oids, buffer)
   1605 
   1606       {:disconnect, _, _} = dis ->
   1607         dis
   1608     end
   1609   end
   1610 
   1611   defp describe(s, query, param_oids, result_oids, columns, buffer) do
   1612     case describe_params(s, query, param_oids) do
   1613       {:ok, query} ->
   1614         redescribe(s, query, result_oids, columns, buffer)
   1615 
   1616       {:reload, oids} ->
   1617         reload_describe_result(s, oids, result_oids, buffer)
   1618 
   1619       {:error, err} ->
   1620         {:disconnect, err, %{s | buffer: buffer}}
   1621     end
   1622   end
   1623 
   1624   defp redescribe(s, query, result_oids, columns, buffer) do
   1625     with {:ok, query} <- describe_result(s, query, result_oids, columns) do
   1626       query_put(s, query)
   1627       {:ok, query, s, buffer}
   1628     else
   1629       {:reload, oids} ->
   1630         {:reload, oids, s, buffer}
   1631 
   1632       {:error, err} ->
   1633         {:disconnect, err, %{s | buffer: buffer}}
   1634     end
   1635   end
   1636 
   1637   defp describe_params(%{types: types}, query, param_oids) do
   1638     with {:ok, param_info} <- fetch_type_info(param_oids, types),
   1639          {param_formats, param_types} = Enum.unzip(param_info) do
   1640       query = %Query{
   1641         query
   1642         | param_oids: param_oids,
   1643           param_formats: param_formats,
   1644           param_types: param_types
   1645       }
   1646 
   1647       {:ok, query}
   1648     end
   1649   end
   1650 
   1651   defp reload_describe_result(s, param_oids, nil, buffer) do
   1652     {:reload, param_oids, s, buffer}
   1653   end
   1654 
   1655   defp reload_describe_result(%{types: types} = s, param_oids, result_oids, buffer) do
   1656     case fetch_type_info(result_oids, types) do
   1657       {:ok, _} ->
   1658         {:reload, param_oids, s, buffer}
   1659 
   1660       {:reload, reload_oids} ->
   1661         {:reload, MapSet.union(param_oids, reload_oids), s, buffer}
   1662 
   1663       {:error, err} ->
   1664         {:disconnect, err, %{s | buffer: buffer}}
   1665     end
   1666   end
   1667 
   1668   defp describe_result(%{types: types}, query, nil, nil) do
   1669     query = %Query{
   1670       query
   1671       | ref: make_ref(),
   1672         types: types,
   1673         columns: nil,
   1674         result_oids: nil,
   1675         result_formats: [],
   1676         result_types: nil
   1677     }
   1678 
   1679     {:ok, query}
   1680   end
   1681 
   1682   defp describe_result(%{types: types}, query, result_oids, columns) do
   1683     with {:ok, result_info} <- fetch_type_info(result_oids, types),
   1684          {result_formats, result_types} = Enum.unzip(result_info) do
   1685       query = %Query{
   1686         query
   1687         | ref: make_ref(),
   1688           types: types,
   1689           columns: columns,
   1690           result_oids: result_oids,
   1691           result_formats: result_formats,
   1692           result_types: result_types
   1693       }
   1694 
   1695       {:ok, query}
   1696     end
   1697   end
   1698 
   1699   defp error_flushed(s, %{mode: :transaction} = status, err, buffer) do
   1700     with :ok <- msg_send(s, [msg_sync()], buffer) do
   1701       error_ready(s, status, err, buffer)
   1702     else
   1703       {:disconnect, _err, _s} = disconnect ->
   1704         disconnect
   1705     end
   1706   end
   1707 
   1708   defp rollback_flushed(s, %{mode: :savepoint} = status, err, buffer) do
   1709     stmt = "ROLLBACK TO SAVEPOINT postgrex_query;RELEASE SAVEPOINT postgrex_query"
   1710     msgs = [msg_sync(), msg_query(statement: stmt)]
   1711 
   1712     with :ok <- msg_send(s, msgs, buffer),
   1713          {:error, err, %{buffer: buffer} = s} <- error_ready(s, status, err, buffer),
   1714          {:ok, _, s} <- recv_transaction(s, status, buffer) do
   1715       {:error, err, s}
   1716     else
   1717       {:disconnect, _err, _s} = disconnect ->
   1718         disconnect
   1719     end
   1720   end
   1721 
   1722   defp reload_transaction(s, status, query, oids, buffer) do
   1723     %Query{name: name} = query
   1724     msgs = [msg_close(type: :statement, name: name), msg_sync()]
   1725 
   1726     with {:ok, _, %{buffer: buffer} = s} <- recv_transaction(s, status, buffer),
   1727          :ok <- msg_send(s, msgs, buffer) do
   1728       reload_closed(s, status, query, oids, buffer)
   1729     else
   1730       {:disconnect, _err, _s} = disconnect ->
   1731         disconnect
   1732     end
   1733   end
   1734 
   1735   defp reload_flushed(s, %{mode: :transaction} = status, query, oids, buffer) do
   1736     %Query{name: name} = query
   1737     msgs = [msg_close(type: :statement, name: name), msg_sync()]
   1738 
   1739     with :ok <- msg_send(s, msgs, buffer) do
   1740       reload_closed(s, status, query, oids, buffer)
   1741     else
   1742       {:disconnect, _err, _s} = disconnect ->
   1743         disconnect
   1744     end
   1745   end
   1746 
   1747   defp reload_flushed(s, %{mode: :savepoint} = status, query, oids, buffer) do
   1748     %Query{name: name} = query
   1749     stmt = "ROLLBACK TO SAVEPOINT postgrex_query;RELEASE SAVEPOINT postgrex_query"
   1750     msgs = [msg_close(type: :statement, name: name), msg_query(statement: stmt)]
   1751 
   1752     with :ok <- msg_send(s, msgs, buffer),
   1753          {:ok, s, buffer} <- recv_close(s, status, buffer),
   1754          {:ok, _, %{buffer: buffer} = s} <-
   1755            recv_transaction(s, status, buffer) do
   1756       reload_spawn(%{s | buffer: nil}, status, query, oids, buffer)
   1757     else
   1758       {:disconnect, _err, _s} = disconnect ->
   1759         disconnect
   1760     end
   1761   end
   1762 
   1763   defp reload_ready(s, status, query, oids, buffer) do
   1764     %Query{name: name} = query
   1765     msgs = [msg_close(type: :statement, name: name), msg_sync()]
   1766 
   1767     with {:ok, %{buffer: buffer} = s} <- recv_ready(s, status, buffer),
   1768          :ok <- msg_send(s, msgs, buffer) do
   1769       reload_closed(s, status, query, oids, buffer)
   1770     else
   1771       {:disconnect, _err, _s} = disconnect ->
   1772         disconnect
   1773     end
   1774   end
   1775 
   1776   defp reload_closed(s, status, query, oids, buffer) do
   1777     with {:ok, s, buffer} <- recv_close(s, status, buffer),
   1778          {:ok, %{buffer: buffer} = s} <- recv_ready(s, status, buffer) do
   1779       reload_spawn(%{s | buffer: nil}, status, query, oids, buffer)
   1780     else
   1781       {:disconnect, _err, _s} = disconnect ->
   1782         disconnect
   1783     end
   1784   end
   1785 
   1786   defp fetch_type_info(oids, types, infos \\ [], reloads \\ MapSet.new())
   1787 
   1788   defp fetch_type_info([], _, infos, reloads) do
   1789     case MapSet.size(reloads) do
   1790       0 ->
   1791         {:ok, Enum.reverse(infos)}
   1792 
   1793       _ ->
   1794         {:reload, reloads}
   1795     end
   1796   end
   1797 
   1798   defp fetch_type_info([oid | oids], types, infos, reloads) do
   1799     case Postgrex.Types.fetch(oid, types) do
   1800       {:ok, info} ->
   1801         fetch_type_info(oids, types, [info | infos], reloads)
   1802 
   1803       {:error, %Postgrex.TypeInfo{} = info, mod} ->
   1804         msg = Postgrex.Utils.type_msg(info, mod)
   1805         {:error, Postgrex.QueryError.exception(msg)}
   1806 
   1807       {:error, nil, _} ->
   1808         fetch_type_info(oids, types, infos, MapSet.put(reloads, oid))
   1809     end
   1810   end
   1811 
   1812   defp reload_spawn(s, status, query, oids, buffer) do
   1813     Logger.debug(fn ->
   1814       [
   1815         inspect(query),
   1816         " uses unknown oid(s) ",
   1817         Enum.join(oids, ", ")
   1818         | "forcing us to reload type information from the database. " <>
   1819             "This is expected behaviour whenever you migrate your database."
   1820       ]
   1821     end)
   1822 
   1823     ref = make_ref()
   1824     {_, mon} = spawn_monitor(fn -> reload_init(s, status, oids, ref, buffer) end)
   1825 
   1826     receive do
   1827       {:DOWN, ^mon, _, _, {^ref, s, buffer}} ->
   1828         reload_fetch(s, status, query, oids, buffer)
   1829 
   1830       {:DOWN, ^mon, _, _, _} ->
   1831         {:disconnect, type_fetch_error(), %{s | buffer: buffer}}
   1832     end
   1833   end
   1834 
   1835   defp reload_init(%{types: types} = s, status, oids, exit_ref, buffer) do
   1836     with {:ok, server} <- Postgrex.Types.owner(types),
   1837          {:lock, lock_ref, ^types} <- TypeServer.fetch(server),
   1838          status = Map.put(status, :types_lock, {server, lock_ref}),
   1839          acc = {[], MapSet.new(), MapSet.new(), MapSet.new()},
   1840          {:ok, s} <- reload(s, status, oids, acc, buffer) do
   1841       %{buffer: buffer} = s
   1842       exit({exit_ref, %{s | buffer: nil}, buffer})
   1843     else
   1844       :noproc ->
   1845         exit(:normal)
   1846 
   1847       :error ->
   1848         exit(:normal)
   1849 
   1850       {error, err, _} when error in [:error, :disconnect] ->
   1851         raise err
   1852     end
   1853   end
   1854 
   1855   defp reload(%{types: types} = s, status, oids, acc, buffer) do
   1856     %{parameters: parameters} = s
   1857 
   1858     with {:ok, parameters} <- Postgrex.Parameters.fetch(parameters) do
   1859       version = Postgrex.Utils.parse_version(parameters["server_version"])
   1860       statement = Types.reload_query(version, Enum.to_list(oids), types)
   1861 
   1862       if statement do
   1863         reload_send(s, status, statement, acc, buffer)
   1864       else
   1865         %{types_lock: {server, ref}} = status
   1866         {type_infos, _, _, _} = acc
   1867         sorted_infos = Enum.sort_by(type_infos, & &1.oid)
   1868         TypeServer.update(server, ref, sorted_infos)
   1869         {:ok, %{s | buffer: buffer}}
   1870       end
   1871     else
   1872       :error ->
   1873         s = %{s | buffer: buffer}
   1874         {:error, %Postgrex.Error{message: "parameters not available"}, s}
   1875     end
   1876   end
   1877 
   1878   defp reload_send(s, status, statement, acc, buffer) do
   1879     msg = msg_query(statement: statement)
   1880 
   1881     case msg_send(s, msg, buffer) do
   1882       :ok ->
   1883         reload_recv(s, status, acc, buffer)
   1884 
   1885       {:disconnect, err, s} ->
   1886         bootstrap_fail(s, err, status)
   1887     end
   1888   end
   1889 
   1890   defp reload_recv(%{types: types} = s, status, acc, buffer) do
   1891     case msg_recv(s, :infinity, buffer) do
   1892       {:ok, msg_row_desc(), buffer} ->
   1893         reload_recv(s, status, acc, buffer)
   1894 
   1895       {:ok, msg_data_row(values: values), buffer} ->
   1896         reload_recv(s, status, reload_row(acc, values, types), buffer)
   1897 
   1898       {:ok, msg_command_complete(), buffer} ->
   1899         reload_complete(s, status, acc, buffer)
   1900 
   1901       {:ok, msg_error(fields: fields), buffer} ->
   1902         err = Postgrex.Error.exception(postgres: fields)
   1903         bootstrap_fail(s, err, status, buffer)
   1904 
   1905       {:ok, msg, buffer} ->
   1906         {s, status} = handle_msg(s, status, msg)
   1907         reload_recv(s, status, acc, buffer)
   1908 
   1909       {:disconnect, err, s} ->
   1910         bootstrap_fail(s, err, status)
   1911     end
   1912   end
   1913 
   1914   defp reload_row({type_infos, oids, missing, current}, values, types) do
   1915     %Postgrex.TypeInfo{oid: oid} = type_info = Types.build_type_info(values)
   1916 
   1917     missing =
   1918       missing
   1919       |> put_missing_oids(type_info, oids, types)
   1920       |> MapSet.delete(oid)
   1921 
   1922     {[type_info | type_infos], MapSet.put(oids, oid), missing, current}
   1923   end
   1924 
   1925   defp put_missing_oids(missing, type_info, new, types) do
   1926     %Postgrex.TypeInfo{array_elem: array_elem, base_type: base_type, comp_elems: comp_elems} =
   1927       type_info
   1928 
   1929     for oid <- [array_elem, base_type | comp_elems],
   1930         oid !== 0,
   1931         not MapSet.member?(new, oid),
   1932         not bootstrapped?(types, oid),
   1933         do: oid,
   1934         into: missing
   1935   end
   1936 
   1937   defp bootstrapped?(types, oid) do
   1938     case Postgrex.Types.fetch(oid, types) do
   1939       {:ok, _} ->
   1940         true
   1941 
   1942       {:error, %Postgrex.TypeInfo{}, _} ->
   1943         true
   1944 
   1945       {:error, nil, _} ->
   1946         false
   1947     end
   1948   end
   1949 
   1950   defp reload_complete(s, status, {type_infos, new, missing, prev}, buffer) do
   1951     case sync_recv(s, status, buffer) do
   1952       {:ok, %{buffer: buffer} = s} ->
   1953         s = %{s | buffer: nil}
   1954         next = MapSet.delete(missing, prev)
   1955         current = MapSet.union(next, prev)
   1956         reload(s, status, Enum.to_list(next), {type_infos, new, MapSet.new(), current}, buffer)
   1957 
   1958       {:disconnect, _, _} = error ->
   1959         error
   1960     end
   1961   end
   1962 
   1963   defp reload_fetch(%{types: types} = s, status, query, oids, buffer) do
   1964     case oids |> Enum.to_list() |> fetch_type_info(types) do
   1965       {:ok, _} ->
   1966         reload_prepare(%{s | buffer: buffer}, status, query)
   1967 
   1968       {:error, err} ->
   1969         disconnect(s, err, buffer)
   1970 
   1971       {:reload, oids} ->
   1972         msg = "oid(s) #{Enum.join(oids, ", ")} lack type information after bootstrap"
   1973         disconnect(s, RuntimeError.exception(message: msg), buffer)
   1974     end
   1975   end
   1976 
   1977   defp reload_prepare(s, %{prepare: prepare} = status, query) do
   1978     %Query{name: name} = query
   1979 
   1980     case prepare do
   1981       true when name == "" ->
   1982         # unnamed queries closed on prepare when not re-using
   1983         parse_describe_close(s, status, query)
   1984 
   1985       true ->
   1986         # named queries closed when oid not found
   1987         parse_describe(s, status, query)
   1988 
   1989       _ ->
   1990         # flush awaiting execute or declare
   1991         parse_describe_flush(s, status, query)
   1992     end
   1993   end
   1994 
   1995   ## execute
   1996 
   1997   defp lock_error(s, fun) do
   1998     msg = "connection is locked copying to or from the database and can not #{fun} transaction"
   1999 
   2000     {:disconnect, RuntimeError.exception(msg), s}
   2001   end
   2002 
   2003   defp lock_error(s, fun, query) do
   2004     msg =
   2005       "connection is locked copying to or from the database and can not #{fun} #{inspect(query)}"
   2006 
   2007     {:disconnect, RuntimeError.exception(msg), s}
   2008   end
   2009 
   2010   defp transaction_error(s, status) do
   2011     {:error, DBConnection.TransactionError.exception(status), s}
   2012   end
   2013 
   2014   defp handle_prepare_execute(%Query{name: ""} = query, params, opts, s) do
   2015     status = new_status(opts)
   2016 
   2017     case parse_describe_flush(s, status, query) do
   2018       {:ok, query, s} ->
   2019         bind_execute_close(s, status, query, params)
   2020 
   2021       {error, _, _} = other when error in [:error, :disconnect] ->
   2022         other
   2023     end
   2024   end
   2025 
   2026   defp handle_prepare_execute(%Query{} = query, params, opts, s) do
   2027     status = new_status(opts)
   2028 
   2029     case close_parse_describe_flush(s, status, query) do
   2030       {:ok, query, s} ->
   2031         bind_execute(s, status, query, params)
   2032 
   2033       {error, _, _} = other when error in [:error, :disconnect] ->
   2034         other
   2035     end
   2036   end
   2037 
   2038   defp bind_execute_close(s, %{mode: :transaction} = status, query, params) do
   2039     %Query{param_formats: pfs, result_formats: rfs, name: name} = query
   2040     %{buffer: buffer} = s
   2041 
   2042     msgs = [
   2043       msg_bind(
   2044         name_port: "",
   2045         name_stat: name,
   2046         param_formats: pfs,
   2047         params: params,
   2048         result_formats: rfs
   2049       ),
   2050       msg_execute(name_port: "", max_rows: 0),
   2051       msg_close(type: :statement, name: name),
   2052       msg_sync()
   2053     ]
   2054 
   2055     with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer),
   2056          {:ok, s, buffer} <- recv_bind(s, status, buffer),
   2057          {:ok, result, s, buffer} <- recv_execute(s, status, query, buffer),
   2058          {:ok, s, buffer} <- recv_close(s, status, buffer),
   2059          {:ok, s} <- recv_ready(s, status, buffer) do
   2060       {:ok, query, result, s}
   2061     else
   2062       {:error, %Postgrex.Error{} = err, s, buffer} ->
   2063         error_ready(s, status, err, buffer)
   2064         |> maybe_disconnect()
   2065 
   2066       {:disconnect, _err, _s} = disconnect ->
   2067         disconnect
   2068     end
   2069   end
   2070 
   2071   defp bind_execute_close(s, %{mode: :savepoint} = status, query, params) do
   2072     # only used for un-named and query will always get closed by release
   2073     bind_execute(s, status, query, params)
   2074   end
   2075 
   2076   defp bind_execute(s, %{mode: :transaction} = status, query, params) do
   2077     %Query{param_formats: pfs, result_formats: rfs, name: name} = query
   2078     %{buffer: buffer} = s
   2079 
   2080     msgs = [
   2081       msg_bind(
   2082         name_port: "",
   2083         name_stat: name,
   2084         param_formats: pfs,
   2085         params: params,
   2086         result_formats: rfs
   2087       ),
   2088       msg_execute(name_port: "", max_rows: 0),
   2089       msg_sync()
   2090     ]
   2091 
   2092     with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer),
   2093          {:ok, s, buffer} <- recv_bind(s, status, buffer),
   2094          {:ok, result, s, buffer} <- recv_execute(s, status, query, buffer),
   2095          {:ok, s} <- recv_ready(s, status, buffer) do
   2096       {:ok, query, result, s}
   2097     else
   2098       {:error, %Postgrex.Error{} = err, s, buffer} ->
   2099         query_delete_on_error(s, err, query)
   2100 
   2101         error_ready(s, status, err, buffer)
   2102         |> maybe_disconnect()
   2103 
   2104       {:disconnect, _err, _s} = disconnect ->
   2105         disconnect
   2106     end
   2107   end
   2108 
   2109   defp bind_execute(s, %{mode: :savepoint} = status, query, params) do
   2110     %Query{param_formats: pfs, result_formats: rfs, name: name} = query
   2111     %{buffer: buffer} = s
   2112 
   2113     msgs = [
   2114       msg_bind(
   2115         name_port: "",
   2116         name_stat: name,
   2117         param_formats: pfs,
   2118         params: params,
   2119         result_formats: rfs
   2120       ),
   2121       msg_execute(name_port: "", max_rows: 0),
   2122       msg_query(statement: "RELEASE SAVEPOINT postgrex_query")
   2123     ]
   2124 
   2125     with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer),
   2126          {:ok, s, buffer} <- recv_bind(s, status, buffer),
   2127          {:ok, result, s, buffer} <- recv_execute(s, status, query, buffer),
   2128          {:ok, _, s} <- recv_transaction(s, status, buffer) do
   2129       {:ok, query, result, s}
   2130     else
   2131       {:error, %Postgrex.Error{} = err, s, buffer} ->
   2132         query_delete_on_error(s, err, query)
   2133         rollback_flushed(s, status, err, buffer)
   2134 
   2135       {:disconnect, _err, _s} = disconnect ->
   2136         disconnect
   2137     end
   2138   end
   2139 
   2140   defp maybe_disconnect({:error, _, %{disconnect_on_error_codes: []}} = result), do: result
   2141 
   2142   defp maybe_disconnect(
   2143          {:error, %Postgrex.Error{postgres: %{code: code}} = error,
   2144           %{disconnect_on_error_codes: codes} = state} = result
   2145        ) do
   2146     if code in codes do
   2147       {:disconnect, error, state}
   2148     else
   2149       result
   2150     end
   2151   end
   2152 
   2153   defp maybe_disconnect(other), do: other
   2154 
   2155   defp rebind_execute(s, %{mode: :transaction} = status, query, params) do
   2156     # using a cached query is same as using it for the first time when don't
   2157     # need to setup savepoints
   2158     bind_execute(s, status, query, params)
   2159   end
   2160 
   2161   defp rebind_execute(%{postgres: :transaction} = s, %{mode: :savepoint} = status, query, params) do
   2162     # using a named cache query so savepoint/simple query does not unprepare
   2163     %Query{param_formats: pfs, result_formats: rfs, name: name} = query
   2164     %{buffer: buffer} = s
   2165 
   2166     msgs = [
   2167       msg_query(statement: "SAVEPOINT postgrex_query"),
   2168       msg_bind(
   2169         name_port: "",
   2170         name_stat: name,
   2171         param_formats: pfs,
   2172         params: params,
   2173         result_formats: rfs
   2174       ),
   2175       msg_execute(name_port: "", max_rows: 0),
   2176       msg_query(statement: "RELEASE SAVEPOINT postgrex_query")
   2177     ]
   2178 
   2179     with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer),
   2180          {:ok, _, %{buffer: buffer} = s} <- recv_transaction(s, status, buffer),
   2181          {:ok, s, buffer} <- recv_bind(%{s | buffer: nil}, status, buffer),
   2182          {:ok, result, s, buffer} <- recv_execute(s, status, query, buffer),
   2183          {:ok, _, s} <- recv_transaction(s, status, buffer) do
   2184       {:ok, query, result, s}
   2185     else
   2186       {:error, %Postgrex.Error{} = err, s, buffer} ->
   2187         query_delete_on_error(s, err, query)
   2188         rollback_flushed(s, status, err, buffer)
   2189 
   2190       {:disconnect, _err, _s} = disconnect ->
   2191         disconnect
   2192     end
   2193   end
   2194 
   2195   defp rebind_execute(%{postgres: postgres} = s, %{mode: :savepoint}, _, _)
   2196        when postgres in [:idle, :error] do
   2197     transaction_error(s, postgres)
   2198   end
   2199 
   2200   defp recv_bind(s, status, buffer) do
   2201     case msg_recv(s, :infinity, buffer) do
   2202       {:ok, msg_bind_complete(), buffer} ->
   2203         {:ok, s, buffer}
   2204 
   2205       {:ok, msg_error(fields: fields), buffer} ->
   2206         {:error, Postgrex.Error.exception(postgres: fields), s, buffer}
   2207 
   2208       {:ok, msg, buffer} ->
   2209         {s, status} = handle_msg(s, status, msg)
   2210         recv_bind(s, status, buffer)
   2211 
   2212       {:disconnect, _, _} = dis ->
   2213         dis
   2214     end
   2215   end
   2216 
   2217   defp recv_execute(s, status, query, rows \\ [], buffer) do
   2218     %Query{result_types: types} = query
   2219 
   2220     case rows_recv(s, types, rows, buffer) do
   2221       {:ok, msg_command_complete(tag: tag), rows, buffer} ->
   2222         {:ok, done(s, status, query, rows, tag), s, buffer}
   2223 
   2224       {:ok, msg_error(fields: fields), _, buffer} ->
   2225         {:error, Postgrex.Error.exception(postgres: fields), s, buffer}
   2226 
   2227       {:ok, msg_empty_query(), [], buffer} ->
   2228         {:ok, done(s, status, query, nil, nil), s, buffer}
   2229 
   2230       {:ok, msg_copy_in_response(), [], buffer} ->
   2231         copy_in_disconnect(s, query, buffer)
   2232 
   2233       {:ok, msg_copy_out_response(), [], buffer} ->
   2234         recv_copy_out(s, status, query, buffer)
   2235 
   2236       {:ok, msg_copy_both_response(), [], buffer} ->
   2237         copy_both_disconnect(s, query, buffer)
   2238 
   2239       {:ok, msg, rows, buffer} ->
   2240         {s, status} = handle_msg(s, status, msg)
   2241         recv_execute(s, status, query, rows, buffer)
   2242 
   2243       {:disconnect, _, _} = dis ->
   2244         dis
   2245     end
   2246   end
   2247 
   2248   defp copy_in_disconnect(s, query, buffer) do
   2249     msg = "query #{inspect(query)} is trying to copy in but no copy data to send"
   2250     {:disconnect, RuntimeError.exception(msg), %{s | buffer: buffer}}
   2251   end
   2252 
   2253   defp copy_both_disconnect(s, query, buffer) do
   2254     msg = "query #{inspect(query)} is trying to copy both ways but it is not supported"
   2255     {:disconnect, RuntimeError.exception(msg), %{s | buffer: buffer}}
   2256   end
   2257 
   2258   defp recv_copy_out(s, status, query, acc \\ [], buffer) do
   2259     case msg_recv(s, :infinity, buffer) do
   2260       {:ok, msg_copy_data(data: data), buffer} ->
   2261         recv_copy_out(s, status, query, [data | acc], buffer)
   2262 
   2263       {:ok, msg_copy_done(), buffer} ->
   2264         recv_copy_out(s, status, query, acc, buffer)
   2265 
   2266       {:ok, msg_command_complete(tag: tag), buffer} ->
   2267         {:ok, done(s, status, query, acc, tag), s, buffer}
   2268 
   2269       {:ok, msg_error(fields: fields), buffer} ->
   2270         {:error, Postgrex.Error.exception(postgres: fields), s, buffer}
   2271 
   2272       {:ok, msg, buffer} ->
   2273         {s, status} = handle_msg(s, status, msg)
   2274         recv_copy_out(s, status, query, acc, buffer)
   2275 
   2276       {:disconnect, _, _} = dis ->
   2277         dis
   2278     end
   2279   end
   2280 
   2281   defp make_portal() do
   2282     System.unique_integer([:positive])
   2283     |> Integer.to_string(36)
   2284   end
   2285 
   2286   defp handle_bind(%Query{ref: ref} = query, params, res, opts, %{postgres: {_, ref}} = s) do
   2287     bind(s, new_status(opts), query, params, res)
   2288   end
   2289 
   2290   defp handle_bind(query, _, _, _, %{postgres: {_, _}} = s) do
   2291     lock_error(s, :bind, query)
   2292   end
   2293 
   2294   defp handle_bind(query, params, res, opts, s) do
   2295     if query_member?(s, query) do
   2296       rebind(s, new_status(opts), query, params, res)
   2297     else
   2298       handle_prepare_bind(query, params, res, opts, s)
   2299     end
   2300   end
   2301 
   2302   defp handle_prepare_bind(%Query{name: ""} = query, params, res, opts, s) do
   2303     status = new_status(opts)
   2304 
   2305     case parse_describe_flush(s, status, query) do
   2306       {:ok, query, s} ->
   2307         bind(s, status, query, params, res)
   2308 
   2309       {error, _, _} = other when error in [:error, :disconnect] ->
   2310         other
   2311     end
   2312   end
   2313 
   2314   defp handle_prepare_bind(%Query{} = query, params, res, opts, s) do
   2315     status = new_status(opts)
   2316 
   2317     case close_parse_describe_flush(s, status, query) do
   2318       {:ok, query, s} ->
   2319         bind(s, status, query, params, res)
   2320 
   2321       {error, _, _} = other when error in [:error, :disconnect] ->
   2322         other
   2323     end
   2324   end
   2325 
   2326   defp bind(s, %{mode: :transaction} = status, query, params, cursor) do
   2327     %Query{param_formats: pfs, result_formats: rfs, name: name} = query
   2328     %{portal: portal} = cursor
   2329     %{buffer: buffer} = s
   2330 
   2331     msgs = [
   2332       msg_bind(
   2333         name_port: portal,
   2334         name_stat: name,
   2335         param_formats: pfs,
   2336         params: params,
   2337         result_formats: rfs
   2338       ),
   2339       msg_sync()
   2340     ]
   2341 
   2342     with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer),
   2343          {:ok, s, buffer} <- recv_bind(s, status, buffer),
   2344          {:ok, s} <- recv_ready(s, status, buffer) do
   2345       {:ok, query, cursor, s}
   2346     else
   2347       {:error, %Postgrex.Error{} = err, s, buffer} ->
   2348         error_ready(s, status, err, buffer)
   2349 
   2350       {:disconnect, _err, _s} = disconnect ->
   2351         disconnect
   2352     end
   2353   end
   2354 
   2355   defp bind(s, %{mode: :savepoint} = status, query, params, cursor) do
   2356     %Query{param_formats: pfs, result_formats: rfs, name: name} = query
   2357     %{portal: portal} = cursor
   2358     %{buffer: buffer} = s
   2359 
   2360     msgs = [
   2361       msg_bind(
   2362         name_port: portal,
   2363         name_stat: name,
   2364         param_formats: pfs,
   2365         params: params,
   2366         result_formats: rfs
   2367       ),
   2368       msg_query(statement: "RELEASE SAVEPOINT postgrex_query")
   2369     ]
   2370 
   2371     with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer),
   2372          {:ok, s, buffer} <- recv_bind(s, status, buffer),
   2373          {:ok, _, s} <- recv_transaction(s, status, buffer) do
   2374       {:ok, query, cursor, s}
   2375     else
   2376       {:error, %Postgrex.Error{} = err, s, buffer} ->
   2377         rollback_flushed(s, status, err, buffer)
   2378 
   2379       {:disconnect, _err, _s} = disconnect ->
   2380         disconnect
   2381     end
   2382   end
   2383 
   2384   defp rebind(s, %{mode: :transaction} = status, query, params, cursor) do
   2385     # using a cached query is same as using it for the first time when don't
   2386     # need to setup savepoints
   2387     bind(s, status, query, params, cursor)
   2388   end
   2389 
   2390   defp rebind(%{postgres: :transaction} = s, %{mode: :savepoint} = status, query, params, cursor) do
   2391     %Query{param_formats: pfs, result_formats: rfs, name: name} = query
   2392     %{portal: portal} = cursor
   2393     %{buffer: buffer} = s
   2394 
   2395     msgs = [
   2396       msg_query(statement: "SAVEPOINT postgrex_query"),
   2397       msg_bind(
   2398         name_port: portal,
   2399         name_stat: name,
   2400         param_formats: pfs,
   2401         params: params,
   2402         result_formats: rfs
   2403       ),
   2404       msg_query(statement: "RELEASE SAVEPOINT postgrex_query")
   2405     ]
   2406 
   2407     with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer),
   2408          {:ok, _, %{buffer: buffer} = s} <- recv_transaction(s, status, buffer),
   2409          {:ok, s, buffer} <- recv_bind(s, status, buffer),
   2410          {:ok, _, s} <- recv_transaction(s, status, buffer) do
   2411       {:ok, query, cursor, s}
   2412     else
   2413       {:error, %Postgrex.Error{} = err, s, buffer} ->
   2414         rollback_flushed(s, status, err, buffer)
   2415 
   2416       {:disconnect, _err, _s} = disconnect ->
   2417         disconnect
   2418     end
   2419   end
   2420 
   2421   defp rebind(%{postgres: postgres} = s, %{mode: :savepoint}, _, _, _)
   2422        when postgres in [:idle, :error] do
   2423     transaction_error(s, postgres)
   2424   end
   2425 
   2426   defp execute(s, %{mode: :transaction} = status, query, cursor, max_rows) do
   2427     %Cursor{portal: portal} = cursor
   2428     msgs = [msg_execute(name_port: portal, max_rows: max_rows), msg_sync()]
   2429     %{buffer: buffer} = s
   2430 
   2431     with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer),
   2432          {ok, result, s, buffer} when ok in [:cont, :halt] <-
   2433            recv_execute(s, status, query, cursor, max_rows, [], buffer),
   2434          {:ok, s} <- recv_ready(s, status, buffer) do
   2435       {ok, result, s}
   2436     else
   2437       {:copy_out, result, s} ->
   2438         {:cont, result, s}
   2439 
   2440       {:error, %Postgrex.Error{} = err, s, buffer} ->
   2441         error_ready(s, status, err, buffer)
   2442 
   2443       {:disconnect, _err, _s} = disconnect ->
   2444         disconnect
   2445     end
   2446   end
   2447 
   2448   defp execute(
   2449          %{postgres: :transaction} = s,
   2450          %{mode: :savepoint} = status,
   2451          query,
   2452          cursor,
   2453          max_rows
   2454        ) do
   2455     %Cursor{portal: portal} = cursor
   2456     %{buffer: buffer} = s
   2457 
   2458     msgs = [
   2459       msg_query(statement: "SAVEPOINT postgrex_query"),
   2460       msg_execute(name_port: portal, max_rows: max_rows),
   2461       msg_query(statement: "RELEASE SAVEPOINT postgrex_query")
   2462     ]
   2463 
   2464     with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer),
   2465          {:ok, _, %{buffer: buffer} = s} <- recv_transaction(s, status, buffer),
   2466          {ok, result, s, buffer} when ok in [:cont, :halt] <-
   2467            recv_execute(s, status, query, cursor, max_rows, [], buffer),
   2468          {:ok, _, s} <- recv_transaction(s, status, buffer) do
   2469       {ok, result, s}
   2470     else
   2471       {:copy_out, result, s} ->
   2472         {:cont, result, s}
   2473 
   2474       {:error, %Postgrex.Error{} = err, s, buffer} ->
   2475         rollback_flushed(s, status, err, buffer)
   2476 
   2477       {:disconnect, _err, _s} = disconnect ->
   2478         disconnect
   2479     end
   2480   end
   2481 
   2482   defp recv_execute(s, status, query, cursor, max_rows, rows, buffer) do
   2483     %Query{result_types: types} = query
   2484 
   2485     case rows_recv(s, types, rows, buffer) do
   2486       {:ok, msg_command_complete(tag: tag), rows, buffer} ->
   2487         {:halt, halt(s, status, query, rows, tag), s, buffer}
   2488 
   2489       {:ok, msg_portal_suspend(), rows, buffer} ->
   2490         {:cont, done(s, status, query, rows, :stream, max_rows), s, buffer}
   2491 
   2492       {:ok, msg_error(fields: fields), _, buffer} ->
   2493         {:error, Postgrex.Error.exception(postgres: fields), s, buffer}
   2494 
   2495       {:ok, msg_empty_query(), [], buffer} ->
   2496         {:halt, done(s, status, query, nil, nil), s, buffer}
   2497 
   2498       {:ok, msg_copy_in_response(), [], buffer} ->
   2499         copy_in_disconnect(s, query, buffer)
   2500 
   2501       {:ok, msg_copy_out_response(), [], buffer} ->
   2502         %{postgres: postgres} = s
   2503         %Cursor{ref: ref} = cursor
   2504         s = %{s | postgres: {postgres, ref}}
   2505         recv_copy_out(s, status, query, max_rows, [], buffer)
   2506 
   2507       {:ok, msg_copy_both_response(), [], buffer} ->
   2508         copy_both_disconnect(s, query, buffer)
   2509 
   2510       {:ok, msg, rows, buffer} ->
   2511         {s, status} = handle_msg(s, status, msg)
   2512         recv_execute(s, status, query, cursor, max_rows, rows, buffer)
   2513 
   2514       {:disconnect, _, _} = dis ->
   2515         dis
   2516     end
   2517   end
   2518 
   2519   defp fetch_copy_out(%{buffer: buffer} = s, %{mode: :transaction} = status, query, max_rows) do
   2520     s = %{s | buffer: nil}
   2521 
   2522     with {:halt, result, s, buffer} <- recv_copy_out(s, status, query, max_rows, [], buffer),
   2523          {:ok, s} <- recv_ready(s, status, buffer) do
   2524       {:halt, result, s}
   2525     else
   2526       {:copy_out, result, s} ->
   2527         {:cont, result, s}
   2528 
   2529       {:error, err, s, buffer} ->
   2530         error_ready(s, status, err, buffer)
   2531 
   2532       {:disconnect, _err, _s} = disconnect ->
   2533         disconnect
   2534     end
   2535   end
   2536 
   2537   defp fetch_copy_out(%{buffer: buffer} = s, %{mode: :savepoint} = status, query, max_rows) do
   2538     s = %{s | buffer: nil}
   2539 
   2540     with {:halt, result, s, buffer} <- recv_copy_out(s, status, query, max_rows, [], buffer),
   2541          {:ok, _, s} <- recv_transaction(s, status, buffer) do
   2542       {:halt, result, s}
   2543     else
   2544       {:copy_out, result, s} ->
   2545         {:cont, result, s}
   2546 
   2547       {:error, err, s, buffer} ->
   2548         rollback_flushed(s, status, err, buffer)
   2549 
   2550       {:disconnect, _err, _s} = disconnect ->
   2551         disconnect
   2552     end
   2553   end
   2554 
   2555   defp recv_copy_out(s, status, query, max_rows, [], buffer) do
   2556     max_rows = if max_rows == 0, do: :infinity, else: max_rows
   2557     recv_copy_out(s, status, query, max_rows, [], 0, buffer)
   2558   end
   2559 
   2560   defp recv_copy_out(s, status, query, max_rows, acc, max_rows, buffer) do
   2561     s = %{s | buffer: buffer}
   2562     {:copy_out, done(s, status, query, acc, :copy_stream, max_rows), s}
   2563   end
   2564 
   2565   defp recv_copy_out(s, status, query, max_rows, acc, nrows, buffer) do
   2566     case msg_recv(s, :infinity, buffer) do
   2567       {:ok, msg_copy_data(data: data), buffer} ->
   2568         recv_copy_out(s, status, query, max_rows, [data | acc], nrows + 1, buffer)
   2569 
   2570       {:ok, msg_copy_done(), buffer} ->
   2571         recv_copy_out(s, status, query, max_rows, acc, nrows, buffer)
   2572 
   2573       {:ok, msg_command_complete(tag: tag), buffer} ->
   2574         {:halt, halt(s, status, query, acc, tag), s, buffer}
   2575 
   2576       {:ok, msg_error(fields: fields), buffer} ->
   2577         {:error, Postgrex.Error.exception(postgres: fields), s, buffer}
   2578 
   2579       {:ok, msg, buffer} ->
   2580         {s, status} = handle_msg(s, status, msg)
   2581         recv_copy_out(s, status, query, max_rows, acc, nrows, buffer)
   2582 
   2583       {:disconnect, _, _} = dis ->
   2584         dis
   2585     end
   2586   end
   2587 
   2588   defp copy_in_data(s, %{mode: :transaction}, copy, data) do
   2589     %Copy{portal: portal, ref: ref, query: query} = copy
   2590     %{postgres: postgres, buffer: buffer} = s
   2591     msgs = [msg_execute(name_port: portal, max_rows: 0), data]
   2592 
   2593     case msg_send(s, msgs, buffer) do
   2594       :ok ->
   2595         {:ok, query, copied(s), %{s | postgres: {postgres, ref}}}
   2596 
   2597       {:disconnect, _err, _s} = disconnect ->
   2598         disconnect
   2599     end
   2600   end
   2601 
   2602   defp copy_in_data(s, %{mode: :savepoint} = status, copy, data) do
   2603     %Copy{portal: portal, ref: ref, query: query} = copy
   2604     %{buffer: buffer} = s
   2605 
   2606     msgs = [
   2607       msg_query(statement: "SAVEPOINT postgrex_query"),
   2608       msg_execute(name_port: portal, max_rows: 0),
   2609       data
   2610     ]
   2611 
   2612     with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer),
   2613          {:ok, _, %{postgres: postgres} = s} <- recv_transaction(s, status, buffer) do
   2614       {:ok, query, copied(s), %{s | postgres: {postgres, ref}}}
   2615     else
   2616       {:disconnect, _err, _s} = disconnect ->
   2617         disconnect
   2618     end
   2619   end
   2620 
   2621   defp copy_in_data(%{sock: {mod, sock}} = s, %{query: query}, data) do
   2622     case mod.send(sock, data) do
   2623       :ok ->
   2624         {:ok, query, copied(s), s}
   2625 
   2626       {:error, reason} ->
   2627         disconnect(s, tag(mod), "send", reason)
   2628     end
   2629   end
   2630 
   2631   defp copied(%{connection_id: connection_id}) do
   2632     %Postgrex.Result{
   2633       command: :copy_stream,
   2634       num_rows: :copy_stream,
   2635       rows: nil,
   2636       columns: nil,
   2637       connection_id: connection_id
   2638     }
   2639   end
   2640 
   2641   defp copy_in_done(s, %{mode: :transaction} = status, %Copy{query: query}) do
   2642     %{buffer: buffer} = s
   2643     msgs = [msg_copy_done(), msg_sync()]
   2644 
   2645     with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer),
   2646          {:ok, result, s, buffer} <- recv_copy_in(s, status, query, buffer),
   2647          {:ok, s} <- recv_ready(s, status, buffer) do
   2648       {:ok, query, result, s}
   2649     else
   2650       {:error, %Postgrex.Error{} = err, s, buffer} ->
   2651         error_ready(s, status, err, buffer)
   2652 
   2653       {:disconnect, _err, _s} = disconnect ->
   2654         disconnect
   2655     end
   2656   end
   2657 
   2658   defp copy_in_done(s, %{mode: :savepoint} = status, %Copy{query: query}) do
   2659     %{buffer: buffer} = s
   2660     msgs = [msg_copy_done(), msg_query(statement: "RELEASE SAVEPOINT postgrex_query")]
   2661 
   2662     with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer),
   2663          {:ok, result, s, buffer} <- recv_copy_in(s, status, query, buffer),
   2664          {:ok, _, s} <- recv_transaction(s, status, buffer) do
   2665       {:ok, query, result, s}
   2666     else
   2667       {:error, %Postgrex.Error{} = err, s, buffer} ->
   2668         rollback_flushed(s, status, err, buffer)
   2669 
   2670       {:disconnect, _err, _s} = disconnect ->
   2671         disconnect
   2672     end
   2673   end
   2674 
   2675   defp recv_copy_in(s, status, query, buffer) do
   2676     %Query{result_types: types} = query
   2677 
   2678     case rows_recv(s, types, [], buffer) do
   2679       {:ok, msg_copy_in_response(), [], buffer} ->
   2680         recv_copy_in_done(s, status, query, buffer)
   2681 
   2682       {:ok, msg_command_complete(tag: tag), rows, buffer} ->
   2683         {:ok, done(s, status, query, rows, tag), s, buffer}
   2684 
   2685       {:ok, msg_empty_query(), [], buffer} ->
   2686         {:ok, done(s, status, query, nil, nil), s, buffer}
   2687 
   2688       {:ok, msg_error(fields: fields), _, buffer} ->
   2689         {:error, Postgrex.Error.exception(postgres: fields), s, buffer}
   2690 
   2691       {:ok, msg_copy_out_response(), [], buffer} ->
   2692         recv_copy_out(s, status, query, buffer)
   2693 
   2694       {:ok, msg_copy_both_response(), [], buffer} ->
   2695         copy_both_disconnect(s, query, buffer)
   2696 
   2697       {:ok, msg, [], buffer} ->
   2698         {s, status} = handle_msg(s, status, msg)
   2699         recv_copy_in(s, status, query, buffer)
   2700 
   2701       {:ok, msg, [_ | _] = rows, buffer} ->
   2702         {s, status} = handle_msg(s, status, msg)
   2703         recv_execute(s, status, query, rows, buffer)
   2704 
   2705       {:disconnect, _, _} = dis ->
   2706         dis
   2707     end
   2708   end
   2709 
   2710   defp recv_copy_in_done(s, status, query, buffer) do
   2711     case msg_recv(s, :infinity, buffer) do
   2712       {:ok, msg_command_complete(tag: tag), buffer} ->
   2713         {:ok, done(s, status, query, nil, tag), s, buffer}
   2714 
   2715       {:ok, msg_error(fields: fields), buffer} ->
   2716         {:error, Postgrex.Error.exception(postgres: fields), s, buffer}
   2717 
   2718       {:ok, msg, buffer} ->
   2719         {s, status} = handle_msg(s, status, msg)
   2720         recv_copy_in_done(s, status, query, buffer)
   2721 
   2722       {:disconnect, _, _} = dis ->
   2723         dis
   2724     end
   2725   end
   2726 
   2727   ## close
   2728 
   2729   defp copy_out_done(s, %{mode: :transaction} = status, query) do
   2730     %{buffer: buffer} = s
   2731     s = %{s | buffer: nil}
   2732 
   2733     with {:halt, result, s, buffer} <- recv_copy_out(s, status, query, :infinity, [], buffer),
   2734          {:ok, s} <- recv_ready(s, status, buffer) do
   2735       {:ok, result, s}
   2736     else
   2737       {:error, %Postgrex.Error{} = err, s, buffer} ->
   2738         error_ready(s, status, err, buffer)
   2739 
   2740       {:disconnect, _err, _s} = disconnect ->
   2741         disconnect
   2742     end
   2743   end
   2744 
   2745   defp copy_out_done(s, %{mode: :savepoint} = status, query) do
   2746     %{buffer: buffer} = s
   2747     s = %{s | buffer: nil}
   2748 
   2749     with {:halt, result, s, buffer} <- recv_copy_out(s, status, query, :infinity, [], buffer),
   2750          {:ok, _, s} <- recv_transaction(s, status, buffer) do
   2751       {:ok, result, s}
   2752     else
   2753       {:error, %Postgrex.Error{} = err, s, buffer} ->
   2754         rollback_flushed(s, status, err, buffer)
   2755 
   2756       {:disconnect, _err, _s} = disconnect ->
   2757         disconnect
   2758     end
   2759   end
   2760 
   2761   defp flushed_close(s, %{mode: :transaction} = status, query) do
   2762     # closing query without transaction if not flushed is the same as if doing
   2763     # with preparing immediately before.
   2764     close(s, status, query)
   2765   end
   2766 
   2767   defp flushed_close(s, %{mode: :savepoint} = status, query) do
   2768     %Query{name: name} = query
   2769     %{buffer: buffer} = s
   2770     stmt = "ROLLBACK TO SAVEPOINT postgrex_query;RELEASE SAVEPOINT postgrex_query"
   2771     msgs = [msg_close(type: :statement, name: name), msg_query(statement: stmt)]
   2772 
   2773     with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer),
   2774          {:ok, s, buffer} <- recv_close(s, status, buffer),
   2775          _ = query_delete(s, query),
   2776          {:ok, _, s} <- recv_transaction(s, status, buffer) do
   2777       %{connection_id: connection_id} = s
   2778       {:ok, %Postgrex.Result{command: :close, connection_id: connection_id}, s}
   2779     else
   2780       {:disconnect, _err, _s} = disconnect ->
   2781         disconnect
   2782     end
   2783   end
   2784 
   2785   defp close(s, status, %Query{name: name} = query) do
   2786     %{buffer: buffer} = s
   2787     msgs = [msg_close(type: :statement, name: name), msg_sync()]
   2788 
   2789     with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer),
   2790          {:ok, s, buffer} <- recv_close(s, status, buffer),
   2791          _ = query_delete(s, query),
   2792          {:ok, s} <- recv_ready(s, status, buffer) do
   2793       %{connection_id: connection_id} = s
   2794       {:ok, %Postgrex.Result{command: :close, connection_id: connection_id}, s}
   2795     else
   2796       {:disconnect, _err, _s} = disconnect ->
   2797         disconnect
   2798     end
   2799   end
   2800 
   2801   defp close(s, status, %{portal: portal}) do
   2802     %{buffer: buffer} = s
   2803     msgs = [msg_close(type: :portal, name: portal), msg_sync()]
   2804 
   2805     with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer),
   2806          {:ok, s, buffer} <- recv_close(s, status, buffer),
   2807          {:ok, s} <- recv_ready(s, status, buffer) do
   2808       %{connection_id: connection_id} = s
   2809       {:ok, %Postgrex.Result{command: :close, connection_id: connection_id}, s}
   2810     else
   2811       {:disconnect, _err, _s} = disconnect ->
   2812         disconnect
   2813     end
   2814   end
   2815 
   2816   ## ping
   2817 
   2818   defp ping_recv(s, status, old_buffer, buffer) do
   2819     %{ping_timeout: timeout, postgres: postgres, transactions: transactions} = s
   2820 
   2821     case msg_recv(s, timeout, buffer) do
   2822       {:ok, msg_ready(status: :idle), buffer}
   2823       when postgres == :transaction and transactions == :strict ->
   2824         sync_error(s, :idle, buffer)
   2825 
   2826       {:ok, msg_ready(status: :transaction), buffer}
   2827       when postgres == :idle and transactions == :strict ->
   2828         sync_error(s, :transaction, buffer)
   2829 
   2830       {:ok, msg_ready(status: :error), buffer}
   2831       when postgres == :idle and transactions == :strict ->
   2832         sync_error(s, :error, buffer)
   2833 
   2834       {:ok, msg_ready(status: postgres), buffer} when old_buffer == :active_once ->
   2835         activate(%{s | postgres: postgres}, buffer)
   2836 
   2837       {:ok, msg_ready(status: postgres), buffer} when is_nil(old_buffer) ->
   2838         {:ok, %{s | postgres: postgres, buffer: buffer}}
   2839 
   2840       {:ok, msg_error(fields: fields), buffer} ->
   2841         disconnect(s, Postgrex.Error.exception(postgres: fields), buffer)
   2842 
   2843       {:ok, msg, buffer} ->
   2844         {s, status} = handle_msg(s, status, msg)
   2845         ping_recv(s, status, old_buffer, buffer)
   2846 
   2847       {:disconnect, _, _} = dis ->
   2848         dis
   2849     end
   2850   end
   2851 
   2852   ## transaction
   2853 
   2854   defp handle_transaction(statement, opts, %{buffer: buffer} = s) do
   2855     status = new_status(opts, mode: :transaction)
   2856     msgs = [msg_query(statement: statement)]
   2857 
   2858     case msg_send(%{s | buffer: nil}, msgs, buffer) do
   2859       :ok ->
   2860         recv_transaction(s, status, buffer)
   2861 
   2862       {:disconnect, err, s} ->
   2863         {:disconnect, err, s}
   2864 
   2865       {:error, %Postgrex.Error{} = err, s, buffer} ->
   2866         error_ready(s, status, err, buffer)
   2867     end
   2868   end
   2869 
   2870   defp recv_transaction(s, status, tags \\ [], buffer) do
   2871     case msg_recv(s, :infinity, buffer) do
   2872       {:ok, msg_command_complete(tag: tag), buffer} ->
   2873         recv_transaction(s, status, [tag | tags], buffer)
   2874 
   2875       {:ok, msg_error(fields: fields), buffer} ->
   2876         err = Postgrex.Error.exception(postgres: fields)
   2877         {:disconnect, err, %{s | buffer: buffer}}
   2878 
   2879       {:ok, msg_ready(status: postgres), buffer} ->
   2880         s = %{s | postgres: postgres, buffer: buffer}
   2881         {:ok, done(s, status, Enum.reverse(tags)), s}
   2882 
   2883       {:ok, msg, buffer} ->
   2884         {s, status} = handle_msg(s, status, msg)
   2885         recv_transaction(s, status, tags, buffer)
   2886 
   2887       {:disconnect, _, _} = dis ->
   2888         dis
   2889     end
   2890   end
   2891 
   2892   defp recv_close(s, status, buffer) do
   2893     case msg_recv(s, :infinity, buffer) do
   2894       {:ok, msg_close_complete(), buffer} ->
   2895         {:ok, s, buffer}
   2896 
   2897       {:ok, msg_error(fields: fields), buffer} ->
   2898         err = Postgrex.Error.exception(postgres: fields)
   2899         {:disconnect, err, %{s | buffer: buffer}}
   2900 
   2901       {:ok, msg, buffer} ->
   2902         {s, status} = handle_msg(s, status, msg)
   2903         recv_close(s, status, buffer)
   2904 
   2905       {:disconnect, _, _} = dis ->
   2906         dis
   2907     end
   2908   end
   2909 
   2910   defp recv_ready(%{transactions: :naive} = s, status, buffer) do
   2911     case msg_recv(s, :infinity, buffer) do
   2912       {:ok, msg_ready(status: postgres), buffer} ->
   2913         {:ok, %{s | postgres: postgres, buffer: buffer}}
   2914 
   2915       {:ok, msg_error(fields: fields), buffer} ->
   2916         err = Postgrex.Error.exception(postgres: fields)
   2917         {:disconnect, err, %{s | buffer: buffer}}
   2918 
   2919       {:ok, msg, buffer} ->
   2920         {s, status} = handle_msg(s, status, msg)
   2921         recv_ready(s, status, buffer)
   2922 
   2923       {:disconnect, _, _} = dis ->
   2924         dis
   2925     end
   2926   end
   2927 
   2928   defp recv_ready(%{transactions: :strict, postgres: {postgres, _}} = s, status, buffer) do
   2929     recv_strict_ready(s, status, postgres, buffer)
   2930   end
   2931 
   2932   defp recv_ready(%{transactions: :strict, postgres: postgres} = s, status, buffer) do
   2933     recv_strict_ready(s, status, postgres, buffer)
   2934   end
   2935 
   2936   defp recv_strict_ready(s, status, expected, buffer) do
   2937     case msg_recv(s, :infinity, buffer) do
   2938       {:ok, msg_ready(status: ^expected), buffer} ->
   2939         {:ok, %{s | postgres: expected, buffer: buffer}}
   2940 
   2941       {:ok, msg_ready(status: :error), buffer} when expected == :transaction ->
   2942         {:ok, %{s | postgres: :error, buffer: buffer}}
   2943 
   2944       {:ok, msg_ready(status: unexpected), buffer} ->
   2945         sync_error(s, unexpected, buffer)
   2946 
   2947       {:ok, msg, buffer} ->
   2948         {s, status} = handle_msg(s, status, msg)
   2949         recv_strict_ready(s, status, expected, buffer)
   2950 
   2951       {:disconnect, _, _} = dis ->
   2952         dis
   2953     end
   2954   end
   2955 
   2956   defp error_ready(s, status, %Postgrex.Error{} = err, buffer) do
   2957     case recv_ready(s, status, buffer) do
   2958       {:ok, s} ->
   2959         %{connection_id: connection_id} = s
   2960         {:error, %Postgrex.Error{err | connection_id: connection_id}, s}
   2961 
   2962       {:disconnect, _, _} = disconnect ->
   2963         disconnect
   2964     end
   2965   end
   2966 
   2967   defp error_ready(s, status, err, buffer) do
   2968     case recv_ready(s, status, buffer) do
   2969       {:ok, s} -> {:error, err, s}
   2970       {:disconnect, _, _} = disconnect -> disconnect
   2971     end
   2972   end
   2973 
   2974   defp done(%{connection_id: connection_id}, %{messages: messages}, tags) do
   2975     {command, _} = decode_tags(tags)
   2976 
   2977     %Postgrex.Result{
   2978       command: command,
   2979       num_rows: nil,
   2980       rows: nil,
   2981       columns: nil,
   2982       connection_id: connection_id,
   2983       messages: messages
   2984     }
   2985   end
   2986 
   2987   defp done(s, status, %Query{} = query, rows, tag) do
   2988     {command, nrows} = if tag, do: decode_tag(tag), else: {nil, nil}
   2989     done(s, status, query, rows, command, nrows)
   2990   end
   2991 
   2992   defp done(%{connection_id: connection_id}, %{messages: messages}, columns, rows, tags) do
   2993     {command, _} = decode_tags(tags)
   2994 
   2995     %Postgrex.Result{
   2996       command: command,
   2997       num_rows: length(rows),
   2998       rows: rows,
   2999       columns: columns,
   3000       connection_id: connection_id,
   3001       messages: messages
   3002     }
   3003   end
   3004 
   3005   defp done(s, status, query, rows, command, nrows) do
   3006     %{connection_id: connection_id} = s
   3007     %{messages: messages} = status
   3008     %Query{columns: cols} = query
   3009 
   3010     # Fix for PostgreSQL 8.4 (doesn't include number of selected rows in tag)
   3011     nrows = if is_nil(nrows) and command == :select, do: length(rows), else: nrows
   3012     rows = if is_nil(cols) and rows == [] and command != :copy, do: nil, else: rows
   3013 
   3014     %Postgrex.Result{
   3015       command: command,
   3016       num_rows: nrows || 0,
   3017       rows: rows,
   3018       columns: cols,
   3019       connection_id: connection_id,
   3020       messages: messages
   3021     }
   3022   end
   3023 
   3024   defp halt(s, status, query, rows, tag) do
   3025     case done(s, status, query, rows, tag) do
   3026       %Postgrex.Result{rows: rows} = result when is_list(rows) ->
   3027         # shows rows for all streamed results but we only want for last chunk.
   3028         %Postgrex.Result{result | num_rows: length(rows)}
   3029 
   3030       result ->
   3031         result
   3032     end
   3033   end
   3034 
   3035   ## data
   3036 
   3037   defp handle_data(s, opts, buffer) do
   3038     data(s, new_status(opts, mode: :transaction), buffer)
   3039   end
   3040 
   3041   defp data(%{timeout: timeout} = s, status, buffer) do
   3042     case msg_recv(s, timeout, buffer) do
   3043       {:ok, msg_error(fields: fields), buffer} ->
   3044         disconnect(s, Postgrex.Error.exception(postgres: fields), buffer)
   3045 
   3046       {:ok, msg, <<>>} ->
   3047         {s, _} = handle_msg(s, status, msg)
   3048         activate(s, <<>>)
   3049 
   3050       {:ok, msg, buffer} ->
   3051         {s, status} = handle_msg(s, status, msg)
   3052         data(s, status, buffer)
   3053 
   3054       {:disconnect, _, _} = dis ->
   3055         dis
   3056     end
   3057   end
   3058 
   3059   ## helpers
   3060   defp notify(opts) do
   3061     opts[:notify] || fn _, _ -> :ok end
   3062   end
   3063 
   3064   defp mode(opts) do
   3065     case opts[:mode] || :transaction do
   3066       :transaction -> :transaction
   3067       :savepoint -> :savepoint
   3068     end
   3069   end
   3070 
   3071   defp columns(fields) do
   3072     fields
   3073     |> Enum.map(fn row_field(type_oid: oid, name: name) -> {oid, name} end)
   3074     |> :lists.unzip()
   3075   end
   3076 
   3077   defp column_names(fields) do
   3078     Enum.map(fields, fn row_field(name: name) -> name end)
   3079   end
   3080 
   3081   defp tag(:gen_tcp), do: :tcp
   3082   defp tag(:ssl), do: :ssl
   3083 
   3084   defp decode_tags([tag]), do: decode_tag(tag)
   3085   defp decode_tags(tags), do: Enum.map_reduce(tags, nil, &decode_tags/2)
   3086 
   3087   defp decode_tags(tag, acc) do
   3088     case decode_tag(tag) do
   3089       {command, nil} -> {command, acc}
   3090       {command, nrows} -> {command, nrows + (acc || 0)}
   3091     end
   3092   end
   3093 
   3094   defp decode_tag("INSERT " <> rest) do
   3095     [_oid, nrows] = :binary.split(rest, " ")
   3096     {:insert, String.to_integer(nrows)}
   3097   end
   3098 
   3099   defp decode_tag("SELECT " <> int), do: {:select, String.to_integer(int)}
   3100   defp decode_tag("UPDATE " <> int), do: {:update, String.to_integer(int)}
   3101   defp decode_tag("DELETE " <> int), do: {:delete, String.to_integer(int)}
   3102   defp decode_tag("FETCH " <> int), do: {:fetch, String.to_integer(int)}
   3103   defp decode_tag("MOVE " <> int), do: {:move, String.to_integer(int)}
   3104   defp decode_tag("COPY " <> int), do: {:copy, String.to_integer(int)}
   3105   defp decode_tag("BEGIN"), do: {:begin, nil}
   3106   defp decode_tag("COMMIT"), do: {:commit, nil}
   3107   defp decode_tag("ROLLBACK"), do: {:rollback, nil}
   3108   defp decode_tag(tag), do: decode_tag(tag, "")
   3109   defp decode_tag(<<>>, acc), do: {String.to_atom(acc), nil}
   3110   defp decode_tag(<<?\s, t::binary>>, acc), do: decode_tag(t, <<acc::binary, ?_>>)
   3111 
   3112   defp decode_tag(<<h, t::binary>>, acc) when h in ?A..?Z,
   3113     do: decode_tag(t, <<acc::binary, h + 32>>)
   3114 
   3115   # Valid SQL statements in PostgreSQL are only
   3116   # uppercase A..Z and space. Therefore any other
   3117   # character prompts a return of the accumulator
   3118   # ignoring anything from the invalid character
   3119   # and any trailing space.
   3120   defp decode_tag(<<_h, _t::binary>>, acc) do
   3121     tag =
   3122       acc
   3123       |> String.trim_trailing("_")
   3124       |> String.to_atom()
   3125 
   3126     {tag, nil}
   3127   end
   3128 
   3129   # It is ok to use infinity timeout here if in client process as timer is
   3130   # running.
   3131   defp msg_recv(%{sock: {:gen_tcp, sock}} = s, timeout, :active_once) do
   3132     receive do
   3133       {:tcp, ^sock, buffer} ->
   3134         msg_recv(s, timeout, buffer)
   3135 
   3136       {:tcp_closed, ^sock} ->
   3137         disconnect(s, :tcp, "async_recv", :closed, :active_once)
   3138 
   3139       {:tcp_error, ^sock, reason} ->
   3140         disconnect(s, :tcp, "async_recv", reason, :active_once)
   3141     after
   3142       timeout ->
   3143         disconnect(s, :tcp, "async_recv", :timeout, :active_one)
   3144     end
   3145   end
   3146 
   3147   defp msg_recv(%{sock: {:ssl, sock}} = s, timeout, :active_once) do
   3148     receive do
   3149       {:ssl, ^sock, buffer} ->
   3150         msg_recv(s, timeout, buffer)
   3151 
   3152       {:ssl_closed, ^sock} ->
   3153         disconnect(s, :ssl, "async_recv", :closed, :active_once)
   3154 
   3155       {:ssl_error, ^sock, reason} ->
   3156         disconnect(s, :ssl, "async_recv", reason, :active_once)
   3157     after
   3158       timeout ->
   3159         disconnect(s, :ssl, "async_recv", :timeout, :active_once)
   3160     end
   3161   end
   3162 
   3163   defp msg_recv(s, timeout, buffer) do
   3164     case msg_decode(buffer) do
   3165       {:ok, _, _} = ok -> ok
   3166       {:more, more} -> msg_recv(s, timeout, buffer, more)
   3167     end
   3168   end
   3169 
   3170   defp msg_recv(%{sock: {mod, sock}} = s, timeout, buffer, more) do
   3171     case mod.recv(sock, min(more, @max_packet), timeout) do
   3172       {:ok, data} when byte_size(data) < more ->
   3173         msg_recv(s, timeout, [buffer | data], more - byte_size(data))
   3174 
   3175       {:ok, data} when is_binary(buffer) ->
   3176         msg_recv(s, timeout, buffer <> data)
   3177 
   3178       {:ok, data} when is_list(buffer) ->
   3179         msg_recv(s, timeout, IO.iodata_to_binary([buffer | data]))
   3180 
   3181       {:error, reason} ->
   3182         action =
   3183           if s.postgres == :idle do
   3184             "recv (idle)"
   3185           else
   3186             "recv"
   3187           end
   3188 
   3189         disconnect(s, tag(mod), action, reason, IO.iodata_to_binary(buffer))
   3190     end
   3191   end
   3192 
   3193   defp msg_decode(bin) when byte_size(bin) < 5 do
   3194     {:more, 0}
   3195   end
   3196 
   3197   defp msg_decode(<<type::int8(), size::int32(), rest::binary>>) do
   3198     size = size - 4
   3199 
   3200     case rest do
   3201       <<body::binary(size), rest::binary>> ->
   3202         {:ok, parse(body, type, size), rest}
   3203 
   3204       _ ->
   3205         {:more, size - byte_size(rest)}
   3206     end
   3207   end
   3208 
   3209   defp rows_recv(%{types: types} = s, result_types, rows, buffer) do
   3210     case Types.decode_rows(buffer, result_types, rows, types) do
   3211       {:ok, rows, buffer} ->
   3212         rows_msg(s, rows, buffer)
   3213 
   3214       {:more, buffer, rows, more} ->
   3215         rows_recv(s, result_types, rows, buffer, more)
   3216     end
   3217   end
   3218 
   3219   defp rows_recv(%{sock: {mod, sock}} = s, result_types, rows, buffer, more) do
   3220     case mod.recv(sock, 0, :infinity) do
   3221       {:ok, data} when byte_size(data) < more ->
   3222         rows_recv(s, result_types, rows, [buffer | data], more - byte_size(data))
   3223 
   3224       {:ok, data} when is_binary(buffer) ->
   3225         rows_recv(s, result_types, rows, buffer <> data)
   3226 
   3227       {:ok, data} when is_list(buffer) ->
   3228         rows_recv(s, result_types, rows, IO.iodata_to_binary([buffer | data]))
   3229 
   3230       {:error, reason} ->
   3231         disconnect(s, tag(mod), "recv", reason, IO.iodata_to_binary(buffer))
   3232     end
   3233   end
   3234 
   3235   defp rows_msg(s, rows, buffer) do
   3236     case msg_recv(s, :infinity, buffer) do
   3237       {:ok, msg, buffer} ->
   3238         {:ok, msg, rows, buffer}
   3239 
   3240       {:disconnect, _, _} = dis ->
   3241         dis
   3242     end
   3243   end
   3244 
   3245   defp msg_send(s, msgs, buffer) when is_list(msgs) do
   3246     binaries = Enum.reduce(msgs, [], &[&2 | maybe_encode_msg(&1)])
   3247     do_send(s, binaries, buffer)
   3248   end
   3249 
   3250   defp msg_send(s, msg, buffer) do
   3251     do_send(s, encode_msg(msg), buffer)
   3252   end
   3253 
   3254   defp maybe_encode_msg(msg) when is_tuple(msg), do: encode_msg(msg)
   3255   defp maybe_encode_msg(msg) when is_binary(msg) or is_list(msg), do: msg
   3256 
   3257   defp do_send(%{sock: {mod, sock}} = s, data, buffer) do
   3258     case mod.send(sock, data) do
   3259       :ok ->
   3260         :ok
   3261 
   3262       {:error, reason} ->
   3263         disconnect(s, tag(mod), "send", reason, buffer)
   3264     end
   3265   end
   3266 
   3267   defp handle_msg(s, status, msg_parameter(name: name, value: value)) do
   3268     %{parameters: parameters} = s
   3269 
   3270     # Binaries likely part of much larger binary and
   3271     # only keeping name/value over long term
   3272     name = :binary.copy(name)
   3273     value = :binary.copy(value)
   3274 
   3275     cond do
   3276       is_reference(parameters) ->
   3277         _ = Postgrex.Parameters.put(parameters, name, value)
   3278         {s, status}
   3279 
   3280       is_map(parameters) ->
   3281         {%{s | parameters: Map.put(parameters, name, value)}, status}
   3282     end
   3283   end
   3284 
   3285   defp handle_msg(s, status, msg_notify(channel: channel, payload: payload)) do
   3286     %{notify: notify} = status
   3287     notify.(channel, payload)
   3288     {s, status}
   3289   end
   3290 
   3291   defp handle_msg(s, status, msg_notice(fields: fields)) do
   3292     {s, update_in(status.messages, &[fields | &1])}
   3293   end
   3294 
   3295   defp disconnect(s, tag, action, reason, buffer) do
   3296     disconnect(%{s | buffer: buffer}, tag, action, reason)
   3297   end
   3298 
   3299   defp disconnect(s, tag, action, reason) do
   3300     {:disconnect, conn_error(tag, action, reason), s}
   3301   end
   3302 
   3303   defp conn_error(mod, action, reason) when reason in @nonposix_errors do
   3304     conn_error("#{mod} #{action}: #{reason}")
   3305   end
   3306 
   3307   defp conn_error(:tcp, action, reason) do
   3308     formatted_reason = :inet.format_error(reason)
   3309     conn_error("tcp #{action}: #{formatted_reason} - #{inspect(reason)}")
   3310   end
   3311 
   3312   defp conn_error(:ssl, action, reason) do
   3313     formatted_reason = :ssl.format_error(reason)
   3314     conn_error("ssl #{action}: #{formatted_reason} - #{inspect(reason)}")
   3315   end
   3316 
   3317   defp conn_error(message) do
   3318     DBConnection.ConnectionError.exception(message)
   3319   end
   3320 
   3321   defp disconnect(%{connection_id: connection_id} = s, %Postgrex.Error{} = err, buffer) do
   3322     {:disconnect, %{err | connection_id: connection_id}, %{s | buffer: buffer}}
   3323   end
   3324 
   3325   defp disconnect(s, %RuntimeError{} = err, buffer) do
   3326     {:disconnect, err, %{s | buffer: buffer}}
   3327   end
   3328 
   3329   defp sync_recv(s, status, buffer) do
   3330     %{postgres: postgres, transactions: transactions} = s
   3331 
   3332     case msg_recv(s, :infinity, buffer) do
   3333       {:ok, msg_ready(status: :idle), buffer}
   3334       when postgres == :transaction and transactions == :strict ->
   3335         sync_error(s, :idle, buffer)
   3336 
   3337       {:ok, msg_ready(status: :transaction), buffer}
   3338       when postgres == :idle and transactions == :strict ->
   3339         sync_error(s, :transaction, buffer)
   3340 
   3341       {:ok, msg_ready(status: :error), buffer}
   3342       when postgres == :idle and transactions == :strict ->
   3343         sync_error(s, :error, buffer)
   3344 
   3345       {:ok, msg_ready(status: postgres), buffer} ->
   3346         {:ok, %{s | postgres: postgres, buffer: buffer}}
   3347 
   3348       {:ok, msg_error(fields: fields), buffer} ->
   3349         disconnect(s, Postgrex.Error.exception(postgres: fields), buffer)
   3350 
   3351       {:ok, msg, buffer} ->
   3352         {s, status} = handle_msg(s, status, msg)
   3353         sync_recv(s, status, buffer)
   3354 
   3355       {:disconnect, _, _} = dis ->
   3356         dis
   3357     end
   3358   end
   3359 
   3360   defp sync_error(s, postgres, buffer) do
   3361     sync_error(%{s | buffer: buffer}, postgres)
   3362   end
   3363 
   3364   defp sync_error(s, postgres) do
   3365     err = %Postgrex.Error{message: "unexpected postgres status: #{postgres}"}
   3366     {:disconnect, err, s}
   3367   end
   3368 
   3369   defp recv_buffer(%{sock: {:gen_tcp, sock}} = s) do
   3370     receive do
   3371       {:tcp, ^sock, buffer} ->
   3372         {:ok, %{s | buffer: buffer}}
   3373 
   3374       {:tcp_closed, ^sock} ->
   3375         disconnect(s, :tcp, "async recv", :closed, "")
   3376 
   3377       {:tcp_error, ^sock, reason} ->
   3378         disconnect(s, :tcp, "async_recv", reason, "")
   3379     after
   3380       0 ->
   3381         {:ok, %{s | buffer: <<>>}}
   3382     end
   3383   end
   3384 
   3385   defp recv_buffer(%{sock: {:ssl, sock}} = s) do
   3386     receive do
   3387       {:ssl, ^sock, buffer} ->
   3388         {:ok, %{s | buffer: buffer}}
   3389 
   3390       {:ssl_closed, ^sock} ->
   3391         disconnect(s, :ssl, "async recv", :closed, "")
   3392 
   3393       {:ssl_error, ^sock, reason} ->
   3394         disconnect(s, :ssl, "async recv", reason, "")
   3395     after
   3396       0 ->
   3397         {:ok, %{s | buffer: <<>>}}
   3398     end
   3399   end
   3400 
   3401   ## Fake [active: once] if buffer not empty
   3402   defp activate(s, <<>>) do
   3403     case setopts(s, [active: :once], <<>>) do
   3404       :ok -> {:ok, %{s | buffer: :active_once}}
   3405       other -> other
   3406     end
   3407   end
   3408 
   3409   defp activate(%{sock: {mod, sock}} = s, buffer) do
   3410     _ = send(self(), {tag(mod), sock, buffer})
   3411     {:ok, s}
   3412   end
   3413 
   3414   defp setopts(%{sock: {mod, sock}} = s, opts, buffer) do
   3415     case setopts(mod, sock, opts) do
   3416       :ok ->
   3417         :ok
   3418 
   3419       {:error, reason} ->
   3420         disconnect(s, tag(mod), "setopts", reason, buffer)
   3421     end
   3422   end
   3423 
   3424   defp setopts(:gen_tcp, sock, opts), do: :inet.setopts(sock, opts)
   3425   defp setopts(:ssl, sock, opts), do: :ssl.setopts(sock, opts)
   3426 
   3427   defp cancel_request(%{connection_key: nil}), do: :ok
   3428 
   3429   defp cancel_request(s) do
   3430     case do_cancel_request(s) do
   3431       :ok ->
   3432         :ok
   3433 
   3434       {:error, action, reason} ->
   3435         err = conn_error(:tcp, action, reason)
   3436 
   3437         Logger.error(fn ->
   3438           [
   3439             "#{inspect(__MODULE__)} #{inspect(self())} could not cancel backend: "
   3440             | Exception.message(err)
   3441           ]
   3442         end)
   3443     end
   3444   end
   3445 
   3446   defp do_cancel_request(%{peer: {:local, _} = peer} = s), do: do_cancel_request(peer, 0, s)
   3447   defp do_cancel_request(%{peer: {ip, port}} = s), do: do_cancel_request(ip, port, s)
   3448 
   3449   defp do_cancel_request(ip, port, %{timeout: timeout} = s) do
   3450     case :gen_tcp.connect(ip, port, [mode: :binary, active: false], timeout) do
   3451       {:ok, sock} -> cancel_send_recv(s, sock)
   3452       {:error, reason} -> {:error, :connect, reason}
   3453     end
   3454   end
   3455 
   3456   defp cancel_send_recv(%{connection_id: pid, connection_key: key} = s, sock) do
   3457     msg = msg_cancel_request(pid: pid, key: key)
   3458 
   3459     case :gen_tcp.send(sock, encode_msg(msg)) do
   3460       :ok -> cancel_recv(s, sock)
   3461       {:error, reason} -> {:error, :send, reason}
   3462     end
   3463   end
   3464 
   3465   defp cancel_recv(%{timeout: timeout}, sock) do
   3466     # ignore result as socket will close, else can do nothing
   3467     _ = :gen_tcp.recv(sock, 0, timeout)
   3468     :gen_tcp.close(sock)
   3469   end
   3470 
   3471   defp sock_close(%{sock: {mod, sock}}), do: mod.close(sock)
   3472 
   3473   defp delete_parameters(%{parameters: ref}) when is_reference(ref) do
   3474     Postgrex.Parameters.delete(ref)
   3475   end
   3476 
   3477   defp delete_parameters(_), do: :ok
   3478 
   3479   defp queries_new(), do: :ets.new(__MODULE__, [:set, :public])
   3480 
   3481   defp queries_delete(%{queries: nil}), do: true
   3482   defp queries_delete(%{queries: queries}), do: :ets.delete(queries)
   3483 
   3484   defp query_put(%{queries: nil}, _), do: :ok
   3485   defp query_put(_, %Query{ref: nil}), do: :ok
   3486   defp query_put(_, %Query{name: ""}), do: :ok
   3487 
   3488   defp query_put(%{queries: queries}, %Query{name: name, cache: :statement, ref: ref} = query) do
   3489     try do
   3490       :ets.insert(queries, {name, ref, query})
   3491     rescue
   3492       ArgumentError ->
   3493         # ets table deleted, socket will be closed, rescue here and get nice
   3494         # error when trying to recv on socket.
   3495         :ok
   3496     else
   3497       true -> :ok
   3498     end
   3499   end
   3500 
   3501   defp query_put(%{queries: queries}, %Query{name: name, cache: :reference, ref: ref}) do
   3502     try do
   3503       :ets.insert(queries, {name, ref})
   3504     rescue
   3505       ArgumentError ->
   3506         # ets table deleted, socket will be closed, rescue here and get nice
   3507         # error when trying to recv on socket.
   3508         :ok
   3509     else
   3510       true -> :ok
   3511     end
   3512   end
   3513 
   3514   defp query_delete(%{queries: nil}, _), do: :ok
   3515   defp query_delete(_, %Query{name: ""}), do: :ok
   3516 
   3517   defp query_delete(%{queries: queries}, %Query{name: name}) do
   3518     try do
   3519       :ets.delete(queries, name)
   3520     rescue
   3521       ArgumentError -> :ok
   3522     else
   3523       true -> :ok
   3524     end
   3525   end
   3526 
   3527   defp query_delete_on_error(
   3528          %{queries: queries},
   3529          %{postgres: %{code: code}},
   3530          %Query{name: name}
   3531        )
   3532        when queries != nil and code in [:feature_not_supported, :invalid_sql_statement_name] do
   3533     try do
   3534       :ets.delete(queries, name)
   3535     rescue
   3536       ArgumentError -> :ok
   3537     else
   3538       true -> :ok
   3539     end
   3540   end
   3541 
   3542   defp query_delete_on_error(_, _, _), do: :ok
   3543 
   3544   defp query_member?(%{queries: nil}, _), do: false
   3545   defp query_member?(_, %{name: ""}), do: false
   3546 
   3547   defp query_member?(%{types: protocol_types}, %Query{types: query_types})
   3548        when protocol_types != query_types,
   3549        do: false
   3550 
   3551   defp query_member?(%{queries: queries}, %Query{name: name, ref: ref}) do
   3552     try do
   3553       :ets.lookup_element(queries, name, 2)
   3554     rescue
   3555       ArgumentError -> false
   3556     else
   3557       ^ref -> true
   3558       _ -> false
   3559     end
   3560   end
   3561 
   3562   defp cached_query(%{queries: queries}, %Query{cache: :statement} = query) do
   3563     %{name: name, statement: statement} = query
   3564 
   3565     try do
   3566       :ets.lookup_element(queries, name, 3)
   3567     rescue
   3568       ArgumentError -> nil
   3569     else
   3570       %{statement: ^statement} = query -> query
   3571       _ -> nil
   3572     end
   3573   end
   3574 
   3575   defp cached_query(_, _) do
   3576     nil
   3577   end
   3578 end