zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

subscribe_self.ex (4113B)


      1 defmodule Absinthe.Phase.Subscription.SubscribeSelf do
      2   use Absinthe.Phase
      3   alias Absinthe.Phase
      4 
      5   @moduledoc false
      6 
      7   alias Absinthe.Blueprint
      8 
      9   @spec run(any, Keyword.t()) :: {:ok, Blueprint.t()}
     10   def run(blueprint, options) do
     11     with %{type: :subscription} = op <- Blueprint.current_operation(blueprint) do
     12       do_subscription(op, blueprint, options)
     13     else
     14       _ -> {:ok, blueprint}
     15     end
     16   end
     17 
     18   def do_subscription(%{type: :subscription} = op, blueprint, options) do
     19     context = blueprint.execution.context
     20     pubsub = ensure_pubsub!(context)
     21 
     22     %{selections: [field]} = op
     23 
     24     with {:ok, config} <- get_config(field, context, blueprint) do
     25       field_keys = get_field_keys(field, config)
     26       subscription_id = get_subscription_id(config, blueprint, options)
     27 
     28       for field_key <- field_keys,
     29           do: Absinthe.Subscription.subscribe(pubsub, field_key, subscription_id, blueprint)
     30 
     31       {:replace, blueprint,
     32        [
     33          {Phase.Subscription.Result, topic: subscription_id},
     34          {Phase.Telemetry, Keyword.put(options, :event, [:execute, :operation, :stop])}
     35        ]}
     36     else
     37       {:error, error} ->
     38         blueprint = update_in(blueprint.execution.validation_errors, &[error | &1])
     39 
     40         error_pipeline = [
     41           {Phase.Document.Result, options}
     42         ]
     43 
     44         {:replace, blueprint, error_pipeline}
     45     end
     46   end
     47 
     48   defp get_config(
     49          %{schema_node: schema_node, argument_data: argument_data} = field,
     50          context,
     51          blueprint
     52        ) do
     53     name = schema_node.identifier
     54 
     55     config =
     56       case Absinthe.Type.function(schema_node, :config) do
     57         fun when is_function(fun, 2) ->
     58           apply(fun, [argument_data, %{context: context, document: blueprint}])
     59 
     60         fun when is_function(fun, 1) ->
     61           IO.write(
     62             :stderr,
     63             "Warning: 1-arity topic functions are deprecated, upgrade to 2 arity before 1.4.0 release"
     64           )
     65 
     66           apply(fun, [argument_data])
     67 
     68         nil ->
     69           {:ok, topic: Atom.to_string(name)}
     70       end
     71 
     72     case config do
     73       {:ok, config} ->
     74         {:ok, config}
     75 
     76       {:error, msg} ->
     77         error = %Phase.Error{
     78           phase: __MODULE__,
     79           message: msg,
     80           locations: [field.source_location]
     81         }
     82 
     83         {:error, error}
     84 
     85       val ->
     86         raise """
     87         Invalid return from config function!
     88 
     89         A config function must return `{:ok, config}` or `{:error, msg}`. You returned:
     90 
     91         #{inspect(val)}
     92         """
     93     end
     94   end
     95 
     96   defp get_field_keys(%{schema_node: schema_node} = _field, config) do
     97     name = schema_node.identifier
     98 
     99     find_field_keys!(config)
    100     |> Enum.map(fn key -> {name, key} end)
    101   end
    102 
    103   defp ensure_pubsub!(context) do
    104     case Absinthe.Subscription.extract_pubsub(context) do
    105       {:ok, pubsub} ->
    106         pubsub
    107 
    108       _ ->
    109         raise """
    110         Pubsub not configured!
    111 
    112         Subscriptions require a configured pubsub module.
    113         """
    114     end
    115   end
    116 
    117   defp find_field_keys!(config) do
    118     topic =
    119       config[:topic] ||
    120         raise """
    121         Subscription config must include a non null topic!
    122 
    123         #{inspect(config)}
    124         """
    125 
    126     case topic do
    127       [] ->
    128         raise """
    129         Subscription config must not provide an empty list of topics!
    130 
    131         #{inspect(config)}
    132         """
    133 
    134       val ->
    135         List.wrap(val)
    136         |> Enum.map(&to_string/1)
    137     end
    138   end
    139 
    140   defp get_subscription_id(config, blueprint, options) do
    141     context_id = get_context_id(config)
    142     document_id = get_document_id(config, blueprint, options)
    143 
    144     "__absinthe__:doc:#{context_id}:#{document_id}"
    145   end
    146 
    147   defp get_context_id(config) do
    148     context_id = config[:context_id] || :erlang.unique_integer()
    149     to_string(context_id)
    150   end
    151 
    152   defp get_document_id(config, blueprint, options) do
    153     case config[:document_id] do
    154       nil ->
    155         binary =
    156           {blueprint.source || blueprint.input, options[:variables] || %{}}
    157           |> :erlang.term_to_binary()
    158 
    159         :crypto.hash(:sha256, binary)
    160         |> Base.encode16()
    161 
    162       val ->
    163         val
    164     end
    165   end
    166 end