zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

httpc.ex (3461B)


      1 defmodule Zenflows.HTTPC do
      2 @moduledoc """
      3 An HTTP client implemented for Zenswarm and Restroom.
      4 """
      5 use GenServer
      6 
      7 require Logger
      8 
      9 alias Mint.HTTP
     10 
     11 defstruct [:conn, conn_info: {}, requests: %{}]
     12 
     13 def start_link(opts) do
     14 	scheme = Keyword.fetch!(opts, :scheme)
     15 	host = Keyword.fetch!(opts, :host)
     16 	port = Keyword.fetch!(opts, :port)
     17 	name = Keyword.fetch!(opts, :name)
     18 	GenServer.start_link(__MODULE__, {scheme, host, port}, name: name)
     19 end
     20 
     21 @spec request(term(), term(), term(), term())
     22 	:: {:ok, term()} | {:error, term()}
     23 def request(name, method, path, headers \\ [], body \\ nil, max \\ 5) do
     24 	headers =
     25 		case :lists.keyfind("user-agent", 1, headers) do
     26 		  {"user-agent", _} -> headers
     27 		  false -> [{"user-agent", "zenflows/#{Application.spec(:zenflows, :vsn)}"} | headers]
     28 		end
     29 	Enum.reduce_while(1..max, nil, fn x, _ ->
     30 		case GenServer.call(name, {:request, method, path, headers, body}) do
     31 			{:ok, result} ->
     32 				{:halt, {:ok, result}}
     33 			{:error_conn, reason} ->
     34 				if x != max, do: Process.sleep(5)
     35 				{:cont, {:error, reason}}
     36 			{:error_req, reason} ->
     37 				{:halt, {:error, reason}}
     38 		end
     39 	end)
     40 end
     41 
     42 @impl true
     43 def init({scheme, host, port}) do
     44 	{:ok, %__MODULE__{conn_info: {scheme, host, port}}}
     45 end
     46 
     47 @impl true
     48 def handle_call({:request, method, path, headers, body}, from, state) do
     49 	if state.conn && HTTP.open?(state.conn) do
     50 		{:ok, state}
     51 	else
     52 		{scheme, host, port} = state.conn_info
     53 		case HTTP.connect(scheme, host, port) do
     54 			{:ok, conn} ->
     55 		 		state = put_in(state.conn, conn)
     56 				{:ok, state}
     57 			{:error, reason} ->
     58 				{:error, reason}
     59 		end
     60 	end
     61 	|> case do
     62 		{:ok, state} ->
     63 			case HTTP.request(state.conn, method, path, headers, body) do
     64 				{:ok, conn, request_ref} ->
     65 					state = put_in(state.conn, conn)
     66 					state = put_in(state.requests[request_ref], %{from: from, response: %{}})
     67 					{:noreply, state}
     68 
     69 				{:error, conn, reason} ->
     70 					state = put_in(state.conn, conn)
     71 					{:reply, {:error_req, reason}, state}
     72 			end
     73 		{:error, reason} ->
     74 			{:reply, {:error_conn, reason}, state}
     75 	end
     76 end
     77 
     78 @impl true
     79 def handle_info(message, state) do
     80 	case HTTP.stream(state.conn, message) do
     81 		:unknown ->
     82 			_ = Logger.error(fn -> "Received unknown message: " <> inspect(message) end)
     83 			{:noreply, state}
     84 
     85 		{:ok, conn, responses} ->
     86 			state = put_in(state.conn, conn)
     87 			state = Enum.reduce(responses, state, &process_response/2)
     88 
     89 			{:noreply, state}
     90 		{:error, conn, _reason, responses} ->
     91 			state = put_in(state.conn, conn)
     92 			# Send a response to all the succesful request
     93 			state = Enum.reduce(responses, state, &process_response/2)
     94 
     95 			{:noreply, state}
     96 	end
     97 end
     98 
     99 defp process_response({:status, request_ref, status}, state) do
    100 	put_in(state.requests[request_ref].response[:status], status)
    101 end
    102 
    103 defp process_response({:headers, request_ref, headers}, state) do
    104 	put_in(state.requests[request_ref].response[:headers], headers)
    105 end
    106 
    107 defp process_response({:data, request_ref, new_data}, state) do
    108 	update_in(state.requests[request_ref].response[:data], fn data -> [(data || ""), new_data] end)
    109 end
    110 
    111 defp process_response({:error, request_ref, error}, state) do
    112 	update_in(state.requests[request_ref].response[:error], error)
    113 end
    114 
    115 defp process_response({:done, request_ref}, state) do
    116 	state = update_in(state.requests[request_ref].response[:data], &IO.iodata_to_binary/1)
    117 	{%{response: response, from: from}, state} = pop_in(state.requests[request_ref])
    118 	GenServer.reply(from, {:ok, response})
    119 	state
    120 end
    121 end