zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

fs_poll.ex (3069B)


      1 require Logger
      2 
      3 defmodule FileSystem.Backends.FSPoll do
      4   @moduledoc """
      5   FileSysetm backend for any OS, a GenServer that regularly scans file system to
      6   detect changes and send them to the worker process.
      7 
      8   ## Backend Options
      9 
     10     * `:interval` (integer, default: 1000), polling interval
     11 
     12   ## Use FSPoll Backend
     13 
     14   Unlike other backends, polling backend is never automatically chosen in any
     15   OS environment, despite being usable on all platforms.
     16 
     17   To use polling backend, one has to explicitly specify in the backend option.
     18   """
     19 
     20   use GenServer
     21   @behaviour FileSystem.Backend
     22 
     23   def bootstrap, do: :ok
     24 
     25   def supported_systems do
     26     [{:unix, :linux}, {:unix, :freebsd}, {:unix, :openbsd}, {:unix, :darwin}, {:win32, :nt}]
     27   end
     28 
     29   def known_events do
     30     [:created, :deleted, :modified]
     31   end
     32 
     33   def start_link(args) do
     34     GenServer.start_link(__MODULE__, args, [])
     35   end
     36 
     37   def init(args) do
     38     worker_pid = Keyword.fetch!(args, :worker_pid)
     39     dirs = Keyword.fetch!(args, :dirs)
     40     interval = Keyword.get(args, :interval, 1000)
     41 
     42     Logger.info("Polling file changes every #{interval}ms...")
     43     send(self(), :first_check)
     44 
     45     {:ok, {worker_pid, dirs, interval, %{}}}
     46   end
     47 
     48   def handle_info(:first_check, {worker_pid, dirs, interval, _empty_map}) do
     49     schedule_check(interval)
     50     {:noreply, {worker_pid, dirs, interval, files_mtimes(dirs)}}
     51   end
     52 
     53   def handle_info(:check, {worker_pid, dirs, interval, stale_mtimes}) do
     54     fresh_mtimes = files_mtimes(dirs)
     55 
     56     diff(stale_mtimes, fresh_mtimes)
     57     |> Tuple.to_list
     58     |> Enum.zip([:created, :deleted, :modified])
     59     |> Enum.each(&report_change(&1, worker_pid))
     60 
     61     schedule_check(interval)
     62     {:noreply, {worker_pid, dirs, interval, fresh_mtimes}}
     63   end
     64 
     65   defp schedule_check(interval) do
     66     Process.send_after(self(), :check, interval)
     67   end
     68 
     69   defp files_mtimes(dirs, files_mtimes_map \\ %{}) do
     70     Enum.reduce(dirs, files_mtimes_map, fn dir, map ->
     71       case File.stat!(dir) do
     72         %{type: :regular, mtime: mtime} ->
     73           Map.put(map, dir, mtime)
     74         %{type: :directory} ->
     75           dir
     76           |> Path.join("*")
     77           |> Path.wildcard
     78           |> files_mtimes(map)
     79         %{type: _other} ->
     80           map
     81       end
     82     end)
     83   end
     84 
     85   @doc false
     86   def diff(stale_mtimes, fresh_mtimes) do
     87     fresh_file_paths = fresh_mtimes |> Map.keys |> MapSet.new
     88     stale_file_paths = stale_mtimes |> Map.keys |> MapSet.new
     89 
     90     created_file_paths =
     91       MapSet.difference(fresh_file_paths, stale_file_paths) |> MapSet.to_list
     92     deleted_file_paths =
     93       MapSet.difference(stale_file_paths, fresh_file_paths) |> MapSet.to_list
     94     modified_file_paths =
     95       for file_path <- MapSet.intersection(stale_file_paths, fresh_file_paths),
     96         stale_mtimes[file_path] != fresh_mtimes[file_path], do: file_path
     97 
     98     {created_file_paths, deleted_file_paths, modified_file_paths}
     99   end
    100 
    101   defp report_change({file_paths, event}, worker_pid) do
    102     for file_path <- file_paths do
    103       send(worker_pid, {:backend_file_event, self(), {file_path, [event]}})
    104     end
    105   end
    106 end