proxy.ex (1313B)
1 defmodule Absinthe.Subscription.Proxy do 2 @moduledoc false 3 4 use GenServer 5 6 defstruct [ 7 :pubsub, 8 :node, 9 :task_super 10 ] 11 12 def child_spec([_, _, shard] = args) do 13 %{ 14 id: {__MODULE__, shard}, 15 start: {__MODULE__, :start_link, [args]} 16 } 17 end 18 19 alias Absinthe.Subscription 20 21 @gc_interval 5_000 22 23 def start_link(args) do 24 GenServer.start_link(__MODULE__, args) 25 end 26 27 def topic(shard), do: "__absinthe__:proxy:#{shard}" 28 29 def init([task_super, pubsub, shard]) do 30 node_name = pubsub.node_name() 31 :ok = pubsub.subscribe(topic(shard)) 32 Process.send_after(self(), :gc, @gc_interval) 33 {:ok, %__MODULE__{pubsub: pubsub, node: node_name, task_super: task_super}} 34 end 35 36 def handle_info(:gc, state) do 37 :erlang.garbage_collect() 38 Process.send_after(self(), :gc, @gc_interval) 39 {:noreply, state} 40 end 41 42 def handle_info(payload, state) do 43 # There's no meaningful form of backpressure to have here, and we can't 44 # bottleneck execution inside each proxy process 45 46 unless payload.node == state.pubsub.node_name() do 47 Task.Supervisor.start_child(state.task_super, Subscription.Local, :publish_mutation, [ 48 state.pubsub, 49 payload.mutation_result, 50 payload.subscribed_fields 51 ]) 52 end 53 54 {:noreply, state} 55 end 56 end