diff --git a/.changeset/unlucky-files-protect.md b/.changeset/unlucky-files-protect.md new file mode 100644 index 0000000000..ba4cf044e0 --- /dev/null +++ b/.changeset/unlucky-files-protect.md @@ -0,0 +1,5 @@ +--- +'@core/sync-service': patch +--- + +Add SQLite connection pool scaling to minimize memory usage in quiet instances and disable SQLite metrics collection by default diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index f4e69fccbc..58424820c6 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -283,6 +283,7 @@ config :electric, shape_db_synchronous: env!("ELECTRIC_SHAPE_DB_SYNCHRONOUS", :string, nil), shape_db_cache_size: env!("ELECTRIC_SHAPE_DB_CACHE_SIZE", &Electric.Config.parse_human_readable_size!/1, nil), + shape_db_enable_stats: env!("ELECTRIC_SHAPE_DB_ENABLE_STATS", :boolean, nil), shape_db_enable_memory_stats: env!("ELECTRIC_SHAPE_DB_ENABLE_MEMORY_STATS", :boolean, nil) if Electric.telemetry_enabled?() do diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 5c93526cf0..dc74c2fe5d 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -155,6 +155,7 @@ defmodule Electric.Application do end), synchronous: get_env(opts, :shape_db_synchronous), cache_size: get_env(opts, :shape_db_cache_size), + enable_stats?: get_env(opts, :shape_db_enable_stats), enable_memory_stats?: get_env(opts, :shape_db_enable_memory_stats) ] ) diff --git a/packages/sync-service/lib/electric/config.ex b/packages/sync-service/lib/electric/config.ex index 130b009129..a4d29f3fcb 100644 --- a/packages/sync-service/lib/electric/config.ex +++ b/packages/sync-service/lib/electric/config.ex @@ -113,6 +113,7 @@ defmodule Electric.Config do shape_db_synchronous: Electric.ShapeCache.ShapeStatus.ShapeDb.Connection.default!(:synchronous), shape_db_cache_size: Electric.ShapeCache.ShapeStatus.ShapeDb.Connection.default!(:cache_size), + shape_db_enable_stats: false, shape_db_enable_memory_stats: false ] diff --git a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex index 26814178d3..5f6e6f7fa4 100644 --- a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex +++ b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex @@ -4,6 +4,7 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do alias Exqlite.Sqlite3 alias Electric.ShapeCache.ShapeStatus.ShapeDb.Query alias Electric.ShapeCache.ShapeStatus.ShapeDb.PoolRegistry + alias Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics alias Electric.Telemetry.OpenTelemetry require Logger @@ -65,7 +66,7 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do defguardp is_raw_connection(conn) when is_reference(conn) - defstruct [:conn, :stmts] + defstruct [:conn, :mode, :stmts] def migrate(conn, opts) when is_raw_connection(conn) do # because we embed the storage version into the db path @@ -143,6 +144,13 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do @impl NimblePool def init_worker(pool_state) do + with {:ok, conn} <- init_worker_for_pool(pool_state) do + :ok = Statistics.worker_start(Keyword.get(pool_state, :stack_id)) + {:ok, conn, pool_state} + end + end + + defp init_worker_for_pool(pool_state) do if Keyword.get(pool_state, :exclusive_mode, false) do init_worker_exclusive(pool_state) else @@ -151,9 +159,11 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do end defp init_worker_pooled(pool_state) do + mode = Keyword.get(pool_state, :mode, :readwrite) + with {:ok, conn} <- open(pool_state), - stmts <- Query.prepare!(conn, pool_state) do - {:ok, %__MODULE__{conn: conn, stmts: stmts}, pool_state} + stmts <- Query.prepare!(conn, mode) do + {:ok, %__MODULE__{conn: conn, mode: mode, stmts: stmts}} end end @@ -163,10 +173,12 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do # over the same, single, connection since every connection is a separate db # in in-memory mode. defp init_worker_exclusive(pool_state) do + mode = :readwrite + with {:ok, conn} <- open(pool_state, integrity_check: true), {:ok, _version} <- migrate(conn, pool_state), - stmts <- Query.prepare!(conn, Keyword.put(pool_state, :mode, :readwrite)) do - {:ok, %__MODULE__{conn: conn, stmts: stmts}, pool_state} + stmts <- Query.prepare!(conn, mode) do + {:ok, %__MODULE__{conn: conn, mode: mode, stmts: stmts}} end end @@ -198,6 +210,31 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do {:ok, client_state, pool_state} end + @impl NimblePool + def handle_ping(%__MODULE__{} = conn, pool_state) do + if Keyword.get(pool_state, :exclusive_mode, false) do + # keep the write connection alive in exclusive mode — closing it + # would destroy an in-memory database + {:ok, conn} + else + # the idle timeout is only enabled in non-exclusive mode, so we're free + # to close all the connections, including write. + {:remove, :idle} + end + end + + @impl NimblePool + def terminate_worker(reason, conn, pool_state) do + Logger.debug(fn -> + ["Closing SQLite ", to_string(conn.mode), " connection for reason: ", inspect(reason)] + end) + + _ = close(conn) + :ok = Statistics.worker_stop(Keyword.get(pool_state, :stack_id)) + + {:ok, pool_state} + end + @max_recovery_attempts 2 def open(pool_state, opts \\ []) do @@ -270,6 +307,26 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do end end + def close(%__MODULE__{conn: conn, stmts: stmts}) do + # Need to release all the prepared statements on the connection or the + # close doesn't actually release the resources. + stmts + |> Query.active_stmts() + |> Enum.each(fn {name, stmt} -> + case Sqlite3.release(conn, stmt) do + :ok -> + :ok + + {:error, reason} -> + Logger.warning( + "Failed to release prepared statement #{inspect(name)}: #{inspect(reason)}" + ) + end + end) + + close(conn) + end + def close(conn) when is_raw_connection(conn) do Sqlite3.close(conn) end @@ -480,7 +537,7 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do end) end - # used in testing + # used in testing and logging def db_path(pool_state) do # Manage compatibility by embedding all versions into the db name rather # than embed the values in the db itself and then have to manage schema @@ -515,15 +572,16 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do path = Path.join(storage_dir, "meta/shape-db/#{version}.sqlite") with :ok <- File.mkdir_p(Path.dirname(path)) do - if Keyword.get(pool_state, :mode) == :write do - Logger.notice("Shape database file: #{inspect(path)}") - end - {:ok, path} end end end end + def db_path!(pool_state) do + {:ok, path} = db_path(pool_state) + path + end + defp now, do: System.monotonic_time() end diff --git a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/migrator.ex b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/migrator.ex index aef60795fc..215019f219 100644 --- a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/migrator.ex +++ b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/migrator.ex @@ -16,6 +16,8 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Migrator do {:ok, stack_id} = Keyword.fetch(args, :stack_id) exclusive_mode = Keyword.get(args, :exclusive_mode, false) + Logger.notice("Shape database file: #{inspect(ShapeDb.Connection.db_path!(args))}") + Process.set_label({:shape_db_migrator, stack_id}) Logger.metadata(stack_id: stack_id) Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id) diff --git a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/query.ex b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/query.ex index da8ef01345..7d97306783 100644 --- a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/query.ex +++ b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/query.ex @@ -52,7 +52,9 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Query do Keyword.take(@read_queries, [:handle_lookup]) ) - defstruct Enum.uniq(Keyword.keys(@read_queries) ++ Keyword.keys(@write_queries)) + @stmt_names Enum.uniq(Keyword.keys(@read_queries) ++ Keyword.keys(@write_queries)) + + defstruct @stmt_names alias Electric.ShapeCache.ShapeStatus.ShapeDb.Connection, as: Conn @@ -68,8 +70,18 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Query do stream_query: 3 ] - def prepare!(conn, opts) do - case Keyword.get(opts, :mode, :readwrite) do + def active_stmts(nil) do + [] + end + + def active_stmts(%__MODULE__{} = query) do + @stmt_names + |> Enum.map(&{&1, Map.fetch!(query, &1)}) + |> Enum.reject(&is_nil(elem(&1, 1))) + end + + def prepare!(conn, mode) do + case mode do :readwrite -> struct(__MODULE__, prepare_stmts!(conn, @read_queries ++ @write_queries)) diff --git a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/statistics.ex b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/statistics.ex index 2adb0a9c11..1803db99d3 100644 --- a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/statistics.ex +++ b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/statistics.ex @@ -15,10 +15,17 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics do use GenServer - alias Electric.ShapeCache.ShapeStatus.ShapeDb + alias Electric.ShapeCache.ShapeStatus.ShapeDb.Connection require Logger + defstruct total_memory: 0, + page_cache_overflow: 0, + disk_size: 0, + data_size: 0, + connections: 0, + updated_at: nil + @measurement_period 60_000 def name(stack_ref) do @@ -33,6 +40,18 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics do GenServer.call(name(stack_id), :statistics) end + def initialize(stack_id) do + GenServer.cast(name(stack_id), :initialize) + end + + def worker_start(stack_id) do + GenServer.cast(name(stack_id), {:worker_incr, 1}) + end + + def worker_stop(stack_id) do + GenServer.cast(name(stack_id), {:worker_incr, -1}) + end + @impl GenServer def init(args) do stack_id = Keyword.fetch!(args, :stack_id) @@ -41,74 +60,152 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics do Logger.metadata(stack_id: stack_id) Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id) + measurement_period = Keyword.get(args, :statistics_collection_period, @measurement_period) + enable_stats? = Keyword.get(args, :enable_stats?, false) + # don't need to && with enable_stats because if enable_stats? is false, + # we never test this secondary flag enable_memory_stats? = Keyword.get(args, :enable_memory_stats?, false) {:ok, %{ stack_id: stack_id, - page_size: 0, - memstat_available?: false, - stats: %{} - }, {:continue, {:initialize_stats, enable_memory_stats?}}} + stats: %__MODULE__{}, + connections: 0, + enable_stats?: enable_stats?, + enable_memory_stats?: enable_memory_stats?, + measurement_period: measurement_period, + task: nil + }} end @impl GenServer - def handle_continue({:initialize_stats, enable_memory_stats?}, state) do - %{stack_id: stack_id} = state - - {:ok, {page_size, memstat_available?}} = - ShapeDb.Connection.checkout_write!(stack_id, :read_stats, fn %{conn: conn} -> - memstat_available? = - if enable_memory_stats? do - # don't even try to load the extension unless enabled -- loading the extension - # may be the cause of segfaults we've seen in prod - case ShapeDb.Connection.enable_extension(conn, "memstat") do - :ok -> - Logger.notice("SQLite memory statistics enabled") - - true - - {:error, reason} -> - Logger.warning( - "Failed to load memstat SQLite extension: #{inspect(reason)}. " <> - "Memory statistics will not be available." - ) - - false - end - else - false - end + def handle_info(:read_stats, state) do + {:noreply, read_stats(state), :hibernate} + end - {:ok, [page_size]} = ShapeDb.Connection.fetch_one(conn, "PRAGMA page_size", []) + def handle_info({ref, read_stats_result}, %{task: %{ref: ref}} = state) do + state = + case read_stats_result do + {:ok, stats} -> + %{state | stats: stats} - {:ok, {page_size, memstat_available?}} - end) + {:error, reason} -> + Logger.warning(["Failed to read SQLite statistics: ", inspect(reason)]) + state - {:noreply, - read_stats(%{state | page_size: page_size, memstat_available?: memstat_available?}), - :hibernate} + :error -> + Logger.warning("Failed to read SQLite statistics") + state + end + + {:noreply, state} end - @impl GenServer - def handle_info(:read_stats, state) do - {:noreply, read_stats(state), :hibernate} + def handle_info({:DOWN, ref, :process, _pid, _reason}, %{task: %{ref: ref}} = state) do + Process.send_after(self(), :read_stats, state.measurement_period) + {:noreply, %{state | task: nil}} + end + + def handle_info(msg, state) do + Logger.warning(["Received unexpected message: ", inspect(msg)]) + {:noreply, state} end @impl GenServer def handle_call(:statistics, _from, state) do - {:reply, {:ok, state.stats}, state} + {:reply, {:ok, Map.from_struct(%{state.stats | connections: state.connections})}, state} end - defp read_stats(%{stack_id: stack_id, memstat_available?: memstat_available?} = state) do - {:ok, stats} = - ShapeDb.Connection.checkout_write!(stack_id, :read_stats, fn %{conn: conn} -> - ShapeDb.Connection.fetch_all(conn, stats_query(memstat_available?), []) + @impl GenServer + def handle_cast({:worker_incr, incr}, state) do + state = + state + |> Map.update!(:connections, &(&1 + incr)) + |> tap(fn state -> + Logger.debug([ + if(incr > 0, do: "Opening ", else: "Closing "), + "ShapeDb connection: #{state.connections} active connections" + ]) + end) + + {:noreply, state} + end + + def handle_cast(:initialize, state) do + {:noreply, read_stats(state, _force = true, _first = true)} + end + + defp read_stats(state, force? \\ false, first? \\ false) + + # If stats are globally disabled then the only call to this function + # is from the cast(:initialize) startup event. Since we don't actually + # collect the stats here we also never schedule the periodic collection + defp read_stats(%{enable_stats?: false} = state, _force?, _first?) do + Logger.notice("SQLite statistics disabled") + state + end + + # If the pools have no open connections, then don't read memory usage because + # the report would only include memory used by the temporary statistics + # connection. We're assuming that 0 open connections == 0 sqlite memory + # usage, which seems reasonable + defp read_stats(%{connections: 0} = state, _force? = false, first?) do + do_read_stats(state, false, first?) + end + + defp read_stats(state, _force?, first?) do + do_read_stats(state, true, first?) + end + + defp do_read_stats(state, include_memory?, first?) do + # Read the stats in an async task so that we don't block reading the stats + # and get spurious timeout errors. + # We use the writer pool to read the stats. This will keep this pooled + # connection open but that's ok. + task = + Task.async(fn -> + Connection.checkout_write!(state.stack_id, :read_stats, fn %{conn: conn} -> + with {:ok, memstat_available?, page_size} <- + initialize_connection(conn, first?, state.enable_memory_stats?), + {:ok, stats} <- + Connection.fetch_all( + conn, + stats_query(memstat_available? && include_memory?), + [] + ) do + {:ok, analyze_stats(stats, page_size)} + end + end) end) - Process.send_after(self(), :read_stats, @measurement_period) + %{state | task: task} + end + + defp initialize_connection(conn, first_run?, enable_memory_stats?) do + memstat_available? = + if enable_memory_stats? do + # have to re-enable the extension with every run because the writer + # pool may have scaled to 0 in between collection runs + case Connection.enable_extension(conn, "memstat") do + :ok -> + if first_run?, do: Logger.notice("SQLite memory statistics enabled") + true + + {:error, reason} -> + Logger.warning( + "Failed to load memstat SQLite extension: #{inspect(reason)}. " <> + "Memory statistics will not be available." + ) - %{state | stats: analyze_stats(stats, state.page_size)} + false + end + else + false + end + + with {:ok, [page_size]} <- Connection.fetch_one(conn, "PRAGMA page_size", []) do + {:ok, memstat_available?, page_size} + end end defp stats_query(true) do @@ -143,11 +240,12 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics do # # 3. PAGECACHE_OVERFLOW (heap fallback): When the pre-allocated page # cache is full, overflow goes to heap. This is already in bytes. - %{ + %__MODULE__{ total_memory: memory_used + pagecache_used + pagecache_overflow, page_cache_overflow: pagecache_overflow, disk_size: disk_size, - data_size: data_size + data_size: data_size, + updated_at: DateTime.utc_now() } end) end diff --git a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/supervisor.ex b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/supervisor.ex index 0616307629..e8305d3617 100644 --- a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/supervisor.ex +++ b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/supervisor.ex @@ -13,11 +13,17 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Supervisor do Supervisor.start_link(__MODULE__, opts, name: name(opts)) end + @default_connection_idle_timeout 30_000 + def init(opts) do shape_db_opts = Keyword.fetch!(opts, :shape_db_opts) stack_id = Keyword.fetch!(opts, :stack_id) opts = Keyword.put(shape_db_opts, :stack_id, stack_id) exclusive_mode = Keyword.get(opts, :exclusive_mode, false) + idle_timeout = Keyword.get(opts, :connection_idle_timeout, @default_connection_idle_timeout) + # don't close the write connection in exclusive mode + # NimblePool treats `worker_idle_timeout: nil` as no idle timeout + write_pool_idle_timeout = if(exclusive_mode, do: nil, else: idle_timeout) read_pool_spec = if exclusive_mode do @@ -30,7 +36,9 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Supervisor do NimblePool, worker: {ShapeDb.Connection, Keyword.put(opts, :mode, :read)}, pool_size: Keyword.get(opts, :read_pool_size, 2 * System.schedulers_online()), - name: ShapeDb.PoolRegistry.pool_name(stack_id, :read, exclusive_mode) + name: ShapeDb.PoolRegistry.pool_name(stack_id, :read, exclusive_mode), + worker_idle_timeout: idle_timeout, + lazy: true }, id: {:pool, :read} ) @@ -41,6 +49,7 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Supervisor do Enum.concat([ [ {ShapeDb.PoolRegistry, stack_id: stack_id}, + {ShapeDb.Statistics, opts}, {ShapeDb.Migrator, opts} ], read_pool_spec, @@ -51,15 +60,21 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Supervisor do {NimblePool, worker: {ShapeDb.Connection, Keyword.put(opts, :mode, :write)}, pool_size: 1, - name: ShapeDb.PoolRegistry.pool_name(stack_id, :write, exclusive_mode)}, + name: ShapeDb.PoolRegistry.pool_name(stack_id, :write, exclusive_mode), + worker_idle_timeout: write_pool_idle_timeout, + lazy: not exclusive_mode}, id: {:pool, :write} ), - # write buffer for batching SQLite writes to avoid timeout cascades + # Write buffer for batching SQLite writes to avoid timeout cascades. {ShapeDb.WriteBuffer, opts}, - {ShapeDb.Statistics, opts} + {Task, fn -> ShapeDb.Statistics.initialize(stack_id) end} ] ]) - Supervisor.init(children, strategy: :one_for_one) + # Because the full state of the system is split between the actual db, the + # writeBuffer and the ShapeStatus ets caches, we are not safe to adopt a + # one_for_one strategy and need to propagate an exit in the children of + # this supervisor to the parent + Supervisor.init(children, strategy: :one_for_all, max_restarts: 0) end end diff --git a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/write_buffer.ex b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/write_buffer.ex index 2459f65e5a..3fa7f699cb 100644 --- a/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/write_buffer.ex +++ b/packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/write_buffer.ex @@ -274,8 +274,6 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.WriteBuffer do @impl GenServer def init(opts) do - Process.flag(:trap_exit, true) - stack_id = Keyword.fetch!(opts, :stack_id) manual_flush_only = Keyword.get(opts, :manual_flush_only, false) diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index b222dddf6b..1b5d221836 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -165,6 +165,10 @@ defmodule Electric.StackSupervisor do exclusive_mode: [type: :boolean], synchronous: [type: :string], cache_size: [type: :integer], + connection_idle_timeout: [type: :integer], + read_pool_size: [type: :integer], + statistics_collection_period: [type: :integer], + enable_stats?: [type: :boolean], enable_memory_stats?: [type: :boolean] ] ], diff --git a/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex b/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex index 6f760a85f2..f22716372a 100644 --- a/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex +++ b/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex @@ -54,7 +54,8 @@ defmodule Electric.StackSupervisor.Telemetry do unit: :byte ), Telemetry.Metrics.last_value("electric.shape_db.sqlite.total_memory", unit: :byte), - Telemetry.Metrics.last_value("electric.shape_db.sqlite.disk_size", unit: :byte) + Telemetry.Metrics.last_value("electric.shape_db.sqlite.disk_size", unit: :byte), + Telemetry.Metrics.last_value("electric.shape_db.sqlite.connections") ] end diff --git a/packages/sync-service/test/electric/shape_cache/shape_status/shape_db_test.exs b/packages/sync-service/test/electric/shape_cache/shape_status/shape_db_test.exs index 27f2e606d4..baa1aa589c 100644 --- a/packages/sync-service/test/electric/shape_cache/shape_status/shape_db_test.exs +++ b/packages/sync-service/test/electric/shape_cache/shape_status/shape_db_test.exs @@ -546,19 +546,55 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDbTest do end describe "statistics" do - @tag shape_db_opts: [enable_memory_stats?: true] + @tag shape_db_opts: [enable_stats?: true, enable_memory_stats?: true] test "export memory and disk usage when enabled", ctx do - assert {:ok, %{total_memory: memory, disk_size: disk_size}} = - ShapeDb.statistics(ctx.stack_id) + assert {:ok, %{total_memory: memory, disk_size: disk_size}} = wait_statistics(ctx) assert memory > 0 assert disk_size > 0 end + @tag shape_db_opts: [enable_stats?: true] test "only exports disk usage by default", ctx do - assert {:ok, %{total_memory: 0, disk_size: disk_size}} = ShapeDb.statistics(ctx.stack_id) + assert {:ok, %{total_memory: 0, disk_size: disk_size}} = wait_statistics(ctx) assert disk_size > 0 end + + @tag shape_db_opts: [] + test "only returns number of active connections if stats disabled", ctx do + parent = self() + + pid = + start_supervised!( + {Task, + fn -> + ShapeDb.Connection.checkout!(ctx.stack_id, :read_call, fn _conn -> + send(parent, :ready) + + receive(do: (_msg -> :ok)) + end) + end} + ) + + assert_receive :ready + assert {:ok, %{connections: 1}} = ShapeDb.statistics(ctx.stack_id) + send(pid, :done) + end + + defp wait_statistics(ctx, attempts \\ 10) + + defp wait_statistics(_ctx, 0), do: :error + + defp wait_statistics(ctx, remaining_attempts) do + case ShapeDb.statistics(ctx.stack_id) do + {:ok, %{updated_at: %DateTime{}} = stats} -> + {:ok, stats} + + {:ok, _} -> + Process.sleep(10) + wait_statistics(ctx, remaining_attempts - 1) + end + end end describe "recovery" do @@ -621,5 +657,60 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDbTest do end) end end + + test "crashing WriteBuffer restarts entire supervision tree", ctx do + assert supervisor_pid = GenServer.whereis(ShapeDb.Supervisor.name(ctx.stack_id)) + super_ref = Process.monitor(supervisor_pid) + assert write_buffer_pid = GenServer.whereis(ShapeDb.WriteBuffer.name(ctx.stack_id)) + + buffer_ref = Process.monitor(write_buffer_pid) + Process.exit(write_buffer_pid, :some_reason) + assert_receive {:DOWN, ^buffer_ref, :process, ^write_buffer_pid, :some_reason} + assert_receive {:DOWN, ^super_ref, :process, ^supervisor_pid, _} + end + end + + describe "pool scaling" do + @tag shape_db_opts: [ + connection_idle_timeout: 5, + statistics_collection_period: 5 + ] + test "scales to 0 if nothing is active", ctx do + ShapeDb.Connection.checkout!(ctx.stack_id, :test, fn _conn -> + assert :ok = assert_stats_match(ctx, connections: 1) + end) + + assert :ok = assert_stats_match(ctx, connections: 0) + end + + @tag shape_db_opts: [ + exclusive_mode: true, + connection_idle_timeout: 10, + statistics_collection_period: 1000 + ] + test "does not scale the pool in exclusive mode", ctx do + ShapeDb.Connection.checkout!(ctx.stack_id, :test, fn _conn -> + assert :ok = assert_stats_match(ctx, connections: 1) + end) + + assert :error = assert_stats_match(ctx, connections: 0) + end + + defp assert_stats_match(ctx, match, repeats \\ 10) + + defp assert_stats_match(_ctx, _match, 0) do + :error + end + + defp assert_stats_match(ctx, match, repeats) do + {:ok, stats} = ShapeDb.statistics(ctx.stack_id) + + if Enum.all?(match, fn {k, v} -> stats[k] == v end) do + :ok + else + Process.sleep(10) + assert_stats_match(ctx, match, repeats - 1) + end + end end end