stream.ex (1063B)
1 defmodule Ecto.Adapters.SQL.Stream do 2 @moduledoc false 3 4 defstruct [:meta, :statement, :params, :opts] 5 6 def build(meta, statement, params, opts) do 7 %__MODULE__{meta: meta, statement: statement, params: params, opts: opts} 8 end 9 end 10 11 alias Ecto.Adapters.SQL.Stream 12 13 defimpl Enumerable, for: Stream do 14 def count(_), do: {:error, __MODULE__} 15 16 def member?(_, _), do: {:error, __MODULE__} 17 18 def slice(_), do: {:error, __MODULE__} 19 20 def reduce(stream, acc, fun) do 21 %Stream{meta: meta, statement: statement, params: params, opts: opts} = stream 22 Ecto.Adapters.SQL.reduce(meta, statement, params, opts, acc, fun) 23 end 24 end 25 26 defimpl Collectable, for: Stream do 27 def into(stream) do 28 %Stream{meta: meta, statement: statement, params: params, opts: opts} = stream 29 {state, fun} = Ecto.Adapters.SQL.into(meta, statement, params, opts) 30 {state, make_into(fun, stream)} 31 end 32 33 defp make_into(fun, stream) do 34 fn 35 state, :done -> 36 fun.(state, :done) 37 stream 38 39 state, acc -> 40 fun.(state, acc) 41 end 42 end 43 end