diff --git a/README.md b/README.md index 5a91a25fc..43235418a 100644 --- a/README.md +++ b/README.md @@ -97,6 +97,42 @@ After setup, Sequin will stream new changes to the sink as they occur in real-ti Sequin comes with a web console/UI for configuration and monitoring. You can also configure Sequin as code using [YAML config files](https://sequinstream.com/docs/reference/sequin-yaml) and our [Management API](https://sequinstream.com/docs/management-api/introduction). +## AWS IAM authentication + +When running on AWS (EKS, ECS, or EC2), Sequin supports IAM-based authentication so you don't need to manage long-lived AWS credentials. + +### IRSA (IAM Roles for Service Accounts) + +For EKS deployments, Sequin natively supports [IRSA](https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html). When the `AWS_ROLE_ARN` and `AWS_WEB_IDENTITY_TOKEN_FILE` environment variables are present (injected automatically by EKS), Sequin will use them to obtain temporary credentials via STS `AssumeRoleWithWebIdentity`. Credentials are refreshed automatically before expiry. + +ECS task roles and EC2 instance profiles are also supported. + +### Sinks + +AWS sinks (SQS, SNS, Kinesis, and Kafka via MSK IAM) support a **"Use task role"** toggle. When enabled, the sink uses IRSA or task role credentials instead of explicit access keys. Configure this in the web console or via YAML: + +```yaml +sinks: + - type: sqs + queue_url: https://sqs.us-east-1.amazonaws.com/123456789012/my-queue + use_task_role: true +``` + +### Source databases (RDS) + +Postgres source databases hosted on RDS can use [IAM database authentication](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/UsingWithRDS.IAMDBAuth.html) instead of a static password. Enable `use_iam_auth` on the database configuration and provide the `iam_region`. Sequin will generate short-lived auth tokens automatically. + +### Sequin's internal database + +Sequin's own internal Postgres database (used for configuration and state) can also use RDS IAM auth. Set these environment variables: + +| Variable | Description | +|----------|-------------| +| `PG_IAM_AUTH=true` | Enable IAM auth for Sequin's internal database | +| `PG_IAM_REGION` | AWS region of the RDS instance (required when `PG_IAM_AUTH` is enabled) | + +When enabled, `PG_PASSWORD` is no longer required. SSL is enforced automatically. + ## Why Sequin We all know Postgres is great for storing and querying data. But what about when you need to stream changes to other systems? diff --git a/config/config.exs b/config/config.exs index eb39155f6..8c56e762b 100644 --- a/config/config.exs +++ b/config/config.exs @@ -12,8 +12,10 @@ sequin_stream_schema = "sequin_streams" config :esbuild, :version, "0.17.11" -# Used by broadway_sqs -config :ex_aws, http_client: ExAws.Request.Req +# Used by broadway_sqs and IRSA credential provider +config :ex_aws, + http_client: ExAws.Request.Req, + json_codec: Jason config :logger, :console, format: {Sequin.ConsoleLogger, :format}, diff --git a/config/runtime.exs b/config/runtime.exs index 32914c665..6dcd27a76 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -111,6 +111,13 @@ if config_env() == :prod and self_hosted do backfill_max_pending_messages = ConfigParser.backfill_max_pending_messages(env_vars) + pg_iam_auth? = System.get_env("PG_IAM_AUTH") in ~w(true 1) + pg_iam_region = System.get_env("PG_IAM_REGION") + + if pg_iam_auth? and is_nil(pg_iam_region) do + raise "PG_IAM_REGION is required when PG_IAM_AUTH is enabled" + end + database_url = case System.get_env("PG_URL") do nil -> @@ -120,14 +127,30 @@ if config_env() == :prod and self_hosted do username = System.get_env("PG_USERNAME") password = System.get_env("PG_PASSWORD") - if Enum.all?([hostname, database, port, username, password], &(not is_nil(&1))) do - "postgres://#{username}:#{password}@#{hostname}:#{port}/#{database}" + # When using IAM auth, password is not required — a token is generated at connection time + required_vars = [hostname, database, port, username] + + if pg_iam_auth? do + if Enum.all?(required_vars, &(not is_nil(&1))) do + # Use a placeholder password in the URL; it will be replaced by the IAM token + "postgres://#{username}:iam-placeholder@#{hostname}:#{port}/#{database}" + else + raise """ + Missing PostgreSQL connection information. + When PG_IAM_AUTH is enabled, provide either PG_URL or all of the following: + PG_HOSTNAME, PG_DATABASE, PG_PORT, PG_USERNAME + """ + end else - raise """ - Missing PostgreSQL connection information. - Please provide either PG_URL or all of the following environment variables: - PG_HOSTNAME, PG_DATABASE, PG_PORT, PG_USERNAME, PG_PASSWORD - """ + if Enum.all?([password | required_vars], &(not is_nil(&1))) do + "postgres://#{username}:#{password}@#{hostname}:#{port}/#{database}" + else + raise """ + Missing PostgreSQL connection information. + Please provide either PG_URL or all of the following environment variables: + PG_HOSTNAME, PG_DATABASE, PG_PORT, PG_USERNAME, PG_PASSWORD + """ + end end url -> @@ -163,17 +186,27 @@ if config_env() == :prod and self_hosted do :ok end + repo_opts = [ + ssl: if(pg_iam_auth?, do: [verify: :verify_none], else: repo_ssl), + pool_size: String.to_integer(System.get_env("PG_POOL_SIZE", "10")), + url: database_url, + socket_options: ConfigParser.ecto_socket_opts(env_vars) + ] + + repo_opts = + if pg_iam_auth? do + Keyword.put(repo_opts, :configure, {Sequin.Aws.RepoIamAuth, :configure, [pg_iam_region]}) + else + repo_opts + end + config :sequin, Sequin.Posthog, req_opts: [base_url: "https://us.i.posthog.com"], api_key: "phc_i9k28nZwjjJG9DzUK0gDGASxXtGNusdI1zdaz9cuA7h", frontend_api_key: "phc_i9k28nZwjjJG9DzUK0gDGASxXtGNusdI1zdaz9cuA7h", is_disabled: System.get_env("SEQUIN_TELEMETRY_DISABLED") in ~w(true 1) - config :sequin, Sequin.Repo, - ssl: repo_ssl, - pool_size: String.to_integer(System.get_env("PG_POOL_SIZE", "10")), - url: database_url, - socket_options: ConfigParser.ecto_socket_opts(env_vars) + config :sequin, Sequin.Repo, repo_opts config :sequin, SequinWeb.Endpoint, # `url` is used for configuring links in the console. So it corresponds to the *external* diff --git a/lib/sequin/application.ex b/lib/sequin/application.ex index c6e0d166f..aeadfb3cd 100644 --- a/lib/sequin/application.ex +++ b/lib/sequin/application.ex @@ -76,6 +76,7 @@ defmodule Sequin.Application do Sequin.Redis.connect_cluster() [ + Sequin.Aws.IrsaCredentials, Sequin.Repo, Sequin.Vault, Sequin.PubSub.child_spec(), diff --git a/lib/sequin/aws/client.ex b/lib/sequin/aws/client.ex index 0029ce3db..149841828 100644 --- a/lib/sequin/aws/client.ex +++ b/lib/sequin/aws/client.ex @@ -12,6 +12,7 @@ defmodule Sequin.Aws.Client do @behaviour Sequin.Aws + alias Sequin.Aws.IrsaCredentials alias Sequin.Error require Logger @@ -34,6 +35,15 @@ defmodule Sequin.Aws.Client do end defp get_credentials do + # IRSA (Kubernetes/EKS) takes priority if available + if IrsaCredentials.available?() do + IrsaCredentials.get_credentials() + else + get_credentials_from_metadata() + end + end + + defp get_credentials_from_metadata do case Application.ensure_all_started(:aws_credentials) do {:ok, _} -> case :aws_credentials.get_credentials() do diff --git a/lib/sequin/aws/irsa_credentials.ex b/lib/sequin/aws/irsa_credentials.ex new file mode 100644 index 000000000..a404c2acc --- /dev/null +++ b/lib/sequin/aws/irsa_credentials.ex @@ -0,0 +1,221 @@ +defmodule Sequin.Aws.IrsaCredentials do + @moduledoc """ + GenServer that manages IRSA (IAM Roles for Service Accounts) credentials for EKS. + + Reads the projected JWT from `AWS_WEB_IDENTITY_TOKEN_FILE`, calls + `STS:AssumeRoleWithWebIdentity` via `ex_aws_sts`, and refreshes the resulting + temporary credentials before they expire. + + The GenServer only starts when both `AWS_ROLE_ARN` and `AWS_WEB_IDENTITY_TOKEN_FILE` + environment variables are set. Otherwise it returns `:ignore` from `init/1`. + """ + + use GenServer + + alias Sequin.Error + + require Logger + + @default_session_name "sequin-irsa" + # Refresh at 80% of credential TTL + @refresh_ratio 0.8 + # Minimum refresh interval: 60 seconds + @min_refresh_ms to_timeout(minute: 1) + # Retry interval on failure: 30 seconds + @retry_ms to_timeout(second: 30) + + defmodule State do + @moduledoc false + use TypedStruct + + typedstruct do + field :role_arn, String.t(), enforce: true + field :token_file, String.t(), enforce: true + field :session_name, String.t(), enforce: true + field :credentials, map() + field :expires_at, DateTime.t() + field :refresh_timer, reference() + end + end + + # -- Public API -- + + def start_link(opts \\ []) do + name = Keyword.get(opts, :name, __MODULE__) + GenServer.start_link(__MODULE__, opts, name: name) + end + + @doc """ + Returns true if the IRSA GenServer is running and has credentials available. + """ + @spec available?() :: boolean() + def available? do + case GenServer.whereis(__MODULE__) do + nil -> false + pid -> Process.alive?(pid) and GenServer.call(pid, :available?) + end + catch + :exit, _ -> false + end + + @doc """ + Returns the current IRSA credentials. + + Returns `{:ok, %{access_key_id: ..., secret_access_key: ..., token: ...}}` + or `{:error, reason}`. + """ + @spec get_credentials() :: {:ok, map()} | {:error, Error.t()} + def get_credentials do + GenServer.call(__MODULE__, :get_credentials) + catch + :exit, _ -> + {:error, Error.service(service: :aws, message: "IRSA credentials server not available")} + end + + # -- GenServer callbacks -- + + @impl GenServer + def init(opts) do + role_arn = Keyword.get(opts, :role_arn) || System.get_env("AWS_ROLE_ARN") + token_file = Keyword.get(opts, :token_file) || System.get_env("AWS_WEB_IDENTITY_TOKEN_FILE") + session_name = Keyword.get(opts, :session_name) || System.get_env("AWS_ROLE_SESSION_NAME", @default_session_name) + + if is_nil(role_arn) or is_nil(token_file) do + :ignore + else + state = %State{ + role_arn: role_arn, + token_file: token_file, + session_name: session_name + } + + # Fetch credentials immediately on startup + case fetch_credentials(state) do + {:ok, new_state} -> + {:ok, new_state} + + {:error, reason} -> + Logger.error("IRSA initial credential fetch failed: #{inspect(reason)}. Will retry.") + timer = Process.send_after(self(), :refresh, @retry_ms) + {:ok, %{state | refresh_timer: timer}} + end + end + end + + @impl GenServer + def handle_call(:available?, _from, %State{credentials: credentials} = state) do + {:reply, not is_nil(credentials), state} + end + + @impl GenServer + def handle_call(:get_credentials, _from, %State{credentials: nil} = state) do + {:reply, {:error, Error.service(service: :aws, message: "IRSA credentials not yet available")}, state} + end + + @impl GenServer + def handle_call(:get_credentials, _from, %State{credentials: credentials} = state) do + {:reply, {:ok, credentials}, state} + end + + @impl GenServer + def handle_info(:refresh, state) do + case fetch_credentials(state) do + {:ok, new_state} -> + {:noreply, new_state} + + {:error, reason} -> + Logger.error("IRSA credential refresh failed: #{inspect(reason)}. Will retry in #{@retry_ms}ms.") + timer = Process.send_after(self(), :refresh, @retry_ms) + {:noreply, %{state | refresh_timer: timer}} + end + end + + # -- Private -- + + defp fetch_credentials(%State{} = state) do + with {:ok, token} <- read_token_file(state.token_file), + {:ok, result} <- assume_role_with_web_identity(state.role_arn, token, state.session_name) do + credentials = %{ + access_key_id: result.access_key_id, + secret_access_key: result.secret_access_key, + token: result.session_token + } + + expires_at = result.expiration + refresh_ms = calculate_refresh_ms(expires_at) + + if state.refresh_timer, do: Process.cancel_timer(state.refresh_timer) + timer = Process.send_after(self(), :refresh, refresh_ms) + + Logger.info("IRSA credentials fetched successfully. Next refresh in #{div(refresh_ms, 1000)}s.") + + {:ok, %{state | credentials: credentials, expires_at: expires_at, refresh_timer: timer}} + end + end + + defp read_token_file(path) do + case File.read(path) do + {:ok, token} -> + {:ok, String.trim(token)} + + {:error, reason} -> + {:error, Error.service(service: :aws, message: "Failed to read IRSA token file #{path}: #{inspect(reason)}")} + end + end + + defp assume_role_with_web_identity(role_arn, token, session_name) do + op = + ExAws.STS.assume_role_with_web_identity( + role_arn, + session_name, + token, + duration: 3600 + ) + + # AssumeRoleWithWebIdentity doesn't require pre-existing AWS credentials — + # the web identity token IS the authentication. We pass dummy credentials + # to satisfy ExAws's credential resolution (same approach as the built-in + # AssumeRoleWebIdentityAdapter). + config = %{access_key_id: "not-used", secret_access_key: "not-used"} + + case ExAws.request(op, config) do + {:ok, %{body: body}} -> + parse_assume_role_response(body) + + {:error, {:http_error, status, %{body: body}}} -> + {:error, Error.service(service: :aws, message: "STS AssumeRoleWithWebIdentity failed (HTTP #{status}): #{body}")} + + {:error, reason} -> + {:error, Error.service(service: :aws, message: "STS AssumeRoleWithWebIdentity failed: #{inspect(reason)}")} + end + end + + defp parse_assume_role_response(body) do + credentials = body[:credentials] + + if credentials do + expiration = + case DateTime.from_iso8601(credentials[:expiration]) do + {:ok, dt, _offset} -> dt + _ -> DateTime.add(DateTime.utc_now(), 3600, :second) + end + + {:ok, + %{ + access_key_id: credentials[:access_key_id], + secret_access_key: credentials[:secret_access_key], + session_token: credentials[:session_token], + expiration: expiration + }} + else + {:error, Error.service(service: :aws, message: "Unexpected STS response format: #{inspect(body)}")} + end + end + + defp calculate_refresh_ms(expires_at) do + now = DateTime.utc_now() + ttl_seconds = DateTime.diff(expires_at, now, :second) + refresh_seconds = trunc(ttl_seconds * @refresh_ratio) + max(refresh_seconds * 1000, @min_refresh_ms) + end +end diff --git a/lib/sequin/aws/rds_token.ex b/lib/sequin/aws/rds_token.ex new file mode 100644 index 000000000..a1a7817a2 --- /dev/null +++ b/lib/sequin/aws/rds_token.ex @@ -0,0 +1,62 @@ +defmodule Sequin.Aws.RdsToken do + @moduledoc """ + Generates RDS IAM authentication tokens. + + RDS IAM auth tokens are pre-signed URLs using AWS SigV4 that serve as + short-lived passwords (valid for 15 minutes) for Postgres RDS connections. + """ + + @service "rds-db" + # 15 minutes, the maximum for RDS IAM tokens + @ttl 900 + + @doc """ + Generates an RDS IAM authentication token for the given connection parameters. + + The token is a pre-signed URL that can be used as a Postgrex password. + It is valid for 15 minutes. + + ## Parameters + + - `hostname` - The RDS instance hostname + - `port` - The RDS instance port + - `username` - The database username + - `credentials` - Map with `:access_key_id`, `:secret_access_key`, and optionally `:token` (session token) + - `region` - The AWS region + + ## Returns + + - `{:ok, token}` on success + - `{:error, reason}` on failure + """ + @spec generate(String.t(), integer(), String.t(), map(), String.t()) :: {:ok, String.t()} | {:error, term()} + def generate(hostname, port, username, credentials, region) do + url = "https://#{hostname}:#{port}/?Action=connect&DBUser=#{URI.encode_www_form(username)}" + now = NaiveDateTime.to_erl(NaiveDateTime.utc_now()) + + opts = maybe_add_session_token([ttl: @ttl], Map.get(credentials, :token)) + + signed_url = + :aws_signature.sign_v4_query_params( + credentials.access_key_id, + credentials.secret_access_key, + region, + @service, + now, + "GET", + url, + opts + ) + + # Strip the protocol prefix — RDS expects just the host:port/query portion + token = String.replace_leading(signed_url, "https://", "") + + {:ok, token} + rescue + e -> + {:error, Sequin.Error.service(service: :aws, message: "Failed to generate RDS IAM token: #{Exception.message(e)}")} + end + + defp maybe_add_session_token(opts, nil), do: opts + defp maybe_add_session_token(opts, token), do: Keyword.put(opts, :session_token, token) +end diff --git a/lib/sequin/aws/repo_iam_auth.ex b/lib/sequin/aws/repo_iam_auth.ex new file mode 100644 index 000000000..0b96b297e --- /dev/null +++ b/lib/sequin/aws/repo_iam_auth.ex @@ -0,0 +1,31 @@ +defmodule Sequin.Aws.RepoIamAuth do + @moduledoc """ + Ecto Repo `:configure` callback for RDS IAM authentication. + + When Sequin's internal database is hosted on AWS RDS and IAM auth is enabled + via `PG_IAM_AUTH=true`, this module generates a fresh RDS IAM auth token + before each new database connection. + """ + + require Logger + + def configure(region, opts) do + hostname = Keyword.fetch!(opts, :hostname) + port = Keyword.fetch!(opts, :port) + username = Keyword.fetch!(opts, :username) + + with {:ok, client} <- Sequin.Aws.get_client(region), + credentials = %{ + access_key_id: client.access_key_id, + secret_access_key: client.secret_access_key, + token: Map.get(client, :session_token) + }, + {:ok, token} <- Sequin.Aws.RdsToken.generate(hostname, port, username, credentials, region) do + {:ok, Keyword.put(opts, :password, token)} + else + {:error, reason} -> + Logger.error("Failed to generate RDS IAM token for Sequin.Repo: #{inspect(reason)}") + {:error, reason} + end + end +end diff --git a/lib/sequin/consumers/kafka_sink.ex b/lib/sequin/consumers/kafka_sink.ex index 6595a4ed5..edae00053 100644 --- a/lib/sequin/consumers/kafka_sink.ex +++ b/lib/sequin/consumers/kafka_sink.ex @@ -9,7 +9,7 @@ defmodule Sequin.Consumers.KafkaSink do alias Sequin.Encrypted.Field, as: EncryptedField alias Sequin.Sinks.Kafka.AwsMskIam - @derive {Jason.Encoder, only: [:hosts, :topic]} + @derive {Jason.Encoder, only: [:hosts, :topic, :use_task_role]} @derive {Inspect, except: [:password, :aws_secret_access_key]} @primary_key false typed_embedded_schema do @@ -23,6 +23,7 @@ defmodule Sequin.Consumers.KafkaSink do field :aws_region, :string field :aws_access_key_id, :string field :aws_secret_access_key, EncryptedField + field :use_task_role, :boolean, default: false field :connection_id, :string field :routing_mode, Ecto.Enum, values: [:dynamic, :static] field :compression, Ecto.Enum, values: [:none, :gzip, :snappy, :lz4, :zstd], default: :none @@ -40,6 +41,7 @@ defmodule Sequin.Consumers.KafkaSink do :aws_region, :aws_access_key_id, :aws_secret_access_key, + :use_task_role, :routing_mode, :compression ]) @@ -48,6 +50,7 @@ defmodule Sequin.Consumers.KafkaSink do |> validate_length(:topic, max: 255) |> validate_hosts() |> validate_sasl_credentials() + |> validate_cloud_mode_restrictions() |> put_new_connection_id() end @@ -100,6 +103,7 @@ defmodule Sequin.Consumers.KafkaSink do sasl_mechanism = get_field(changeset, :sasl_mechanism) username = get_field(changeset, :username) password = get_field(changeset, :password) + use_task_role = get_field(changeset, :use_task_role) cond do sasl_mechanism in [:plain, :scram_sha_256, :scram_sha_512] -> @@ -107,6 +111,20 @@ defmodule Sequin.Consumers.KafkaSink do message: "is required when SASL Mechanism is #{sasl_mechanism}" ) + sasl_mechanism == :aws_msk_iam and use_task_role -> + # When using task role, only region is required — credentials come from IRSA/metadata + changeset = + changeset + |> validate_required([:aws_region], message: "is required when SASL Mechanism is #{sasl_mechanism}") + |> put_change(:aws_access_key_id, nil) + |> put_change(:aws_secret_access_key, nil) + + if get_field(changeset, :tls) do + changeset + else + add_error(changeset, :tls, "is required when SASL Mechanism is #{sasl_mechanism}") + end + sasl_mechanism == :aws_msk_iam -> changeset = validate_required(changeset, [:aws_access_key_id, :aws_secret_access_key, :aws_region], @@ -127,6 +145,21 @@ defmodule Sequin.Consumers.KafkaSink do end end + defp validate_cloud_mode_restrictions(changeset) do + self_hosted? = Sequin.Config.self_hosted?() + use_task_role? = get_field(changeset, :use_task_role) + + if not self_hosted? and use_task_role? do + add_error( + changeset, + :use_task_role, + "Task role credentials are not supported in Sequin Cloud. Please use explicit credentials instead." + ) + else + changeset + end + end + defp put_new_connection_id(changeset) do case get_field(changeset, :connection_id) do nil -> put_change(changeset, :connection_id, Ecto.UUID.generate()) @@ -186,6 +219,14 @@ defmodule Sequin.Consumers.KafkaSink do defp brod_compression(other), do: other # Add SASL authentication if username/password are configured + defp maybe_add_sasl(config, %{sasl_mechanism: :aws_msk_iam, use_task_role: true} = sink) do + Keyword.put( + config, + :sasl, + {:callback, AwsMskIam.Auth, {:AWS_MSK_IAM, :dynamic, sink.aws_region}} + ) + end + defp maybe_add_sasl(config, %{sasl_mechanism: :aws_msk_iam} = sink) do Keyword.put( config, diff --git a/lib/sequin/databases/postgres_database.ex b/lib/sequin/databases/postgres_database.ex index 0d2027f46..7d1d094e3 100644 --- a/lib/sequin/databases/postgres_database.ex +++ b/lib/sequin/databases/postgres_database.ex @@ -32,6 +32,8 @@ defmodule Sequin.Databases.PostgresDatabase do :password, :ipv6, :use_local_tunnel, + :use_iam_auth, + :iam_region, :pg_major_version ]} @derive {Inspect, except: [:tables, :password]} @@ -50,6 +52,8 @@ defmodule Sequin.Databases.PostgresDatabase do field :tables_refreshed_at, :utc_datetime field :ipv6, :boolean, default: false field :use_local_tunnel, :boolean, default: false + field :use_iam_auth, :boolean, default: false + field :iam_region, :string field :annotations, :map, default: %{} field :pg_major_version, :integer @@ -83,10 +87,12 @@ defmodule Sequin.Databases.PostgresDatabase do :username, :ipv6, :use_local_tunnel, + :use_iam_auth, + :iam_region, :annotations, :pg_major_version ]) - |> validate_required([:hostname, :database, :username, :password, :name]) + |> validate_iam_auth_or_password() |> validate_number(:port, greater_than_or_equal_to: 0, less_than_or_equal_to: 65_535) |> validate_number(:pool_size, greater_than_or_equal_to: 1) |> validate_not_supabase_pooled() @@ -100,6 +106,7 @@ defmodule Sequin.Databases.PostgresDatabase do |> Sequin.Changeset.validate_name() |> foreign_key_constraint(:account_id, name: "postgres_databases_account_id_fkey") |> Sequin.Changeset.annotations_check_constraint() + |> validate_iam_cloud_mode_restrictions() end def create_changeset(pd, attrs) do @@ -110,6 +117,34 @@ defmodule Sequin.Databases.PostgresDatabase do changeset(pd, attrs) end + defp validate_iam_auth_or_password(changeset) do + use_iam_auth = get_field(changeset, :use_iam_auth) + + if use_iam_auth do + # When using IAM auth, password is not required but region and SSL are + changeset + |> validate_required([:hostname, :database, :username, :name, :iam_region]) + |> put_change(:ssl, true) + else + validate_required(changeset, [:hostname, :database, :username, :password, :name]) + end + end + + defp validate_iam_cloud_mode_restrictions(changeset) do + self_hosted? = Sequin.Config.self_hosted?() + use_iam_auth? = get_field(changeset, :use_iam_auth) + + if not self_hosted? and use_iam_auth? do + add_error( + changeset, + :use_iam_auth, + "IAM authentication is not supported in Sequin Cloud. Please use password authentication instead." + ) + else + changeset + end + end + defp validate_not_supabase_pooled(%Ecto.Changeset{valid?: false} = changeset), do: changeset defp validate_not_supabase_pooled(%Ecto.Changeset{valid?: true} = changeset) do @@ -171,9 +206,10 @@ defmodule Sequin.Databases.PostgresDatabase do end def to_postgrex_opts(%PostgresDatabase{} = pd) do + pd_resolved = with_local_tunnel(pd) + opts = - pd - |> with_local_tunnel() + pd_resolved |> Sequin.Map.from_ecto() |> Map.take([ :database, @@ -211,13 +247,57 @@ defmodule Sequin.Databases.PostgresDatabase do opts = Keyword.put(opts, :ssl, ssl) - if pd.ipv6 do - Keyword.put(opts, :socket_options, [:inet6]) + opts = + if pd.ipv6 do + Keyword.put(opts, :socket_options, [:inet6]) + else + opts + end + + if pd.use_iam_auth do + add_iam_auth_configure(opts, pd_resolved) else opts end end + # Adds a :configure callback that generates a fresh RDS IAM token before each connection. + # This ensures tokens are always fresh since they expire after 15 minutes. + defp add_iam_auth_configure(opts, pd) do + hostname = pd.hostname + port = pd.port + username = pd.username + region = pd.iam_region + + configure_fn = fn opts -> + case Sequin.Aws.get_client(region) do + {:ok, client} -> + credentials = %{ + access_key_id: client.access_key_id, + secret_access_key: client.secret_access_key, + token: Map.get(client, :session_token) + } + + case Sequin.Aws.RdsToken.generate(hostname, port, username, credentials, region) do + {:ok, token} -> + {:ok, Keyword.put(opts, :password, token)} + + {:error, reason} -> + Logger.error("Failed to generate RDS IAM token: #{inspect(reason)}") + {:error, reason} + end + + {:error, reason} -> + Logger.error("Failed to get AWS credentials for RDS IAM auth: #{inspect(reason)}") + {:error, reason} + end + end + + opts + |> Keyword.delete(:password) + |> Keyword.put(:configure, configure_fn) + end + def to_protocol_opts(%PostgresDatabase{} = pd) do pd |> to_postgrex_opts() diff --git a/lib/sequin/sinks/kafka/aws_msk_iam/auth.ex b/lib/sequin/sinks/kafka/aws_msk_iam/auth.ex index 6128d0e6d..154611134 100644 --- a/lib/sequin/sinks/kafka/aws_msk_iam/auth.ex +++ b/lib/sequin/sinks/kafka/aws_msk_iam/auth.ex @@ -44,6 +44,27 @@ defmodule Sequin.Sinks.Kafka.AwsMskIam.Auth do {:error, "AWS Region is empty"} end + # Dynamic credential resolution via IRSA/task role + def auth(host, sock, mod, client_id, timeout, {:AWS_MSK_IAM = _mechanism, :dynamic, aws_region} = _sasl_opts) do + case Sequin.Aws.get_client(aws_region) do + {:ok, client} -> + access_key_id = client.access_key_id + secret_access_key = client.secret_access_key + + auth( + host, + sock, + mod, + client_id, + timeout, + {:AWS_MSK_IAM, access_key_id, secret_access_key, aws_region} + ) + + {:error, reason} -> + {:error, "Failed to resolve dynamic AWS credentials: #{inspect(reason)}"} + end + end + # The following code is based on the implmentation of SASL handshake implementation from kafka_protocol Erlang library # Ref: https://github.com/kafka4beam/kafka_protocol/blob/master/src/kpro_sasl.erl @impl true diff --git a/mix.exs b/mix.exs index 22b4f9d3c..b7a34fb0f 100644 --- a/mix.exs +++ b/mix.exs @@ -73,6 +73,9 @@ defmodule Sequin.MixProject do {:aws_credentials, "~> 1.0.0", runtime: false}, {:aws_rds_castore, "~> 1.2.0"}, {:aws_signature, "~> 0.3.2"}, + {:ex_aws, "~> 2.5"}, + {:ex_aws_sts, "~> 2.3"}, + {:sweet_xml, "~> 0.7"}, # Monitoring and Observability {:telemetry_metrics, "~> 1.0"}, diff --git a/mix.lock b/mix.lock index c96231eec..ed48a44fc 100644 --- a/mix.lock +++ b/mix.lock @@ -47,6 +47,7 @@ "esbuild": {:hex, :esbuild, "0.8.1", "0cbf919f0eccb136d2eeef0df49c4acf55336de864e63594adcea3814f3edf41", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "25fc876a67c13cb0a776e7b5d7974851556baeda2085296c14ab48555ea7560f"}, "ex_aws": {:hex, :ex_aws, "2.5.9", "8e2455172f0e5cbe2f56dd68de514f0dae6bb26d6b6e2f435a06434cf9dbb412", [:mix], [{:configparser_ex, "~> 4.0", [hex: :configparser_ex, repo: "hexpm", optional: true]}, {:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:jsx, "~> 2.8 or ~> 3.0", [hex: :jsx, repo: "hexpm", optional: true]}, {:mime, "~> 1.2 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:req, "~> 0.5.10 or ~> 0.6 or ~> 1.0", [hex: :req, repo: "hexpm", optional: true]}, {:sweet_xml, "~> 0.7", [hex: :sweet_xml, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "cbdb6ffb0e6c6368de05ed8641fe1376298ba23354674428e5b153a541f23359"}, "ex_aws_sqs": {:hex, :ex_aws_sqs, "3.4.0", "f7c4d0177c1c954776363d3dc05e5dfd37ddf0e2c65ec3f047e5c9c7dd1b71ac", [:mix], [{:ex_aws, "~> 2.1", [hex: :ex_aws, repo: "hexpm", optional: false]}, {:hackney, "~> 1.9", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:saxy, "~> 1.1", [hex: :saxy, repo: "hexpm", optional: true]}, {:sweet_xml, ">= 0.0.0", [hex: :sweet_xml, repo: "hexpm", optional: true]}], "hexpm", "b504482206ccaf767b714888e9d41a1cfcdcb241577985517114191c812f155a"}, + "ex_aws_sts": {:hex, :ex_aws_sts, "2.3.0", "ce48c4cba7f1595a7d544458d0202ca313124026dba7b1a0021bbb1baa3d66d0", [:mix], [{:ex_aws, "~> 2.2", [hex: :ex_aws, repo: "hexpm", optional: false]}], "hexpm", "f14e4c7da3454514bf253b331e9422d25825485c211896ab3b81d2a4bdbf62f5"}, "expo": {:hex, :expo, "0.5.2", "beba786aab8e3c5431813d7a44b828e7b922bfa431d6bfbada0904535342efe2", [:mix], [], "hexpm", "8c9bfa06ca017c9cb4020fabe980bc7fdb1aaec059fd004c2ab3bff03b1c599c"}, "ezstd": {:hex, :ezstd, "1.2.3", "98748f4099e6e2a067f77ace43041ebaa53c13194b08ce22370e4c93079e9e16", [:rebar3], [], "hexpm", "de32e0b41ba36a9ed46db8215da74777d2f141bb75f67bfc05dbb4b7c3386dee"}, "faker": {:hex, :faker, "0.18.0", "943e479319a22ea4e8e39e8e076b81c02827d9302f3d32726c5bf82f430e6e14", [:mix], [], "hexpm", "bfbdd83958d78e2788e99ec9317c4816e651ad05e24cfd1196ce5db5b3e81797"}, @@ -122,6 +123,7 @@ "statistex": {:hex, :statistex, "1.0.0", "f3dc93f3c0c6c92e5f291704cf62b99b553253d7969e9a5fa713e5481cd858a5", [:mix], [], "hexpm", "ff9d8bee7035028ab4742ff52fc80a2aa35cece833cf5319009b52f1b5a86c27"}, "stillir": {:hex, :stillir, "1.0.0", "9e77eaadd2418a61ec7398c01e29dea26d14f51c42e0b309084493e3ed33337a", [:rebar3], [], "hexpm", "04afdee2e5123b6da11fcc28c38d581f74db0cbe1faa1c36ed4f364797b588c0"}, "styler": {:hex, :styler, "1.4.2", "420da8a9d10324625b75690ca9f2468bc00ee6eb78dead827e562368f9feabbb", [:mix], [], "hexpm", "ca22538b203b2424eef99a227e081143b9a9a4b26da75f26d920537fcd778832"}, + "sweet_xml": {:hex, :sweet_xml, "0.7.5", "803a563113981aaac202a1dbd39771562d0ad31004ddbfc9b5090bdcd5605277", [:mix], [], "hexpm", "193b28a9b12891cae351d81a0cead165ffe67df1b73fe5866d10629f4faefb12"}, "swoosh": {:hex, :swoosh, "1.16.9", "20c6a32ea49136a4c19f538e27739bb5070558c0fa76b8a95f4d5d5ca7d319a1", [:mix], [{:bandit, ">= 1.0.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:cowboy, "~> 1.1 or ~> 2.4", [hex: :cowboy, repo: "hexpm", optional: true]}, {:ex_aws, "~> 2.1", [hex: :ex_aws, repo: "hexpm", optional: true]}, {:finch, "~> 0.6", [hex: :finch, repo: "hexpm", optional: true]}, {:gen_smtp, "~> 0.13 or ~> 1.0", [hex: :gen_smtp, repo: "hexpm", optional: true]}, {:hackney, "~> 1.9", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mail, "~> 0.2", [hex: :mail, repo: "hexpm", optional: true]}, {:mime, "~> 1.1 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mua, "~> 0.2.0", [hex: :mua, repo: "hexpm", optional: true]}, {:multipart, "~> 0.4", [hex: :multipart, repo: "hexpm", optional: true]}, {:plug, "~> 1.9", [hex: :plug, repo: "hexpm", optional: true]}, {:plug_cowboy, ">= 1.0.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:req, "~> 0.5 or ~> 1.0", [hex: :req, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.2 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "878b1a7a6c10ebbf725a3349363f48f79c5e3d792eb621643b0d276a38acc0a6"}, "syn": {:hex, :syn, "3.3.0", "4684a909efdfea35ce75a9662fc523e4a8a4e8169a3df275e4de4fa63f99c486", [:rebar3], [], "hexpm", "e58ee447bc1094bdd21bf0acc102b1fbf99541a508cd48060bf783c245eaf7d6"}, "tailwind": {:hex, :tailwind, "0.2.3", "277f08145d407de49650d0a4685dc062174bdd1ae7731c5f1da86163a24dfcdb", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}], "hexpm", "8e45e7a34a676a7747d04f7913a96c770c85e6be810a1d7f91e713d3a3655b5d"}, diff --git a/priv/repo/migrations/20260310000000_add_use_iam_auth_to_postgres_databases.exs b/priv/repo/migrations/20260310000000_add_use_iam_auth_to_postgres_databases.exs new file mode 100644 index 000000000..ef22bceb7 --- /dev/null +++ b/priv/repo/migrations/20260310000000_add_use_iam_auth_to_postgres_databases.exs @@ -0,0 +1,10 @@ +defmodule Sequin.Repo.Migrations.AddUseIamAuthToPostgresDatabases do + use Ecto.Migration + + def change do + alter table(:postgres_databases) do + add :use_iam_auth, :boolean, default: false, null: false + add :iam_region, :string + end + end +end diff --git a/test/sequin/aws/irsa_credentials_test.exs b/test/sequin/aws/irsa_credentials_test.exs new file mode 100644 index 000000000..9d9fb1543 --- /dev/null +++ b/test/sequin/aws/irsa_credentials_test.exs @@ -0,0 +1,53 @@ +defmodule Sequin.Aws.IrsaCredentialsTest do + use Sequin.Case, async: true + + alias Sequin.Aws.IrsaCredentials + + @role_arn "arn:aws:iam::123456789012:role/test-role" + + describe "init/1" do + test "returns :ignore when env vars are not set" do + assert :ignore = IrsaCredentials.init([]) + end + + test "returns :ignore when only role_arn is set" do + assert :ignore = IrsaCredentials.init(role_arn: @role_arn) + end + + test "returns :ignore when only token_file is set" do + assert :ignore = IrsaCredentials.init(token_file: "/tmp/token") + end + end + + describe "init/1 with token file" do + setup do + token_file = Path.join(System.tmp_dir!(), "irsa_test_token_#{System.unique_integer([:positive])}") + File.write!(token_file, "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.test-token") + on_exit(fn -> File.rm(token_file) end) + %{token_file: token_file} + end + + @tag capture_log: true + test "starts and schedules retry when STS call fails", %{token_file: token_file} do + # The init will fail the STS call but should start with a retry timer + {:ok, state} = IrsaCredentials.init(role_arn: @role_arn, token_file: token_file, session_name: "test") + + # Credentials should be nil since STS call failed + assert state.credentials == nil + # But a retry timer should be scheduled + assert state.refresh_timer + end + end + + describe "available?/0" do + test "returns false when GenServer is not running" do + refute IrsaCredentials.available?() + end + end + + describe "get_credentials/0" do + test "returns error when GenServer is not running" do + assert {:error, _} = IrsaCredentials.get_credentials() + end + end +end diff --git a/test/sequin/aws/rds_token_test.exs b/test/sequin/aws/rds_token_test.exs new file mode 100644 index 000000000..71249e348 --- /dev/null +++ b/test/sequin/aws/rds_token_test.exs @@ -0,0 +1,53 @@ +defmodule Sequin.Aws.RdsTokenTest do + use Sequin.Case, async: true + + alias Sequin.Aws.RdsToken + + describe "generate/5" do + test "generates a token string" do + credentials = %{ + access_key_id: "AKIAIOSFODNN7EXAMPLE", + secret_access_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + token: nil + } + + assert {:ok, token} = + RdsToken.generate("mydb.abc123.us-east-1.rds.amazonaws.com", 5432, "admin", credentials, "us-east-1") + + # Token should contain the hostname and query params + assert token =~ "mydb.abc123.us-east-1.rds.amazonaws.com" + assert token =~ "Action=connect" + assert token =~ "DBUser=admin" + # SigV4 signature components + assert token =~ "X-Amz-Algorithm" + assert token =~ "X-Amz-Credential" + assert token =~ "X-Amz-Signature" + end + + test "generates a token with session token" do + credentials = %{ + access_key_id: "AKIAIOSFODNN7EXAMPLE", + secret_access_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + token: "FwoGZXIvYXdzEBYaDHqa0AP" + } + + assert {:ok, token} = + RdsToken.generate("mydb.abc123.us-east-1.rds.amazonaws.com", 5432, "admin", credentials, "us-east-1") + + assert token =~ "X-Amz-Security-Token" + end + + test "URL-encodes special characters in username" do + credentials = %{ + access_key_id: "AKIAIOSFODNN7EXAMPLE", + secret_access_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + token: nil + } + + assert {:ok, token} = + RdsToken.generate("mydb.abc123.us-east-1.rds.amazonaws.com", 5432, "user@domain", credentials, "us-east-1") + + assert token =~ "DBUser=user%40domain" + end + end +end diff --git a/test/sequin/kafka_sink_test.exs b/test/sequin/kafka_sink_test.exs index 72a2de17c..c134e547b 100644 --- a/test/sequin/kafka_sink_test.exs +++ b/test/sequin/kafka_sink_test.exs @@ -1,7 +1,8 @@ defmodule Sequin.Consumers.KafkaSinkTest do - use ExUnit.Case, async: true + use Sequin.Case, async: true alias Sequin.Consumers.KafkaSink + alias Sequin.Sinks.Kafka.AwsMskIam.Auth describe "kafka_url/2" do test "generates basic kafka URL without authentication" do @@ -215,14 +216,104 @@ defmodule Sequin.Consumers.KafkaSinkTest do refute :topic in changeset.changes end + + test "validates AWS MSK IAM with use_task_role requires only region" do + changeset = + KafkaSink.changeset(%KafkaSink{}, %{ + hosts: "localhost:9092", + topic: "test-topic", + tls: true, + sasl_mechanism: :aws_msk_iam, + use_task_role: true, + aws_region: "us-east-1", + routing_mode: :static + }) + + assert changeset.valid? + assert Ecto.Changeset.get_field(changeset, :aws_access_key_id) == nil + assert Ecto.Changeset.get_field(changeset, :aws_secret_access_key) == nil + end + + test "validates AWS MSK IAM with use_task_role still requires region" do + changeset = + KafkaSink.changeset(%KafkaSink{}, %{ + hosts: "localhost:9092", + topic: "test-topic", + tls: true, + sasl_mechanism: :aws_msk_iam, + use_task_role: true, + routing_mode: :static + }) + + refute changeset.valid? + assert "is required when SASL Mechanism is aws_msk_iam" in errors_on(changeset).aws_region + end + + test "validates AWS MSK IAM with use_task_role still requires TLS" do + changeset = + KafkaSink.changeset(%KafkaSink{}, %{ + hosts: "localhost:9092", + topic: "test-topic", + tls: false, + sasl_mechanism: :aws_msk_iam, + use_task_role: true, + aws_region: "us-east-1", + routing_mode: :static + }) + + refute changeset.valid? + assert "is required when SASL Mechanism is aws_msk_iam" in errors_on(changeset).tls + end + + test "validates AWS MSK IAM without use_task_role requires explicit credentials" do + changeset = + KafkaSink.changeset(%KafkaSink{}, %{ + hosts: "localhost:9092", + topic: "test-topic", + tls: true, + sasl_mechanism: :aws_msk_iam, + use_task_role: false, + aws_region: "us-east-1", + routing_mode: :static + }) + + refute changeset.valid? + assert "is required when SASL Mechanism is aws_msk_iam" in errors_on(changeset).aws_access_key_id + assert "is required when SASL Mechanism is aws_msk_iam" in errors_on(changeset).aws_secret_access_key + end end - # Helper function to extract error messages - defp errors_on(changeset) do - Ecto.Changeset.traverse_errors(changeset, fn {msg, opts} -> - Regex.replace(~r"%{(\w+)}", msg, fn _, key -> - opts |> Keyword.get(String.to_existing_atom(key), key) |> to_string() - end) - end) + describe "to_brod_config/1" do + test "uses dynamic SASL when use_task_role is true" do + sink = %KafkaSink{ + hosts: "localhost:9092", + topic: "test-topic", + tls: true, + sasl_mechanism: :aws_msk_iam, + use_task_role: true, + aws_region: "us-east-1" + } + + config = KafkaSink.to_brod_config(sink) + assert {:callback, Auth, {:AWS_MSK_IAM, :dynamic, "us-east-1"}} = config[:sasl] + end + + test "uses static SASL credentials when use_task_role is false" do + sink = %KafkaSink{ + hosts: "localhost:9092", + topic: "test-topic", + tls: true, + sasl_mechanism: :aws_msk_iam, + use_task_role: false, + aws_access_key_id: "AKIATEST", + aws_secret_access_key: "secret", + aws_region: "us-east-1" + } + + config = KafkaSink.to_brod_config(sink) + + assert {:callback, Auth, {:AWS_MSK_IAM, "AKIATEST", "secret", "us-east-1"}} = + config[:sasl] + end end end