Skip to content

Commit

Permalink
Do :batch_interval based socket draining
Browse files Browse the repository at this point in the history
  • Loading branch information
josevalim committed Mar 16, 2023
1 parent b2f110f commit 276b81d
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 39 deletions.
21 changes: 13 additions & 8 deletions lib/phoenix/endpoint.ex
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 234,7 @@ defmodule Phoenix.Endpoint do
* `:drainer` - a drainer process waits for any on-going request to finish
during application shutdown. It accepts the `:shutdown` and
`:drain_check_interval` options as defined by `Plug.Cowboy.Drainer`.
`:check_interval` options as defined by `Plug.Cowboy.Drainer`.
Note the draining does not terminate any existing connection, it simply
waits for them to finish. Socket connections run their own drainer
before this one is invoked. That's because sockets are stateful and
Expand Down Expand Up @@ -798,15 798,20 @@ defmodule Phoenix.Endpoint do
on application shutdown. The goal is to notify all channels (and
LiveViews) clients to reconnect. The supported options are:
* `:shutdown` - How long to wait for connections to drain. Defaults to 15000ms.
* `:drain_check_interval` - The maximum frequency to terminate batches. Defaults to 1000ms.
* `:batch_size` - How many clients to notify at once in a given batch.
Defaults to 10000.
* `:batch_interval` - The amount of time in miliseconds given for a
batch to terminate. Defaults to 2000ms.
* `:shutdown` - The maximum amount of time in miliseconds allowed
to drain all batches. Defaults to 30000ms.
For example, if you have 150k connections, the default values will
split them into 15 batches of 10k connections and notify each batch
within 1000ms. If the batch shutdowns faster, we move on to the next
one before the interval. If it takes longer, then we proceed anyway.
Note that, after the socket drainer runs, the lower level HTTP/HTTPS
connection drainer will still run, and apply to all connections.
split them into 15 batches of 10k connections. Each batch takes
2000ms before the next batch starts. In this case, we will do everything
right under the maximum shutdown time of 30000ms. Therefore, as
you increase the number of connections, remember to adjust the shutdown
accordingly. Finally, after the socket drainer runs, the lower level
HTTP/HTTPS connection drainer will still run, and apply to all connections.
Set it to `false` to disable draining.
* `:connect_info` - a list of keys that represent data to be copied from
Expand Down
4 changes: 1 addition & 3 deletions lib/phoenix/socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -445,9 445,7 @@ defmodule Phoenix.Socket do
opts = Keyword.merge(socket_options, opts)

if drainer = Keyword.get(opts, :drainer, []) do
shutdown = Keyword.get(drainer, :shutdown, 15_000)
interval = Keyword.get(drainer, :drain_check_interval, 1_000)
{Phoenix.Socket.PoolDrainer, {endpoint, handler, shutdown, interval}}
{Phoenix.Socket.PoolDrainer, {endpoint, handler, drainer}}
else
:ignore
end
Expand Down
48 changes: 20 additions & 28 deletions lib/phoenix/socket/pool_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 69,14 @@ defmodule Phoenix.Socket.PoolDrainer do
use GenServer
require Logger

def child_spec({_endpoint, name, shutdown, _interval} = tuple) do
def child_spec({_endpoint, name, opts} = tuple) do
# The process should terminate within shutdown but,
# in case it doesn't, we will be killed if we exceed
# double of that
%{
id: {:terminator, name},
start: {__MODULE__, :start_link, [tuple]},
shutdown: shutdown * 2
shutdown: Keyword.get(opts, :shutdown, 30_000)
}
end

Expand All @@ -85,13 85,15 @@ defmodule Phoenix.Socket.PoolDrainer do
end

@impl true
def init(tuple) do
def init({endpoint, name, opts}) do
Process.flag(:trap_exit, true)
{:ok, tuple}
size = Keyword.get(opts, :batch_size, 10_000)
interval = Keyword.get(opts, :batch_interval, 2_000)
{:ok, {endpoint, name, size, interval}}
end

@impl true
def terminate(_reason, {endpoint, name, shutdown, interval}) do
def terminate(_reason, {endpoint, name, size, interval}) do
ets = endpoint.config({:socket, name})
partitions = :ets.lookup_element(ets, :partitions, 2)

Expand All @@ -106,33 108,23 @@ defmodule Phoenix.Socket.PoolDrainer do
end
end)

rounds = div(shutdown, interval)
batch = max(ceil(total / rounds), 1)
rounds = div(total, size) 1

if total != 0 do
Logger.info("Shutting down #{total} sockets in #{shutdown}ms in #{rounds} rounds")
Logger.info("Shutting down #{total} sockets in #{rounds} rounds of #{interval}ms")
end

for pids <- collection |> Stream.concat() |> Stream.chunk_every(batch) do
{_pid, ref} =
spawn_monitor(fn ->
refs =
for pid <- pids do
send(pid, %Phoenix.Socket.Broadcast{event: "phx_draining"})
Process.monitor(pid)
end

Enum.each(refs, fn _ ->
receive do
{:DOWN, _, _, _, _} -> :ok
end
end)
end)

receive do
{:DOWN, ^ref, _, _, _} -> :ok
after
interval -> Process.demonitor(ref, [:flush])
for {pids, index} <-
collection |> Stream.concat() |> Stream.chunk_every(size) |> Stream.with_index(1) do

spawn(fn ->
for pid <- pids do
send(pid, %Phoenix.Socket.Broadcast{event: "phx_draining"})
end
end)

if index < rounds do
Process.sleep(interval)
end
end
end
Expand Down

0 comments on commit 276b81d

Please sign in to comment.