Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ config :sequin, Sequin.Repo,
hostname: "localhost",
database: "sequin_test",
pool: Ecto.Adapters.SQL.Sandbox,
pool_size: 20,
pool_size: 40,
port: 5432,
queue_target: 100,
queue_interval: 1000,
queue_target: 500,
queue_interval: 2000,
ssl: false,
types: PostgrexTypes

Expand Down
2 changes: 1 addition & 1 deletion lib/sequin/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ defmodule Sequin.Application do
children =
base_children() ++
[
Sequin.SystemMetricsServer,
SequinWeb.Telemetry,
MutexedSupervisor.child_spec(
Sequin.Runtime.MutexedSupervisor,
Expand Down Expand Up @@ -91,7 +92,6 @@ defmodule Sequin.Application do
Sequin.Sinks.Nats.ConnectionCache,
Sequin.Sinks.RabbitMq.ConnectionCache,
SequinWeb.Presence,
Sequin.SystemMetricsServer,
{Task, fn -> enqueue_workers() end},
# Start to serve requests, typically the last entry
SequinWeb.Endpoint,
Expand Down
8 changes: 5 additions & 3 deletions lib/sequin/runtime/message_consistency_check_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@ defmodule Sequin.Runtime.MessageConsistencyCheckWorker do
def audit_and_trim_undelivered_cursors(consumer_id, older_than_timestamp) do
case MessageLedgers.count_undelivered_wal_cursors(consumer_id, older_than_timestamp) do
{:ok, 0} ->
:ok
{:ok, 0}

{:ok, undelivered_cursor_count} ->
Logger.warning("[MessageConsistencyCheckWorker] Found undelivered cursors (count=#{undelivered_cursor_count})",
Logger.warning(
"[MessageConsistencyCheckWorker] Found undelivered cursors (count=#{undelivered_cursor_count})",
consumer_id: consumer_id,
undelivered_cursor_count: undelivered_cursor_count
)

MessageLedgers.trim_stale_undelivered_wal_cursors(consumer_id, older_than_timestamp)
:ok = MessageLedgers.trim_stale_undelivered_wal_cursors(consumer_id, older_than_timestamp)
{:ok, undelivered_cursor_count}
end
end

Expand Down
7 changes: 4 additions & 3 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ defmodule Sequin.MixProject do
def application do
[
mod: {Sequin.Application, []},
extra_applications: [:logger, :runtime_tools, :os_mon] ++ extra_applications(Mix.env()),
extra_applications: [:logger, :runtime_tools] ++ extra_applications(Mix.env()),
included_applications: [:aws_credentials]
]
end

defp extra_applications(:dev), do: [:wx, :observer]
defp extra_applications(_), do: []
defp extra_applications(:dev), do: [:os_mon, :wx, :observer]
defp extra_applications(:test), do: []
defp extra_applications(_), do: [:os_mon]

# Specifies which paths to compile per environment.
defp elixirc_paths(:test), do: ["lib", "test/support"]
Expand Down
85 changes: 46 additions & 39 deletions test/sequin/gcp_pubsub_pipeline_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,30 @@ defmodule Sequin.Runtime.GcpPubsubPipelineTest do
setup do
consumer = ConsumersFactory.insert_sink_consumer!(type: :gcp_pubsub, batch_size: 10)

# Setup auth token expectation for all tests
Req.Test.expect(PubSub, fn conn ->
if conn.host == "oauth2.googleapis.com" do
Req.Test.json(conn, %{"access_token" => "test_token"})
end
end)

{:ok, %{consumer: consumer}}
end

test "events are sent to PubSub", %{consumer: consumer} do
message = ConsumersFactory.consumer_message()

# Mock PubSub publish
Req.Test.expect(PubSub, fn conn ->
assert conn.method == "POST"
assert conn.host == "pubsub.googleapis.com"
assert String.contains?(conn.request_path, ":publish")
Req.Test.stub(PubSub, fn conn ->
if conn.host == "oauth2.googleapis.com" do
Req.Test.json(conn, %{"access_token" => "test_token"})
else
assert conn.method == "POST"
assert conn.host == "pubsub.googleapis.com"
assert String.contains?(conn.request_path, ":publish")

{:ok, body, _} = Plug.Conn.read_body(conn)
body = Jason.decode!(body)
{:ok, body, _} = Plug.Conn.read_body(conn)
body = Jason.decode!(body)

data = get_in(body, ["messages", Access.at(0), "data"])
data = data |> Base.decode64!() |> Jason.decode!()
assert Map.has_key?(data, "record")
assert Map.has_key?(data, "metadata")
data = get_in(body, ["messages", Access.at(0), "data"])
data = data |> Base.decode64!() |> Jason.decode!()
assert Map.has_key?(data, "record")
assert Map.has_key?(data, "metadata")

Req.Test.json(conn, %{})
Req.Test.json(conn, %{})
end
end)

start_pipeline!(consumer)
Expand All @@ -55,16 +51,19 @@ defmodule Sequin.Runtime.GcpPubsubPipelineTest do
message1 = ConsumersFactory.consumer_message(group_id: group_id)
message2 = ConsumersFactory.consumer_message(group_id: group_id)

# Mock PubSub publish and verify batch
Req.Test.expect(PubSub, fn conn ->
assert conn.method == "POST"
assert conn.host == "pubsub.googleapis.com"
Req.Test.stub(PubSub, fn conn ->
if conn.host == "oauth2.googleapis.com" do
Req.Test.json(conn, %{"access_token" => "test_token"})
else
assert conn.method == "POST"
assert conn.host == "pubsub.googleapis.com"

{:ok, body, _} = Plug.Conn.read_body(conn)
body = Jason.decode!(body)
assert length(body["messages"]) == 2
{:ok, body, _} = Plug.Conn.read_body(conn)
body = Jason.decode!(body)
assert length(body["messages"]) == 2

Req.Test.json(conn, %{})
Req.Test.json(conn, %{})
end
end)

start_pipeline!(consumer)
Expand All @@ -78,7 +77,9 @@ defmodule Sequin.Runtime.GcpPubsubPipelineTest do
message1 = ConsumersFactory.consumer_message(group_id: group_id)
message2 = ConsumersFactory.consumer_message(group_id: group_id)

Req.Test.expect(PubSub, 3, fn conn ->
# Use stub to handle non-deterministic auth request count (1 or 2 auth requests
# depending on whether the token is cached before the second batch processor starts)
Req.Test.stub(PubSub, fn conn ->
if conn.host == "oauth2.googleapis.com" do
Req.Test.json(conn, %{"access_token" => "test_token"})
else
Expand All @@ -102,11 +103,14 @@ defmodule Sequin.Runtime.GcpPubsubPipelineTest do

@tag capture_log: true
test "failed PubSub publish results in failed events", %{consumer: consumer} do
# Mock failed PubSub publish
Req.Test.expect(PubSub, fn conn ->
conn
|> Plug.Conn.put_status(500)
|> Req.Test.json(%{"error" => "Failed to publish to PubSub"})
Req.Test.stub(PubSub, fn conn ->
if conn.host == "oauth2.googleapis.com" do
Req.Test.json(conn, %{"access_token" => "test_token"})
else
conn
|> Plug.Conn.put_status(500)
|> Req.Test.json(%{"error" => "Failed to publish to PubSub"})
end
end)

start_pipeline!(consumer)
Expand All @@ -133,13 +137,16 @@ defmodule Sequin.Runtime.GcpPubsubPipelineTest do

message = ConsumersFactory.consumer_message()

# Mock PubSub publish and verify topic
Req.Test.expect(PubSub, fn conn ->
assert conn.method == "POST"
assert conn.host == "pubsub.googleapis.com"
assert String.contains?(conn.request_path, "/my_topic:publish")
Req.Test.stub(PubSub, fn conn ->
if conn.host == "oauth2.googleapis.com" do
Req.Test.json(conn, %{"access_token" => "test_token"})
else
assert conn.method == "POST"
assert conn.host == "pubsub.googleapis.com"
assert String.contains?(conn.request_path, "/my_topic:publish")

Req.Test.json(conn, %{})
Req.Test.json(conn, %{})
end
end)

start_pipeline!(consumer)
Expand Down
33 changes: 18 additions & 15 deletions test/sequin/message_consistency_check_worker_test.exs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
defmodule Sequin.MessageConsistencyCheckWorkerTest do
use Sequin.DataCase, async: true

import ExUnit.CaptureLog

alias Sequin.Factory.ConsumersFactory
alias Sequin.Runtime.MessageConsistencyCheckWorker
alias Sequin.Runtime.MessageLedgers
Expand All @@ -26,10 +24,11 @@ defmodule Sequin.MessageConsistencyCheckWorkerTest do
# Set timestamp to 2 minutes ago
two_minutes_ago = DateTime.add(DateTime.utc_now(), -2 * 60, :second)

# Capture logs to verify output
assert capture_log(fn ->
MessageConsistencyCheckWorker.audit_and_trim_undelivered_cursors(consumer.id, two_minutes_ago)
end) =~ "Found undelivered cursors (count=3)"
assert {:ok, 3} =
MessageConsistencyCheckWorker.audit_and_trim_undelivered_cursors(
consumer.id,
two_minutes_ago
)

# Verify that the undelivered cursors set was trimmed
assert {:ok, 0} = MessageLedgers.count_undelivered_wal_cursors(consumer.id, two_minutes_ago)
Expand All @@ -42,10 +41,12 @@ defmodule Sequin.MessageConsistencyCheckWorkerTest do
# Set timestamp to 2 minutes ago
two_minutes_ago = DateTime.add(DateTime.utc_now(), -2 * 60, :second)

# Capture logs to verify output
assert capture_log(fn ->
MessageConsistencyCheckWorker.audit_and_trim_undelivered_cursors(consumer.id, two_minutes_ago)
end) == ""
# Should return :ok without logging (0-count path)
assert {:ok, 0} =
MessageConsistencyCheckWorker.audit_and_trim_undelivered_cursors(
consumer.id,
two_minutes_ago
)

# Verify that the undelivered cursors set is empty
assert {:ok, 0} = MessageLedgers.count_undelivered_wal_cursors(consumer.id, two_minutes_ago)
Expand All @@ -67,13 +68,15 @@ defmodule Sequin.MessageConsistencyCheckWorkerTest do
# Set timestamp to 1 minute ago (not stale enough)
one_minute_ago = DateTime.add(DateTime.utc_now(), -60, :second)

# Capture logs to verify output
assert capture_log(fn ->
MessageConsistencyCheckWorker.audit_and_trim_undelivered_cursors(consumer.id, one_minute_ago)
end) == ""
assert {:ok, 0} =
MessageConsistencyCheckWorker.audit_and_trim_undelivered_cursors(
consumer.id,
one_minute_ago
)

# Verify that the undelivered cursors set still contains the messages
assert {:ok, 2} = MessageLedgers.count_undelivered_wal_cursors(consumer.id, DateTime.utc_now())
assert {:ok, 2} =
MessageLedgers.count_undelivered_wal_cursors(consumer.id, DateTime.utc_now())
end
end
end
4 changes: 2 additions & 2 deletions test/sequin/message_handler_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ defmodule Sequin.MessageHandlerTest do
database = DatabasesFactory.insert_postgres_database!(account_id: account.id)

field = ReplicationFactory.field()
table_schema1 = Factory.postgres_object()
table_schema2 = Factory.postgres_object()
table_schema1 = "schema_one_#{Factory.sequence()}"
table_schema2 = "schema_two_#{Factory.sequence()}"

message1 =
ReplicationFactory.postgres_message(table_oid: 123, action: :insert, fields: [field], table_schema: table_schema1)
Expand Down
2 changes: 1 addition & 1 deletion test/sequin/postgres/benchmark_source_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ defmodule Sequin.Postgres.BenchmarkSourceTest do
receive do
{:messages_received, messages} -> receive_messages(count, acc ++ messages)
after
100 -> acc
1_000 -> acc
end
end
end
16 changes: 16 additions & 0 deletions test/sequin/postgres_replication_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ defmodule Sequin.PostgresReplicationTest do
alias Sequin.TestSupport.SimpleHttpServer

@moduletag :unboxed
@moduletag :capture_log

@publication "characters_publication"

Expand Down Expand Up @@ -1499,6 +1500,21 @@ defmodule Sequin.PostgresReplicationTest do
defp stop_replication!(pg_replication) do
# Stop the supervisor using its via_tuple
stop_supervised!(Supervisor.via_tuple(pg_replication.id))
# Wait for :syn to clean up the process registration asynchronously
wait_for_syn_cleanup({Supervisor, pg_replication.id})
end

defp wait_for_syn_cleanup(key, attempts \\ 50) do
if :syn.lookup(:replication, key) == :undefined do
:ok
else
if attempts > 0 do
Process.sleep(10)
wait_for_syn_cleanup(key, attempts - 1)
else
raise "Timed out waiting for :syn to clean up registration for #{inspect(key)}"
end
end
end

# defp config do
Expand Down
2 changes: 2 additions & 0 deletions test/sequin/postgres_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ defmodule Sequin.PostgresTest do
alias Sequin.Repo
alias Sequin.Test.UnboxedRepo

@moduletag :capture_log

setup do
{:ok, conn} = Postgrex.start_link(UnboxedRepo.config())
%{conn: conn}
Expand Down
1 change: 1 addition & 0 deletions test/sequin/runtime/slot_producer/integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ defmodule Sequin.Runtime.SlotProducer.IntegrationTest do
alias Sequin.TestSupport.ReplicationSlots

@moduletag :unboxed
@moduletag :capture_log
@publication "characters_publication"

def replication_slot, do: ReplicationSlots.slot_name(__MODULE__)
Expand Down
2 changes: 2 additions & 0 deletions test/sequin/runtime/slot_producer/reorder_buffer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ defmodule Sequin.Runtime.SlotProducer.ReorderBufferTest do
alias Sequin.Runtime.SlotProducer.Message
alias Sequin.Runtime.SlotProducer.ReorderBuffer

@moduletag :capture_log

@reorder_buffer_id Module.concat(__MODULE__, ReorderBuffer)
def reorder_buffer_id, do: @reorder_buffer_id

Expand Down
1 change: 1 addition & 0 deletions test/sequin/runtime/slot_producer/slot_producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ defmodule Sequin.Runtime.SlotProducerTest do
alias Sequin.TestSupport.ReplicationSlots

@moduletag :unboxed
@moduletag :capture_log
@publication "characters_publication"

def replication_slot, do: ReplicationSlots.slot_name(__MODULE__)
Expand Down
2 changes: 2 additions & 0 deletions test/sequin/slot_message_store_state_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ defmodule Sequin.Runtime.SlotMessageStoreStateTest do
alias Sequin.Runtime.SlotMessageStore.State
alias Sequin.Size

@moduletag :capture_log

setup do
state = %State{
consumer: %SinkConsumer{seq: Factory.unique_integer()},
Expand Down
2 changes: 2 additions & 0 deletions test/sequin/table_reader_server_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ defmodule Sequin.Runtime.TableReaderServerTest do
alias Sequin.Runtime.TableReaderServer
alias Sequin.TestSupport.Models.CharacterDetailed

@moduletag :capture_log

@task_sup_name Module.concat(__MODULE__, TaskSupervisor)

setup do
Expand Down
6 changes: 6 additions & 0 deletions test/sequin/table_reader_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ defmodule Sequin.TableReaderTest do
alias Sequin.Factory.DatabasesFactory
alias Sequin.Runtime.KeysetCursor
alias Sequin.Runtime.TableReader
alias Sequin.TestSupport.Models.Character
alias Sequin.TestSupport.Models.CharacterMultiPK

@moduletag :capture_log

setup do
db = DatabasesFactory.insert_configured_postgres_database!(tables: [])
ConnectionCache.cache_connection(db, Repo)
Expand Down Expand Up @@ -355,6 +358,9 @@ defmodule Sequin.TableReaderTest do
db: db,
characters_table: table
} do
# Clean up any characters committed by concurrent unboxed tests
Repo.delete_all(Character)

# Create a table with nil sort_column_attnum
table_with_nil_sort = %{table | sort_column_attnum: nil}

Expand Down
Loading
Loading