worker.ex (3129B)
1 defmodule Credo.Check.Worker do 2 @moduledoc false 3 4 @doc """ 5 Runs all members of `workloads` using ``. 6 """ 7 def run(workloads, max_concurrency, work_fn) do 8 {:ok, server_pid} = GenServer.start_link(__MODULE__.Server, workloads) 9 10 worker_context = %{ 11 runner_pid: self(), 12 server_pid: server_pid, 13 max_concurrency: max_concurrency, 14 work_fn: work_fn, 15 results: [] 16 } 17 18 outer_loop(worker_context, 0) 19 end 20 21 @doc """ 22 Called when a workload has finished. 23 """ 24 def send_workload_finished_to_runner(worker_context, _workload, result) do 25 send(worker_context.runner_pid, {self(), {:workload_finished, result}}) 26 end 27 28 defp outer_loop(worker_context, taken) do 29 available = worker_context.max_concurrency - taken 30 31 cond do 32 available <= 0 -> 33 wait_for_workload_finished(worker_context, taken) 34 35 taken_workloads = __MODULE__.Server.take_workloads(worker_context.server_pid, available) -> 36 inner_loop(worker_context, taken_workloads, taken) 37 38 # we fall thru here if there are no checks left 39 # there are two options: we are done ... 40 taken == 0 -> 41 {:ok, worker_context.results} 42 43 # ... or we need for the very last batch to finish up 44 true -> 45 wait_for_workload_finished(worker_context, taken) 46 end 47 end 48 49 defp wait_for_workload_finished(worker_context, taken) do 50 receive do 51 {_spawned_pid, {:workload_finished, result}} -> 52 # IO.puts("Finished #{workload}") 53 new_worker_context = %{worker_context | results: [result | worker_context.results]} 54 55 outer_loop(new_worker_context, taken - 1) 56 end 57 end 58 59 defp inner_loop(worker_context, [], taken) do 60 outer_loop(worker_context, taken) 61 end 62 63 defp inner_loop(worker_context, [workload | rest], taken) do 64 spawn_fn = fn -> 65 result = worker_context.work_fn.(workload) 66 67 send_workload_finished_to_runner(worker_context, workload, result) 68 end 69 70 spawn_link(spawn_fn) 71 72 inner_loop(worker_context, rest, taken + 1) 73 end 74 75 defmodule Server do 76 @moduledoc false 77 @timeout :infinity 78 79 use GenServer 80 81 def take_workloads(pid, count) do 82 GenServer.call(pid, {:take_workloads, count}, @timeout) 83 end 84 85 # 86 # Server 87 # 88 89 @impl true 90 def init(workloads) do 91 state = %{ 92 waiting: nil, 93 workloads: workloads 94 } 95 96 {:ok, state} 97 end 98 99 @impl true 100 def handle_call({:take_workloads, count}, from, %{waiting: nil} = state) do 101 {:noreply, take_workloads(%{state | waiting: {from, count}})} 102 end 103 104 defp take_workloads(%{waiting: nil} = state) do 105 state 106 end 107 108 defp take_workloads(%{waiting: {from, _count}, workloads: []} = state) do 109 GenServer.reply(from, nil) 110 111 %{state | waiting: nil} 112 end 113 114 defp take_workloads(%{workloads: []} = state) do 115 state 116 end 117 118 defp take_workloads(%{waiting: {from, count}, workloads: workloads} = state) do 119 {reply, workloads} = Enum.split(workloads, count) 120 121 GenServer.reply(from, reply) 122 123 %{state | workloads: workloads, waiting: nil} 124 end 125 end 126 end