defmodule Finch do @external_resource "README.md" @moduledoc "README.md" |> File.read!() |> String.split("") |> Enum.fetch!(1) alias Finch.{Pool, Request, Response} require Finch.Pool.Manager use Supervisor @default_pool_size 50 @default_pool_count 1 @default_connect_timeout 5_000 @pool_config_schema [ protocols: [ type: {:list, {:in, [:http1, :http2]}}, doc: """ The type of connections to support. If using `:http1` only, an HTTP1 pool without multiplexing is used. \ If using `:http2` only, an HTTP2 pool with multiplexing is used. \ If both are listed, then both HTTP1/HTTP2 connections are \ supported (via ALPN), but there is no multiplexing. """, default: [:http1] ], count: [ type: :pos_integer, doc: """ How many **shards** to start for this pool key. - **HTTP/1**: Each shard is a `NimblePool`. HTTP/1 shards are able to re-use connections in the same shard and establish new ones only when necessary. A higher `:count` under moderate traffic scatters work so idle connections stay **per shard**, which **reduces HTTP/1 connection reuse**. Prefer the **lowest `:count`** that still meets latency and throughput; raise **`count`** when you see checkout queue timeouts or heavy load on one shard. Use `t:pool_metrics/0`, `get_pool_status/2`, and Finch telemetry to inspect connections per shard. - **HTTP/2**: Each shard is a single connection process, able to multiplex requests. Shards register under the same registry key, so increasing `:count` spreads concurrent load across more processes and can relieve pressure when a **single** pool process (message handling, socket operations) becomes the bottleneck. Prefer the **lowest `:count`** unless one shard is the limit; raise **`count`** when telemetry or `get_pool_status/2` shows a shard consistently hot (e.g. high `in_flight_requests`). When `:count` > 1, **`:pool_strategy`** selects the shard per request—see [Multiple shards (`count` > 1) and `:pool_strategy`](#start_link/1-multiple-shards-count-1-and-pool_strategy). """, default: @default_pool_count ], size: [ type: :pos_integer, doc: """ It is the maximum number of HTTP/1 connections per pool **shard**. Connections are opened **lazily** up to this cap; the value is an upper bound, not a reservation. When every connection in that shard is busy, further requests wait in the checkout queue until one is returned or `:pool_timeout` (see `request/3`) is exceeded. This applies only to HTTP/1 pools. For HTTP/2, this setting is ignored. A single connection multiplexes streams per pool process; use `:count` for more HTTP/2 connections in parallel. Combined with `:count`, the upper bound on concurrent HTTP/1 connections to one origin is roughly **`count * size`**. Actual open connections may be lower. """, default: @default_pool_size ], conn_opts: [ type: :keyword_list, doc: """ These options are passed to `Mint.HTTP.connect/4` whenever a new connection is established. \ `:mode` is not configurable as Finch must control this setting. Typically these options are \ used to configure proxying, https settings, or connect timeouts. """, default: [] ], pool_max_idle_time: [ type: :timeout, doc: """ The maximum number of milliseconds that a pool can be idle before being terminated, used only by HTTP1 pools. \ This options is forwarded to NimblePool and it starts and idle verification cycle that may impact \ performance if misused. For instance setting a very low timeout may lead to pool restarts. \ For more information see NimblePool's `handle_ping/2` documentation. """, default: :infinity ], conn_max_idle_time: [ type: :timeout, doc: """ The maximum number of milliseconds an HTTP1 connection is allowed to be idle \ before being closed during a checkout attempt. """, default: :infinity ], start_pool_metrics?: [ type: :boolean, doc: "When true, pool metrics will be collected and available through `get_pool_status/2`", default: false ], http2: [ type: :keyword_list, doc: "HTTP/2-specific options. Only relevant when `protocols` includes `:http2`.", default: [ wait_for_server_settings?: false, ping_interval: :infinity, max_connection_age: :infinity, max_connection_age_jitter: 0 ], keys: [ wait_for_server_settings?: [ type: :boolean, doc: """ When true, the pool does not send any request until the server's SETTINGS frame \ has been received and applied. If a request arrives before that, it fails with \ `Finch.Error` reason `:connection_not_ready` (callers should retry). When false, \ behaviour is unchanged and requests may be sent before SETTINGS. """, default: false ], ping_interval: [ type: :timeout, default: :infinity, doc: """ Interval in milliseconds between HTTP/2 PING frames sent to keep the connection \ alive. The timer resets on any connection activity, so PINGs are only sent after \ the connection has been idle for this duration. When set to `:infinity` (default), \ no PINGs are sent. """ ], max_connection_age: [ type: :timeout, doc: """ Maximum lifetime in milliseconds for an HTTP/2 connection before it is gracefully \ drained and replaced with a fresh one. When the timer expires the pool unregisters \ from the Registry (so new requests go to a fresh connection), finishes any in-flight \ requests, then terminates normally — the supervisor restarts it with a new DNS lookup. \ Useful for Kubernetes headless-service load balancing where DNS entries rotate. \ Defaults to `:infinity` (no age limit). """, default: :infinity ], max_connection_age_jitter: [ type: :non_neg_integer, doc: """ Random jitter in milliseconds added to `:max_connection_age`. Prevents multiple \ pool shards from draining simultaneously (thundering-herd). The actual age used is \ `max_connection_age + :rand.uniform(max_connection_age_jitter)`. \ Defaults to `0` (no jitter). """, default: 0 ] ] ] ] @typedoc """ The `:name` provided to Finch in `start_link/1`. """ @type name() :: atom() @type scheme() :: :http | :https @type scheme_host_port() :: {scheme(), host :: String.t(), port :: :inet.port_number()} @type pool_identifier() :: url :: String.t() | scheme_host_port() | Finch.Pool.t() @typedoc """ Pool metrics returned by `get_pool_status/2` for a single pool. """ @type pool_metrics() :: [Finch.HTTP1.PoolMetrics.t()] | [Finch.HTTP2.PoolMetrics.t()] @typedoc """ Pool metrics grouped by pool identifier when querying the `:default` configuration. """ @type default_pool_metrics() :: %{required(Finch.Pool.t()) => pool_metrics()} @type request_opt() :: {:pool_timeout, timeout()} | {:receive_timeout, timeout()} | {:request_timeout, timeout()} | {:pool_strategy, pool_strategy()} @typedoc """ Pool strategy to choose a pool from a list of pools. * `{module, state}` - a module implementing `Finch.Pool.Strategy` (e.g. `{Finch.Pool.Strategy.RoundRobin, counter}`) * `{&module.select/2, state}` - same as above but avoids dynamic dispatch; use for performance-critical paths * `module` - a module implementing `Finch.Pool.Strategy` (e.g. `Finch.Pool.Strategy.Random`) that needs no state: `nil` will be passed as a default * a 1-arity function `fn entries -> chosen end` where `entries` is `nonempty_list(term())` """ @type pool_strategy_state() :: term() @type pool_strategy_module() :: module() @type pool_strategy_module_with_state() :: {module(), pool_strategy_state()} @type pool_strategy_fun() :: (nonempty_list(term()) -> term()) @type pool_strategy_fun_with_state() :: {(nonempty_list(term()), pool_strategy_state() -> term()), pool_strategy_state()} @type pool_strategy() :: pool_strategy_fun() | pool_strategy_fun_with_state() | pool_strategy_module_with_state() | pool_strategy_module() @typedoc """ Options used by request functions. """ @type request_opts() :: [request_opt()] @typedoc """ Errors returned by Finch request functions. """ @type error() :: Finch.Error.t() | Finch.HTTPError.t() | Finch.TransportError.t() @doc """ Returns true if the term is any Finch error struct (`Finch.error()`). """ defguard is_finch_error(term) when is_struct(term, Finch.Error) or is_struct(term, Finch.HTTPError) or is_struct(term, Finch.TransportError) @typedoc """ The reference used to identify a request sent using `async_request/3`. Use the `is_request_ref/1` guard when matching on async response messages in `c:GenServer.handle_info/2` or similar callbacks to ensure your code keeps working if the internal structure of the reference changes. """ @opaque request_ref() :: Finch.Pool.Manager.request_ref() @doc """ A guard that returns true if `ref` is a valid request reference from `async_request/3`. Use this guard when matching on async response messages in `c:GenServer.handle_info/2` so your code remains valid if the internal structure of the reference changes. ## Example require Finch def handle_info({ref, response}, state) when Finch.is_request_ref(ref) do # handle async response from Finch.async_request/3 end """ defguard is_request_ref(ref) when Finch.Pool.Manager.is_request_ref(ref) @typedoc """ The stream function given to `stream/5`. """ @type stream(acc) :: ({:status, integer} | {:headers, Mint.Types.headers()} | {:data, binary} | {:trailers, Mint.Types.headers()}, acc -> acc) @typedoc """ The stream function given to `stream_while/5`. """ @type stream_while(acc) :: ({:status, integer} | {:headers, Mint.Types.headers()} | {:data, binary} | {:trailers, Mint.Types.headers()}, acc -> {:cont, acc} | {:halt, acc}) @typedoc """ The request body function used with `{:stream, req_body_fun}` in `build/5`. """ @type req_body_fun(acc) :: (acc -> {:data, binary(), acc} | {:done, acc} | {:halt, acc}) @doc """ Start an instance of Finch. ## Options * `:name` - The name of your Finch instance. Required. * `:pools` - A map of pool identifiers to configuration options. See the ":pools" subsection below. ### :name The name of your Finch instance. It is used to identify the instance when making requests and when calling other functions like `Finch.start_pool/3` or `Finch.get_pool_status/2`. #### Examples Finch.start_link(name: MyFinch) ### :pools A map where each key identifies a pool and each value is a keyword list of pool configuration options (see "[Pool Configuration Options](#start_link/1-pool-configuration-options)" below). Default is `%{default: [size: #{@default_pool_size}, count: #{@default_pool_count}]}`. **Pool keys** may be: * URL string – A binary URL. Pools created from URLs use the `:default` tag unless you use a `t:Finch.Pool.t/0` struct as the key instead. * `t:Finch.Pool.t/0` struct – Created with `Finch.Pool.new/2`. Use this when you need tagged pools (e.g. to run multiple pools for the same host with different configs). * URL string with `http+unix://` or `https+unix://` – For Unix domain sockets (e.g. `"http+unix:///tmp/socket"`) * `:default` – Catch-all. Any request whose pool is not in the map will use this config when its pool is started. When making a request with a `:pool_tag` option, that tag must exist in your pool configuration. If it does not, the request uses the `:default` configuration. #### Examples # URL keys (pool uses :default tag) Finch.start_link( name: MyFinch, pools: %{ "https://api.example.com" => [size: 10, count: 2] } ) # Tagged pools via Finch.Pool.new/2 Finch.start_link( name: MyFinch, pools: %{ Finch.Pool.new("https://api.example.com", tag: :bulk) => [size: 100, count: 1], Finch.Pool.new("https://api.example.com", tag: :realtime) => [size: 10, count: 2] } ) # Unix socket Finch.start_link( name: MyFinch, pools: %{ "http+unix:///tmp/socket" => [size: 5] } ) # Custom default configuration Finch.start_link( name: MyFinch, pools: %{ :default => [size: 25, count: 2] } ) ### Multiple shards (`count` > 1) and `:pool_strategy` When `:count` is greater than 1, Finch starts that many shards for the same pool. Each request must pick one shard. **By default** Finch picks uniformly at random, which matches `Finch.Pool.Strategy.Random`. You can override selection per request with the `:pool_strategy` option (see `t:pool_strategy/0` and `Finch.Pool.Strategy`). Built-in modules include `Finch.Pool.Strategy.RoundRobin` and `Finch.Pool.Strategy.Hash` (stable mapping from a key to a shard, useful for affinity). You can also pass a custom module or function. ### Pool Configuration Options #{NimbleOptions.docs(@pool_config_schema)} """ def start_link(opts) do name = finch_name!(opts) pools = Keyword.get(opts, :pools, []) |> pool_options!() {default_pool_config, pools} = Map.pop(pools, :default) config = %{ registry_name: name, registry_listeners: Keyword.get(opts, :registry_listeners, []), supervisor_name: Pool.Manager.supervisor_name(name), supervisor_registry_name: Pool.Manager.supervisor_registry_name(name), default_pool_config: default_pool_config, pools: pools } Supervisor.start_link(__MODULE__, config, name: concat_name(name, "Supervisor")) end @doc """ Finds a pool by its configuration and returns the pool pid. Returns `{:ok, pid}` if the pool exists, `:error` otherwise. This is useful for checking if a pool is available before making requests, or for advanced use cases where you need direct access to the pool process. ## Example case Finch.find_pool(MyFinch, Finch.Pool.new("https://api.internal", tag: :api)) do {:ok, pid} -> # Pool exists :error -> # Pool not found end """ @spec find_pool(name(), Finch.Pool.t()) :: {:ok, pid()} | :error def find_pool(name, %Finch.Pool{} = pool) do case Pool.Manager.get_pool(name, pool, %{start_pool?: false}) do {pid, _mod} -> {:ok, pid} _ -> :error end end @doc """ Starts a pool dynamically under Finch's internal supervision tree. Returns `:ok` if the pool was started or already exists. ## Options Same pool configuration options as `Finch.start_link/1`: `:size`, `:count`, `:protocols`, `:conn_opts`, etc. ## Example Finch.start_pool(MyFinch, Finch.Pool.new("https://api.example.com", tag: :api), size: 10) """ @spec start_pool(name(), Finch.Pool.t(), keyword()) :: :ok def start_pool(name, pool, opts \\ []) def start_pool(name, %Finch.Pool{} = pool, opts) do # Avoid building the child_spec (cast_pool_opts, sanitize, etc.) if the pool already exists if Process.whereis(name) do supervisor_registry_name = Pool.Manager.supervisor_registry_name(name) case Pool.Manager.get_pool(supervisor_registry_name, pool, %{start_pool?: false}) do :not_found -> spec = Finch.Pool.child_spec([finch: name, pool: pool] ++ opts) supervisor_name = Pool.Manager.supervisor_name(name) case DynamicSupervisor.start_child(supervisor_name, spec) do {:ok, _pid} -> :ok {:error, {:already_started, _pid}} -> :ok end _ -> :ok end else raise ArgumentError, "Finch instance #{inspect(name)} is not running" end end def child_spec(opts) do %{ id: finch_name!(opts), start: {__MODULE__, :start_link, [opts]} } end @impl true def init(config) do Finch.PoolMetrics.new(config.registry_name) children = [ {Registry, keys: :duplicate, name: config.registry_name, listeners: config.registry_listeners, meta: [config: config]}, {Registry, keys: :unique, name: config.supervisor_registry_name}, {DynamicSupervisor, name: config.supervisor_name, strategy: :one_for_one}, {Pool.Manager, config} ] Supervisor.init(children, strategy: :one_for_all) end defp finch_name!(opts) do Keyword.get(opts, :name) || raise(ArgumentError, "must supply a name") end defp pool_options!(pools) do {:ok, default} = NimbleOptions.validate([], @pool_config_schema) Enum.reduce(pools, %{default: valid_opts_to_map(default)}, fn {destination, opts}, acc -> with {:ok, valid_destination} <- cast_destination(destination), {:ok, valid_pool_opts} <- cast_pool_opts(opts) do Map.put(acc, valid_destination, valid_pool_opts) else {:error, reason} -> raise reason end end) end defp cast_destination(destination) do case destination do :default -> {:ok, destination} %Finch.Pool{} = pool -> {:ok, pool} {scheme, {:local, path}} when is_atom(scheme) and is_binary(path) -> ## TODO: Remove in Finch v1.0 IO.warn(""" Using {scheme, {:local, path}} as a pool key is deprecated. Use "#{scheme}+unix://#{path}" instead. For example: pools: %{ "#{scheme}+unix://#{path}" => ... } """) {:ok, Finch.Pool.new({scheme, {:local, path}})} url when is_binary(url) -> {:ok, Finch.Pool.new(url)} _ -> {:error, %ArgumentError{message: "invalid destination: #{inspect(destination)}"}} end end @doc false def cast_pool_opts(opts) do with {:ok, valid} <- NimbleOptions.validate(opts, @pool_config_schema) do {:ok, valid_opts_to_map(valid)} end end defp valid_opts_to_map(valid) do # We need to enable keepalive and set the nodelay flag to true by default. transport_opts = valid |> get_in([:conn_opts, :transport_opts]) |> List.wrap() |> Keyword.put_new(:timeout, @default_connect_timeout) |> Keyword.put_new(:nodelay, true) |> Keyword.put(:keepalive, true) conn_opts = valid[:conn_opts] |> List.wrap() # Only relevant to HTTP2, but just gracefully ignored in HTTP1. # Since we cannot handle server push responses, we need to disable the feature. client_settings = conn_opts |> Keyword.get(:client_settings, []) |> Keyword.put(:enable_push, false) ssl_key_log_file = Keyword.get(conn_opts, :ssl_key_log_file) || System.get_env("SSLKEYLOGFILE") ssl_key_log_file_device = ssl_key_log_file && File.open!(ssl_key_log_file, [:append]) conn_opts = conn_opts |> Keyword.put(:ssl_key_log_file_device, ssl_key_log_file_device) |> Keyword.put(:transport_opts, transport_opts) |> Keyword.put(:protocols, valid[:protocols]) |> Keyword.put(:client_settings, client_settings) mod = if :http1 in valid[:protocols] do Finch.HTTP1.Pool else Finch.HTTP2.Pool end %{ mod: mod, size: valid[:size], count: valid[:count], conn_opts: conn_opts, conn_max_idle_time: to_native(valid[:conn_max_idle_time]), pool_max_idle_time: valid[:pool_max_idle_time], start_pool_metrics?: valid[:start_pool_metrics?], wait_for_server_settings?: valid[:http2][:wait_for_server_settings?], ping_interval: valid[:http2][:ping_interval], max_connection_age: valid[:http2][:max_connection_age], max_connection_age_jitter: valid[:http2][:max_connection_age_jitter] } end defp to_native(:infinity), do: :infinity defp to_native(time), do: System.convert_time_unit(time, :millisecond, :native) defp concat_name(name, suffix), do: :"#{name}.#{suffix}" defmacrop request_span(request, name, do: block) do quote do start_meta = %{request: unquote(request), name: unquote(name)} Finch.Telemetry.span(:request, start_meta, fn -> result = unquote(block) end_meta = Map.put(start_meta, :result, result) {result, end_meta} end) end end @doc """ Builds an HTTP request to be sent with `request/3`, `async_request/3`, `stream/5`, or `stream_while/5`. Request `body` can be one of: * `nil` - no body is sent with the request. * `iodata` - the body to send for the request. * `{:stream, enumerable}` - stream request body chunks emitted by an `Enumerable`. * `{:stream, req_body_fun}` - stream request body chunks emitted by `req_body_fun`. Can only be used with `Finch.stream_while/5` on HTTP/1 pools. See `Finch.stream_while/5` for more information. ## Options * `:unix_socket` - Path to a Unix domain socket to connect to instead of the URL host/port. The URL scheme still determines whether HTTP or HTTPS is used. * `:pool_tag` - The tag to use when selecting which pool to use for this request. Defaults to `:default`. """ @spec build( Request.method(), Request.url(), Request.headers(), Request.body(), Request.build_opts() ) :: Request.t() defdelegate build(method, url, headers \\ [], body \\ nil, opts \\ []), to: Request @doc """ Streams an HTTP request and returns the accumulator. `resp_fun` receives a response entry and the accumulator `acc`, and must return the updated accumulator. Response entries are: * `{:status, status}` - the HTTP response status * `{:headers, headers}` - the HTTP response headers * `{:data, data}` - the HTTP response body chunk * `{:trailers, trailers}` - the HTTP response trailers See also `request/3`, `stream_while/5`. > ### HTTP2 streaming and back-pressure {: .warning} > > At the moment, streaming over HTTP2 connections do not provide > any back-pressure mechanism: this means the response will be > sent to the client as quickly as possible. Therefore, you must > not use streaming over HTTP2 for non-terminating responses or > when streaming large responses which you do not intend to keep > in memory. > ### Connection draining {: .info} > > If the HTTP/2 pool this request is dispatched to is currently draining (see > `http2: [max_connection_age: ...]`), the request is automatically retried on a fresh > pool. The retry is transparent to the caller. See `async_request/3` for the async > variant, which does not retry automatically. ## Options Shares options with `request/3`. ## Examples path = "/tmp/archive.zip" file = File.open!(path, [:write, :exclusive]) url = "https://example.com/archive.zip" request = Finch.build(:get, url) Finch.stream(request, MyFinch, nil, fn {:status, status}, _acc -> IO.inspect(status) {:headers, headers}, _acc -> IO.inspect(headers) {:data, data}, _acc -> IO.binwrite(file, data) end) File.close(file) """ @spec stream(Request.t(), name(), acc, stream(acc), request_opts()) :: {:ok, acc} | {:error, Finch.error(), acc} when acc: term() def stream(%Request{} = req, name, acc, fun, opts \\ []) when is_function(fun, 2) do validate_no_req_body_fun!(req, "Finch.stream/5") fun = fn entry, acc -> {:cont, fun.(entry, acc)} end stream_while(req, name, acc, fun, opts) end @doc """ Streams an HTTP request until it finishes or is cancelled. ## Request body streaming When the request body is set to `{:stream, req_body_fun}` (see `build/5`), `req_body_fun` receives the accumulator `acc` and must return one of: * `{:data, chunk, acc}` - emit request body `chunk` and continue streaming * `{:done, acc}` - request body is done, `acc` is passed to `resp_fun` * `{:halt, acc}` - cancel the request and close the connection `{:stream, req_body_fun}` is currently only supported on HTTP/1 pools. ## Response streaming `resp_fun` receives a response entry and the accumulator `acc`, and must return one of: * `{:cont, acc}` - continue streaming * `{:halt, acc}` - cancel the request. On HTTP/1, this also closes the connection Response entries are: * `{:status, status}` - the HTTP response status * `{:headers, headers}` - the HTTP response headers * `{:data, data}` - the HTTP response body chunk * `{:trailers, trailers}` - the HTTP response trailers See also `request/3`, `stream/5`. > ### HTTP2 streaming and back-pressure {: .warning} > > At the moment, streaming over HTTP2 connections do not provide > any back-pressure mechanism: this means the response will be > sent to the client as quickly as possible. Therefore, you must > not use streaming over HTTP2 for non-terminating responses or > when streaming large responses which you do not intend to keep > in memory. > ### Connection draining {: .info} > > If the HTTP/2 pool this request is dispatched to is currently draining (see > `http2: [max_connection_age: ...]`), the request is automatically retried on a fresh > pool. The retry is transparent to the caller. See `async_request/3` for the async > variant, which does not retry automatically. ## Options Shares options with `request/3`. ## Examples path = "/tmp/archive.zip" file = File.open!(path, [:write, :exclusive]) request = Finch.build(:get, "https://example.com/archive.zip") Finch.stream_while(request, MyFinch, nil, fn {:status, status}, acc -> IO.inspect(status) {:cont, acc} {:headers, headers}, acc -> IO.inspect(headers) {:cont, acc} {:data, data}, acc -> IO.binwrite(file, data) {:cont, acc} end) File.close(file) Uploading a file using `req_body_fun`: file = File.open!("/tmp/archive.zip", [:read]) req_body_fun = fn file -> case IO.binread(file, 4096) do :eof -> {:done, file} data -> {:data, data, file} end end request = Finch.build(:post, "https://example.com/upload", [], {:stream, req_body_fun}) resp_fun = fn {:status, status}, acc -> IO.inspect(status) {:cont, acc} {:headers, headers}, acc -> IO.inspect(headers) {:cont, acc} {:data, data}, acc -> IO.inspect(data) {:cont, acc} end {:ok, file} = Finch.stream_while(request, MyFinch, file, resp_fun) File.close(file) """ @spec stream_while(Request.t(), name(), acc, stream_while(acc), request_opts()) :: {:ok, acc} | {:error, Finch.error(), acc} when acc: term() def stream_while(%Request{} = req, name, acc, fun, opts \\ []) when is_function(fun, 2) do request_span req, name do __stream__(req, name, acc, fun, opts) end end defp __stream__(req, name, acc, fun, opts, retries \\ 3) defp __stream__(%Request{} = req, name, acc, fun, opts, retries) do case get_pool(req, name, opts) do {pool, pool_mod} -> case pool_mod.request(pool, req, acc, fun, name, opts) do {:error, %Finch.Error{reason: :read_only}, _acc} when retries > 0 -> __stream__(req, name, acc, fun, opts, retries - 1) other -> other end _ -> {:error, Finch.Error.exception(:pool_not_available), acc} end end @doc """ Sends an HTTP request and returns a `Finch.Response` struct. It can still raise exceptions if it was not possible to check out a connection in the given `:pool_timeout`. See also `stream/5`. > ### Connection draining {: .info} > > If the HTTP/2 pool this request is dispatched to is currently draining (see > `http2: [max_connection_age: ...]`), the request is automatically retried on a fresh > pool. The retry is transparent to the caller. See `async_request/3` for the async > variant, which does not retry automatically. ## Options * `:pool_timeout` - This timeout is applied when we check out a connection from the pool. Default value is `5_000`. * `:receive_timeout` - The maximum time to wait for each chunk to be received before returning an error. Default value is `15_000`. * `:request_timeout` - The amount of time to wait for a complete response before returning an error. This timeout only applies to HTTP/1, and its current implementation is a best effort timeout, it does not guarantee the call will return precisely when the time has elapsed. Default value is `:infinity`. * `:pool_strategy` - When the pool has multiple shards (`count: N`), selects which shards handles the request. Default is random selection. See `t:pool_strategy/0` for details. """ @spec request(Request.t(), name(), request_opts()) :: {:ok, Response.t()} | {:error, Finch.error()} def request(req, name, opts \\ []) def request(%Request{} = req, name, opts) do validate_no_req_body_fun!(req, "Finch.request/3") Keyword.validate!(opts, [:pool_timeout, :receive_timeout, :request_timeout, :pool_strategy]) request_span req, name do acc = {nil, [], [], []} fun = fn {:status, value}, {_, headers, body, trailers} -> {:cont, {value, headers, body, trailers}} {:headers, value}, {status, headers, body, trailers} -> {:cont, {status, headers ++ value, body, trailers}} {:data, value}, {status, headers, body, trailers} -> {:cont, {status, headers, [body | value], trailers}} {:trailers, value}, {status, headers, body, trailers} -> {:cont, {status, headers, body, trailers ++ value}} end case __stream__(req, name, acc, fun, opts) do {:ok, {status, headers, body, trailers}} -> {:ok, %Response{ status: status, headers: headers, body: IO.iodata_to_binary(body), trailers: trailers }} {:error, error, _acc} -> {:error, error} end end end @doc """ Sends an HTTP request and returns a `Finch.Response` struct or raises an exception in case of failure. See `request/3` for more detailed information. """ @spec request!(Request.t(), name(), request_opts()) :: Response.t() def request!(%Request{} = req, name, opts \\ []) do case request(req, name, opts) do {:ok, resp} -> resp {:error, exception} -> raise exception end end @doc """ Sends an HTTP request asynchronously, returning a request reference. If the request is sent using HTTP1, an extra process is spawned to consume messages from the underlying socket. The messages are sent to the current process as soon as they arrive, as a firehose. If you wish to maximize request rate or have more control over how messages are streamed, a strategy using `request/3` or `stream/5` should be used instead. ## Receiving the response Response information is sent to the calling process as it is received in `{ref, response}` tuples. If the calling process exits before the request has completed, the request will be canceled. Responses include: * `{:status, status}` - HTTP response status * `{:headers, headers}` - HTTP response headers * `{:data, data}` - section of the HTTP response body * `{:error, exception}` - an error occurred during the request * `:done` - request has completed successfully On a successful request, a single `:status` message will be followed by a single `:headers` message, after which more than one `:data` messages may be sent. If trailing headers are present, a final `:headers` message may be sent. Any `:done` or `:error` message indicates that the request has succeeded or failed and no further messages are expected. ## Example iex> req = Finch.build(:get, "https://httpbin.org/stream/5") iex> ref = Finch.async_request(req, MyFinch) iex> flush() {ref, {:status, 200}} {ref, {:headers, [...]}} {ref, {:data, "..."}} {ref, :done} > ### Connection draining {: .info} > > Unlike `request/3` and `stream/5`, async requests are not automatically retried when a > pool is draining (see `http2: [max_connection_age: ...]`). If the caller receives > `{ref, {:error, %Finch.Error{reason: :read_only}}}`, it should retry by calling > `async_request/3` again. ## Options Shares options with `request/3`. """ @spec async_request(Request.t(), name(), request_opts()) :: request_ref() def async_request(%Request{} = req, name, opts \\ []) do validate_no_req_body_fun!(req, "Finch.async_request/3") case get_pool(req, name, opts) do {pool, pool_mod} -> pool_mod.async_request(pool, req, name, opts) _ -> raise Finch.Error, :pool_not_available end end @doc """ Cancels a request sent with `async_request/3`. """ @spec cancel_async_request(request_ref()) :: :ok def cancel_async_request(request_ref) when Finch.Pool.Manager.is_request_ref(request_ref) do {pool_mod, _cancel_ref} = request_ref pool_mod.cancel_async_request(request_ref) end defp validate_no_req_body_fun!(%Request{body: {:stream, fun}}, caller) when is_function(fun, 1) do raise ArgumentError, "streaming request body with an accumulator function is not supported in #{caller}, " <> "use Finch.stream_while/5 instead" end defp validate_no_req_body_fun!(_req, _caller), do: :ok defp get_pool(%Request{scheme: scheme, unix_socket: unix_socket, pool_tag: tag}, name, opts) when is_binary(unix_socket) do pool = Finch.Pool.from_name({scheme, {:local, unix_socket}, 0, tag}) Pool.Manager.get_pool(name, pool, opts) end defp get_pool(%Request{scheme: scheme, host: host, port: port, pool_tag: tag}, name, opts) do pool = Finch.Pool.from_name({scheme, host, port, tag}) Pool.Manager.get_pool(name, pool, opts) end @doc """ Get pool metrics. When given a URL or pool identifier tuple, this returns the metrics list for that specific pool. The number of items in the metrics list depends on the configured `:count` option and each entry will have a `pool_index` going from 1 to `:count`. When `:default` is provided, Finch returns the metrics for all pools started from the `:default` configuration. In this case the return value is a map keyed by each pool's `{scheme, host, port}` tuple with the corresponding metrics list as the value. The metrics struct depends on the pool scheme defined in the `:protocols` option: `Finch.HTTP1.PoolMetrics` for `:http1` and `Finch.HTTP2.PoolMetrics` for `:http2`. See the documentation for those modules for more details. `{:error, :not_found}` is returned in the following scenarios: * There is no pool registered for the given Finch instance and pool identifier. * The pool has `start_pool_metrics?: false` (the default). * `:default` is provided but no pools have been started from the `:default` configuration (or none have metrics enabled). ## Examples iex> Finch.get_pool_status(MyFinch, "https://httpbin.org") {:ok, [ %Finch.HTTP1.PoolMetrics{ pool_index: 1, pool_size: 50, available_connections: 43, in_use_connections: 7 }, %Finch.HTTP1.PoolMetrics{ pool_index: 2, pool_size: 50, available_connections: 37, in_use_connections: 13 }] } iex> Finch.get_pool_status(MyFinch, :default) {:ok, %{ %Finch.Pool{host: "httpbin.com", port: 443, scheme: :https, tag: :default} => [ %Finch.HTTP1.PoolMetrics{ pool_index: 1, pool_size: 50, available_connections: 43, in_use_connections: 7 } ] }} """ @spec get_pool_status(name(), :default | pool_identifier()) :: {:ok, pool_metrics()} | {:ok, default_pool_metrics()} | {:error, :not_found} def get_pool_status(finch_name, :default) do finch_name |> Pool.Manager.get_default_pools() |> Enum.reduce(%{}, fn {_sup_pid, {pool_name, pool_mod}}, acc -> case pool_mod.get_pool_status(finch_name, pool_name) do {:ok, metrics} -> pool_id = Pool.from_name(pool_name) Map.put(acc, pool_id, metrics) {:error, :not_found} -> acc end end) |> case do result when result == %{} -> {:error, :not_found} result -> {:ok, result} end end def get_pool_status(finch_name, pool_identifier) do pool = resolve_pool(pool_identifier) case Pool.Manager.get_pool_mod(finch_name, pool) do {pool_name, pool_mod} -> pool_mod.get_pool_status(finch_name, pool_name) :not_found -> {:error, :not_found} end end @doc """ Sends an HTTP/2 PING frame and waits for PONG. Returns `{:ok, rtt_ms}` where `rtt_ms` is the round-trip time in native time units, or `{:error, reason}` if the ping fails. This is only supported for HTTP/2 pools. Returns `{:error, :not_http2}` for HTTP/1 pools. ## Examples {:ok, rtt} = Finch.ping(MyFinch, "https://example.com") IO.puts("RTT: \#{rtt}ms") """ @spec ping(name(), pool_identifier()) :: {:ok, integer()} | {:error, term()} def ping(name, pool_identifier) do pool = resolve_pool(pool_identifier) case Pool.Manager.get_pool(name, pool) do {pid, Finch.HTTP2.Pool} -> Finch.HTTP2.Pool.ping(pid) {_pid, _mod} -> {:error, :not_http2} :not_found -> {:error, :not_found} end end @doc """ Stops the pool of processes associated with the given pool identifier. This function can be invoked to manually stop the pool for the given identifier when you know it's not going to be used anymore. Note that this function is not safe with respect to concurrent requests. Invoking it while another request to the same pool is taking place might result in the failure of that request. It is the responsibility of the client to ensure that no request to the same pool is taking place while this function is being invoked. """ @spec stop_pool(name(), pool_identifier()) :: :ok | {:error, :not_found} def stop_pool(finch_name, pool_identifier) do pool = resolve_pool(pool_identifier) case Pool.Manager.get_pool_supervisor(finch_name, pool) do :not_found -> {:error, :not_found} {pid, pool_name, _pool_mod, _pool_count, _pool_config} -> result = Supervisor.stop(pid) Finch.PoolMetrics.delete_pool(finch_name, pool_name) result end end @doc """ Returns the current worker count for the given pool. Returns `{:ok, count}` if the pool exists, `{:error, :not_found}` otherwise. ## Examples {:ok, count} = Finch.get_pool_count(MyFinch, "https://example.com") """ @spec get_pool_count(name(), pool_identifier()) :: {:ok, pos_integer()} | {:error, :not_found} def get_pool_count(finch_name, pool_identifier) do pool = resolve_pool(pool_identifier) case Pool.Manager.get_pool_supervisor(finch_name, pool) do :not_found -> {:error, :not_found} {_pid, _pool_name, _pool_mod, pool_count, _pool_config} -> {:ok, pool_count} end end @doc """ Dynamically changes the number of pool workers for the given pool. Returns `:ok` on success, `{:error, :not_found}` if the pool doesn't exist. Works with all kinds of pools, but note that `:default` pools must have been materialized by at least one request before they can be resized. ## Examples :ok = Finch.set_pool_count(MyFinch, "https://example.com", 4) """ @spec set_pool_count(name(), pool_identifier(), pos_integer()) :: :ok | {:error, term()} def set_pool_count(finch_name, pool_identifier, count) when is_integer(count) and count > 0 do pool = resolve_pool(pool_identifier) Pool.Manager.set_pool_count(finch_name, pool, count) end defp resolve_pool(%Finch.Pool{} = pool), do: pool defp resolve_pool({scheme, host, port}), do: Finch.Pool.from_name({scheme, host, port, :default}) defp resolve_pool(url) when is_binary(url), do: Finch.Pool.new(url) end