subscription.ex (5389B)
1 defmodule Absinthe.Subscription do 2 @moduledoc """ 3 Real time updates via GraphQL 4 5 For a how to guide on getting started with Absinthe.Subscriptions in your phoenix 6 project see the `Absinthe.Phoenix` package. 7 8 Define in your schema via `Absinthe.Schema.subscription/2` 9 10 ## Basic Usage 11 12 ## Performance Characteristics 13 14 There are a couple of limitations to the beta release of subscriptions that 15 are worth keeping in mind if you want to use this in production: 16 17 By design, all subscription docs triggered by a mutation are run inside the 18 mutation process as a form of back pressure. 19 20 At the moment however database batching does not happen across the set of 21 subscription docs. Thus if you have a lot of subscription docs and they each 22 do a lot of extra DB lookups you're going to delay incoming mutation responses 23 by however long it takes to do all that work. 24 25 Before the final version of 1.4.0 we want 26 27 - Batching across subscriptions 28 - More user control over back pressure / async balance. 29 """ 30 31 alias __MODULE__ 32 33 alias Absinthe.Subscription.PipelineSerializer 34 35 @doc """ 36 Add Absinthe.Subscription to your process tree. 37 """ 38 defdelegate start_link(pubsub), to: Subscription.Supervisor 39 40 def child_spec(pubsub) do 41 %{ 42 id: __MODULE__, 43 start: {Subscription.Supervisor, :start_link, [pubsub]}, 44 type: :supervisor 45 } 46 end 47 48 @type subscription_field_spec :: {atom, term | (term -> term)} 49 50 @doc """ 51 Publish a mutation 52 53 This function is generally used when trying to publish to one or more subscription 54 fields "out of band" from any particular mutation. 55 56 ## Examples 57 58 Note: As with all subscription examples if you're using Absinthe.Phoenix `pubsub` 59 will be `MyAppWeb.Endpoint`. 60 61 ``` 62 Absinthe.Subscription.publish(pubsub, user, [new_users: user.account_id]) 63 ``` 64 ``` 65 # publish to two subscription fields 66 Absinthe.Subscription.publish(pubsub, user, [ 67 new_users: user.account_id, 68 other_user_subscription_field: user.id, 69 ]) 70 ``` 71 """ 72 @spec publish( 73 Absinthe.Subscription.Pubsub.t(), 74 term, 75 Absinthe.Resolution.t() | [subscription_field_spec] 76 ) :: :ok 77 def publish(pubsub, mutation_result, %Absinthe.Resolution{} = info) do 78 subscribed_fields = get_subscription_fields(info) 79 publish(pubsub, mutation_result, subscribed_fields) 80 end 81 82 def publish(pubsub, mutation_result, subscribed_fields) do 83 _ = publish_remote(pubsub, mutation_result, subscribed_fields) 84 _ = Subscription.Local.publish_mutation(pubsub, mutation_result, subscribed_fields) 85 :ok 86 end 87 88 defp get_subscription_fields(resolution_info) do 89 mutation_field = resolution_info.definition.schema_node 90 schema = resolution_info.schema 91 subscription = Absinthe.Schema.lookup_type(schema, :subscription) || %{fields: []} 92 93 subscription_fields = fetch_fields(subscription.fields, mutation_field.triggers) 94 95 for {sub_field_id, sub_field} <- subscription_fields do 96 triggers = Absinthe.Type.function(sub_field, :triggers) 97 config = Map.fetch!(triggers, mutation_field.identifier) 98 {sub_field_id, config} 99 end 100 end 101 102 # TODO: normalize the `.fields` type. 103 defp fetch_fields(fields, triggers) when is_map(fields) do 104 Map.take(fields, triggers) 105 end 106 107 defp fetch_fields(_, _), do: [] 108 109 @doc false 110 def subscribe(pubsub, field_key, doc_id, doc) do 111 registry = pubsub |> registry_name 112 113 doc_value = { 114 doc_id, 115 %{ 116 initial_phases: PipelineSerializer.pack(doc.initial_phases), 117 source: doc.source 118 } 119 } 120 121 {:ok, _} = Registry.register(registry, field_key, doc_value) 122 {:ok, _} = Registry.register(registry, {self(), doc_id}, field_key) 123 end 124 125 @doc false 126 def unsubscribe(pubsub, doc_id) do 127 registry = pubsub |> registry_name 128 self = self() 129 130 for {^self, field_key} <- Registry.lookup(registry, {self, doc_id}) do 131 Registry.unregister_match(registry, field_key, {doc_id, :_}) 132 end 133 134 Registry.unregister(registry, {self, doc_id}) 135 :ok 136 end 137 138 @doc false 139 def get(pubsub, key) do 140 pubsub 141 |> registry_name 142 |> Registry.lookup(key) 143 |> Enum.map(fn match -> 144 {_, {doc_id, doc}} = match 145 doc = Map.update!(doc, :initial_phases, &PipelineSerializer.unpack/1) 146 147 {doc_id, doc} 148 end) 149 end 150 151 @doc false 152 def registry_name(pubsub) do 153 Module.concat([pubsub, :Registry]) 154 end 155 156 @doc false 157 def publish_remote(pubsub, mutation_result, subscribed_fields) do 158 {:ok, pool_size} = 159 pubsub 160 |> registry_name 161 |> Registry.meta(:pool_size) 162 163 shard = :erlang.phash2(mutation_result, pool_size) 164 165 proxy_topic = Subscription.Proxy.topic(shard) 166 167 :ok = pubsub.publish_mutation(proxy_topic, mutation_result, subscribed_fields) 168 end 169 170 ## Middleware callback 171 @doc false 172 def call(%{state: :resolved, errors: [], value: value} = res, _) do 173 with {:ok, pubsub} <- extract_pubsub(res.context) do 174 __MODULE__.publish(pubsub, value, res) 175 end 176 177 res 178 end 179 180 def call(res, _), do: res 181 182 @doc false 183 def extract_pubsub(context) do 184 with {:ok, pubsub} <- Map.fetch(context, :pubsub), 185 pid when is_pid(pid) <- Process.whereis(registry_name(pubsub)) do 186 {:ok, pubsub} 187 else 188 _ -> :error 189 end 190 end 191 192 @doc false 193 def add_middleware(middleware) do 194 middleware ++ [{__MODULE__, []}] 195 end 196 end