zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

batch.ex (6152B)


      1 defmodule Absinthe.Middleware.Batch do
      2   @moduledoc """
      3   Batch the resolution of multiple fields.
      4 
      5   ## Motivation
      6   Consider the following graphql query:
      7   ```
      8   {
      9     posts {
     10       author {
     11         name
     12       }
     13     }
     14   }
     15   ```
     16 
     17   `posts` returns a list of `post` objects, which has an associated `author` field.
     18   If the `author` field makes a call to the database we have the classic N + 1 problem.
     19   What we want is a way to load all authors for all posts in one database request.
     20 
     21   This plugin provides this, without any eager loading at the parent level. That is,
     22   the code for the `posts` field does not need to do anything to facilitate the
     23   efficient loading of its children.
     24 
     25   ## Example Usage
     26   The API for this plugin is a little on the verbose side because it is not specific
     27   to any particular batching mechanism. That is, this API is just as useful for an Ecto
     28   based DB as it is for talking to S3 or the File System. Thus we anticipate people
     29   (including ourselves) will be creating additional functions more tailored to each
     30   of those specific use cases.
     31 
     32   Here is an example using the `Absinthe.Resolution.Helpers.batch/3` helper.
     33   ```elixir
     34   object :post do
     35     field :name, :string
     36     field :author, :user do
     37       resolve fn post, _, _ ->
     38         batch({__MODULE__, :users_by_id}, post.author_id, fn batch_results ->
     39           {:ok, Map.get(batch_results, post.author_id)}
     40         end)
     41       end
     42     end
     43   end
     44 
     45   def users_by_id(_, user_ids) do
     46     users = Repo.all from u in User, where: u.id in ^user_ids
     47     Map.new(users, fn user -> {user.id, user} end)
     48   end
     49   ```
     50 
     51   Let's look at this piece by piece:
     52   - `{__MODULE__, :users_by_id}`: is the batching function which will be used. It must
     53   be a 2 arity function. For details see the `batch_fun` typedoc.
     54   - `post.author_id`: This is the information to be aggregated. The aggregated values
     55   are the second argument to the batching function.
     56   - `fn batch_results`: This function takes the results from the batching function.
     57   it should return one of the resolution function values.
     58   """
     59 
     60   @behaviour Absinthe.Middleware
     61   @behaviour Absinthe.Plugin
     62 
     63   @typedoc """
     64   The function to be called with the aggregate batch information.
     65 
     66   It comes in both a 2 tuple and 3 tuple form. The first two elements are the module
     67   and function name. The third element is an arbitrary parameter that is passed
     68   as the first argument to the batch function.
     69 
     70   For example, one could parameterize the `users_by_id` function from the moduledoc
     71   to make it more generic. Instead of doing `{__MODULE__, :users_by_id}` you could do
     72   `{__MODULE__, :by_id, User}`. Then the function would be:
     73 
     74   ```elixir
     75   def by_id(model, ids) do
     76     model
     77     |> where([m], m.id in ^ids)
     78     |> Repo.all()
     79     |> Map.new(&{&1.id, &1})
     80   end
     81   ```
     82   It could also be used to set options unique to the execution of a particular
     83   batching function.
     84   """
     85   @type batch_fun :: {module, atom} | {module, atom, term}
     86 
     87   @type post_batch_fun :: (term -> Absinthe.Type.Field.result())
     88 
     89   def before_resolution(exec) do
     90     case exec.acc do
     91       %{__MODULE__ => _} ->
     92         put_in(exec.acc[__MODULE__][:input], [])
     93 
     94       _ ->
     95         put_in(exec.acc[__MODULE__], %{input: [], output: %{}})
     96     end
     97   end
     98 
     99   def call(%{state: :unresolved} = res, {batch_key, field_data, post_batch_fun, batch_opts}) do
    100     acc = res.acc
    101 
    102     acc =
    103       update_in(acc[__MODULE__][:input], fn
    104         nil -> [{{batch_key, batch_opts}, field_data}]
    105         data -> [{{batch_key, batch_opts}, field_data} | data]
    106       end)
    107 
    108     %{
    109       res
    110       | state: :suspended,
    111         middleware: [{__MODULE__, {batch_key, post_batch_fun}} | res.middleware],
    112         acc: acc
    113     }
    114   end
    115 
    116   def call(%{state: :suspended} = res, {batch_key, post_batch_fun}) do
    117     batch_data_for_fun =
    118       res.acc
    119       |> Map.fetch!(__MODULE__)
    120       |> Map.fetch!(:output)
    121       |> Map.fetch!(batch_key)
    122 
    123     res
    124     |> Absinthe.Resolution.put_result(post_batch_fun.(batch_data_for_fun))
    125   end
    126 
    127   def after_resolution(exec) do
    128     output = do_batching(exec.acc[__MODULE__][:input])
    129     put_in(exec.acc[__MODULE__][:output], output)
    130   end
    131 
    132   defp do_batching(input) do
    133     input
    134     |> Enum.group_by(&elem(&1, 0), &elem(&1, 1))
    135     |> Enum.map(fn {{batch_fun, batch_opts}, batch_data} ->
    136       system_time = System.system_time()
    137       start_time_mono = System.monotonic_time()
    138 
    139       task =
    140         Task.async(fn ->
    141           {batch_fun, call_batch_fun(batch_fun, batch_data)}
    142         end)
    143 
    144       metadata = emit_start_event(system_time, batch_fun, batch_opts, batch_data)
    145 
    146       {batch_opts, task, start_time_mono, metadata}
    147     end)
    148     |> Map.new(fn {batch_opts, task, start_time_mono, metadata} ->
    149       timeout = Keyword.get(batch_opts, :timeout, 5_000)
    150       result = Task.await(task, timeout)
    151 
    152       duration = System.monotonic_time() - start_time_mono
    153       emit_stop_event(duration, metadata, result)
    154 
    155       result
    156     end)
    157   end
    158 
    159   @batch_start [:absinthe, :middleware, :batch, :start]
    160   @batch_stop [:absinthe, :middleware, :batch, :stop]
    161   defp emit_start_event(system_time, batch_fun, batch_opts, batch_data) do
    162     id = :erlang.unique_integer()
    163 
    164     metadata = %{
    165       id: id,
    166       telemetry_span_context: id,
    167       batch_fun: batch_fun,
    168       batch_opts: batch_opts,
    169       batch_data: batch_data
    170     }
    171 
    172     :telemetry.execute(
    173       @batch_start,
    174       %{system_time: system_time},
    175       metadata
    176     )
    177 
    178     metadata
    179   end
    180 
    181   defp emit_stop_event(duration, metadata, result) do
    182     :telemetry.execute(
    183       @batch_stop,
    184       %{duration: duration},
    185       Map.put(metadata, :result, result)
    186     )
    187   end
    188 
    189   defp call_batch_fun({module, fun}, batch_data) do
    190     call_batch_fun({module, fun, []}, batch_data)
    191   end
    192 
    193   defp call_batch_fun({module, fun, config}, batch_data) do
    194     apply(module, fun, [config, batch_data])
    195   end
    196 
    197   # If the flag is set we need to do another resolution phase.
    198   # otherwise, we do not
    199   def pipeline(pipeline, exec) do
    200     case exec.acc[__MODULE__][:input] do
    201       [_ | _] ->
    202         [Absinthe.Phase.Document.Execution.Resolution | pipeline]
    203 
    204       _ ->
    205         pipeline
    206     end
    207   end
    208 end