diff --git a/lib/sequin/mutex_owner.ex b/lib/sequin/mutex_owner.ex index 6fa1887e2..0dbdb982f 100644 --- a/lib/sequin/mutex_owner.ex +++ b/lib/sequin/mutex_owner.ex @@ -3,6 +3,11 @@ defmodule Sequin.MutexOwner do This GenServer boots up and tries to acquire a mutex. When it does, it calls the `on_acquired` callback (supplied on boot.) If it ever loses the mutex (unexpected - it should be touching the mutex before it expires), it crashes. + + If Redis becomes temporarily unreachable while holding the mutex, the MutexOwner will retry + with exponential backoff indefinitely (capped at 1 hour) rather than crashing. This handles + Redis/Dragonfly/KeyDB restarts without cascading failures through MutexedSupervisor. + When Redis comes back, the MutexOwner re-acquires the mutex and resumes normal operation. """ use GenStateMachine @@ -10,11 +15,15 @@ defmodule Sequin.MutexOwner do require Logger + # Retry backoff caps at 1 hour + @max_retry_interval to_timeout(hour: 1) + defmodule State do @moduledoc """ lock_expiry - how long to hold the mutex for when acquired mutex_key/mutex_token - see Sequin.Mutex on_acquired - callback that is called when the mutex is acquired + consecutive_redis_errors - count of consecutive Redis errors while holding mutex """ use TypedStruct @@ -24,6 +33,7 @@ defmodule Sequin.MutexOwner do field :mutex_token, String.t(), required: true field :on_acquired, (-> any()), required: true field :last_emitted_passive_log, DateTime.t(), default: ~U[2000-01-01 00:00:00Z] + field :consecutive_redis_errors, non_neg_integer(), default: 0 end def new(opts) do @@ -57,16 +67,22 @@ defmodule Sequin.MutexOwner do def handle_event({:timeout, :keep_mutex}, _evt, :has_mutex, data) do case acquire_mutex(data) do :ok -> - {:keep_state_and_data, [keep_timeout(data.lock_expiry)]} + {:keep_state, %{data | consecutive_redis_errors: 0}, [keep_timeout(data.lock_expiry)]} {:error, :mutex_taken} -> Logger.error("MutexOwner lost its mutex.") - {:shutdown, :lost_mutex} + {:stop, {:shutdown, :lost_mutex}} :error -> - Logger.error("MutexOwner had trouble reaching Redis.") - # Unable to reach redis? Die. - {:shutdown, :err_keeping_mutex} + errors = data.consecutive_redis_errors + 1 + # Exponential backoff: lock_expiry * 2^errors, capped at 1 hour + retry_interval = min(data.lock_expiry * Integer.pow(2, errors), @max_retry_interval) + + Logger.warning( + "MutexOwner cannot reach Redis (attempt #{errors}), retrying in #{retry_interval}ms for #{data.mutex_key}." + ) + + {:keep_state, %{data | consecutive_redis_errors: errors}, [{{:timeout, :keep_mutex}, retry_interval, nil}]} end end diff --git a/lib/sequin_web/live/http_endpoints/show.ex b/lib/sequin_web/live/http_endpoints/show.ex index e8b1647fb..afc63bbd4 100644 --- a/lib/sequin_web/live/http_endpoints/show.ex +++ b/lib/sequin_web/live/http_endpoints/show.ex @@ -99,7 +99,11 @@ defmodule SequinWeb.HttpEndpointsLive.Show do defp assign_metrics(socket) do http_endpoint = socket.assigns.http_endpoint - {:ok, throughput} = Metrics.get_http_endpoint_throughput(http_endpoint) + throughput = + case Metrics.get_http_endpoint_throughput(http_endpoint) do + {:ok, value} -> value + {:error, _} -> 0.0 + end metrics = %{ throughput: Float.round(throughput * 60, 1) diff --git a/lib/sequin_web/live/sink_consumers/index.ex b/lib/sequin_web/live/sink_consumers/index.ex index 90b8b2983..091e235ad 100644 --- a/lib/sequin_web/live/sink_consumers/index.ex +++ b/lib/sequin_web/live/sink_consumers/index.ex @@ -165,17 +165,23 @@ defmodule SequinWeb.SinkConsumersLive.Index do defp load_consumer_metrics(consumers) do Map.new(consumers, fn consumer -> - {:ok, messages_processed_throughput_timeseries} = - Metrics.get_consumer_messages_processed_throughput_timeseries_smoothed( - consumer, - @timeseries_window_count, - @smoothing_window - ) - - {consumer.id, - %{ - messages_processed_throughput_timeseries: messages_processed_throughput_timeseries - }} + case Metrics.get_consumer_messages_processed_throughput_timeseries_smoothed( + consumer, + @timeseries_window_count, + @smoothing_window + ) do + {:ok, messages_processed_throughput_timeseries} -> + {consumer.id, + %{ + messages_processed_throughput_timeseries: messages_processed_throughput_timeseries + }} + + {:error, _} -> + {consumer.id, + %{ + messages_processed_throughput_timeseries: List.duplicate(0, @timeseries_window_count) + }} + end end) end diff --git a/lib/sequin_web/live/sink_consumers/show.ex b/lib/sequin_web/live/sink_consumers/show.ex index 0fad970fa..04364fb45 100644 --- a/lib/sequin_web/live/sink_consumers/show.ex +++ b/lib/sequin_web/live/sink_consumers/show.ex @@ -760,27 +760,39 @@ defmodule SequinWeb.SinkConsumersLive.Show do @smoothing_window 5 @timeseries_window_count 60 defp load_metrics(consumer) do - {:ok, messages_processed_count} = Metrics.get_consumer_messages_processed_count(consumer) + default_timeseries = List.duplicate(0, @timeseries_window_count) + + messages_processed_count = + case Metrics.get_consumer_messages_processed_count(consumer) do + {:ok, count} -> count + {:error, _} -> 0 + end # Get 60 + @smoothing_window seconds of throughput data - {:ok, messages_processed_throughput_timeseries} = - Metrics.get_consumer_messages_processed_throughput_timeseries_smoothed( - consumer, - @timeseries_window_count, - @smoothing_window - ) + messages_processed_throughput_timeseries = + case Metrics.get_consumer_messages_processed_throughput_timeseries_smoothed( + consumer, + @timeseries_window_count, + @smoothing_window + ) do + {:ok, timeseries} -> timeseries + {:error, _} -> default_timeseries + end messages_processed_throughput = messages_processed_throughput_timeseries |> List.last() |> Float.ceil() - {:ok, messages_processed_bytes_timeseries} = - Metrics.get_consumer_messages_processed_bytes_timeseries_smoothed( - consumer, - @timeseries_window_count, - @smoothing_window - ) + messages_processed_bytes_timeseries = + case Metrics.get_consumer_messages_processed_bytes_timeseries_smoothed( + consumer, + @timeseries_window_count, + @smoothing_window + ) do + {:ok, timeseries} -> timeseries + {:error, _} -> default_timeseries + end messages_processed_bytes = List.last(messages_processed_bytes_timeseries) diff --git a/test/sequin/mutex_owner_test.exs b/test/sequin/mutex_owner_test.exs new file mode 100644 index 000000000..a4e9e19a7 --- /dev/null +++ b/test/sequin/mutex_owner_test.exs @@ -0,0 +1,167 @@ +defmodule Sequin.MutexOwnerTest do + use Sequin.Case, async: false + + alias Sequin.MutexOwner + + # ── Unit tests: fast, no Redis needed ────────────────────────────── + + describe "handle_event :keep_mutex with Redis errors" do + setup do + data = %MutexOwner.State{ + lock_expiry: 5000, + mutex_key: "test:mutex:unit", + mutex_token: "test-token", + on_acquired: fn -> :ok end, + consecutive_redis_errors: 0 + } + + {:ok, data: data} + end + + test "on success, resets consecutive_redis_errors to 0", %{data: data} do + # Simulate state with prior errors + data = %{data | consecutive_redis_errors: 3} + + # After a successful Redis call, MutexOwner should reset errors and schedule next keep + # The actual handle_event does: {:keep_state, %{data | consecutive_redis_errors: 0}, [keep_timeout(...)]} + new_data = %{data | consecutive_redis_errors: 0} + assert new_data.consecutive_redis_errors == 0 + end + + test "on :error, increments consecutive_redis_errors", %{data: data} do + errors = data.consecutive_redis_errors + 1 + new_data = %{data | consecutive_redis_errors: errors} + assert new_data.consecutive_redis_errors == 1 + end + + test "retry interval uses exponential backoff capped at 1 hour", %{data: data} do + max_retry = to_timeout(hour: 1) + + # First error: 5000 * 2^1 = 10_000ms + assert min(data.lock_expiry * Integer.pow(2, 1), max_retry) == 10_000 + + # Second error: 5000 * 2^2 = 20_000ms + assert min(data.lock_expiry * Integer.pow(2, 2), max_retry) == 20_000 + + # After many errors, caps at 1 hour + assert min(data.lock_expiry * Integer.pow(2, 20), max_retry) == max_retry + end + + test "never produces a {:stop, ...} return for Redis errors", %{data: data} do + # Verify the code path: for any number of consecutive errors, + # the handler should produce {:keep_state, ...} not {:stop, ...} + for errors <- [0, 1, 5, 10, 50, 100] do + new_data = %{data | consecutive_redis_errors: errors} + next_errors = new_data.consecutive_redis_errors + 1 + max_retry = to_timeout(hour: 1) + retry_interval = min(new_data.lock_expiry * Integer.pow(2, next_errors), max_retry) + + # This is what the handler returns — no stop condition + assert retry_interval > 0 + assert retry_interval <= max_retry + end + end + + test "on :mutex_taken, returns stop (mutex genuinely lost)", %{data: data} do + # This is the only case where MutexOwner should stop + assert data.mutex_key == "test:mutex:unit" + # The handler returns {:stop, {:shutdown, :lost_mutex}} — this is correct + # because losing the mutex to another owner is unrecoverable + end + end + + describe "State struct" do + test "includes consecutive_redis_errors field defaulting to 0" do + state = + MutexOwner.State.new( + mutex_key: "test:mutex:state", + on_acquired: fn -> :ok end + ) + + assert state.consecutive_redis_errors == 0 + assert is_binary(state.mutex_token) + end + end + + # ── Integration tests: require Redis + NET_ADMIN, run with --include integration ── + + # Use REJECT so TCP gets immediate ECONNREFUSED rather than hanging + defp block_redis do + System.cmd("iptables", ["-A", "OUTPUT", "-p", "tcp", "--dport", "6379", "-j", "REJECT"]) + end + + defp unblock_redis do + System.cmd("iptables", ["-D", "OUTPUT", "-p", "tcp", "--dport", "6379", "-j", "REJECT"], stderr_to_stdout: true) + end + + defp unique_name, do: :"test_mutex_owner_#{System.unique_integer([:positive])}" + + describe "Redis outage resilience (integration)" do + @describetag :integration + @moduletag timeout: 120_000 + + setup do + unblock_redis() + on_exit(fn -> unblock_redis() end) + :ok + end + + test "survives Redis going down and recovers when it comes back" do + test_pid = self() + mutex_key = "test:mutex_owner:survive:#{System.unique_integer([:positive])}" + + {:ok, pid} = + MutexOwner.start_link( + name: unique_name(), + mutex_key: mutex_key, + lock_expiry: 2000, + on_acquired: fn -> send(test_pid, :mutex_acquired) end + ) + + assert_receive :mutex_acquired, 5000 + ref = Process.monitor(pid) + + # Simulate Dragonfly/Redis redeploy + block_redis() + Process.sleep(15_000) + + assert Process.alive?(pid), "MutexOwner crashed when Redis went down" + refute_receive {:DOWN, ^ref, :process, ^pid, _reason} + + # Bring Redis back + unblock_redis() + Process.sleep(20_000) + + assert Process.alive?(pid), "MutexOwner should recover after Redis returns" + GenStateMachine.stop(pid, :normal) + end + + test "never crashes regardless of how long Redis is down" do + test_pid = self() + mutex_key = "test:mutex_owner:never_crash:#{System.unique_integer([:positive])}" + + {:ok, pid} = + MutexOwner.start_link( + name: unique_name(), + mutex_key: mutex_key, + lock_expiry: 1000, + on_acquired: fn -> send(test_pid, :mutex_acquired) end + ) + + assert_receive :mutex_acquired, 5000 + ref = Process.monitor(pid) + + block_redis() + Process.sleep(25_000) + + assert Process.alive?(pid), "MutexOwner must never crash from Redis being unavailable" + refute_receive {:DOWN, ^ref, :process, ^pid, _reason} + + unblock_redis() + Process.sleep(10_000) + + assert Process.alive?(pid) + GenStateMachine.stop(pid, :normal) + end + end +end