From bbee19852327f84b9bdf7fc0431e767e05c5a2ac Mon Sep 17 00:00:00 2001 From: Garry Hill Date: Tue, 24 Feb 2026 16:33:17 +0000 Subject: [PATCH 1/7] scale sqlite pool to 0 --- .changeset/unlucky-files-protect.md | 5 + .../shape_status/shape_db/connection.ex | 63 ++++++- .../shape_status/shape_db/query.ex | 18 +- .../shape_status/shape_db/statistics.ex | 168 ++++++++++++------ .../shape_status/shape_db/supervisor.ex | 16 +- .../lib/electric/stack_supervisor.ex | 3 + .../electric/stack_supervisor/telemetry.ex | 3 +- .../shape_status/shape_db_test.exs | 47 +++++ 8 files changed, 258 insertions(+), 65 deletions(-) create mode 100644 .changeset/unlucky-files-protect.md diff --git a/.changeset/unlucky-files-protect.md b/.changeset/unlucky-files-protect.md new file mode 100644 index 0000000000..ba4cf044e0 --- /dev/null +++ b/.changeset/unlucky-files-protect.md @@ -0,0 +1,5 @@ +--- +'@core/sync-service': patch +--- + +Add SQLite connection pool scaling to minimize memory usage in quiet instances and disable SQLite metrics collection by default diff --git a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex index 26814178d3..2eae6c4e40 100644 --- a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex +++ b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex @@ -4,6 +4,7 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do alias Exqlite.Sqlite3 alias Electric.ShapeCache.ShapeStatus.ShapeDb.Query alias Electric.ShapeCache.ShapeStatus.ShapeDb.PoolRegistry + alias Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics alias Electric.Telemetry.OpenTelemetry require Logger @@ -65,7 +66,7 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do defguardp is_raw_connection(conn) when is_reference(conn) - defstruct [:conn, :stmts] + defstruct [:conn, :mode, :stmts] def migrate(conn, opts) when is_raw_connection(conn) do # because we embed the storage version into the db path @@ -143,7 +144,14 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do @impl NimblePool def init_worker(pool_state) do - if Keyword.get(pool_state, :exclusive_mode, false) do + with {:ok, conn} <- init_worker_for_pool(pool_state) do + :ok = Statistics.worker_start(Keyword.get(pool_state, :stack_id)) + {:ok, conn, pool_state} + end + end + + defp init_worker_for_pool(pool_state) do + if(Keyword.get(pool_state, :exclusive_mode, false)) do init_worker_exclusive(pool_state) else init_worker_pooled(pool_state) @@ -151,9 +159,10 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do end defp init_worker_pooled(pool_state) do - with {:ok, conn} <- open(pool_state), - stmts <- Query.prepare!(conn, pool_state) do - {:ok, %__MODULE__{conn: conn, stmts: stmts}, pool_state} + with mode = Keyword.get(pool_state, :mode, :readwrite), + {:ok, conn} <- open(pool_state), + stmts <- Query.prepare!(conn, mode) do + {:ok, %__MODULE__{conn: conn, mode: mode, stmts: stmts}} end end @@ -163,10 +172,11 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do # over the same, single, connection since every connection is a separate db # in in-memory mode. defp init_worker_exclusive(pool_state) do - with {:ok, conn} <- open(pool_state, integrity_check: true), + with mode = :readwrite, + {:ok, conn} <- open(pool_state, integrity_check: true), {:ok, _version} <- migrate(conn, pool_state), - stmts <- Query.prepare!(conn, Keyword.put(pool_state, :mode, :readwrite)) do - {:ok, %__MODULE__{conn: conn, stmts: stmts}, pool_state} + stmts <- Query.prepare!(conn, mode) do + {:ok, %__MODULE__{conn: conn, mode: mode, stmts: stmts}} end end @@ -198,6 +208,23 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do {:ok, client_state, pool_state} end + @impl NimblePool + def handle_ping(%__MODULE__{} = state, pool_state) do + Logger.debug(fn -> ["Closing idle SQLite ", to_string(state.mode), " connection"] end) + + # the idle timeout is only enabled in non-exclusive mode, so we're free + # to close all the connections, including write. + + :ok = close(state) + :ok = Statistics.worker_stop(Keyword.get(pool_state, :stack_id)) + + {:remove, :idle} + end + + def shrink_memory(conn) when is_raw_connection(conn) do + execute(conn, "PRAGMA shrink_memory") + end + @max_recovery_attempts 2 def open(pool_state, opts \\ []) do @@ -270,6 +297,26 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do end end + def close(%__MODULE__{conn: conn, stmts: stmts}) do + # Need to release all the prepared statements on the connection or the + # close doesn't actually release the resources. + stmts + |> Query.active_stmts() + |> Enum.each(fn {name, stmt} -> + case Sqlite3.release(conn, stmt) do + :ok -> + :ok + + {:error, reason} -> + Logger.warning( + "Failed to release prepared statement #{inspect(name)}: #{inspect(reason)}" + ) + end + end) + + close(conn) + end + def close(conn) when is_raw_connection(conn) do Sqlite3.close(conn) end diff --git a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/query.ex b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/query.ex index da8ef01345..7d97306783 100644 --- a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/query.ex +++ b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/query.ex @@ -52,7 +52,9 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Query do Keyword.take(@read_queries, [:handle_lookup]) ) - defstruct Enum.uniq(Keyword.keys(@read_queries) ++ Keyword.keys(@write_queries)) + @stmt_names Enum.uniq(Keyword.keys(@read_queries) ++ Keyword.keys(@write_queries)) + + defstruct @stmt_names alias Electric.ShapeCache.ShapeStatus.ShapeDb.Connection, as: Conn @@ -68,8 +70,18 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Query do stream_query: 3 ] - def prepare!(conn, opts) do - case Keyword.get(opts, :mode, :readwrite) do + def active_stmts(nil) do + [] + end + + def active_stmts(%__MODULE__{} = query) do + @stmt_names + |> Enum.map(&{&1, Map.fetch!(query, &1)}) + |> Enum.reject(&is_nil(elem(&1, 1))) + end + + def prepare!(conn, mode) do + case mode do :readwrite -> struct(__MODULE__, prepare_stmts!(conn, @read_queries ++ @write_queries)) diff --git a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/statistics.ex b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/statistics.ex index 2adb0a9c11..d7520d16e9 100644 --- a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/statistics.ex +++ b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/statistics.ex @@ -15,7 +15,7 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics do use GenServer - alias Electric.ShapeCache.ShapeStatus.ShapeDb + alias Electric.ShapeCache.ShapeStatus.ShapeDb.Connection require Logger @@ -33,6 +33,18 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics do GenServer.call(name(stack_id), :statistics) end + def initialize(stack_id) do + GenServer.cast(name(stack_id), :initialize) + end + + def worker_start(stack_id) do + GenServer.cast(name(stack_id), {:worker_incr, 1}) + end + + def worker_stop(stack_id) do + GenServer.cast(name(stack_id), {:worker_incr, -1}) + end + @impl GenServer def init(args) do stack_id = Keyword.fetch!(args, :stack_id) @@ -40,75 +52,130 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics do Process.set_label({:shape_db_statistics, stack_id}) Logger.metadata(stack_id: stack_id) Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id) - + measurement_period = Keyword.get(args, :statistics_collection_period, @measurement_period) enable_memory_stats? = Keyword.get(args, :enable_memory_stats?, false) {:ok, %{ stack_id: stack_id, page_size: 0, - memstat_available?: false, - stats: %{} - }, {:continue, {:initialize_stats, enable_memory_stats?}}} + stats: %{}, + connections: 0, + use_pool?: Keyword.get(args, :exclusive_mode, false), + enable_memory_stats?: enable_memory_stats?, + measurement_period: measurement_period, + pool_opts: args + }} end @impl GenServer - def handle_continue({:initialize_stats, enable_memory_stats?}, state) do - %{stack_id: stack_id} = state - - {:ok, {page_size, memstat_available?}} = - ShapeDb.Connection.checkout_write!(stack_id, :read_stats, fn %{conn: conn} -> - memstat_available? = - if enable_memory_stats? do - # don't even try to load the extension unless enabled -- loading the extension - # may be the cause of segfaults we've seen in prod - case ShapeDb.Connection.enable_extension(conn, "memstat") do - :ok -> - Logger.notice("SQLite memory statistics enabled") - - true - - {:error, reason} -> - Logger.warning( - "Failed to load memstat SQLite extension: #{inspect(reason)}. " <> - "Memory statistics will not be available." - ) - - false - end - else - false - end + def handle_info(:read_stats, state) do + {:noreply, read_stats(state), :hibernate} + end - {:ok, [page_size]} = ShapeDb.Connection.fetch_one(conn, "PRAGMA page_size", []) + @impl GenServer + def handle_call(:statistics, _from, state) do + {:reply, {:ok, Map.put(state.stats, :connections, state.connections)}, state} + end - {:ok, {page_size, memstat_available?}} + @impl GenServer + def handle_cast({:worker_incr, incr}, state) do + state = + state + |> Map.update!(:connections, &(&1 + incr)) + |> tap(fn state -> + Logger.debug([ + if(incr > 0, do: "Opening ", else: "Closing "), + "ShapeDb connection: #{state.connections} active connections" + ]) end) - {:noreply, - read_stats(%{state | page_size: page_size, memstat_available?: memstat_available?}), - :hibernate} + {:noreply, state} end - @impl GenServer - def handle_info(:read_stats, state) do - {:noreply, read_stats(state), :hibernate} + def handle_cast(:initialize, state) do + {:noreply, read_stats(state, _force = true)} end - @impl GenServer - def handle_call(:statistics, _from, state) do - {:reply, {:ok, state.stats}, state} + defp read_stats(state, force? \\ false) + + # If the pools have no open connections, then don't read memory usage because + # the report would only include memory used by the temporary statistics + # connection. We're assuming that 0 open connections == 0 sqlite memory + # usage, which seems reasonable + defp read_stats(%{connections: 0} = state, false) do + do_read_stats(state, false) end - defp read_stats(%{stack_id: stack_id, memstat_available?: memstat_available?} = state) do - {:ok, stats} = - ShapeDb.Connection.checkout_write!(stack_id, :read_stats, fn %{conn: conn} -> - ShapeDb.Connection.fetch_all(conn, stats_query(memstat_available?), []) - end) + defp read_stats(state, _force?) do + do_read_stats(state, true) + end - Process.send_after(self(), :read_stats, @measurement_period) + defp do_read_stats(state, include_memory?) do + state + |> open_connection(fn conn -> + with {:ok, memstat_available?, page_size} <- + initialize_connection(conn, state.enable_memory_stats?), + {:ok, stats} <- + Connection.fetch_all( + conn, + stats_query(memstat_available? && include_memory?), + [] + ) do + {:ok, analyze_stats(stats, page_size)} + end + end) + |> case do + {:ok, stats} -> + %{state | stats: stats} + + {:error, reason} -> + Logger.warning(["Failed to read statistics: ", inspect(reason)]) + state + end + |> tap(fn state -> + Process.send_after(self(), :read_stats, state.measurement_period) + end) + end - %{state | stats: analyze_stats(stats, state.page_size)} + defp open_connection(%{use_pool?: true} = state, fun) do + Connection.checkout_write!(state.stack_id, :read_stats, fn %{conn: conn} -> + fun.(conn) + end) + end + + defp open_connection(%{use_pool?: false, pool_opts: pool_opts} = _state, fun) do + with {:ok, conn} <- Connection.open(pool_opts) do + try do + fun.(conn) + after + Connection.close(conn) + end + end + end + + defp initialize_connection(conn, enable_memory_stats?) do + memstat_available? = + if enable_memory_stats? do + case Connection.enable_extension(conn, "memstat") do + :ok -> + true + + {:error, reason} -> + Logger.warning( + "Failed to load memstat SQLite extension: #{inspect(reason)}. " <> + "Memory statistics will not be available." + ) + + false + end + else + false + end + + with {:ok, [page_size]} <- Connection.fetch_one(conn, "PRAGMA page_size", []) do + {:ok, memstat_available?, page_size} + end end defp stats_query(true) do @@ -147,7 +214,8 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics do total_memory: memory_used + pagecache_used + pagecache_overflow, page_cache_overflow: pagecache_overflow, disk_size: disk_size, - data_size: data_size + data_size: data_size, + updated_at: DateTime.utc_now() } end) end diff --git a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/supervisor.ex b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/supervisor.ex index 0616307629..8abdfd3444 100644 --- a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/supervisor.ex +++ b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/supervisor.ex @@ -13,11 +13,16 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Supervisor do Supervisor.start_link(__MODULE__, opts, name: name(opts)) end + @default_connection_idle_timeout 30_000 + def init(opts) do shape_db_opts = Keyword.fetch!(opts, :shape_db_opts) stack_id = Keyword.fetch!(opts, :stack_id) opts = Keyword.put(shape_db_opts, :stack_id, stack_id) exclusive_mode = Keyword.get(opts, :exclusive_mode, false) + idle_timeout = Keyword.get(opts, :connection_idle_timeout, @default_connection_idle_timeout) + # don't close the write connection in exclusive mode + write_pool_idle_timeout = if(exclusive_mode, do: nil, else: idle_timeout) read_pool_spec = if exclusive_mode do @@ -30,7 +35,9 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Supervisor do NimblePool, worker: {ShapeDb.Connection, Keyword.put(opts, :mode, :read)}, pool_size: Keyword.get(opts, :read_pool_size, 2 * System.schedulers_online()), - name: ShapeDb.PoolRegistry.pool_name(stack_id, :read, exclusive_mode) + name: ShapeDb.PoolRegistry.pool_name(stack_id, :read, exclusive_mode), + worker_idle_timeout: idle_timeout, + lazy: true }, id: {:pool, :read} ) @@ -41,6 +48,7 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Supervisor do Enum.concat([ [ {ShapeDb.PoolRegistry, stack_id: stack_id}, + {ShapeDb.Statistics, opts}, {ShapeDb.Migrator, opts} ], read_pool_spec, @@ -51,12 +59,14 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Supervisor do {NimblePool, worker: {ShapeDb.Connection, Keyword.put(opts, :mode, :write)}, pool_size: 1, - name: ShapeDb.PoolRegistry.pool_name(stack_id, :write, exclusive_mode)}, + name: ShapeDb.PoolRegistry.pool_name(stack_id, :write, exclusive_mode), + worker_idle_timeout: write_pool_idle_timeout, + lazy: not exclusive_mode}, id: {:pool, :write} ), # write buffer for batching SQLite writes to avoid timeout cascades {ShapeDb.WriteBuffer, opts}, - {ShapeDb.Statistics, opts} + {Task, fn -> ShapeDb.Statistics.initialize(stack_id) end} ] ]) diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index b222dddf6b..c6dcdc0d8a 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -165,6 +165,9 @@ defmodule Electric.StackSupervisor do exclusive_mode: [type: :boolean], synchronous: [type: :string], cache_size: [type: :integer], + connection_idle_timeout: [type: :integer], + read_pool_size: [type: :integer], + statistics_collection_period: [type: :integer], enable_memory_stats?: [type: :boolean] ] ], diff --git a/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex b/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex index 6f760a85f2..f22716372a 100644 --- a/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex +++ b/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex @@ -54,7 +54,8 @@ defmodule Electric.StackSupervisor.Telemetry do unit: :byte ), Telemetry.Metrics.last_value("electric.shape_db.sqlite.total_memory", unit: :byte), - Telemetry.Metrics.last_value("electric.shape_db.sqlite.disk_size", unit: :byte) + Telemetry.Metrics.last_value("electric.shape_db.sqlite.disk_size", unit: :byte), + Telemetry.Metrics.last_value("electric.shape_db.sqlite.connections") ] end diff --git a/packages/sync-service/test/electric/shape_cache/shape_status/shape_db_test.exs b/packages/sync-service/test/electric/shape_cache/shape_status/shape_db_test.exs index 27f2e606d4..371986403a 100644 --- a/packages/sync-service/test/electric/shape_cache/shape_status/shape_db_test.exs +++ b/packages/sync-service/test/electric/shape_cache/shape_status/shape_db_test.exs @@ -622,4 +622,51 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDbTest do end end end + + describe "pool scaling" do + @tag shape_db_opts: [connection_idle_timeout: 10, statistics_collection_period: 10] + test "scales to 0 if nothing is active", ctx do + ShapeDb.Connection.checkout!(ctx.stack_id, :test, fn _conn -> + Process.sleep(1) + {:ok, %{connections: n}} = ShapeDb.statistics(ctx.stack_id) + assert n == 1 + end) + + assert :ok = assert_zero_memory(ctx) + end + + @tag shape_db_opts: [ + exclusive_mode: true, + connection_idle_timeout: 10, + statistics_collection_period: 10 + ] + test "does not scale the pool in exclusive mode", ctx do + ShapeDb.Connection.checkout!(ctx.stack_id, :test, fn _conn -> + Process.sleep(1) + {:ok, %{connections: n}} = ShapeDb.statistics(ctx.stack_id) + assert n == 1 + end) + + :error = assert_zero_memory(ctx) + end + + defp assert_zero_memory(ctx, repeats \\ 10) + + defp assert_zero_memory(_ctx, 0) do + :error + end + + defp assert_zero_memory(ctx, repeats) do + {:ok, stats} = ShapeDb.statistics(ctx.stack_id) + + case stats do + %{connections: 0, total_memory: 0} -> + :ok + + _ -> + Process.sleep(10) + assert_zero_memory(ctx, repeats - 1) + end + end + end end From f45284dd26ac2993894e642f92d13b8e8f2dd2f7 Mon Sep 17 00:00:00 2001 From: Garry Hill Date: Thu, 26 Feb 2026 10:16:41 +0000 Subject: [PATCH 2/7] pr feedback --- .../shape_status/shape_db/connection.ex | 20 ++++++------ .../shape_status/shape_db/statistics.ex | 30 +++++++++++++---- .../shape_status/shape_db_test.exs | 32 ++++++++----------- 3 files changed, 48 insertions(+), 34 deletions(-) diff --git a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex index 2eae6c4e40..b8bdf8c7cc 100644 --- a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex +++ b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex @@ -209,20 +209,22 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do end @impl NimblePool - def handle_ping(%__MODULE__{} = state, pool_state) do - Logger.debug(fn -> ["Closing idle SQLite ", to_string(state.mode), " connection"] end) - + def handle_ping(%__MODULE__{} = _conn, _pool_state) do # the idle timeout is only enabled in non-exclusive mode, so we're free # to close all the connections, including write. - - :ok = close(state) - :ok = Statistics.worker_stop(Keyword.get(pool_state, :stack_id)) - {:remove, :idle} end - def shrink_memory(conn) when is_raw_connection(conn) do - execute(conn, "PRAGMA shrink_memory") + @impl NimblePool + def terminate_worker(reason, conn, pool_state) do + Logger.debug(fn -> + ["Closing SQLite ", to_string(conn.mode), " connection for reason: ", inspect(reason)] + end) + + _ = close(conn) + :ok = Statistics.worker_stop(Keyword.get(pool_state, :stack_id)) + + {:ok, pool_state} end @max_recovery_attempts 2 diff --git a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/statistics.ex b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/statistics.ex index d7520d16e9..e296bd6356 100644 --- a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/statistics.ex +++ b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/statistics.ex @@ -61,7 +61,8 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics do page_size: 0, stats: %{}, connections: 0, - use_pool?: Keyword.get(args, :exclusive_mode, false), + first_run?: true, + exclusive_mode?: Keyword.get(args, :exclusive_mode, false), enable_memory_stats?: enable_memory_stats?, measurement_period: measurement_period, pool_opts: args @@ -115,7 +116,7 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics do state |> open_connection(fn conn -> with {:ok, memstat_available?, page_size} <- - initialize_connection(conn, state.enable_memory_stats?), + initialize_connection(conn, state.first_run?, state.enable_memory_stats?), {:ok, stats} <- Connection.fetch_all( conn, @@ -130,21 +131,37 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics do %{state | stats: stats} {:error, reason} -> - Logger.warning(["Failed to read statistics: ", inspect(reason)]) + Logger.warning(["Failed to read SQLite statistics: ", inspect(reason)]) + state + + :error -> + Logger.warning("Failed to read SQLite statistics") state end + |> then(fn + %{first_run?: true} = state -> + %{state | first_run?: false} + + state -> + state + end) |> tap(fn state -> Process.send_after(self(), :read_stats, state.measurement_period) end) end - defp open_connection(%{use_pool?: true} = state, fun) do + # In exclusive_mode we *must* use a pooled connection because the db maybe + # in-memory. This is ok because in this mode we never close the single + # connection instance so using it won't prevent closing idle connections. + defp open_connection(%{exclusive_mode?: true} = state, fun) do Connection.checkout_write!(state.stack_id, :read_stats, fn %{conn: conn} -> fun.(conn) end) end - defp open_connection(%{use_pool?: false, pool_opts: pool_opts} = _state, fun) do + # read the stats over a completely new connection to avoid waking a pool + # worker and preventing it from reaching the idle timeout + defp open_connection(%{exclusive_mode?: false, pool_opts: pool_opts} = _state, fun) do with {:ok, conn} <- Connection.open(pool_opts) do try do fun.(conn) @@ -154,11 +171,12 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics do end end - defp initialize_connection(conn, enable_memory_stats?) do + defp initialize_connection(conn, first_run?, enable_memory_stats?) do memstat_available? = if enable_memory_stats? do case Connection.enable_extension(conn, "memstat") do :ok -> + if first_run?, do: Logger.info("SQLite memory statistics enabled") true {:error, reason} -> diff --git a/packages/sync-service/test/electric/shape_cache/shape_status/shape_db_test.exs b/packages/sync-service/test/electric/shape_cache/shape_status/shape_db_test.exs index 371986403a..b50580fd75 100644 --- a/packages/sync-service/test/electric/shape_cache/shape_status/shape_db_test.exs +++ b/packages/sync-service/test/electric/shape_cache/shape_status/shape_db_test.exs @@ -624,15 +624,13 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDbTest do end describe "pool scaling" do - @tag shape_db_opts: [connection_idle_timeout: 10, statistics_collection_period: 10] + @tag shape_db_opts: [connection_idle_timeout: 5, statistics_collection_period: 5] test "scales to 0 if nothing is active", ctx do ShapeDb.Connection.checkout!(ctx.stack_id, :test, fn _conn -> - Process.sleep(1) - {:ok, %{connections: n}} = ShapeDb.statistics(ctx.stack_id) - assert n == 1 + assert :ok = assert_stats_match(ctx, connections: 1) end) - assert :ok = assert_zero_memory(ctx) + assert :ok = assert_stats_match(ctx, connections: 0, total_memory: 0) end @tag shape_db_opts: [ @@ -642,30 +640,26 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDbTest do ] test "does not scale the pool in exclusive mode", ctx do ShapeDb.Connection.checkout!(ctx.stack_id, :test, fn _conn -> - Process.sleep(1) - {:ok, %{connections: n}} = ShapeDb.statistics(ctx.stack_id) - assert n == 1 + assert :ok = assert_stats_match(ctx, connections: 1) end) - :error = assert_zero_memory(ctx) + assert :error = assert_stats_match(ctx, connections: 0, total_memory: 0) end - defp assert_zero_memory(ctx, repeats \\ 10) + defp assert_stats_match(ctx, match, repeats \\ 10) - defp assert_zero_memory(_ctx, 0) do + defp assert_stats_match(_ctx, _match, 0) do :error end - defp assert_zero_memory(ctx, repeats) do + defp assert_stats_match(ctx, match, repeats) do {:ok, stats} = ShapeDb.statistics(ctx.stack_id) - case stats do - %{connections: 0, total_memory: 0} -> - :ok - - _ -> - Process.sleep(10) - assert_zero_memory(ctx, repeats - 1) + if Enum.all?(match, fn {k, v} -> stats[k] == v end) do + :ok + else + Process.sleep(10) + assert_stats_match(ctx, match, repeats - 1) end end end From acaa5b0b2bef2d44cd38e3aba107a254a357da9f Mon Sep 17 00:00:00 2001 From: Garry Hill Date: Thu, 26 Feb 2026 15:03:22 +0000 Subject: [PATCH 3/7] make stats reading async so stats retrieval is not blocked by concurrent reading of stats --- .../shape_status/shape_db/connection.ex | 14 ++- .../shape_status/shape_db/statistics.ex | 90 +++++++++++-------- .../shape_status/shape_db/supervisor.ex | 1 + .../shape_status/shape_db_test.exs | 24 +++-- 4 files changed, 85 insertions(+), 44 deletions(-) diff --git a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex index b8bdf8c7cc..17f58c3f36 100644 --- a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex +++ b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex @@ -209,10 +209,16 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do end @impl NimblePool - def handle_ping(%__MODULE__{} = _conn, _pool_state) do - # the idle timeout is only enabled in non-exclusive mode, so we're free - # to close all the connections, including write. - {:remove, :idle} + def handle_ping(%__MODULE__{} = conn, pool_state) do + if Keyword.get(pool_state, :exclusive_mode, false) do + # keep the write connection alive in exclusive mode — closing it + # would destroy an in-memory database + {:ok, conn} + else + # the idle timeout is only enabled in non-exclusive mode, so we're free + # to close all the connections, including write. + {:remove, :idle} + end end @impl NimblePool diff --git a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/statistics.ex b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/statistics.ex index e296bd6356..1f67a0c551 100644 --- a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/statistics.ex +++ b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/statistics.ex @@ -65,6 +65,7 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics do exclusive_mode?: Keyword.get(args, :exclusive_mode, false), enable_memory_stats?: enable_memory_stats?, measurement_period: measurement_period, + task: nil, pool_opts: args }} end @@ -74,6 +75,41 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics do {:noreply, read_stats(state), :hibernate} end + def handle_info({ref, read_stats_result}, %{task: %{ref: ref}} = state) do + state = + case read_stats_result do + {:ok, stats} -> + %{state | stats: stats} + + {:error, reason} -> + Logger.warning(["Failed to read SQLite statistics: ", inspect(reason)]) + state + + :error -> + Logger.warning("Failed to read SQLite statistics") + state + end + |> then(fn + %{first_run?: true} = state -> + %{state | first_run?: false} + + state -> + state + end) + + {:noreply, state} + end + + def handle_info({:DOWN, ref, :process, _pid, _reason}, %{task: %{ref: ref}} = state) do + Process.send_after(self(), :read_stats, state.measurement_period) + {:noreply, %{state | task: nil}} + end + + def handle_info(msg, state) do + Logger.warning(["Received unexpected message: ", inspect(msg)]) + {:noreply, state} + end + @impl GenServer def handle_call(:statistics, _from, state) do {:reply, {:ok, Map.put(state.stats, :connections, state.connections)}, state} @@ -113,41 +149,25 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics do end defp do_read_stats(state, include_memory?) do - state - |> open_connection(fn conn -> - with {:ok, memstat_available?, page_size} <- - initialize_connection(conn, state.first_run?, state.enable_memory_stats?), - {:ok, stats} <- - Connection.fetch_all( - conn, - stats_query(memstat_available? && include_memory?), - [] - ) do - {:ok, analyze_stats(stats, page_size)} - end - end) - |> case do - {:ok, stats} -> - %{state | stats: stats} - - {:error, reason} -> - Logger.warning(["Failed to read SQLite statistics: ", inspect(reason)]) - state - - :error -> - Logger.warning("Failed to read SQLite statistics") - state - end - |> then(fn - %{first_run?: true} = state -> - %{state | first_run?: false} + # Read the stats in an async task so that we don't block reading the stats + # and get spurious timeout errors + task = + Task.async(fn -> + open_connection(state, fn conn -> + with {:ok, memstat_available?, page_size} <- + initialize_connection(conn, state.first_run?, state.enable_memory_stats?), + {:ok, stats} <- + Connection.fetch_all( + conn, + stats_query(memstat_available? && include_memory?), + [] + ) do + {:ok, analyze_stats(stats, page_size)} + end + end) + end) - state -> - state - end) - |> tap(fn state -> - Process.send_after(self(), :read_stats, state.measurement_period) - end) + %{state | task: task} end # In exclusive_mode we *must* use a pooled connection because the db maybe @@ -176,7 +196,7 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics do if enable_memory_stats? do case Connection.enable_extension(conn, "memstat") do :ok -> - if first_run?, do: Logger.info("SQLite memory statistics enabled") + if first_run?, do: Logger.notice("SQLite memory statistics enabled") true {:error, reason} -> diff --git a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/supervisor.ex b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/supervisor.ex index 8abdfd3444..765756f4e5 100644 --- a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/supervisor.ex +++ b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/supervisor.ex @@ -22,6 +22,7 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Supervisor do exclusive_mode = Keyword.get(opts, :exclusive_mode, false) idle_timeout = Keyword.get(opts, :connection_idle_timeout, @default_connection_idle_timeout) # don't close the write connection in exclusive mode + # NimblePool treats `worker_idle_timeout: nil` as no idle timeout write_pool_idle_timeout = if(exclusive_mode, do: nil, else: idle_timeout) read_pool_spec = diff --git a/packages/sync-service/test/electric/shape_cache/shape_status/shape_db_test.exs b/packages/sync-service/test/electric/shape_cache/shape_status/shape_db_test.exs index b50580fd75..626ecba8c1 100644 --- a/packages/sync-service/test/electric/shape_cache/shape_status/shape_db_test.exs +++ b/packages/sync-service/test/electric/shape_cache/shape_status/shape_db_test.exs @@ -548,17 +548,31 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDbTest do describe "statistics" do @tag shape_db_opts: [enable_memory_stats?: true] test "export memory and disk usage when enabled", ctx do - assert {:ok, %{total_memory: memory, disk_size: disk_size}} = - ShapeDb.statistics(ctx.stack_id) + assert {:ok, %{total_memory: memory, disk_size: disk_size}} = wait_statistics(ctx) assert memory > 0 assert disk_size > 0 end test "only exports disk usage by default", ctx do - assert {:ok, %{total_memory: 0, disk_size: disk_size}} = ShapeDb.statistics(ctx.stack_id) + assert {:ok, %{total_memory: 0, disk_size: disk_size}} = wait_statistics(ctx) assert disk_size > 0 end + + defp wait_statistics(ctx, attempts \\ 10) + + defp wait_statistics(_ctx, 0), do: :error + + defp wait_statistics(ctx, remaining_attempts) do + case ShapeDb.statistics(ctx.stack_id) do + {:ok, %{updated_at: %DateTime{}} = stats} -> + {:ok, stats} + + {:ok, _} -> + Process.sleep(10) + wait_statistics(ctx, remaining_attempts - 1) + end + end end describe "recovery" do @@ -636,14 +650,14 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDbTest do @tag shape_db_opts: [ exclusive_mode: true, connection_idle_timeout: 10, - statistics_collection_period: 10 + statistics_collection_period: 1000 ] test "does not scale the pool in exclusive mode", ctx do ShapeDb.Connection.checkout!(ctx.stack_id, :test, fn _conn -> assert :ok = assert_stats_match(ctx, connections: 1) end) - assert :error = assert_stats_match(ctx, connections: 0, total_memory: 0) + assert :error = assert_stats_match(ctx, connections: 0) end defp assert_stats_match(ctx, match, repeats \\ 10) From f5e111290cbe28eea93fd8b64aba50de84978f52 Mon Sep 17 00:00:00 2001 From: Garry Hill Date: Tue, 3 Mar 2026 11:30:06 +0000 Subject: [PATCH 4/7] disable stats collection completely by default and always use a pooled connection for the retreival --- packages/sync-service/config/runtime.exs | 1 + .../sync-service/lib/electric/application.ex | 1 + packages/sync-service/lib/electric/config.ex | 1 + .../shape_status/shape_db/statistics.ex | 84 +++++++++---------- .../lib/electric/stack_supervisor.ex | 1 + .../shape_status/shape_db_test.exs | 31 ++++++- 6 files changed, 70 insertions(+), 49 deletions(-) diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index f4e69fccbc..58424820c6 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -283,6 +283,7 @@ config :electric, shape_db_synchronous: env!("ELECTRIC_SHAPE_DB_SYNCHRONOUS", :string, nil), shape_db_cache_size: env!("ELECTRIC_SHAPE_DB_CACHE_SIZE", &Electric.Config.parse_human_readable_size!/1, nil), + shape_db_enable_stats: env!("ELECTRIC_SHAPE_DB_ENABLE_STATS", :boolean, nil), shape_db_enable_memory_stats: env!("ELECTRIC_SHAPE_DB_ENABLE_MEMORY_STATS", :boolean, nil) if Electric.telemetry_enabled?() do diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 5c93526cf0..dc74c2fe5d 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -155,6 +155,7 @@ defmodule Electric.Application do end), synchronous: get_env(opts, :shape_db_synchronous), cache_size: get_env(opts, :shape_db_cache_size), + enable_stats?: get_env(opts, :shape_db_enable_stats), enable_memory_stats?: get_env(opts, :shape_db_enable_memory_stats) ] ) diff --git a/packages/sync-service/lib/electric/config.ex b/packages/sync-service/lib/electric/config.ex index 130b009129..a4d29f3fcb 100644 --- a/packages/sync-service/lib/electric/config.ex +++ b/packages/sync-service/lib/electric/config.ex @@ -113,6 +113,7 @@ defmodule Electric.Config do shape_db_synchronous: Electric.ShapeCache.ShapeStatus.ShapeDb.Connection.default!(:synchronous), shape_db_cache_size: Electric.ShapeCache.ShapeStatus.ShapeDb.Connection.default!(:cache_size), + shape_db_enable_stats: false, shape_db_enable_memory_stats: false ] diff --git a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/statistics.ex b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/statistics.ex index 1f67a0c551..1803db99d3 100644 --- a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/statistics.ex +++ b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/statistics.ex @@ -19,6 +19,13 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics do require Logger + defstruct total_memory: 0, + page_cache_overflow: 0, + disk_size: 0, + data_size: 0, + connections: 0, + updated_at: nil + @measurement_period 60_000 def name(stack_ref) do @@ -52,21 +59,22 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics do Process.set_label({:shape_db_statistics, stack_id}) Logger.metadata(stack_id: stack_id) Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id) + measurement_period = Keyword.get(args, :statistics_collection_period, @measurement_period) + enable_stats? = Keyword.get(args, :enable_stats?, false) + # don't need to && with enable_stats because if enable_stats? is false, + # we never test this secondary flag enable_memory_stats? = Keyword.get(args, :enable_memory_stats?, false) {:ok, %{ stack_id: stack_id, - page_size: 0, - stats: %{}, + stats: %__MODULE__{}, connections: 0, - first_run?: true, - exclusive_mode?: Keyword.get(args, :exclusive_mode, false), + enable_stats?: enable_stats?, enable_memory_stats?: enable_memory_stats?, measurement_period: measurement_period, - task: nil, - pool_opts: args + task: nil }} end @@ -89,13 +97,6 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics do Logger.warning("Failed to read SQLite statistics") state end - |> then(fn - %{first_run?: true} = state -> - %{state | first_run?: false} - - state -> - state - end) {:noreply, state} end @@ -112,7 +113,7 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics do @impl GenServer def handle_call(:statistics, _from, state) do - {:reply, {:ok, Map.put(state.stats, :connections, state.connections)}, state} + {:reply, {:ok, Map.from_struct(%{state.stats | connections: state.connections})}, state} end @impl GenServer @@ -131,31 +132,41 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics do end def handle_cast(:initialize, state) do - {:noreply, read_stats(state, _force = true)} + {:noreply, read_stats(state, _force = true, _first = true)} end - defp read_stats(state, force? \\ false) + defp read_stats(state, force? \\ false, first? \\ false) + + # If stats are globally disabled then the only call to this function + # is from the cast(:initialize) startup event. Since we don't actually + # collect the stats here we also never schedule the periodic collection + defp read_stats(%{enable_stats?: false} = state, _force?, _first?) do + Logger.notice("SQLite statistics disabled") + state + end # If the pools have no open connections, then don't read memory usage because # the report would only include memory used by the temporary statistics # connection. We're assuming that 0 open connections == 0 sqlite memory # usage, which seems reasonable - defp read_stats(%{connections: 0} = state, false) do - do_read_stats(state, false) + defp read_stats(%{connections: 0} = state, _force? = false, first?) do + do_read_stats(state, false, first?) end - defp read_stats(state, _force?) do - do_read_stats(state, true) + defp read_stats(state, _force?, first?) do + do_read_stats(state, true, first?) end - defp do_read_stats(state, include_memory?) do + defp do_read_stats(state, include_memory?, first?) do # Read the stats in an async task so that we don't block reading the stats - # and get spurious timeout errors + # and get spurious timeout errors. + # We use the writer pool to read the stats. This will keep this pooled + # connection open but that's ok. task = Task.async(fn -> - open_connection(state, fn conn -> + Connection.checkout_write!(state.stack_id, :read_stats, fn %{conn: conn} -> with {:ok, memstat_available?, page_size} <- - initialize_connection(conn, state.first_run?, state.enable_memory_stats?), + initialize_connection(conn, first?, state.enable_memory_stats?), {:ok, stats} <- Connection.fetch_all( conn, @@ -170,30 +181,11 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics do %{state | task: task} end - # In exclusive_mode we *must* use a pooled connection because the db maybe - # in-memory. This is ok because in this mode we never close the single - # connection instance so using it won't prevent closing idle connections. - defp open_connection(%{exclusive_mode?: true} = state, fun) do - Connection.checkout_write!(state.stack_id, :read_stats, fn %{conn: conn} -> - fun.(conn) - end) - end - - # read the stats over a completely new connection to avoid waking a pool - # worker and preventing it from reaching the idle timeout - defp open_connection(%{exclusive_mode?: false, pool_opts: pool_opts} = _state, fun) do - with {:ok, conn} <- Connection.open(pool_opts) do - try do - fun.(conn) - after - Connection.close(conn) - end - end - end - defp initialize_connection(conn, first_run?, enable_memory_stats?) do memstat_available? = if enable_memory_stats? do + # have to re-enable the extension with every run because the writer + # pool may have scaled to 0 in between collection runs case Connection.enable_extension(conn, "memstat") do :ok -> if first_run?, do: Logger.notice("SQLite memory statistics enabled") @@ -248,7 +240,7 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics do # # 3. PAGECACHE_OVERFLOW (heap fallback): When the pre-allocated page # cache is full, overflow goes to heap. This is already in bytes. - %{ + %__MODULE__{ total_memory: memory_used + pagecache_used + pagecache_overflow, page_cache_overflow: pagecache_overflow, disk_size: disk_size, diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index c6dcdc0d8a..1b5d221836 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -168,6 +168,7 @@ defmodule Electric.StackSupervisor do connection_idle_timeout: [type: :integer], read_pool_size: [type: :integer], statistics_collection_period: [type: :integer], + enable_stats?: [type: :boolean], enable_memory_stats?: [type: :boolean] ] ], diff --git a/packages/sync-service/test/electric/shape_cache/shape_status/shape_db_test.exs b/packages/sync-service/test/electric/shape_cache/shape_status/shape_db_test.exs index 626ecba8c1..a0bf7663cd 100644 --- a/packages/sync-service/test/electric/shape_cache/shape_status/shape_db_test.exs +++ b/packages/sync-service/test/electric/shape_cache/shape_status/shape_db_test.exs @@ -546,7 +546,7 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDbTest do end describe "statistics" do - @tag shape_db_opts: [enable_memory_stats?: true] + @tag shape_db_opts: [enable_stats?: true, enable_memory_stats?: true] test "export memory and disk usage when enabled", ctx do assert {:ok, %{total_memory: memory, disk_size: disk_size}} = wait_statistics(ctx) @@ -554,11 +554,33 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDbTest do assert disk_size > 0 end + @tag shape_db_opts: [enable_stats?: true] test "only exports disk usage by default", ctx do assert {:ok, %{total_memory: 0, disk_size: disk_size}} = wait_statistics(ctx) assert disk_size > 0 end + @tag shape_db_opts: [] + test "only returns number of active connections if stats disabled", ctx do + parent = self() + + pid = + start_supervised!( + {Task, + fn -> + ShapeDb.Connection.checkout!(ctx.stack_id, :read_call, fn _conn -> + send(parent, :ready) + + receive(do: (_msg -> :ok)) + end) + end} + ) + + assert_receive :ready + assert {:ok, %{connections: 1}} = ShapeDb.statistics(ctx.stack_id) + send(pid, :done) + end + defp wait_statistics(ctx, attempts \\ 10) defp wait_statistics(_ctx, 0), do: :error @@ -638,13 +660,16 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDbTest do end describe "pool scaling" do - @tag shape_db_opts: [connection_idle_timeout: 5, statistics_collection_period: 5] + @tag shape_db_opts: [ + connection_idle_timeout: 5, + statistics_collection_period: 5 + ] test "scales to 0 if nothing is active", ctx do ShapeDb.Connection.checkout!(ctx.stack_id, :test, fn _conn -> assert :ok = assert_stats_match(ctx, connections: 1) end) - assert :ok = assert_stats_match(ctx, connections: 0, total_memory: 0) + assert :ok = assert_stats_match(ctx, connections: 0) end @tag shape_db_opts: [ From 10a7ef7d96e0d2b24fda45d9f0abbae8df0cf6b0 Mon Sep 17 00:00:00 2001 From: Garry Hill Date: Tue, 3 Mar 2026 11:48:45 +0000 Subject: [PATCH 5/7] only log db path on startup, not on connect because now we're maybe shutting the write connection when idle, this will log all the time --- .../shape_cache/shape_status/shape_db/connection.ex | 11 ++++++----- .../shape_cache/shape_status/shape_db/migrator.ex | 2 ++ 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex index 17f58c3f36..97ad632fbc 100644 --- a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex +++ b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex @@ -535,7 +535,7 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do end) end - # used in testing + # used in testing and logging def db_path(pool_state) do # Manage compatibility by embedding all versions into the db name rather # than embed the values in the db itself and then have to manage schema @@ -570,15 +570,16 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do path = Path.join(storage_dir, "meta/shape-db/#{version}.sqlite") with :ok <- File.mkdir_p(Path.dirname(path)) do - if Keyword.get(pool_state, :mode) == :write do - Logger.notice("Shape database file: #{inspect(path)}") - end - {:ok, path} end end end end + def db_path!(pool_state) do + {:ok, path} = db_path(pool_state) + path + end + defp now, do: System.monotonic_time() end diff --git a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/migrator.ex b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/migrator.ex index aef60795fc..215019f219 100644 --- a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/migrator.ex +++ b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/migrator.ex @@ -16,6 +16,8 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Migrator do {:ok, stack_id} = Keyword.fetch(args, :stack_id) exclusive_mode = Keyword.get(args, :exclusive_mode, false) + Logger.notice("Shape database file: #{inspect(ShapeDb.Connection.db_path!(args))}") + Process.set_label({:shape_db_migrator, stack_id}) Logger.metadata(stack_id: stack_id) Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id) From 9be685ca80019ec9e79548ccc04eeb064094fd45 Mon Sep 17 00:00:00 2001 From: Garry Hill Date: Tue, 3 Mar 2026 11:48:45 +0000 Subject: [PATCH 6/7] review feedback --- .../shape_cache/shape_status/shape_db/connection.ex | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex index 97ad632fbc..5f6e6f7fa4 100644 --- a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex +++ b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex @@ -151,7 +151,7 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do end defp init_worker_for_pool(pool_state) do - if(Keyword.get(pool_state, :exclusive_mode, false)) do + if Keyword.get(pool_state, :exclusive_mode, false) do init_worker_exclusive(pool_state) else init_worker_pooled(pool_state) @@ -159,8 +159,9 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do end defp init_worker_pooled(pool_state) do - with mode = Keyword.get(pool_state, :mode, :readwrite), - {:ok, conn} <- open(pool_state), + mode = Keyword.get(pool_state, :mode, :readwrite) + + with {:ok, conn} <- open(pool_state), stmts <- Query.prepare!(conn, mode) do {:ok, %__MODULE__{conn: conn, mode: mode, stmts: stmts}} end @@ -172,8 +173,9 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do # over the same, single, connection since every connection is a separate db # in in-memory mode. defp init_worker_exclusive(pool_state) do - with mode = :readwrite, - {:ok, conn} <- open(pool_state, integrity_check: true), + mode = :readwrite + + with {:ok, conn} <- open(pool_state, integrity_check: true), {:ok, _version} <- migrate(conn, pool_state), stmts <- Query.prepare!(conn, mode) do {:ok, %__MODULE__{conn: conn, mode: mode, stmts: stmts}} From 872c8bd21e35a7ac4e47b3826b86877542132f8b Mon Sep 17 00:00:00 2001 From: Garry Hill Date: Tue, 3 Mar 2026 11:48:45 +0000 Subject: [PATCH 7/7] change supervisor strategy --- .../shape_cache/shape_status/shape_db/supervisor.ex | 8 ++++++-- .../shape_cache/shape_status/shape_db/write_buffer.ex | 2 -- .../shape_cache/shape_status/shape_db_test.exs | 11 +++++++++++ 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/supervisor.ex b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/supervisor.ex index 765756f4e5..e8305d3617 100644 --- a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/supervisor.ex +++ b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/supervisor.ex @@ -65,12 +65,16 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Supervisor do lazy: not exclusive_mode}, id: {:pool, :write} ), - # write buffer for batching SQLite writes to avoid timeout cascades + # Write buffer for batching SQLite writes to avoid timeout cascades. {ShapeDb.WriteBuffer, opts}, {Task, fn -> ShapeDb.Statistics.initialize(stack_id) end} ] ]) - Supervisor.init(children, strategy: :one_for_one) + # Because the full state of the system is split between the actual db, the + # writeBuffer and the ShapeStatus ets caches, we are not safe to adopt a + # one_for_one strategy and need to propagate an exit in the children of + # this supervisor to the parent + Supervisor.init(children, strategy: :one_for_all, max_restarts: 0) end end diff --git a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/write_buffer.ex b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/write_buffer.ex index 2459f65e5a..3fa7f699cb 100644 --- a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/write_buffer.ex +++ b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/write_buffer.ex @@ -274,8 +274,6 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.WriteBuffer do @impl GenServer def init(opts) do - Process.flag(:trap_exit, true) - stack_id = Keyword.fetch!(opts, :stack_id) manual_flush_only = Keyword.get(opts, :manual_flush_only, false) diff --git a/packages/sync-service/test/electric/shape_cache/shape_status/shape_db_test.exs b/packages/sync-service/test/electric/shape_cache/shape_status/shape_db_test.exs index a0bf7663cd..baa1aa589c 100644 --- a/packages/sync-service/test/electric/shape_cache/shape_status/shape_db_test.exs +++ b/packages/sync-service/test/electric/shape_cache/shape_status/shape_db_test.exs @@ -657,6 +657,17 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDbTest do end) end end + + test "crashing WriteBuffer restarts entire supervision tree", ctx do + assert supervisor_pid = GenServer.whereis(ShapeDb.Supervisor.name(ctx.stack_id)) + super_ref = Process.monitor(supervisor_pid) + assert write_buffer_pid = GenServer.whereis(ShapeDb.WriteBuffer.name(ctx.stack_id)) + + buffer_ref = Process.monitor(write_buffer_pid) + Process.exit(write_buffer_pid, :some_reason) + assert_receive {:DOWN, ^buffer_ref, :process, ^write_buffer_pid, :some_reason} + assert_receive {:DOWN, ^super_ref, :process, ^supervisor_pid, _} + end end describe "pool scaling" do