subscribe_self.ex (4113B)
1 defmodule Absinthe.Phase.Subscription.SubscribeSelf do 2 use Absinthe.Phase 3 alias Absinthe.Phase 4 5 @moduledoc false 6 7 alias Absinthe.Blueprint 8 9 @spec run(any, Keyword.t()) :: {:ok, Blueprint.t()} 10 def run(blueprint, options) do 11 with %{type: :subscription} = op <- Blueprint.current_operation(blueprint) do 12 do_subscription(op, blueprint, options) 13 else 14 _ -> {:ok, blueprint} 15 end 16 end 17 18 def do_subscription(%{type: :subscription} = op, blueprint, options) do 19 context = blueprint.execution.context 20 pubsub = ensure_pubsub!(context) 21 22 %{selections: [field]} = op 23 24 with {:ok, config} <- get_config(field, context, blueprint) do 25 field_keys = get_field_keys(field, config) 26 subscription_id = get_subscription_id(config, blueprint, options) 27 28 for field_key <- field_keys, 29 do: Absinthe.Subscription.subscribe(pubsub, field_key, subscription_id, blueprint) 30 31 {:replace, blueprint, 32 [ 33 {Phase.Subscription.Result, topic: subscription_id}, 34 {Phase.Telemetry, Keyword.put(options, :event, [:execute, :operation, :stop])} 35 ]} 36 else 37 {:error, error} -> 38 blueprint = update_in(blueprint.execution.validation_errors, &[error | &1]) 39 40 error_pipeline = [ 41 {Phase.Document.Result, options} 42 ] 43 44 {:replace, blueprint, error_pipeline} 45 end 46 end 47 48 defp get_config( 49 %{schema_node: schema_node, argument_data: argument_data} = field, 50 context, 51 blueprint 52 ) do 53 name = schema_node.identifier 54 55 config = 56 case Absinthe.Type.function(schema_node, :config) do 57 fun when is_function(fun, 2) -> 58 apply(fun, [argument_data, %{context: context, document: blueprint}]) 59 60 fun when is_function(fun, 1) -> 61 IO.write( 62 :stderr, 63 "Warning: 1-arity topic functions are deprecated, upgrade to 2 arity before 1.4.0 release" 64 ) 65 66 apply(fun, [argument_data]) 67 68 nil -> 69 {:ok, topic: Atom.to_string(name)} 70 end 71 72 case config do 73 {:ok, config} -> 74 {:ok, config} 75 76 {:error, msg} -> 77 error = %Phase.Error{ 78 phase: __MODULE__, 79 message: msg, 80 locations: [field.source_location] 81 } 82 83 {:error, error} 84 85 val -> 86 raise """ 87 Invalid return from config function! 88 89 A config function must return `{:ok, config}` or `{:error, msg}`. You returned: 90 91 #{inspect(val)} 92 """ 93 end 94 end 95 96 defp get_field_keys(%{schema_node: schema_node} = _field, config) do 97 name = schema_node.identifier 98 99 find_field_keys!(config) 100 |> Enum.map(fn key -> {name, key} end) 101 end 102 103 defp ensure_pubsub!(context) do 104 case Absinthe.Subscription.extract_pubsub(context) do 105 {:ok, pubsub} -> 106 pubsub 107 108 _ -> 109 raise """ 110 Pubsub not configured! 111 112 Subscriptions require a configured pubsub module. 113 """ 114 end 115 end 116 117 defp find_field_keys!(config) do 118 topic = 119 config[:topic] || 120 raise """ 121 Subscription config must include a non null topic! 122 123 #{inspect(config)} 124 """ 125 126 case topic do 127 [] -> 128 raise """ 129 Subscription config must not provide an empty list of topics! 130 131 #{inspect(config)} 132 """ 133 134 val -> 135 List.wrap(val) 136 |> Enum.map(&to_string/1) 137 end 138 end 139 140 defp get_subscription_id(config, blueprint, options) do 141 context_id = get_context_id(config) 142 document_id = get_document_id(config, blueprint, options) 143 144 "__absinthe__:doc:#{context_id}:#{document_id}" 145 end 146 147 defp get_context_id(config) do 148 context_id = config[:context_id] || :erlang.unique_integer() 149 to_string(context_id) 150 end 151 152 defp get_document_id(config, blueprint, options) do 153 case config[:document_id] do 154 nil -> 155 binary = 156 {blueprint.source || blueprint.input, options[:variables] || %{}} 157 |> :erlang.term_to_binary() 158 159 :crypto.hash(:sha256, binary) 160 |> Base.encode16() 161 162 val -> 163 val 164 end 165 end 166 end