httpc.ex (3461B)
1 defmodule Zenflows.HTTPC do 2 @moduledoc """ 3 An HTTP client implemented for Zenswarm and Restroom. 4 """ 5 use GenServer 6 7 require Logger 8 9 alias Mint.HTTP 10 11 defstruct [:conn, conn_info: {}, requests: %{}] 12 13 def start_link(opts) do 14 scheme = Keyword.fetch!(opts, :scheme) 15 host = Keyword.fetch!(opts, :host) 16 port = Keyword.fetch!(opts, :port) 17 name = Keyword.fetch!(opts, :name) 18 GenServer.start_link(__MODULE__, {scheme, host, port}, name: name) 19 end 20 21 @spec request(term(), term(), term(), term()) 22 :: {:ok, term()} | {:error, term()} 23 def request(name, method, path, headers \\ [], body \\ nil, max \\ 5) do 24 headers = 25 case :lists.keyfind("user-agent", 1, headers) do 26 {"user-agent", _} -> headers 27 false -> [{"user-agent", "zenflows/#{Application.spec(:zenflows, :vsn)}"} | headers] 28 end 29 Enum.reduce_while(1..max, nil, fn x, _ -> 30 case GenServer.call(name, {:request, method, path, headers, body}) do 31 {:ok, result} -> 32 {:halt, {:ok, result}} 33 {:error_conn, reason} -> 34 if x != max, do: Process.sleep(5) 35 {:cont, {:error, reason}} 36 {:error_req, reason} -> 37 {:halt, {:error, reason}} 38 end 39 end) 40 end 41 42 @impl true 43 def init({scheme, host, port}) do 44 {:ok, %__MODULE__{conn_info: {scheme, host, port}}} 45 end 46 47 @impl true 48 def handle_call({:request, method, path, headers, body}, from, state) do 49 if state.conn && HTTP.open?(state.conn) do 50 {:ok, state} 51 else 52 {scheme, host, port} = state.conn_info 53 case HTTP.connect(scheme, host, port) do 54 {:ok, conn} -> 55 state = put_in(state.conn, conn) 56 {:ok, state} 57 {:error, reason} -> 58 {:error, reason} 59 end 60 end 61 |> case do 62 {:ok, state} -> 63 case HTTP.request(state.conn, method, path, headers, body) do 64 {:ok, conn, request_ref} -> 65 state = put_in(state.conn, conn) 66 state = put_in(state.requests[request_ref], %{from: from, response: %{}}) 67 {:noreply, state} 68 69 {:error, conn, reason} -> 70 state = put_in(state.conn, conn) 71 {:reply, {:error_req, reason}, state} 72 end 73 {:error, reason} -> 74 {:reply, {:error_conn, reason}, state} 75 end 76 end 77 78 @impl true 79 def handle_info(message, state) do 80 case HTTP.stream(state.conn, message) do 81 :unknown -> 82 _ = Logger.error(fn -> "Received unknown message: " <> inspect(message) end) 83 {:noreply, state} 84 85 {:ok, conn, responses} -> 86 state = put_in(state.conn, conn) 87 state = Enum.reduce(responses, state, &process_response/2) 88 89 {:noreply, state} 90 {:error, conn, _reason, responses} -> 91 state = put_in(state.conn, conn) 92 # Send a response to all the succesful request 93 state = Enum.reduce(responses, state, &process_response/2) 94 95 {:noreply, state} 96 end 97 end 98 99 defp process_response({:status, request_ref, status}, state) do 100 put_in(state.requests[request_ref].response[:status], status) 101 end 102 103 defp process_response({:headers, request_ref, headers}, state) do 104 put_in(state.requests[request_ref].response[:headers], headers) 105 end 106 107 defp process_response({:data, request_ref, new_data}, state) do 108 update_in(state.requests[request_ref].response[:data], fn data -> [(data || ""), new_data] end) 109 end 110 111 defp process_response({:error, request_ref, error}, state) do 112 update_in(state.requests[request_ref].response[:error], error) 113 end 114 115 defp process_response({:done, request_ref}, state) do 116 state = update_in(state.requests[request_ref].response[:data], &IO.iodata_to_binary/1) 117 {%{response: response, from: from}, state} = pop_in(state.requests[request_ref]) 118 GenServer.reply(from, {:ok, response}) 119 state 120 end 121 end