zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

local.ex (2670B)


      1 defmodule Absinthe.Subscription.Local do
      2   @moduledoc """
      3   This module handles broadcasting documents that are local to this node
      4   """
      5 
      6   require Logger
      7 
      8   alias Absinthe.Pipeline.BatchResolver
      9 
     10   # This module handles running and broadcasting documents that are local to this
     11   # node.
     12 
     13   @doc """
     14   Publish a mutation to the local node only.
     15 
     16   See also `Absinthe.Subscription.publish/3`
     17   """
     18   @spec publish_mutation(
     19           Absinthe.Subscription.Pubsub.t(),
     20           term,
     21           [Absinthe.Subscription.subscription_field_spec()]
     22         ) :: :ok
     23   def publish_mutation(pubsub, mutation_result, subscribed_fields) do
     24     docs_and_topics =
     25       for {field, key_strategy} <- subscribed_fields,
     26           {topic, doc} <- get_docs(pubsub, field, mutation_result, key_strategy) do
     27         {topic, key_strategy, doc}
     28       end
     29 
     30     run_docset(pubsub, docs_and_topics, mutation_result)
     31 
     32     :ok
     33   end
     34 
     35   alias Absinthe.{Phase, Pipeline}
     36 
     37   defp run_docset(pubsub, docs_and_topics, mutation_result) do
     38     for {topic, key_strategy, doc} <- docs_and_topics do
     39       try do
     40         pipeline =
     41           doc.initial_phases
     42           |> Pipeline.replace(
     43             Phase.Telemetry,
     44             {Phase.Telemetry, event: [:subscription, :publish, :start]}
     45           )
     46           |> Pipeline.without(Phase.Subscription.SubscribeSelf)
     47           |> Pipeline.insert_before(
     48             Phase.Document.Execution.Resolution,
     49             {Phase.Document.OverrideRoot, root_value: mutation_result}
     50           )
     51           |> Pipeline.upto(Phase.Document.Execution.Resolution)
     52 
     53         pipeline = [
     54           pipeline,
     55           [
     56             Absinthe.Phase.Document.Result,
     57             {Absinthe.Phase.Telemetry, event: [:subscription, :publish, :stop]}
     58           ]
     59         ]
     60 
     61         {:ok, %{result: data}, _} = Absinthe.Pipeline.run(doc.source, pipeline)
     62 
     63         Logger.debug("""
     64         Absinthe Subscription Publication
     65         Field Topic: #{inspect(key_strategy)}
     66         Subscription id: #{inspect(topic)}
     67         Data: #{inspect(data)}
     68         """)
     69 
     70         :ok = pubsub.publish_subscription(topic, data)
     71       rescue
     72         e ->
     73           BatchResolver.pipeline_error(e, __STACKTRACE__)
     74       end
     75     end
     76   end
     77 
     78   defp get_docs(pubsub, field, mutation_result, topic: topic_fun)
     79        when is_function(topic_fun, 1) do
     80     do_get_docs(pubsub, field, topic_fun.(mutation_result))
     81   end
     82 
     83   defp get_docs(pubsub, field, _mutation_result, key) do
     84     do_get_docs(pubsub, field, key)
     85   end
     86 
     87   defp do_get_docs(pubsub, field, keys) do
     88     keys
     89     |> List.wrap()
     90     |> Enum.map(&to_string/1)
     91     |> Enum.flat_map(&Absinthe.Subscription.get(pubsub, {field, &1}))
     92   end
     93 end