zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

worker.ex (1369B)


      1 defmodule FileSystem.Worker do
      2   @moduledoc """
      3   FileSystem Worker Process with the backend GenServer, receive events from Port Process
      4   and forward it to subscribers.
      5   """
      6 
      7   use GenServer
      8 
      9   @doc false
     10   def start_link(args) do
     11     {opts, args} = Keyword.split(args, [:name])
     12     GenServer.start_link(__MODULE__, args, opts)
     13   end
     14 
     15   @doc false
     16   def init(args) do
     17     {backend, rest} = Keyword.pop(args, :backend)
     18     with {:ok, backend} <- FileSystem.Backend.backend(backend),
     19          {:ok, backend_pid} <- backend.start_link([{:worker_pid, self()} | rest])
     20     do
     21       {:ok, %{backend_pid: backend_pid, subscribers: %{}}}
     22     else
     23       _ -> :ignore
     24     end
     25   end
     26 
     27   @doc false
     28   def handle_call(:subscribe, {pid, _}, state) do
     29     ref = Process.monitor(pid)
     30     state = put_in(state, [:subscribers, ref], pid)
     31     {:reply, :ok, state}
     32   end
     33 
     34   @doc false
     35   def handle_info({:backend_file_event, backend_pid, file_event}, %{backend_pid: backend_pid}=state) do
     36     state.subscribers |> Enum.each(fn {_ref, subscriber_pid} ->
     37       send(subscriber_pid, {:file_event, self(), file_event})
     38     end)
     39     {:noreply, state}
     40   end
     41 
     42   def handle_info({:DOWN, ref, _, _pid, _reason}, state) do
     43     subscribers = Map.drop(state.subscribers, [ref])
     44     {:noreply, %{state | subscribers: subscribers}}
     45   end
     46 
     47   def handle_info(_, state) do
     48     {:noreply, state}
     49   end
     50 end