zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

worker.ex (3129B)


      1 defmodule Credo.Check.Worker do
      2   @moduledoc false
      3 
      4   @doc """
      5   Runs all members of `workloads` using ``.
      6   """
      7   def run(workloads, max_concurrency, work_fn) do
      8     {:ok, server_pid} = GenServer.start_link(__MODULE__.Server, workloads)
      9 
     10     worker_context = %{
     11       runner_pid: self(),
     12       server_pid: server_pid,
     13       max_concurrency: max_concurrency,
     14       work_fn: work_fn,
     15       results: []
     16     }
     17 
     18     outer_loop(worker_context, 0)
     19   end
     20 
     21   @doc """
     22   Called when a workload has finished.
     23   """
     24   def send_workload_finished_to_runner(worker_context, _workload, result) do
     25     send(worker_context.runner_pid, {self(), {:workload_finished, result}})
     26   end
     27 
     28   defp outer_loop(worker_context, taken) do
     29     available = worker_context.max_concurrency - taken
     30 
     31     cond do
     32       available <= 0 ->
     33         wait_for_workload_finished(worker_context, taken)
     34 
     35       taken_workloads = __MODULE__.Server.take_workloads(worker_context.server_pid, available) ->
     36         inner_loop(worker_context, taken_workloads, taken)
     37 
     38       # we fall thru here if there are no checks left
     39       # there are two options: we are done ...
     40       taken == 0 ->
     41         {:ok, worker_context.results}
     42 
     43       # ... or we need for the very last batch to finish up
     44       true ->
     45         wait_for_workload_finished(worker_context, taken)
     46     end
     47   end
     48 
     49   defp wait_for_workload_finished(worker_context, taken) do
     50     receive do
     51       {_spawned_pid, {:workload_finished, result}} ->
     52         # IO.puts("Finished #{workload}")
     53         new_worker_context = %{worker_context | results: [result | worker_context.results]}
     54 
     55         outer_loop(new_worker_context, taken - 1)
     56     end
     57   end
     58 
     59   defp inner_loop(worker_context, [], taken) do
     60     outer_loop(worker_context, taken)
     61   end
     62 
     63   defp inner_loop(worker_context, [workload | rest], taken) do
     64     spawn_fn = fn ->
     65       result = worker_context.work_fn.(workload)
     66 
     67       send_workload_finished_to_runner(worker_context, workload, result)
     68     end
     69 
     70     spawn_link(spawn_fn)
     71 
     72     inner_loop(worker_context, rest, taken + 1)
     73   end
     74 
     75   defmodule Server do
     76     @moduledoc false
     77     @timeout :infinity
     78 
     79     use GenServer
     80 
     81     def take_workloads(pid, count) do
     82       GenServer.call(pid, {:take_workloads, count}, @timeout)
     83     end
     84 
     85     #
     86     # Server
     87     #
     88 
     89     @impl true
     90     def init(workloads) do
     91       state = %{
     92         waiting: nil,
     93         workloads: workloads
     94       }
     95 
     96       {:ok, state}
     97     end
     98 
     99     @impl true
    100     def handle_call({:take_workloads, count}, from, %{waiting: nil} = state) do
    101       {:noreply, take_workloads(%{state | waiting: {from, count}})}
    102     end
    103 
    104     defp take_workloads(%{waiting: nil} = state) do
    105       state
    106     end
    107 
    108     defp take_workloads(%{waiting: {from, _count}, workloads: []} = state) do
    109       GenServer.reply(from, nil)
    110 
    111       %{state | waiting: nil}
    112     end
    113 
    114     defp take_workloads(%{workloads: []} = state) do
    115       state
    116     end
    117 
    118     defp take_workloads(%{waiting: {from, count}, workloads: workloads} = state) do
    119       {reply, workloads} = Enum.split(workloads, count)
    120 
    121       GenServer.reply(from, reply)
    122 
    123       %{state | workloads: workloads, waiting: nil}
    124     end
    125   end
    126 end