zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

subscription.ex (5389B)


      1 defmodule Absinthe.Subscription do
      2   @moduledoc """
      3   Real time updates via GraphQL
      4 
      5   For a how to guide on getting started with Absinthe.Subscriptions in your phoenix
      6   project see the `Absinthe.Phoenix` package.
      7 
      8   Define in your schema via `Absinthe.Schema.subscription/2`
      9 
     10   ## Basic Usage
     11 
     12   ## Performance Characteristics
     13 
     14   There are a couple of limitations to the beta release of subscriptions that
     15   are worth keeping in mind if you want to use this in production:
     16 
     17   By design, all subscription docs triggered by a mutation are run inside the
     18   mutation process as a form of back pressure.
     19 
     20   At the moment however database batching does not happen across the set of
     21   subscription docs. Thus if you have a lot of subscription docs and they each
     22   do a lot of extra DB lookups you're going to delay incoming mutation responses
     23   by however long it takes to do all that work.
     24 
     25   Before the final version of 1.4.0 we want
     26 
     27   - Batching across subscriptions
     28   - More user control over back pressure / async balance.
     29   """
     30 
     31   alias __MODULE__
     32 
     33   alias Absinthe.Subscription.PipelineSerializer
     34 
     35   @doc """
     36   Add Absinthe.Subscription to your process tree.
     37   """
     38   defdelegate start_link(pubsub), to: Subscription.Supervisor
     39 
     40   def child_spec(pubsub) do
     41     %{
     42       id: __MODULE__,
     43       start: {Subscription.Supervisor, :start_link, [pubsub]},
     44       type: :supervisor
     45     }
     46   end
     47 
     48   @type subscription_field_spec :: {atom, term | (term -> term)}
     49 
     50   @doc """
     51   Publish a mutation
     52 
     53   This function is generally used when trying to publish to one or more subscription
     54   fields "out of band" from any particular mutation.
     55 
     56   ## Examples
     57 
     58   Note: As with all subscription examples if you're using Absinthe.Phoenix `pubsub`
     59   will be `MyAppWeb.Endpoint`.
     60 
     61   ```
     62   Absinthe.Subscription.publish(pubsub, user, [new_users: user.account_id])
     63   ```
     64   ```
     65   # publish to two subscription fields
     66   Absinthe.Subscription.publish(pubsub, user, [
     67     new_users: user.account_id,
     68     other_user_subscription_field: user.id,
     69   ])
     70   ```
     71   """
     72   @spec publish(
     73           Absinthe.Subscription.Pubsub.t(),
     74           term,
     75           Absinthe.Resolution.t() | [subscription_field_spec]
     76         ) :: :ok
     77   def publish(pubsub, mutation_result, %Absinthe.Resolution{} = info) do
     78     subscribed_fields = get_subscription_fields(info)
     79     publish(pubsub, mutation_result, subscribed_fields)
     80   end
     81 
     82   def publish(pubsub, mutation_result, subscribed_fields) do
     83     _ = publish_remote(pubsub, mutation_result, subscribed_fields)
     84     _ = Subscription.Local.publish_mutation(pubsub, mutation_result, subscribed_fields)
     85     :ok
     86   end
     87 
     88   defp get_subscription_fields(resolution_info) do
     89     mutation_field = resolution_info.definition.schema_node
     90     schema = resolution_info.schema
     91     subscription = Absinthe.Schema.lookup_type(schema, :subscription) || %{fields: []}
     92 
     93     subscription_fields = fetch_fields(subscription.fields, mutation_field.triggers)
     94 
     95     for {sub_field_id, sub_field} <- subscription_fields do
     96       triggers = Absinthe.Type.function(sub_field, :triggers)
     97       config = Map.fetch!(triggers, mutation_field.identifier)
     98       {sub_field_id, config}
     99     end
    100   end
    101 
    102   # TODO: normalize the `.fields` type.
    103   defp fetch_fields(fields, triggers) when is_map(fields) do
    104     Map.take(fields, triggers)
    105   end
    106 
    107   defp fetch_fields(_, _), do: []
    108 
    109   @doc false
    110   def subscribe(pubsub, field_key, doc_id, doc) do
    111     registry = pubsub |> registry_name
    112 
    113     doc_value = {
    114       doc_id,
    115       %{
    116         initial_phases: PipelineSerializer.pack(doc.initial_phases),
    117         source: doc.source
    118       }
    119     }
    120 
    121     {:ok, _} = Registry.register(registry, field_key, doc_value)
    122     {:ok, _} = Registry.register(registry, {self(), doc_id}, field_key)
    123   end
    124 
    125   @doc false
    126   def unsubscribe(pubsub, doc_id) do
    127     registry = pubsub |> registry_name
    128     self = self()
    129 
    130     for {^self, field_key} <- Registry.lookup(registry, {self, doc_id}) do
    131       Registry.unregister_match(registry, field_key, {doc_id, :_})
    132     end
    133 
    134     Registry.unregister(registry, {self, doc_id})
    135     :ok
    136   end
    137 
    138   @doc false
    139   def get(pubsub, key) do
    140     pubsub
    141     |> registry_name
    142     |> Registry.lookup(key)
    143     |> Enum.map(fn match ->
    144       {_, {doc_id, doc}} = match
    145       doc = Map.update!(doc, :initial_phases, &PipelineSerializer.unpack/1)
    146 
    147       {doc_id, doc}
    148     end)
    149   end
    150 
    151   @doc false
    152   def registry_name(pubsub) do
    153     Module.concat([pubsub, :Registry])
    154   end
    155 
    156   @doc false
    157   def publish_remote(pubsub, mutation_result, subscribed_fields) do
    158     {:ok, pool_size} =
    159       pubsub
    160       |> registry_name
    161       |> Registry.meta(:pool_size)
    162 
    163     shard = :erlang.phash2(mutation_result, pool_size)
    164 
    165     proxy_topic = Subscription.Proxy.topic(shard)
    166 
    167     :ok = pubsub.publish_mutation(proxy_topic, mutation_result, subscribed_fields)
    168   end
    169 
    170   ## Middleware callback
    171   @doc false
    172   def call(%{state: :resolved, errors: [], value: value} = res, _) do
    173     with {:ok, pubsub} <- extract_pubsub(res.context) do
    174       __MODULE__.publish(pubsub, value, res)
    175     end
    176 
    177     res
    178   end
    179 
    180   def call(res, _), do: res
    181 
    182   @doc false
    183   def extract_pubsub(context) do
    184     with {:ok, pubsub} <- Map.fetch(context, :pubsub),
    185          pid when is_pid(pid) <- Process.whereis(registry_name(pubsub)) do
    186       {:ok, pubsub}
    187     else
    188       _ -> :error
    189     end
    190   end
    191 
    192   @doc false
    193   def add_middleware(middleware) do
    194     middleware ++ [{__MODULE__, []}]
    195   end
    196 end