Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/unlucky-files-protect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@core/sync-service': patch
---

Add SQLite connection pool scaling to minimize memory usage in quiet instances and disable SQLite metrics collection by default
1 change: 1 addition & 0 deletions packages/sync-service/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ config :electric,
shape_db_synchronous: env!("ELECTRIC_SHAPE_DB_SYNCHRONOUS", :string, nil),
shape_db_cache_size:
env!("ELECTRIC_SHAPE_DB_CACHE_SIZE", &Electric.Config.parse_human_readable_size!/1, nil),
shape_db_enable_stats: env!("ELECTRIC_SHAPE_DB_ENABLE_STATS", :boolean, nil),
shape_db_enable_memory_stats: env!("ELECTRIC_SHAPE_DB_ENABLE_MEMORY_STATS", :boolean, nil)

if Electric.telemetry_enabled?() do
Expand Down
1 change: 1 addition & 0 deletions packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ defmodule Electric.Application do
end),
synchronous: get_env(opts, :shape_db_synchronous),
cache_size: get_env(opts, :shape_db_cache_size),
enable_stats?: get_env(opts, :shape_db_enable_stats),
enable_memory_stats?: get_env(opts, :shape_db_enable_memory_stats)
]
)
Expand Down
1 change: 1 addition & 0 deletions packages/sync-service/lib/electric/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ defmodule Electric.Config do
shape_db_synchronous:
Electric.ShapeCache.ShapeStatus.ShapeDb.Connection.default!(:synchronous),
shape_db_cache_size: Electric.ShapeCache.ShapeStatus.ShapeDb.Connection.default!(:cache_size),
shape_db_enable_stats: false,
shape_db_enable_memory_stats: false
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do
alias Exqlite.Sqlite3
alias Electric.ShapeCache.ShapeStatus.ShapeDb.Query
alias Electric.ShapeCache.ShapeStatus.ShapeDb.PoolRegistry
alias Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics
alias Electric.Telemetry.OpenTelemetry

require Logger
Expand Down Expand Up @@ -65,7 +66,7 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do

defguardp is_raw_connection(conn) when is_reference(conn)

defstruct [:conn, :stmts]
defstruct [:conn, :mode, :stmts]

def migrate(conn, opts) when is_raw_connection(conn) do
# because we embed the storage version into the db path
Expand Down Expand Up @@ -143,6 +144,13 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do

@impl NimblePool
def init_worker(pool_state) do
with {:ok, conn} <- init_worker_for_pool(pool_state) do
:ok = Statistics.worker_start(Keyword.get(pool_state, :stack_id))
{:ok, conn, pool_state}
end
end

defp init_worker_for_pool(pool_state) do
if Keyword.get(pool_state, :exclusive_mode, false) do
init_worker_exclusive(pool_state)
else
Expand All @@ -151,9 +159,11 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do
end

defp init_worker_pooled(pool_state) do
mode = Keyword.get(pool_state, :mode, :readwrite)

with {:ok, conn} <- open(pool_state),
stmts <- Query.prepare!(conn, pool_state) do
{:ok, %__MODULE__{conn: conn, stmts: stmts}, pool_state}
stmts <- Query.prepare!(conn, mode) do
{:ok, %__MODULE__{conn: conn, mode: mode, stmts: stmts}}
end
end

Expand All @@ -163,10 +173,12 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do
# over the same, single, connection since every connection is a separate db
# in in-memory mode.
defp init_worker_exclusive(pool_state) do
mode = :readwrite

with {:ok, conn} <- open(pool_state, integrity_check: true),
{:ok, _version} <- migrate(conn, pool_state),
stmts <- Query.prepare!(conn, Keyword.put(pool_state, :mode, :readwrite)) do
{:ok, %__MODULE__{conn: conn, stmts: stmts}, pool_state}
stmts <- Query.prepare!(conn, mode) do
{:ok, %__MODULE__{conn: conn, mode: mode, stmts: stmts}}
end
end

Expand Down Expand Up @@ -198,6 +210,31 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do
{:ok, client_state, pool_state}
end

@impl NimblePool
def handle_ping(%__MODULE__{} = conn, pool_state) do
if Keyword.get(pool_state, :exclusive_mode, false) do
# keep the write connection alive in exclusive mode — closing it
# would destroy an in-memory database
{:ok, conn}
else
# the idle timeout is only enabled in non-exclusive mode, so we're free
# to close all the connections, including write.
{:remove, :idle}
end
end

@impl NimblePool
def terminate_worker(reason, conn, pool_state) do
Logger.debug(fn ->
["Closing SQLite ", to_string(conn.mode), " connection for reason: ", inspect(reason)]
end)

_ = close(conn)
:ok = Statistics.worker_stop(Keyword.get(pool_state, :stack_id))

{:ok, pool_state}
end

@max_recovery_attempts 2

def open(pool_state, opts \\ []) do
Expand Down Expand Up @@ -270,6 +307,26 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do
end
end

def close(%__MODULE__{conn: conn, stmts: stmts}) do
# Need to release all the prepared statements on the connection or the
# close doesn't actually release the resources.
stmts
|> Query.active_stmts()
|> Enum.each(fn {name, stmt} ->
case Sqlite3.release(conn, stmt) do
:ok ->
:ok

{:error, reason} ->
Logger.warning(
"Failed to release prepared statement #{inspect(name)}: #{inspect(reason)}"
)
end
end)

close(conn)
end

def close(conn) when is_raw_connection(conn) do
Sqlite3.close(conn)
end
Expand Down Expand Up @@ -480,7 +537,7 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do
end)
end

# used in testing
# used in testing and logging
def db_path(pool_state) do
# Manage compatibility by embedding all versions into the db name rather
# than embed the values in the db itself and then have to manage schema
Expand Down Expand Up @@ -515,15 +572,16 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Connection do
path = Path.join(storage_dir, "meta/shape-db/#{version}.sqlite")

with :ok <- File.mkdir_p(Path.dirname(path)) do
if Keyword.get(pool_state, :mode) == :write do
Logger.notice("Shape database file: #{inspect(path)}")
end

{:ok, path}
end
end
end
end

def db_path!(pool_state) do
{:ok, path} = db_path(pool_state)
path
end

defp now, do: System.monotonic_time()
end
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Migrator do
{:ok, stack_id} = Keyword.fetch(args, :stack_id)
exclusive_mode = Keyword.get(args, :exclusive_mode, false)

Logger.notice("Shape database file: #{inspect(ShapeDb.Connection.db_path!(args))}")

Process.set_label({:shape_db_migrator, stack_id})
Logger.metadata(stack_id: stack_id)
Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Query do
Keyword.take(@read_queries, [:handle_lookup])
)

defstruct Enum.uniq(Keyword.keys(@read_queries) ++ Keyword.keys(@write_queries))
@stmt_names Enum.uniq(Keyword.keys(@read_queries) ++ Keyword.keys(@write_queries))

defstruct @stmt_names

alias Electric.ShapeCache.ShapeStatus.ShapeDb.Connection, as: Conn

Expand All @@ -68,8 +70,18 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Query do
stream_query: 3
]

def prepare!(conn, opts) do
case Keyword.get(opts, :mode, :readwrite) do
def active_stmts(nil) do
[]
end

def active_stmts(%__MODULE__{} = query) do
@stmt_names
|> Enum.map(&{&1, Map.fetch!(query, &1)})
|> Enum.reject(&is_nil(elem(&1, 1)))
end

def prepare!(conn, mode) do
case mode do
:readwrite ->
struct(__MODULE__, prepare_stmts!(conn, @read_queries ++ @write_queries))

Expand Down
Loading
Loading