Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,6 @@ test-config/
typesense-data/
meilisearch-data/
.expert/

# SST auto-generated type stubs (from monorepo builds that scan package.json dirs)
sst-env.d.ts
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ ENV RELEASE_VERSION=${RELEASE_VERSION}
# Compile the release
RUN mix compile

# Ensure stacktraces we send to Sentry are complete
RUN mix sentry.package_source_code
# Ensure stacktraces we send to Sentry are complete (skip for self-hosted — no DSN)
RUN if [ "$SELF_HOSTED" != "1" ]; then mix sentry.package_source_code; fi

# Changes to config/runtime.exs don't require recompiling the code
COPY config/runtime.exs config/
Expand Down
8 changes: 7 additions & 1 deletion config/prod.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,14 @@ import Config

self_hosted = System.get_env("SELF_HOSTED", "0") in ~w(1 true)

# Empty string DSN crashes Sentry — treat "" as nil for self-hosted builds
sentry_dsn = case System.get_env("SENTRY_DSN") do
"" -> nil
dsn -> dsn
end

config :sentry,
dsn: System.get_env("SENTRY_DSN"),
dsn: sentry_dsn,
release: System.get_env("RELEASE_VERSION")

config :sequin, Sequin.ConsoleLogger, drop_metadata_keys: [:mfa]
Expand Down
10 changes: 10 additions & 0 deletions lib/sequin/accounts/accounts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@ defmodule Sequin.Accounts do
"""
def get_user!(id), do: Repo.get!(User, id)

def get_first_user do
User
|> Ecto.Query.first()
|> Repo.one()
|> case do
nil -> nil
user -> Repo.preload(user, [:accounts, :accounts_users])
end
end

def get_user_with_preloads!(user_id) do
User
|> Repo.get!(user_id)
Expand Down
6 changes: 4 additions & 2 deletions lib/sequin/consumers/meilisearch_sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Sequin.Consumers.MeilisearchSink do

import Ecto.Changeset

@derive {Jason.Encoder, only: [:endpoint_url, :index_name, :primary_key]}
@derive {Jason.Encoder, only: [:endpoint_url, :index_name, :primary_key, :document_mode]}
@derive {Inspect, except: [:api_key]}

@primary_key false
Expand All @@ -18,6 +18,7 @@ defmodule Sequin.Consumers.MeilisearchSink do
field(:batch_size, :integer, default: 100)
field(:timeout_seconds, :integer, default: 5)
field(:routing_mode, Ecto.Enum, values: [:dynamic, :static])
field(:document_mode, Ecto.Enum, values: [:replace, :update], default: :replace)
end

def changeset(struct, params) do
Expand All @@ -29,7 +30,8 @@ defmodule Sequin.Consumers.MeilisearchSink do
:api_key,
:batch_size,
:timeout_seconds,
:routing_mode
:routing_mode,
:document_mode
])
|> validate_required([:endpoint_url, :api_key])
|> validate_routing()
Expand Down
10 changes: 9 additions & 1 deletion lib/sequin/sinks/meilisearch/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,15 @@ defmodule Sequin.Sinks.Meilisearch.Client do
body: jsonl
)

case Req.post(req) do
# :replace (default) uses POST = add or replace (full document replacement)
# :update uses PUT = add or update (partial merge — only overwrites fields present)
result =
case sink.document_mode do
:update -> Req.put(req)
_ -> Req.post(req)
end

case result do
{:ok, %{body: %{"taskUid" => task_id}}} ->
wait_for_task(sink, task_id)

Expand Down
6 changes: 4 additions & 2 deletions lib/sequin/transforms/transforms.ex
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,8 @@ defmodule Sequin.Transforms do
index_name: sink.index_name,
primary_key: sink.primary_key,
api_key: SensitiveValue.new(sink.api_key, show_sensitive),
timeout_seconds: sink.timeout_seconds
timeout_seconds: sink.timeout_seconds,
document_mode: if(sink.document_mode != :replace, do: to_string(sink.document_mode))
})
end

Expand Down Expand Up @@ -1305,7 +1306,8 @@ defmodule Sequin.Transforms do
primary_key: attrs["primary_key"],
api_key: attrs["api_key"],
batch_size: attrs["batch_size"],
timeout_seconds: attrs["timeout_seconds"]
timeout_seconds: attrs["timeout_seconds"],
document_mode: attrs["document_mode"]
}}
end

Expand Down
64 changes: 47 additions & 17 deletions lib/sequin_web/user_auth.ex
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,19 @@ defmodule SequinWeb.UserAuth do
if socket.assigns.current_user do
{:cont, socket}
else
socket =
socket
|> Phoenix.LiveView.put_flash(:toast, %{kind: :error, title: "Please log in to continue."})
|> Phoenix.LiveView.redirect(to: ~p"/login")
if auth_disabled?() do
case Accounts.get_first_user() do
nil -> {:cont, socket}
user -> {:cont, Phoenix.Component.assign(socket, :current_user, user)}
end
else
socket =
socket
|> Phoenix.LiveView.put_flash(:toast, %{kind: :error, title: "Please log in to continue."})
|> Phoenix.LiveView.redirect(to: ~p"/login")

{:halt, socket}
{:halt, socket}
end
end
end

Expand Down Expand Up @@ -279,20 +286,24 @@ defmodule SequinWeb.UserAuth do
if conn.assigns[:current_user] do
conn
else
{title, redirect_to} =
case Keyword.get(opts, :unauthenticated_redirect, :login) do
:login ->
{"Please log in to continue.", ~p"/login"}
if auth_disabled?() do
auto_login_default_user(conn)
else
{title, redirect_to} =
case Keyword.get(opts, :unauthenticated_redirect, :login) do
:login ->
{"Please log in to continue.", ~p"/login"}

:register ->
{"Please register to continue.", ~p"/register"}
end
:register ->
{"Please register to continue.", ~p"/register"}
end

conn
|> put_flash(:toast, %{kind: :error, title: title})
|> maybe_store_return_to()
|> redirect(to: redirect_to)
|> halt()
conn
|> put_flash(:toast, %{kind: :error, title: title})
|> maybe_store_return_to()
|> redirect(to: redirect_to)
|> halt()
end
end
end

Expand All @@ -309,4 +320,23 @@ defmodule SequinWeb.UserAuth do
defp maybe_store_return_to(conn), do: conn

defp signed_in_path(_conn), do: ~p"/"

defp auth_disabled? do
System.get_env("AUTH_DISABLED") in ~w(true 1)
end

defp auto_login_default_user(conn) do
case Accounts.get_first_user() do
nil ->
conn

user ->
token = Accounts.generate_user_session_token(user)

conn
|> renew_session()
|> put_token_in_session(token)
|> assign(:current_user, user)
end
end
end
53 changes: 53 additions & 0 deletions test/sequin/meilisearch_client_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,59 @@ defmodule Sequin.Sinks.Meilisearch.ClientTest do
end
end

describe "import_documents/3 with document_mode: :update" do
@update_sink %MeilisearchSink{
type: :meilisearch,
endpoint_url: "http://127.0.0.1:7700",
index_name: "test",
primary_key: "id",
api_key: "token",
document_mode: :update
}

test "uses PUT when document_mode is :update" do
records = [SinkFactory.meilisearch_record()]

Req.Test.expect(Client, fn conn ->
assert conn.method == "PUT"
assert conn.request_path == "/indexes/test/documents"

Req.Test.json(conn, %{"taskUid" => 1})
end)

Req.Test.expect(Client, fn conn ->
assert conn.method == "GET"
assert conn.request_path == "/tasks/1"

response_data = %{"status" => "succeeded"}
send_gzipped_response(conn, 200, response_data)
end)

assert :ok = Client.import_documents(@update_sink, "test", records)
end

test "uses POST when document_mode is :replace (default)" do
records = [SinkFactory.meilisearch_record()]

Req.Test.expect(Client, fn conn ->
assert conn.method == "POST"
assert conn.request_path == "/indexes/test/documents"

Req.Test.json(conn, %{"taskUid" => 1})
end)

Req.Test.expect(Client, fn conn ->
assert conn.method == "GET"
assert conn.request_path == "/tasks/1"

response_data = %{"status" => "succeeded"}
send_gzipped_response(conn, 200, response_data)
end)

assert :ok = Client.import_documents(@sink, "test", records)
end
end

describe "delete_documents/2" do
test "successfully delete batch" do
records = [SinkFactory.meilisearch_record(), SinkFactory.meilisearch_record()]
Expand Down
16 changes: 16 additions & 0 deletions test/sequin/meilisearch_sink_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,21 @@ defmodule Sequin.Consumers.MeilisearchSinkTest do

refute :index_name in changeset.changes
end

test "defaults document_mode to :replace", %{valid_params: params} do
changeset = MeilisearchSink.changeset(%MeilisearchSink{}, params)
assert Ecto.Changeset.get_field(changeset, :document_mode) == :replace
end

test "accepts document_mode :update", %{valid_params: params} do
changeset = MeilisearchSink.changeset(%MeilisearchSink{}, Map.put(params, :document_mode, :update))
assert Sequin.Error.errors_on(changeset) == %{}
assert Ecto.Changeset.get_field(changeset, :document_mode) == :update
end

test "rejects invalid document_mode", %{valid_params: params} do
changeset = MeilisearchSink.changeset(%MeilisearchSink{}, Map.put(params, :document_mode, :invalid))
assert Sequin.Error.errors_on(changeset)[:document_mode] != nil
end
end
end
Loading