zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

stream.ex (3460B)


      1 defmodule Postgrex.Stream do
      2   @moduledoc """
      3   Stream struct returned from stream commands.
      4 
      5   All of its fields are private.
      6   """
      7   @derive {Inspect, only: []}
      8   defstruct [:conn, :query, :params, :options]
      9   @type t :: %Postgrex.Stream{}
     10 end
     11 
     12 defmodule Postgrex.Cursor do
     13   @moduledoc false
     14   defstruct [:portal, :ref, :connection_id, :mode]
     15   @type t :: %Postgrex.Cursor{}
     16 end
     17 
     18 defmodule Postgrex.Copy do
     19   @moduledoc false
     20   defstruct [:portal, :ref, :connection_id, :query]
     21   @type t :: %Postgrex.Copy{}
     22 end
     23 
     24 defimpl Enumerable, for: Postgrex.Stream do
     25   alias Postgrex.Query
     26 
     27   def reduce(%Postgrex.Stream{query: %Query{} = query} = stream, acc, fun) do
     28     %Postgrex.Stream{conn: conn, params: params, options: opts} = stream
     29     stream = %DBConnection.Stream{conn: conn, query: query, params: params, opts: opts}
     30     DBConnection.reduce(stream, acc, fun)
     31   end
     32 
     33   def reduce(%Postgrex.Stream{query: statement} = stream, acc, fun) do
     34     %Postgrex.Stream{conn: conn, params: params, options: opts} = stream
     35     query = %Query{name: "", statement: statement}
     36     opts = Keyword.put(opts, :function, :prepare_open)
     37     stream = %DBConnection.PrepareStream{conn: conn, query: query, params: params, opts: opts}
     38     DBConnection.reduce(stream, acc, fun)
     39   end
     40 
     41   def member?(_, _) do
     42     {:error, __MODULE__}
     43   end
     44 
     45   def count(_) do
     46     {:error, __MODULE__}
     47   end
     48 
     49   def slice(_) do
     50     {:error, __MODULE__}
     51   end
     52 end
     53 
     54 defimpl Collectable, for: Postgrex.Stream do
     55   alias Postgrex.Stream
     56   alias Postgrex.Query
     57 
     58   def into(%Stream{conn: %DBConnection{}} = stream) do
     59     %Stream{conn: conn, query: query, params: params, options: opts} = stream
     60     opts = Keyword.put(opts, :postgrex_copy, true)
     61 
     62     case query do
     63       %Query{} ->
     64         copy = DBConnection.execute!(conn, query, params, opts)
     65         {:ok, make_into(conn, stream, copy, opts)}
     66 
     67       query ->
     68         query = %Query{name: "", statement: query}
     69         {_, copy} = DBConnection.prepare_execute!(conn, query, params, opts)
     70         {:ok, make_into(conn, stream, copy, opts)}
     71     end
     72   end
     73 
     74   def into(_) do
     75     raise ArgumentError, "data can only be copied to database inside a transaction"
     76   end
     77 
     78   defp make_into(conn, stream, %Postgrex.Copy{ref: ref} = copy, opts) do
     79     fn
     80       :ok, {:cont, data} ->
     81         _ = DBConnection.execute!(conn, copy, {:copy_data, ref, data}, opts)
     82         :ok
     83 
     84       :ok, close when close in [:done, :halt] ->
     85         _ = DBConnection.execute!(conn, copy, {:copy_done, ref}, opts)
     86         stream
     87     end
     88   end
     89 end
     90 
     91 defimpl DBConnection.Query, for: Postgrex.Copy do
     92   alias Postgrex.Copy
     93   import Postgrex.Messages
     94 
     95   def parse(copy, _) do
     96     raise "can not prepare #{inspect(copy)}"
     97   end
     98 
     99   def describe(copy, _) do
    100     raise "can not describe #{inspect(copy)}"
    101   end
    102 
    103   def encode(%Copy{ref: ref}, {:copy_data, ref, data}, _) do
    104     try do
    105       encode_msg(msg_copy_data(data: data))
    106     rescue
    107       ArgumentError ->
    108         reraise ArgumentError,
    109                 [message: "expected iodata to copy to database, got: " <> inspect(data)],
    110                 __STACKTRACE__
    111     else
    112       iodata ->
    113         {:copy_data, iodata}
    114     end
    115   end
    116 
    117   def encode(%Copy{ref: ref}, {:copy_done, ref}, _) do
    118     :copy_done
    119   end
    120 
    121   def decode(copy, _result, _opts) do
    122     raise "can not describe #{inspect(copy)}"
    123   end
    124 end
    125 
    126 defimpl String.Chars, for: Postgrex.Copy do
    127   def to_string(%Postgrex.Copy{query: query}) do
    128     String.Chars.to_string(query)
    129   end
    130 end