local.ex (2670B)
1 defmodule Absinthe.Subscription.Local do 2 @moduledoc """ 3 This module handles broadcasting documents that are local to this node 4 """ 5 6 require Logger 7 8 alias Absinthe.Pipeline.BatchResolver 9 10 # This module handles running and broadcasting documents that are local to this 11 # node. 12 13 @doc """ 14 Publish a mutation to the local node only. 15 16 See also `Absinthe.Subscription.publish/3` 17 """ 18 @spec publish_mutation( 19 Absinthe.Subscription.Pubsub.t(), 20 term, 21 [Absinthe.Subscription.subscription_field_spec()] 22 ) :: :ok 23 def publish_mutation(pubsub, mutation_result, subscribed_fields) do 24 docs_and_topics = 25 for {field, key_strategy} <- subscribed_fields, 26 {topic, doc} <- get_docs(pubsub, field, mutation_result, key_strategy) do 27 {topic, key_strategy, doc} 28 end 29 30 run_docset(pubsub, docs_and_topics, mutation_result) 31 32 :ok 33 end 34 35 alias Absinthe.{Phase, Pipeline} 36 37 defp run_docset(pubsub, docs_and_topics, mutation_result) do 38 for {topic, key_strategy, doc} <- docs_and_topics do 39 try do 40 pipeline = 41 doc.initial_phases 42 |> Pipeline.replace( 43 Phase.Telemetry, 44 {Phase.Telemetry, event: [:subscription, :publish, :start]} 45 ) 46 |> Pipeline.without(Phase.Subscription.SubscribeSelf) 47 |> Pipeline.insert_before( 48 Phase.Document.Execution.Resolution, 49 {Phase.Document.OverrideRoot, root_value: mutation_result} 50 ) 51 |> Pipeline.upto(Phase.Document.Execution.Resolution) 52 53 pipeline = [ 54 pipeline, 55 [ 56 Absinthe.Phase.Document.Result, 57 {Absinthe.Phase.Telemetry, event: [:subscription, :publish, :stop]} 58 ] 59 ] 60 61 {:ok, %{result: data}, _} = Absinthe.Pipeline.run(doc.source, pipeline) 62 63 Logger.debug(""" 64 Absinthe Subscription Publication 65 Field Topic: #{inspect(key_strategy)} 66 Subscription id: #{inspect(topic)} 67 Data: #{inspect(data)} 68 """) 69 70 :ok = pubsub.publish_subscription(topic, data) 71 rescue 72 e -> 73 BatchResolver.pipeline_error(e, __STACKTRACE__) 74 end 75 end 76 end 77 78 defp get_docs(pubsub, field, mutation_result, topic: topic_fun) 79 when is_function(topic_fun, 1) do 80 do_get_docs(pubsub, field, topic_fun.(mutation_result)) 81 end 82 83 defp get_docs(pubsub, field, _mutation_result, key) do 84 do_get_docs(pubsub, field, key) 85 end 86 87 defp do_get_docs(pubsub, field, keys) do 88 keys 89 |> List.wrap() 90 |> Enum.map(&to_string/1) 91 |> Enum.flat_map(&Absinthe.Subscription.get(pubsub, {field, &1})) 92 end 93 end