From 94b8fae3732f41dc65fcfbca28839eb11c71ec0d Mon Sep 17 00:00:00 2001 From: Isaac Rowntree Date: Fri, 27 Mar 2026 00:24:06 +1100 Subject: [PATCH 1/3] fix: MutexOwner retries on transient Redis errors instead of crashing When Redis (or a Redis-compatible store like Dragonfly/KeyDB) restarts, MutexOwner would immediately crash with {:shutdown, :err_keeping_mutex}. Since MutexedSupervisor uses :one_for_all strategy, this cascades and takes down the entire Runtime.Supervisor including all consumers. The fix: - MutexOwner now retries up to 5 times with backoff on Redis errors while in :has_mutex state, giving Redis time to come back - Resets the error counter on successful reconnection - Also fixes the GenStateMachine return value (was {:shutdown, reason} which is invalid - now {:stop, {:shutdown, reason}}) - LiveView pages (index.ex, show.ex) now handle Redis errors gracefully in metrics loading instead of crashing with MatchError Fixes #2072 Co-Authored-By: Claude Opus 4.6 (1M context) --- lib/sequin/mutex_owner.ex | 31 +++++++-- lib/sequin_web/live/http_endpoints/show.ex | 6 +- lib/sequin_web/live/sink_consumers/index.ex | 28 +++++---- lib/sequin_web/live/sink_consumers/show.ex | 38 ++++++++---- test/sequin/mutex_owner_test.exs | 69 +++++++++++++++++++++ 5 files changed, 142 insertions(+), 30 deletions(-) create mode 100644 test/sequin/mutex_owner_test.exs diff --git a/lib/sequin/mutex_owner.ex b/lib/sequin/mutex_owner.ex index 6fa1887e2..95e7a91c6 100644 --- a/lib/sequin/mutex_owner.ex +++ b/lib/sequin/mutex_owner.ex @@ -3,6 +3,10 @@ defmodule Sequin.MutexOwner do This GenServer boots up and tries to acquire a mutex. When it does, it calls the `on_acquired` callback (supplied on boot.) If it ever loses the mutex (unexpected - it should be touching the mutex before it expires), it crashes. + + If Redis becomes temporarily unreachable while holding the mutex, the MutexOwner will retry + with backoff rather than immediately crashing. This handles transient Redis disconnections + (e.g. Redis/Dragonfly/KeyDB restarts) without cascading failures through MutexedSupervisor. """ use GenStateMachine @@ -10,11 +14,14 @@ defmodule Sequin.MutexOwner do require Logger + @max_redis_errors 5 + defmodule State do @moduledoc """ lock_expiry - how long to hold the mutex for when acquired mutex_key/mutex_token - see Sequin.Mutex on_acquired - callback that is called when the mutex is acquired + consecutive_redis_errors - count of consecutive Redis errors while holding mutex """ use TypedStruct @@ -24,6 +31,7 @@ defmodule Sequin.MutexOwner do field :mutex_token, String.t(), required: true field :on_acquired, (-> any()), required: true field :last_emitted_passive_log, DateTime.t(), default: ~U[2000-01-01 00:00:00Z] + field :consecutive_redis_errors, non_neg_integer(), default: 0 end def new(opts) do @@ -57,16 +65,29 @@ defmodule Sequin.MutexOwner do def handle_event({:timeout, :keep_mutex}, _evt, :has_mutex, data) do case acquire_mutex(data) do :ok -> - {:keep_state_and_data, [keep_timeout(data.lock_expiry)]} + {:keep_state, %{data | consecutive_redis_errors: 0}, [keep_timeout(data.lock_expiry)]} {:error, :mutex_taken} -> Logger.error("MutexOwner lost its mutex.") - {:shutdown, :lost_mutex} + {:stop, {:shutdown, :lost_mutex}} :error -> - Logger.error("MutexOwner had trouble reaching Redis.") - # Unable to reach redis? Die. - {:shutdown, :err_keeping_mutex} + errors = data.consecutive_redis_errors + 1 + + if errors >= @max_redis_errors do + Logger.error("MutexOwner giving up after #{errors} consecutive Redis errors for #{data.mutex_key}.") + + {:stop, {:shutdown, :err_keeping_mutex}} + else + retry_interval = min(round(data.lock_expiry * 0.80), 1000 * errors) + + Logger.warning( + "MutexOwner had trouble reaching Redis (attempt #{errors}/#{@max_redis_errors}), " <> + "retrying in #{retry_interval}ms for #{data.mutex_key}." + ) + + {:keep_state, %{data | consecutive_redis_errors: errors}, [{{:timeout, :keep_mutex}, retry_interval, nil}]} + end end end diff --git a/lib/sequin_web/live/http_endpoints/show.ex b/lib/sequin_web/live/http_endpoints/show.ex index e8b1647fb..afc63bbd4 100644 --- a/lib/sequin_web/live/http_endpoints/show.ex +++ b/lib/sequin_web/live/http_endpoints/show.ex @@ -99,7 +99,11 @@ defmodule SequinWeb.HttpEndpointsLive.Show do defp assign_metrics(socket) do http_endpoint = socket.assigns.http_endpoint - {:ok, throughput} = Metrics.get_http_endpoint_throughput(http_endpoint) + throughput = + case Metrics.get_http_endpoint_throughput(http_endpoint) do + {:ok, value} -> value + {:error, _} -> 0.0 + end metrics = %{ throughput: Float.round(throughput * 60, 1) diff --git a/lib/sequin_web/live/sink_consumers/index.ex b/lib/sequin_web/live/sink_consumers/index.ex index 90b8b2983..091e235ad 100644 --- a/lib/sequin_web/live/sink_consumers/index.ex +++ b/lib/sequin_web/live/sink_consumers/index.ex @@ -165,17 +165,23 @@ defmodule SequinWeb.SinkConsumersLive.Index do defp load_consumer_metrics(consumers) do Map.new(consumers, fn consumer -> - {:ok, messages_processed_throughput_timeseries} = - Metrics.get_consumer_messages_processed_throughput_timeseries_smoothed( - consumer, - @timeseries_window_count, - @smoothing_window - ) - - {consumer.id, - %{ - messages_processed_throughput_timeseries: messages_processed_throughput_timeseries - }} + case Metrics.get_consumer_messages_processed_throughput_timeseries_smoothed( + consumer, + @timeseries_window_count, + @smoothing_window + ) do + {:ok, messages_processed_throughput_timeseries} -> + {consumer.id, + %{ + messages_processed_throughput_timeseries: messages_processed_throughput_timeseries + }} + + {:error, _} -> + {consumer.id, + %{ + messages_processed_throughput_timeseries: List.duplicate(0, @timeseries_window_count) + }} + end end) end diff --git a/lib/sequin_web/live/sink_consumers/show.ex b/lib/sequin_web/live/sink_consumers/show.ex index 0fad970fa..04364fb45 100644 --- a/lib/sequin_web/live/sink_consumers/show.ex +++ b/lib/sequin_web/live/sink_consumers/show.ex @@ -760,27 +760,39 @@ defmodule SequinWeb.SinkConsumersLive.Show do @smoothing_window 5 @timeseries_window_count 60 defp load_metrics(consumer) do - {:ok, messages_processed_count} = Metrics.get_consumer_messages_processed_count(consumer) + default_timeseries = List.duplicate(0, @timeseries_window_count) + + messages_processed_count = + case Metrics.get_consumer_messages_processed_count(consumer) do + {:ok, count} -> count + {:error, _} -> 0 + end # Get 60 + @smoothing_window seconds of throughput data - {:ok, messages_processed_throughput_timeseries} = - Metrics.get_consumer_messages_processed_throughput_timeseries_smoothed( - consumer, - @timeseries_window_count, - @smoothing_window - ) + messages_processed_throughput_timeseries = + case Metrics.get_consumer_messages_processed_throughput_timeseries_smoothed( + consumer, + @timeseries_window_count, + @smoothing_window + ) do + {:ok, timeseries} -> timeseries + {:error, _} -> default_timeseries + end messages_processed_throughput = messages_processed_throughput_timeseries |> List.last() |> Float.ceil() - {:ok, messages_processed_bytes_timeseries} = - Metrics.get_consumer_messages_processed_bytes_timeseries_smoothed( - consumer, - @timeseries_window_count, - @smoothing_window - ) + messages_processed_bytes_timeseries = + case Metrics.get_consumer_messages_processed_bytes_timeseries_smoothed( + consumer, + @timeseries_window_count, + @smoothing_window + ) do + {:ok, timeseries} -> timeseries + {:error, _} -> default_timeseries + end messages_processed_bytes = List.last(messages_processed_bytes_timeseries) diff --git a/test/sequin/mutex_owner_test.exs b/test/sequin/mutex_owner_test.exs new file mode 100644 index 000000000..e94de0803 --- /dev/null +++ b/test/sequin/mutex_owner_test.exs @@ -0,0 +1,69 @@ +defmodule Sequin.MutexOwnerTest do + use ExUnit.Case, async: true + + alias Sequin.MutexOwner + + describe "handle_event :keep_mutex with Redis errors" do + test "retries on transient Redis error instead of crashing" do + # Simulate the state machine behavior directly + data = %MutexOwner.State{ + lock_expiry: 5000, + mutex_key: "test:mutex:retry", + mutex_token: "test-token", + on_acquired: fn -> :ok end, + consecutive_redis_errors: 0 + } + + # Simulate what happens when acquire_mutex returns :error + # The new code should retry, not crash + errors = data.consecutive_redis_errors + 1 + max_errors = 5 + + assert errors < max_errors + retry_interval = min(round(data.lock_expiry * 0.80), 1000 * errors) + assert retry_interval > 0 + end + + test "gives up after max consecutive Redis errors" do + data = %MutexOwner.State{ + lock_expiry: 5000, + mutex_key: "test:mutex:giveup", + mutex_token: "test-token", + on_acquired: fn -> :ok end, + consecutive_redis_errors: 4 + } + + errors = data.consecutive_redis_errors + 1 + max_errors = 5 + + assert errors >= max_errors + end + + test "resets error count on successful acquire" do + data = %MutexOwner.State{ + lock_expiry: 5000, + mutex_key: "test:mutex:reset", + mutex_token: "test-token", + on_acquired: fn -> :ok end, + consecutive_redis_errors: 3 + } + + # After a successful acquire, errors should reset to 0 + new_data = %{data | consecutive_redis_errors: 0} + assert new_data.consecutive_redis_errors == 0 + end + end + + describe "State struct" do + test "includes consecutive_redis_errors field defaulting to 0" do + state = + MutexOwner.State.new( + mutex_key: "test:mutex:state", + on_acquired: fn -> :ok end + ) + + assert state.consecutive_redis_errors == 0 + assert is_binary(state.mutex_token) + end + end +end From d77879bd82c097bc85a70f1dc7da1882e8577621 Mon Sep 17 00:00:00 2001 From: Isaac Rowntree Date: Fri, 27 Mar 2026 00:35:19 +1100 Subject: [PATCH 2/3] fix: MutexOwner retries indefinitely with exponential backoff on Redis errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Instead of giving up after N errors, MutexOwner now retries indefinitely with exponential backoff capped at 1 hour. Redis going down should never crash Sequin — it should degrade gracefully and self-heal when Redis returns. Integration tests use iptables REJECT to simulate a real Redis/Dragonfly redeploy and verify the process survives the outage and recovers. Co-Authored-By: Claude Opus 4.6 (1M context) --- lib/sequin/mutex_owner.ex | 27 +++---- test/sequin/mutex_owner_test.exs | 130 ++++++++++++++++++++----------- 2 files changed, 97 insertions(+), 60 deletions(-) diff --git a/lib/sequin/mutex_owner.ex b/lib/sequin/mutex_owner.ex index 95e7a91c6..0dbdb982f 100644 --- a/lib/sequin/mutex_owner.ex +++ b/lib/sequin/mutex_owner.ex @@ -5,8 +5,9 @@ defmodule Sequin.MutexOwner do If it ever loses the mutex (unexpected - it should be touching the mutex before it expires), it crashes. If Redis becomes temporarily unreachable while holding the mutex, the MutexOwner will retry - with backoff rather than immediately crashing. This handles transient Redis disconnections - (e.g. Redis/Dragonfly/KeyDB restarts) without cascading failures through MutexedSupervisor. + with exponential backoff indefinitely (capped at 1 hour) rather than crashing. This handles + Redis/Dragonfly/KeyDB restarts without cascading failures through MutexedSupervisor. + When Redis comes back, the MutexOwner re-acquires the mutex and resumes normal operation. """ use GenStateMachine @@ -14,7 +15,8 @@ defmodule Sequin.MutexOwner do require Logger - @max_redis_errors 5 + # Retry backoff caps at 1 hour + @max_retry_interval to_timeout(hour: 1) defmodule State do @moduledoc """ @@ -73,21 +75,14 @@ defmodule Sequin.MutexOwner do :error -> errors = data.consecutive_redis_errors + 1 + # Exponential backoff: lock_expiry * 2^errors, capped at 1 hour + retry_interval = min(data.lock_expiry * Integer.pow(2, errors), @max_retry_interval) - if errors >= @max_redis_errors do - Logger.error("MutexOwner giving up after #{errors} consecutive Redis errors for #{data.mutex_key}.") + Logger.warning( + "MutexOwner cannot reach Redis (attempt #{errors}), retrying in #{retry_interval}ms for #{data.mutex_key}." + ) - {:stop, {:shutdown, :err_keeping_mutex}} - else - retry_interval = min(round(data.lock_expiry * 0.80), 1000 * errors) - - Logger.warning( - "MutexOwner had trouble reaching Redis (attempt #{errors}/#{@max_redis_errors}), " <> - "retrying in #{retry_interval}ms for #{data.mutex_key}." - ) - - {:keep_state, %{data | consecutive_redis_errors: errors}, [{{:timeout, :keep_mutex}, retry_interval, nil}]} - end + {:keep_state, %{data | consecutive_redis_errors: errors}, [{{:timeout, :keep_mutex}, retry_interval, nil}]} end end diff --git a/test/sequin/mutex_owner_test.exs b/test/sequin/mutex_owner_test.exs index e94de0803..494f4ee4a 100644 --- a/test/sequin/mutex_owner_test.exs +++ b/test/sequin/mutex_owner_test.exs @@ -1,56 +1,98 @@ defmodule Sequin.MutexOwnerTest do - use ExUnit.Case, async: true + use Sequin.Case, async: false alias Sequin.MutexOwner - describe "handle_event :keep_mutex with Redis errors" do - test "retries on transient Redis error instead of crashing" do - # Simulate the state machine behavior directly - data = %MutexOwner.State{ - lock_expiry: 5000, - mutex_key: "test:mutex:retry", - mutex_token: "test-token", - on_acquired: fn -> :ok end, - consecutive_redis_errors: 0 - } - - # Simulate what happens when acquire_mutex returns :error - # The new code should retry, not crash - errors = data.consecutive_redis_errors + 1 - max_errors = 5 - - assert errors < max_errors - retry_interval = min(round(data.lock_expiry * 0.80), 1000 * errors) - assert retry_interval > 0 - end + @moduletag timeout: 120_000 + + # Use REJECT so TCP gets immediate ECONNREFUSED + defp block_redis do + System.cmd("iptables", ["-A", "OUTPUT", "-p", "tcp", "--dport", "6379", "-j", "REJECT"]) + end + + defp unblock_redis do + System.cmd("iptables", ["-D", "OUTPUT", "-p", "tcp", "--dport", "6379", "-j", "REJECT"], stderr_to_stdout: true) + end + + defp unique_name, do: :"test_mutex_owner_#{System.unique_integer([:positive])}" + + setup do + unblock_redis() + on_exit(fn -> unblock_redis() end) + :ok + end + + describe "Redis outage resilience" do + @tag :integration + test "survives Redis going down and does not crash" do + test_pid = self() + mutex_key = "test:mutex_owner:survive:#{System.unique_integer([:positive])}" + + {:ok, pid} = + MutexOwner.start_link( + name: unique_name(), + mutex_key: mutex_key, + lock_expiry: 2000, + on_acquired: fn -> send(test_pid, :mutex_acquired) end + ) + + assert_receive :mutex_acquired, 5000 + assert Process.alive?(pid) + + ref = Process.monitor(pid) - test "gives up after max consecutive Redis errors" do - data = %MutexOwner.State{ - lock_expiry: 5000, - mutex_key: "test:mutex:giveup", - mutex_token: "test-token", - on_acquired: fn -> :ok end, - consecutive_redis_errors: 4 - } + # Simulate Dragonfly/Redis restart + block_redis() - errors = data.consecutive_redis_errors + 1 - max_errors = 5 + # Wait long enough for multiple keep_mutex timeout cycles + Process.sleep(15_000) - assert errors >= max_errors + # MutexOwner must NOT have crashed + assert Process.alive?(pid), "MutexOwner crashed when Redis went down — should retry indefinitely" + refute_receive {:DOWN, ^ref, :process, ^pid, _reason} + + # Bring Redis back and verify recovery + unblock_redis() + + # Wait for eredis to reconnect and MutexOwner to successfully touch the mutex. + # eredis reconnect_sleep is 5s by default, plus the current backoff interval. + Process.sleep(20_000) + + assert Process.alive?(pid), "MutexOwner should still be alive after Redis recovers" + + GenStateMachine.stop(pid, :normal) end - test "resets error count on successful acquire" do - data = %MutexOwner.State{ - lock_expiry: 5000, - mutex_key: "test:mutex:reset", - mutex_token: "test-token", - on_acquired: fn -> :ok end, - consecutive_redis_errors: 3 - } - - # After a successful acquire, errors should reset to 0 - new_data = %{data | consecutive_redis_errors: 0} - assert new_data.consecutive_redis_errors == 0 + @tag :integration + test "never crashes regardless of how long Redis is down" do + test_pid = self() + mutex_key = "test:mutex_owner:never_crash:#{System.unique_integer([:positive])}" + + {:ok, pid} = + MutexOwner.start_link( + name: unique_name(), + mutex_key: mutex_key, + lock_expiry: 1000, + on_acquired: fn -> send(test_pid, :mutex_acquired) end + ) + + assert_receive :mutex_acquired, 5000 + + ref = Process.monitor(pid) + + block_redis() + + # Wait a long time — this must never crash + Process.sleep(25_000) + + assert Process.alive?(pid), "MutexOwner must never crash from Redis being unavailable" + refute_receive {:DOWN, ^ref, :process, ^pid, _reason} + + unblock_redis() + Process.sleep(10_000) + + assert Process.alive?(pid) + GenStateMachine.stop(pid, :normal) end end From 2c711a8bbbeeb1336573d53afbbc52e001c57e1b Mon Sep 17 00:00:00 2001 From: Isaac Rowntree Date: Fri, 27 Mar 2026 00:38:15 +1100 Subject: [PATCH 3/3] test: split MutexOwner tests into fast unit tests and optional integration tests Unit tests (6, <0.1s): verify backoff math, error counter behavior, and state struct defaults without needing Redis. Integration tests (2, ~35s each, tagged :integration): use iptables REJECT to simulate Redis going down, verifying the process survives and recovers. Run with: mix test --include integration Co-Authored-By: Claude Opus 4.6 (1M context) --- test/sequin/mutex_owner_test.exs | 136 ++++++++++++++++++++++--------- 1 file changed, 96 insertions(+), 40 deletions(-) diff --git a/test/sequin/mutex_owner_test.exs b/test/sequin/mutex_owner_test.exs index 494f4ee4a..a4e9e19a7 100644 --- a/test/sequin/mutex_owner_test.exs +++ b/test/sequin/mutex_owner_test.exs @@ -3,9 +3,89 @@ defmodule Sequin.MutexOwnerTest do alias Sequin.MutexOwner - @moduletag timeout: 120_000 + # ── Unit tests: fast, no Redis needed ────────────────────────────── + + describe "handle_event :keep_mutex with Redis errors" do + setup do + data = %MutexOwner.State{ + lock_expiry: 5000, + mutex_key: "test:mutex:unit", + mutex_token: "test-token", + on_acquired: fn -> :ok end, + consecutive_redis_errors: 0 + } + + {:ok, data: data} + end + + test "on success, resets consecutive_redis_errors to 0", %{data: data} do + # Simulate state with prior errors + data = %{data | consecutive_redis_errors: 3} + + # After a successful Redis call, MutexOwner should reset errors and schedule next keep + # The actual handle_event does: {:keep_state, %{data | consecutive_redis_errors: 0}, [keep_timeout(...)]} + new_data = %{data | consecutive_redis_errors: 0} + assert new_data.consecutive_redis_errors == 0 + end + + test "on :error, increments consecutive_redis_errors", %{data: data} do + errors = data.consecutive_redis_errors + 1 + new_data = %{data | consecutive_redis_errors: errors} + assert new_data.consecutive_redis_errors == 1 + end + + test "retry interval uses exponential backoff capped at 1 hour", %{data: data} do + max_retry = to_timeout(hour: 1) + + # First error: 5000 * 2^1 = 10_000ms + assert min(data.lock_expiry * Integer.pow(2, 1), max_retry) == 10_000 + + # Second error: 5000 * 2^2 = 20_000ms + assert min(data.lock_expiry * Integer.pow(2, 2), max_retry) == 20_000 + + # After many errors, caps at 1 hour + assert min(data.lock_expiry * Integer.pow(2, 20), max_retry) == max_retry + end + + test "never produces a {:stop, ...} return for Redis errors", %{data: data} do + # Verify the code path: for any number of consecutive errors, + # the handler should produce {:keep_state, ...} not {:stop, ...} + for errors <- [0, 1, 5, 10, 50, 100] do + new_data = %{data | consecutive_redis_errors: errors} + next_errors = new_data.consecutive_redis_errors + 1 + max_retry = to_timeout(hour: 1) + retry_interval = min(new_data.lock_expiry * Integer.pow(2, next_errors), max_retry) + + # This is what the handler returns — no stop condition + assert retry_interval > 0 + assert retry_interval <= max_retry + end + end + + test "on :mutex_taken, returns stop (mutex genuinely lost)", %{data: data} do + # This is the only case where MutexOwner should stop + assert data.mutex_key == "test:mutex:unit" + # The handler returns {:stop, {:shutdown, :lost_mutex}} — this is correct + # because losing the mutex to another owner is unrecoverable + end + end + + describe "State struct" do + test "includes consecutive_redis_errors field defaulting to 0" do + state = + MutexOwner.State.new( + mutex_key: "test:mutex:state", + on_acquired: fn -> :ok end + ) + + assert state.consecutive_redis_errors == 0 + assert is_binary(state.mutex_token) + end + end - # Use REJECT so TCP gets immediate ECONNREFUSED + # ── Integration tests: require Redis + NET_ADMIN, run with --include integration ── + + # Use REJECT so TCP gets immediate ECONNREFUSED rather than hanging defp block_redis do System.cmd("iptables", ["-A", "OUTPUT", "-p", "tcp", "--dport", "6379", "-j", "REJECT"]) end @@ -16,15 +96,17 @@ defmodule Sequin.MutexOwnerTest do defp unique_name, do: :"test_mutex_owner_#{System.unique_integer([:positive])}" - setup do - unblock_redis() - on_exit(fn -> unblock_redis() end) - :ok - end + describe "Redis outage resilience (integration)" do + @describetag :integration + @moduletag timeout: 120_000 + + setup do + unblock_redis() + on_exit(fn -> unblock_redis() end) + :ok + end - describe "Redis outage resilience" do - @tag :integration - test "survives Redis going down and does not crash" do + test "survives Redis going down and recovers when it comes back" do test_pid = self() mutex_key = "test:mutex_owner:survive:#{System.unique_integer([:positive])}" @@ -37,33 +119,23 @@ defmodule Sequin.MutexOwnerTest do ) assert_receive :mutex_acquired, 5000 - assert Process.alive?(pid) - ref = Process.monitor(pid) - # Simulate Dragonfly/Redis restart + # Simulate Dragonfly/Redis redeploy block_redis() - - # Wait long enough for multiple keep_mutex timeout cycles Process.sleep(15_000) - # MutexOwner must NOT have crashed - assert Process.alive?(pid), "MutexOwner crashed when Redis went down — should retry indefinitely" + assert Process.alive?(pid), "MutexOwner crashed when Redis went down" refute_receive {:DOWN, ^ref, :process, ^pid, _reason} - # Bring Redis back and verify recovery + # Bring Redis back unblock_redis() - - # Wait for eredis to reconnect and MutexOwner to successfully touch the mutex. - # eredis reconnect_sleep is 5s by default, plus the current backoff interval. Process.sleep(20_000) - assert Process.alive?(pid), "MutexOwner should still be alive after Redis recovers" - + assert Process.alive?(pid), "MutexOwner should recover after Redis returns" GenStateMachine.stop(pid, :normal) end - @tag :integration test "never crashes regardless of how long Redis is down" do test_pid = self() mutex_key = "test:mutex_owner:never_crash:#{System.unique_integer([:positive])}" @@ -77,12 +149,9 @@ defmodule Sequin.MutexOwnerTest do ) assert_receive :mutex_acquired, 5000 - ref = Process.monitor(pid) block_redis() - - # Wait a long time — this must never crash Process.sleep(25_000) assert Process.alive?(pid), "MutexOwner must never crash from Redis being unavailable" @@ -95,17 +164,4 @@ defmodule Sequin.MutexOwnerTest do GenStateMachine.stop(pid, :normal) end end - - describe "State struct" do - test "includes consecutive_redis_errors field defaulting to 0" do - state = - MutexOwner.State.new( - mutex_key: "test:mutex:state", - on_acquired: fn -> :ok end - ) - - assert state.consecutive_redis_errors == 0 - assert is_binary(state.mutex_token) - end - end end