Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions scylla-server/src/controllers/rule_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
};

#[derive(Deserialize)]
pub struct SubscribeRulesRequest {
pub struct RuleSubscriptionRequest {
rule_ids: Vec<String>,
client_id: String,
}
Expand Down Expand Up @@ -106,10 +106,34 @@ pub async fn edit_rule(
.map_err(|e| ScyllaError::RuleError(e))
}

#[debug_handler]
pub async fn unsubscribe_rules(
Extension(rules_manager): Extension<Arc<RuleManager>>,
Json(request): Json<RuleSubscriptionRequest>,
) -> Result<Json<String>, ScyllaError> {
debug!(
"Unsubscribing client {} from {} rules",
request.client_id,
request.rule_ids.len()
);

let rule_ids: Vec<RuleId> = request.rule_ids.into_iter().map(RuleId).collect();

match rules_manager
.unsubscribe_rules(ClientId(request.client_id), rule_ids)
.await
{
Ok(_) => Ok(Json::from(
"Successfully unsubscribed from rules".to_owned(),
)),
Err(err) => Err(ScyllaError::RuleError(err)),
}
}

#[debug_handler]
pub async fn subscribe_rules(
Extension(rules_manager): Extension<Arc<RuleManager>>,
Json(request): Json<SubscribeRulesRequest>,
Json(request): Json<RuleSubscriptionRequest>,
) -> Result<Json<String>, ScyllaError> {
debug!(
"Subscribing client {} to {} rules",
Expand Down
3 changes: 2 additions & 1 deletion scylla-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use scylla_server::{
data_type_controller, file_insertion_controller,
rule_controller::{
add_rule, delete_rule, edit_rule, get_all_rules, get_all_rules_with_client_info,
subscribe_rules,
subscribe_rules, unsubscribe_rules,
},
run_controller, scylla_config_controller,
video_streamer_controller::{self},
Expand Down Expand Up @@ -411,6 +411,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.route("/rules/delete/{rule_id}", post(delete_rule))
.route("/rules", get(get_all_rules))
.route("/rules/{client_id}", get(get_all_rules_with_client_info))
.route("/rules/unsubscribe", post(unsubscribe_rules))
.route("/rules/edit/{rule_id}", put(edit_rule))
.route("/rules/subscribe", post(subscribe_rules))
//.route("/rules/delete/{rule_id}", post()).route("/rules/poll")
Expand Down
36 changes: 28 additions & 8 deletions scylla-server/src/rule_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,21 +548,41 @@ impl RuleManager {
self.subscriptions.read().await.lefts()
}

/// Helper function to verify all rules exist
async fn verify_rules_exist(&self, rule_ids: &[RuleId]) -> Result<(), RuleManagerError> {
let rules_guard = self.rules.read().await;
for rule_id in rule_ids {
if !rules_guard.contains_key(rule_id) {
return Err(RuleManagerError::NoMatchingRule);
}
}
Ok(())
}

/// Unsubscribe a client from multiple rules
pub async fn unsubscribe_rules(
&self,
client_id: ClientId,
rule_ids: Vec<RuleId>,
) -> Result<(), RuleManagerError> {
let mut subscriptions = self.subscriptions.write().await;

// Remove subscriptions (rules remain even if no subscribers)
for rule_id in rule_ids {
subscriptions.remove_right_from_left(&client_id, &rule_id);
}

Ok(())
}

/// Subscribe a client to multiple existing rules
pub async fn subscribe_rules(
&self,
client_id: ClientId,
rule_ids: Vec<RuleId>,
) -> Result<(), RuleManagerError> {
let rules_guard = self.rules.read().await;

// First, verify all rules exist
for rule_id in &rule_ids {
if !rules_guard.contains_key(rule_id) {
return Err(RuleManagerError::NoMatchingRule);
}
}
drop(rules_guard);
self.verify_rules_exist(&rule_ids).await?;

// Now subscribe to all rules
let mut subscriptions = self.subscriptions.write().await;
Expand Down
169 changes: 169 additions & 0 deletions scylla-server/tests/rule_structs_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,7 @@ async fn test_edit_rule_preserves_subscriptions_concurrent() -> Result<(), RuleM
Ok(())
}

#[tokio::test]
async fn test_subscribe_rules_success() -> Result<(), RuleManagerError> {
let rule_manager = RuleManager::new();
let client1 = ClientId("client1".to_string());
Expand Down Expand Up @@ -684,6 +685,174 @@ async fn test_subscribe_rules_success() -> Result<(), RuleManagerError> {
Ok(())
}

#[tokio::test]
async fn test_unsubscribe_rules_success() -> Result<(), RuleManagerError> {
let rule_manager = RuleManager::new();
let client1 = ClientId("client1".to_string());
let client2 = ClientId("client2".to_string());

// Create rules via client1
let rule1 = Rule::new(
RuleId("rule_1".to_string()),
Topic("topic/1".to_string()),
core::time::Duration::from_secs(60),
"a > 10".to_owned(),
);

let rule2 = Rule::new(
RuleId("rule_2".to_string()),
Topic("topic/2".to_string()),
core::time::Duration::from_secs(30),
"b < 5".to_owned(),
);

rule_manager.add_rule(client1.clone(), rule1).await?;
rule_manager.add_rule(client1.clone(), rule2).await?;
rule_manager
.add_rule(
client2.clone(),
Rule::new(
RuleId("rule_1".to_string()),
Topic("topic/1".to_string()),
core::time::Duration::from_secs(60),
"a > 10".to_owned(),
),
)
.await?;

// Verify initial state: 2 rules, 2 clients
assert_eq!(rule_manager.get_all_rules().await.len(), 2);
assert_eq!(rule_manager.get_all_clients().await.len(), 2);

// Client1 unsubscribes from rule_1
rule_manager
.unsubscribe_rules(client1.clone(), vec![RuleId("rule_1".to_string())])
.await?;

// Rule 1 should still exist (client2 subscribed), 2 clients remain
assert_eq!(rule_manager.get_all_rules().await.len(), 2);
assert_eq!(rule_manager.get_all_clients().await.len(), 2);

Ok(())
}

#[tokio::test]
async fn test_unsubscribe_rules_keeps_orphaned() -> Result<(), RuleManagerError> {
let rule_manager = RuleManager::new();
let client = ClientId("test_client".to_string());

// Create rules
let rule1 = Rule::new(
RuleId("rule_1".to_string()),
Topic("topic/1".to_string()),
core::time::Duration::from_secs(60),
"a > 10".to_owned(),
);

let rule2 = Rule::new(
RuleId("rule_2".to_string()),
Topic("topic/2".to_string()),
core::time::Duration::from_secs(30),
"b < 5".to_owned(),
);

rule_manager.add_rule(client.clone(), rule1).await?;
rule_manager.add_rule(client.clone(), rule2).await?;

// Verify initial state
assert_eq!(rule_manager.get_all_rules().await.len(), 2);
assert_eq!(rule_manager.get_all_clients().await.len(), 1);

// Unsubscribe from both rules
rule_manager
.unsubscribe_rules(
client.clone(),
vec![RuleId("rule_1".to_string()), RuleId("rule_2".to_string())],
)
.await?;

// Rules should still exist (not deleted), client removed
assert_eq!(rule_manager.get_all_rules().await.len(), 2);
assert_eq!(rule_manager.get_all_clients().await.len(), 0);

Ok(())
}

#[tokio::test]
async fn test_unsubscribe_rules_nonexistent() -> Result<(), RuleManagerError> {
let rule_manager = RuleManager::new();
let client = ClientId("test_client".to_string());

// Try to unsubscribe from rules that don't exist - should succeed (no-op)
rule_manager
.unsubscribe_rules(
client,
vec![
RuleId("nonexistent_1".to_string()),
RuleId("nonexistent_2".to_string()),
],
)
.await?;

Ok(())
}

#[tokio::test]
async fn test_unsubscribe_rules_empty_list() -> Result<(), RuleManagerError> {
let rule_manager = RuleManager::new();
let client = ClientId("test_client".to_string());

// Unsubscribe from empty list - should succeed
rule_manager.unsubscribe_rules(client, vec![]).await?;

Ok(())
}

#[tokio::test]
async fn test_orphaned_rule_resubscription() -> Result<(), RuleManagerError> {
let rule_manager = RuleManager::new();
let client1 = ClientId("client1".to_string());
let client2 = ClientId("client2".to_string());

// Client1 creates a rule
let rule = Rule::new(
RuleId("rule_1".to_string()),
Topic("topic/1".to_string()),
core::time::Duration::from_secs(60),
"a > 10".to_owned(),
);

rule_manager.add_rule(client1.clone(), rule).await?;

// Verify initial state
assert_eq!(rule_manager.get_all_rules().await.len(), 1);
assert_eq!(rule_manager.get_all_clients().await.len(), 1);

// Client1 unsubscribes - rule becomes orphaned but still exists
rule_manager
.unsubscribe_rules(client1.clone(), vec![RuleId("rule_1".to_string())])
.await?;

assert_eq!(rule_manager.get_all_rules().await.len(), 1); // Rule still exists
assert_eq!(rule_manager.get_all_clients().await.len(), 0); // No clients

// Client2 subscribes to the orphaned rule (re-adding it)
let rule_reuse = Rule::new(
RuleId("rule_1".to_string()),
Topic("topic/1".to_string()),
core::time::Duration::from_secs(60),
"a > 10".to_owned(),
);

rule_manager.add_rule(client2.clone(), rule_reuse).await?;

// Verify rule is now subscribed to by client2
assert_eq!(rule_manager.get_all_rules().await.len(), 1);
assert_eq!(rule_manager.get_all_clients().await.len(), 1);

Ok(())
}

#[tokio::test]
async fn test_subscribe_rules_nonexistent_rule() -> Result<(), RuleManagerError> {
let rule_manager = RuleManager::new();
Expand Down