Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions migrations/0005_add_bucket.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE inflight_taskactivations ADD COLUMN bucket INTEGER NOT NULL DEFAULT 0;
6 changes: 4 additions & 2 deletions pg_migrations/0001_create_inflight_activations.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
-- PostgreSQL equivalent of the inflight_taskactivations table
CREATE TABLE IF NOT EXISTS inflight_taskactivations (
id TEXT NOT NULL PRIMARY KEY,
activation BYTEA NOT NULL,
Expand All @@ -16,5 +15,8 @@ CREATE TABLE IF NOT EXISTS inflight_taskactivations (
application TEXT NOT NULL,
namespace TEXT NOT NULL,
taskname TEXT NOT NULL,
on_attempts_exceeded INTEGER NOT NULL DEFAULT 1
on_attempts_exceeded INTEGER NOT NULL DEFAULT 1,
bucket SMALLINT NOT NULL
);

CREATE INDEX idx_activation_claim ON inflight_taskactivations (status, bucket);
1 change: 1 addition & 0 deletions src/fetch/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub const MAX_FETCH_THREADS: u128 = 256;
12 changes: 12 additions & 0 deletions src/kafka/deserialize_activation.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use std::{sync::Arc, time::Duration};

use crate::config::Config;
use crate::fetch::MAX_FETCH_THREADS;
use crate::store::inflight_activation::{InflightActivation, InflightActivationStatus};
use anyhow::{Error, anyhow};
use chrono::{DateTime, Utc};
use prost::Message as _;
use rdkafka::{Message, message::OwnedMessage};
use sentry_protos::taskbroker::v1::OnAttemptsExceeded;
use sentry_protos::taskbroker::v1::TaskActivation;
use uuid::Uuid;

pub struct DeserializeActivationConfig {
pub max_delayed_allowed: u64,
Expand All @@ -21,6 +23,13 @@ impl DeserializeActivationConfig {
}
}

/// Use the UUID of an activation to determine its bucket (a value between 0 and 255, inclusive).
pub fn bucket_from_id(id: &str) -> i16 {
Uuid::parse_str(id)
.map(|u| (u.as_u128() % MAX_FETCH_THREADS) as i16)
.unwrap_or(0)
}

pub fn new(
config: DeserializeActivationConfig,
) -> impl Fn(Arc<OwnedMessage>) -> Result<InflightActivation, Error> {
Expand Down Expand Up @@ -78,6 +87,8 @@ pub fn new(
.try_into()
.unwrap();

let bucket = bucket_from_id(&activation.id);

Ok(InflightActivation {
id: activation.id.clone(),
activation: payload.to_vec(),
Expand All @@ -96,6 +107,7 @@ pub fn new(
namespace,
taskname,
on_attempts_exceeded,
bucket,
})
}
}
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use clap::Parser;
use std::fs;

pub mod config;
pub mod fetch;
pub mod grpc;
pub mod kafka;
pub mod logging;
Expand Down
28 changes: 24 additions & 4 deletions src/store/inflight_activation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use tracing::{instrument, warn};

use crate::config::Config;

pub type BucketRange = (i16, i16);

/// The members of this enum should be synced with the members
/// of InflightActivationStatus in sentry_protos
#[derive(Clone, Copy, Debug, PartialEq, Eq, Type)]
Expand Down Expand Up @@ -173,6 +175,10 @@ pub struct InflightActivation {
/// are exceeded.
#[builder(default = false)]
pub at_most_once: bool,

/// Bucket derived from activation ID (UUID as number % 256). Set once on ingestion.
#[builder(setter(skip), default = "0")]
pub bucket: i16,
}

impl InflightActivation {
Expand Down Expand Up @@ -235,6 +241,7 @@ pub struct TableRow {
pub taskname: String,
#[sqlx(try_from = "i32")]
pub on_attempts_exceeded: OnAttemptsExceeded,
pub bucket: i16,
}

impl TryFrom<InflightActivation> for TableRow {
Expand All @@ -259,6 +266,7 @@ impl TryFrom<InflightActivation> for TableRow {
namespace: value.namespace,
taskname: value.taskname,
on_attempts_exceeded: value.on_attempts_exceeded,
bucket: value.bucket,
})
}
}
Expand All @@ -283,6 +291,7 @@ impl From<TableRow> for InflightActivation {
namespace: value.namespace,
taskname: value.taskname,
on_attempts_exceeded: value.on_attempts_exceeded,
bucket: value.bucket,
}
}
}
Expand Down Expand Up @@ -369,7 +378,12 @@ pub trait InflightActivationStore: Send + Sync {
return Ok(None);
}
let result = self
.get_pending_activations_from_namespaces(application, namespaces.as_deref(), Some(1))
.get_pending_activations_from_namespaces(
application,
namespaces.as_deref(),
Some(1),
None,
)
.await?;
if result.is_empty() {
return Ok(None);
Expand All @@ -383,6 +397,7 @@ pub trait InflightActivationStore: Send + Sync {
application: Option<&str>,
namespaces: Option<&[String]>,
limit: Option<i32>,
bucket: Option<BucketRange>,
) -> Result<Vec<InflightActivation>, Error>;

/// Get the age of the oldest pending activation in seconds
Expand Down Expand Up @@ -698,7 +713,8 @@ impl InflightActivationStore for SqliteActivationStore {
application,
namespace,
taskname,
on_attempts_exceeded
on_attempts_exceeded,
bucket
FROM inflight_taskactivations
WHERE id = $1
",
Expand Down Expand Up @@ -739,7 +755,8 @@ impl InflightActivationStore for SqliteActivationStore {
application,
namespace,
taskname,
on_attempts_exceeded
on_attempts_exceeded,
bucket
)
",
);
Expand Down Expand Up @@ -772,6 +789,7 @@ impl InflightActivationStore for SqliteActivationStore {
b.push_bind(row.namespace);
b.push_bind(row.taskname);
b.push_bind(row.on_attempts_exceeded as i32);
b.push_bind(row.bucket);
})
.push(" ON CONFLICT(id) DO NOTHING")
.build();
Expand Down Expand Up @@ -809,6 +827,7 @@ impl InflightActivationStore for SqliteActivationStore {
application: Option<&str>,
namespaces: Option<&[String]>,
limit: Option<i32>,
_bucket: Option<BucketRange>,
) -> Result<Vec<InflightActivation>, Error> {
let now = Utc::now();

Expand Down Expand Up @@ -989,7 +1008,8 @@ impl InflightActivationStore for SqliteActivationStore {
application,
namespace,
taskname,
on_attempts_exceeded
on_attempts_exceeded,
bucket
FROM inflight_taskactivations
WHERE status = $1
",
Expand Down
4 changes: 2 additions & 2 deletions src/store/inflight_activation_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ async fn test_get_pending_activation_from_multiple_namespaces(#[case] adapter: &
// Get activation from multiple namespaces (should get oldest)
let namespaces = vec!["ns2".to_string(), "ns3".to_string()];
let result = store
.get_pending_activations_from_namespaces(None, Some(&namespaces), None)
.get_pending_activations_from_namespaces(None, Some(&namespaces), None, None)
.await
.unwrap();

Expand Down Expand Up @@ -329,7 +329,7 @@ async fn test_get_pending_activation_with_namespace_requires_application(#[case]
// We allow no application in this method because of usage in upkeep
let namespaces = vec!["other_namespace".to_string()];
let activations = store
.get_pending_activations_from_namespaces(None, Some(namespaces).as_deref(), Some(2))
.get_pending_activations_from_namespaces(None, Some(namespaces).as_deref(), Some(2), None)
.await
.unwrap();
assert_eq!(
Expand Down
26 changes: 20 additions & 6 deletions src/store/postgres_activation_store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::store::inflight_activation::{
FailedTasksForwarder, InflightActivation, InflightActivationStatus, InflightActivationStore,
QueryResult, TableRow,
BucketRange, FailedTasksForwarder, InflightActivation, InflightActivationStatus,
InflightActivationStore, QueryResult, TableRow,
};
use anyhow::{Error, anyhow};
use async_trait::async_trait;
Expand Down Expand Up @@ -175,7 +175,8 @@ impl InflightActivationStore for PostgresActivationStore {
application,
namespace,
taskname,
on_attempts_exceeded
on_attempts_exceeded,
bucket
FROM inflight_taskactivations
WHERE id = $1
",
Expand Down Expand Up @@ -216,7 +217,8 @@ impl InflightActivationStore for PostgresActivationStore {
application,
namespace,
taskname,
on_attempts_exceeded
on_attempts_exceeded,
bucket
)
",
);
Expand Down Expand Up @@ -248,6 +250,7 @@ impl InflightActivationStore for PostgresActivationStore {
b.push_bind(row.namespace);
b.push_bind(row.taskname);
b.push_bind(row.on_attempts_exceeded as i32);
b.push_bind(row.bucket);
})
.push(" ON CONFLICT(id) DO NOTHING")
.build();
Expand All @@ -264,11 +267,12 @@ impl InflightActivationStore for PostgresActivationStore {
application: Option<&str>,
namespaces: Option<&[String]>,
limit: Option<i32>,
bucket: Option<BucketRange>,
) -> Result<Vec<InflightActivation>, Error> {
let now = Utc::now();

let grace_period = self.config.processing_deadline_grace_sec;
let mut query_builder = QueryBuilder::new(
let mut query_builder = QueryBuilder::<Postgres>::new(
"WITH selected_activations AS (
SELECT id
FROM inflight_taskactivations
Expand All @@ -294,6 +298,15 @@ impl InflightActivationStore for PostgresActivationStore {
}
query_builder.push(")");
}

if let Some((min, max)) = bucket {
query_builder.push(" AND bucket >= ");
query_builder.push_bind(min);

query_builder.push(" AND bucket <= ");
query_builder.push_bind(max);
}

query_builder.push(" ORDER BY added_at");
if let Some(limit) = limit {
query_builder.push(" LIMIT ");
Expand Down Expand Up @@ -443,7 +456,8 @@ impl InflightActivationStore for PostgresActivationStore {
application,
namespace,
taskname,
on_attempts_exceeded
on_attempts_exceeded,
bucket
FROM inflight_taskactivations
WHERE status = $1
",
Expand Down
2 changes: 1 addition & 1 deletion src/upkeep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ pub async fn do_upkeep(
.expect("Could not create kafka producer in upkeep"),
);
if let Ok(tasks) = store
.get_pending_activations_from_namespaces(None, Some(&demoted_namespaces), None)
.get_pending_activations_from_namespaces(None, Some(&demoted_namespaces), None, None)
.await
{
// Produce tasks to Kafka with updated namespace
Expand Down
Loading