zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

proxy.ex (1313B)


      1 defmodule Absinthe.Subscription.Proxy do
      2   @moduledoc false
      3 
      4   use GenServer
      5 
      6   defstruct [
      7     :pubsub,
      8     :node,
      9     :task_super
     10   ]
     11 
     12   def child_spec([_, _, shard] = args) do
     13     %{
     14       id: {__MODULE__, shard},
     15       start: {__MODULE__, :start_link, [args]}
     16     }
     17   end
     18 
     19   alias Absinthe.Subscription
     20 
     21   @gc_interval 5_000
     22 
     23   def start_link(args) do
     24     GenServer.start_link(__MODULE__, args)
     25   end
     26 
     27   def topic(shard), do: "__absinthe__:proxy:#{shard}"
     28 
     29   def init([task_super, pubsub, shard]) do
     30     node_name = pubsub.node_name()
     31     :ok = pubsub.subscribe(topic(shard))
     32     Process.send_after(self(), :gc, @gc_interval)
     33     {:ok, %__MODULE__{pubsub: pubsub, node: node_name, task_super: task_super}}
     34   end
     35 
     36   def handle_info(:gc, state) do
     37     :erlang.garbage_collect()
     38     Process.send_after(self(), :gc, @gc_interval)
     39     {:noreply, state}
     40   end
     41 
     42   def handle_info(payload, state) do
     43     # There's no meaningful form of backpressure to have here, and we can't
     44     # bottleneck execution inside each proxy process
     45 
     46     unless payload.node == state.pubsub.node_name() do
     47       Task.Supervisor.start_child(state.task_super, Subscription.Local, :publish_mutation, [
     48         state.pubsub,
     49         payload.mutation_result,
     50         payload.subscribed_fields
     51       ])
     52     end
     53 
     54     {:noreply, state}
     55   end
     56 end