batch.ex (6152B)
1 defmodule Absinthe.Middleware.Batch do 2 @moduledoc """ 3 Batch the resolution of multiple fields. 4 5 ## Motivation 6 Consider the following graphql query: 7 ``` 8 { 9 posts { 10 author { 11 name 12 } 13 } 14 } 15 ``` 16 17 `posts` returns a list of `post` objects, which has an associated `author` field. 18 If the `author` field makes a call to the database we have the classic N + 1 problem. 19 What we want is a way to load all authors for all posts in one database request. 20 21 This plugin provides this, without any eager loading at the parent level. That is, 22 the code for the `posts` field does not need to do anything to facilitate the 23 efficient loading of its children. 24 25 ## Example Usage 26 The API for this plugin is a little on the verbose side because it is not specific 27 to any particular batching mechanism. That is, this API is just as useful for an Ecto 28 based DB as it is for talking to S3 or the File System. Thus we anticipate people 29 (including ourselves) will be creating additional functions more tailored to each 30 of those specific use cases. 31 32 Here is an example using the `Absinthe.Resolution.Helpers.batch/3` helper. 33 ```elixir 34 object :post do 35 field :name, :string 36 field :author, :user do 37 resolve fn post, _, _ -> 38 batch({__MODULE__, :users_by_id}, post.author_id, fn batch_results -> 39 {:ok, Map.get(batch_results, post.author_id)} 40 end) 41 end 42 end 43 end 44 45 def users_by_id(_, user_ids) do 46 users = Repo.all from u in User, where: u.id in ^user_ids 47 Map.new(users, fn user -> {user.id, user} end) 48 end 49 ``` 50 51 Let's look at this piece by piece: 52 - `{__MODULE__, :users_by_id}`: is the batching function which will be used. It must 53 be a 2 arity function. For details see the `batch_fun` typedoc. 54 - `post.author_id`: This is the information to be aggregated. The aggregated values 55 are the second argument to the batching function. 56 - `fn batch_results`: This function takes the results from the batching function. 57 it should return one of the resolution function values. 58 """ 59 60 @behaviour Absinthe.Middleware 61 @behaviour Absinthe.Plugin 62 63 @typedoc """ 64 The function to be called with the aggregate batch information. 65 66 It comes in both a 2 tuple and 3 tuple form. The first two elements are the module 67 and function name. The third element is an arbitrary parameter that is passed 68 as the first argument to the batch function. 69 70 For example, one could parameterize the `users_by_id` function from the moduledoc 71 to make it more generic. Instead of doing `{__MODULE__, :users_by_id}` you could do 72 `{__MODULE__, :by_id, User}`. Then the function would be: 73 74 ```elixir 75 def by_id(model, ids) do 76 model 77 |> where([m], m.id in ^ids) 78 |> Repo.all() 79 |> Map.new(&{&1.id, &1}) 80 end 81 ``` 82 It could also be used to set options unique to the execution of a particular 83 batching function. 84 """ 85 @type batch_fun :: {module, atom} | {module, atom, term} 86 87 @type post_batch_fun :: (term -> Absinthe.Type.Field.result()) 88 89 def before_resolution(exec) do 90 case exec.acc do 91 %{__MODULE__ => _} -> 92 put_in(exec.acc[__MODULE__][:input], []) 93 94 _ -> 95 put_in(exec.acc[__MODULE__], %{input: [], output: %{}}) 96 end 97 end 98 99 def call(%{state: :unresolved} = res, {batch_key, field_data, post_batch_fun, batch_opts}) do 100 acc = res.acc 101 102 acc = 103 update_in(acc[__MODULE__][:input], fn 104 nil -> [{{batch_key, batch_opts}, field_data}] 105 data -> [{{batch_key, batch_opts}, field_data} | data] 106 end) 107 108 %{ 109 res 110 | state: :suspended, 111 middleware: [{__MODULE__, {batch_key, post_batch_fun}} | res.middleware], 112 acc: acc 113 } 114 end 115 116 def call(%{state: :suspended} = res, {batch_key, post_batch_fun}) do 117 batch_data_for_fun = 118 res.acc 119 |> Map.fetch!(__MODULE__) 120 |> Map.fetch!(:output) 121 |> Map.fetch!(batch_key) 122 123 res 124 |> Absinthe.Resolution.put_result(post_batch_fun.(batch_data_for_fun)) 125 end 126 127 def after_resolution(exec) do 128 output = do_batching(exec.acc[__MODULE__][:input]) 129 put_in(exec.acc[__MODULE__][:output], output) 130 end 131 132 defp do_batching(input) do 133 input 134 |> Enum.group_by(&elem(&1, 0), &elem(&1, 1)) 135 |> Enum.map(fn {{batch_fun, batch_opts}, batch_data} -> 136 system_time = System.system_time() 137 start_time_mono = System.monotonic_time() 138 139 task = 140 Task.async(fn -> 141 {batch_fun, call_batch_fun(batch_fun, batch_data)} 142 end) 143 144 metadata = emit_start_event(system_time, batch_fun, batch_opts, batch_data) 145 146 {batch_opts, task, start_time_mono, metadata} 147 end) 148 |> Map.new(fn {batch_opts, task, start_time_mono, metadata} -> 149 timeout = Keyword.get(batch_opts, :timeout, 5_000) 150 result = Task.await(task, timeout) 151 152 duration = System.monotonic_time() - start_time_mono 153 emit_stop_event(duration, metadata, result) 154 155 result 156 end) 157 end 158 159 @batch_start [:absinthe, :middleware, :batch, :start] 160 @batch_stop [:absinthe, :middleware, :batch, :stop] 161 defp emit_start_event(system_time, batch_fun, batch_opts, batch_data) do 162 id = :erlang.unique_integer() 163 164 metadata = %{ 165 id: id, 166 telemetry_span_context: id, 167 batch_fun: batch_fun, 168 batch_opts: batch_opts, 169 batch_data: batch_data 170 } 171 172 :telemetry.execute( 173 @batch_start, 174 %{system_time: system_time}, 175 metadata 176 ) 177 178 metadata 179 end 180 181 defp emit_stop_event(duration, metadata, result) do 182 :telemetry.execute( 183 @batch_stop, 184 %{duration: duration}, 185 Map.put(metadata, :result, result) 186 ) 187 end 188 189 defp call_batch_fun({module, fun}, batch_data) do 190 call_batch_fun({module, fun, []}, batch_data) 191 end 192 193 defp call_batch_fun({module, fun, config}, batch_data) do 194 apply(module, fun, [config, batch_data]) 195 end 196 197 # If the flag is set we need to do another resolution phase. 198 # otherwise, we do not 199 def pipeline(pipeline, exec) do 200 case exec.acc[__MODULE__][:input] do 201 [_ | _] -> 202 [Absinthe.Phase.Document.Execution.Resolution | pipeline] 203 204 _ -> 205 pipeline 206 end 207 end 208 end