stream.ex (3460B)
1 defmodule Postgrex.Stream do 2 @moduledoc """ 3 Stream struct returned from stream commands. 4 5 All of its fields are private. 6 """ 7 @derive {Inspect, only: []} 8 defstruct [:conn, :query, :params, :options] 9 @type t :: %Postgrex.Stream{} 10 end 11 12 defmodule Postgrex.Cursor do 13 @moduledoc false 14 defstruct [:portal, :ref, :connection_id, :mode] 15 @type t :: %Postgrex.Cursor{} 16 end 17 18 defmodule Postgrex.Copy do 19 @moduledoc false 20 defstruct [:portal, :ref, :connection_id, :query] 21 @type t :: %Postgrex.Copy{} 22 end 23 24 defimpl Enumerable, for: Postgrex.Stream do 25 alias Postgrex.Query 26 27 def reduce(%Postgrex.Stream{query: %Query{} = query} = stream, acc, fun) do 28 %Postgrex.Stream{conn: conn, params: params, options: opts} = stream 29 stream = %DBConnection.Stream{conn: conn, query: query, params: params, opts: opts} 30 DBConnection.reduce(stream, acc, fun) 31 end 32 33 def reduce(%Postgrex.Stream{query: statement} = stream, acc, fun) do 34 %Postgrex.Stream{conn: conn, params: params, options: opts} = stream 35 query = %Query{name: "", statement: statement} 36 opts = Keyword.put(opts, :function, :prepare_open) 37 stream = %DBConnection.PrepareStream{conn: conn, query: query, params: params, opts: opts} 38 DBConnection.reduce(stream, acc, fun) 39 end 40 41 def member?(_, _) do 42 {:error, __MODULE__} 43 end 44 45 def count(_) do 46 {:error, __MODULE__} 47 end 48 49 def slice(_) do 50 {:error, __MODULE__} 51 end 52 end 53 54 defimpl Collectable, for: Postgrex.Stream do 55 alias Postgrex.Stream 56 alias Postgrex.Query 57 58 def into(%Stream{conn: %DBConnection{}} = stream) do 59 %Stream{conn: conn, query: query, params: params, options: opts} = stream 60 opts = Keyword.put(opts, :postgrex_copy, true) 61 62 case query do 63 %Query{} -> 64 copy = DBConnection.execute!(conn, query, params, opts) 65 {:ok, make_into(conn, stream, copy, opts)} 66 67 query -> 68 query = %Query{name: "", statement: query} 69 {_, copy} = DBConnection.prepare_execute!(conn, query, params, opts) 70 {:ok, make_into(conn, stream, copy, opts)} 71 end 72 end 73 74 def into(_) do 75 raise ArgumentError, "data can only be copied to database inside a transaction" 76 end 77 78 defp make_into(conn, stream, %Postgrex.Copy{ref: ref} = copy, opts) do 79 fn 80 :ok, {:cont, data} -> 81 _ = DBConnection.execute!(conn, copy, {:copy_data, ref, data}, opts) 82 :ok 83 84 :ok, close when close in [:done, :halt] -> 85 _ = DBConnection.execute!(conn, copy, {:copy_done, ref}, opts) 86 stream 87 end 88 end 89 end 90 91 defimpl DBConnection.Query, for: Postgrex.Copy do 92 alias Postgrex.Copy 93 import Postgrex.Messages 94 95 def parse(copy, _) do 96 raise "can not prepare #{inspect(copy)}" 97 end 98 99 def describe(copy, _) do 100 raise "can not describe #{inspect(copy)}" 101 end 102 103 def encode(%Copy{ref: ref}, {:copy_data, ref, data}, _) do 104 try do 105 encode_msg(msg_copy_data(data: data)) 106 rescue 107 ArgumentError -> 108 reraise ArgumentError, 109 [message: "expected iodata to copy to database, got: " <> inspect(data)], 110 __STACKTRACE__ 111 else 112 iodata -> 113 {:copy_data, iodata} 114 end 115 end 116 117 def encode(%Copy{ref: ref}, {:copy_done, ref}, _) do 118 :copy_done 119 end 120 121 def decode(copy, _result, _opts) do 122 raise "can not describe #{inspect(copy)}" 123 end 124 end 125 126 defimpl String.Chars, for: Postgrex.Copy do 127 def to_string(%Postgrex.Copy{query: query}) do 128 String.Chars.to_string(query) 129 end 130 end