Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,097 changes: 544 additions & 553 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ build-bin *ARGS:
[group('build')]
build-python *ARGS:
#!/usr/bin/env bash
set -euxo pipefail
set -euo pipefail
cd libs/opsqueue_python
source "./.setup_local_venv.sh"

Expand All @@ -43,7 +43,7 @@ test-unit *TEST_ARGS:
[group('test')]
test-integration *TEST_ARGS: build-bin build-python
#!/usr/bin/env bash
set -euxo pipefail
set -euo pipefail
cd libs/opsqueue_python
source "./.setup_local_venv.sh"

Expand All @@ -53,7 +53,7 @@ test-integration *TEST_ARGS: build-bin build-python
[group('nix')]
nix-test-integration *TEST_ARGS: nix-build-bin
#!/usr/bin/env bash
set -euxo pipefail
set -euo pipefail
nix_build_python_library_dir=$(just nix-build-python)

cd libs/opsqueue_python/tests
Expand Down
40 changes: 20 additions & 20 deletions opsqueue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ required-features = ["server-logic"]
[dependencies]
# Datatypes and concurrency:
itertools = "0.14.0"
arc-swap = {version = "1.7.1", optional = true}
moka = { version = "0.12.8", features = ["sync"], optional = true }
arc-swap = {version = "1.8.0", optional = true}
moka = { version = "0.12.12", features = ["sync"], optional = true }
chrono = { version = "0.4.38", features = ["serde"]}
futures = "0.3.30"
tokio = { version = "1.38.0", features = ["macros", "signal", "rt-multi-thread"] }
uuid = {version = "1.11.0", features = [
tokio = { version = "1.49.0", features = ["macros", "signal", "rt-multi-thread"] }
uuid = {version = "1.19.0", features = [
"v7", # Timestamp-sortable UUIDs with random component
"fast-rng", # Use a faster (but still sufficiently random) RNG
"serde",
Expand All @@ -36,23 +36,23 @@ anyhow = "1.0.86"
sqlx = { version = "0.8.2", features = ["sqlite", "runtime-tokio", "chrono"], optional = true }
# Serialization:
serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.124"
serde_json = "1.0.149"
ciborium = "0.2.2"
# Webservers/clients
http = "1.2.0"
object_store = {version = "0.11.1", features = ["gcp", "http"]}
http = "1.4.0"
object_store = {version = "0.13.1", features = ["gcp", "http"]}
snowflaked = {version = "1.0.3", features = ["sync"] }
tokio-tungstenite = {version = "0.24.0", optional = true}
axum = { version = "0.7.5", features = ["ws", "macros"], optional = true }
reqwest = { version = "0.12.9", default-features = false, features = ["json", "rustls-tls"], optional = true }
tokio-tungstenite = {version = "0.28.0", optional = true}
axum = { version = "0.8.8", features = ["ws", "macros"], optional = true }
reqwest = { version = "0.13.1", default-features = false, features = ["json"], optional = true }
url = {version = "2.5.2"}
tokio-util = { version = "0.7.11", features = ["io", "rt", "time"] }
tower-http = { version = "0.6.1", features = ["trace", "catch-panic"], optional = true }
tokio-util = { version = "0.7.18", features = ["io", "rt", "time"] }
tower-http = { version = "0.6.8", features = ["trace", "catch-panic"], optional = true }
# Logging and tracing:
tracing = {version = "0.1", features = ["log"] }
tracing-subscriber = {version = "0.3", features = ["std", "env-filter"] }
sentry = {version = "0.35", optional = true, default-features=false, features=["rustls", "reqwest"]}
sentry-tracing = {version = "0.35", optional = true}
sentry = {version = "0.46", optional = true, default-features=false, features=["rustls", "reqwest"]}
sentry-tracing = {version = "0.46", optional = true}
# Exporting traces to Opentelemetry:
axum-tracing-opentelemetry = {version = "0.24.0", optional = true }
opentelemetry = { version = "0.26", default-features = false, features = ["trace"] }
Expand All @@ -62,16 +62,16 @@ opentelemetry-otlp = { version = "0.26", optional = true }
tracing-opentelemetry = {version = "0.27.0" }
opentelemetry-semantic-conventions = {version = "0.26.0", features = ["semconv_experimental"], optional = true}
moro-local = "0.4.0"
thiserror = "1.0.65"
thiserror = "2.0.18"
either = "1.13.0"
serde-error = "0.1.3"
backon = { version = "1.3.0", features = ["tokio-sleep"] }
rand = "0.8.5"
rand = "0.9.2"
rustc-hash = "2.0.0"
axum-prometheus = {version = "0.7.0", optional = true}
axum-prometheus = {version = "0.10.0", optional = true}

# Configuration:
clap = { version = "4.5.21", features = ["derive"] }
clap = { version = "4.5.54", features = ["derive"] }
humantime = "2.1.0"

dashmap = "6.1.0"
Expand All @@ -80,8 +80,8 @@ crossbeam-skiplist = "0.1.3"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dev-dependencies]
criterion = {version = "0.3", features = ["async_tokio"]}
insta = { version = "1.41.1" }
criterion = {version = "0.8", features = ["async_tokio"]}
insta = { version = "1.46.0" }
assert_matches = { version = "1.5.0" }

# [[bench]]
Expand Down
8 changes: 4 additions & 4 deletions opsqueue/src/consumer/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl From<ServerToClientMessage> for ws::Message {
ciborium::into_writer(&val, &mut writer)
.expect("Failed to serialize ServerToClientMessage");

ws::Message::Binary(writer)
ws::Message::Binary(writer.into())
}
}

Expand All @@ -114,7 +114,7 @@ impl From<Envelope<ClientToServerMessage>> for ws::Message {
ciborium::into_writer(&val, &mut writer)
.expect("Failed to serialize ClientToServerMessage");

ws::Message::Binary(writer)
ws::Message::Binary(writer.into())
}
}

Expand Down Expand Up @@ -146,7 +146,7 @@ impl From<ServerToClientMessage> for tokio_tungstenite::tungstenite::Message {
ciborium::into_writer(&val, &mut writer)
.expect("Failed to serialize ServerToClientMessage");

tokio_tungstenite::tungstenite::Message::Binary(writer)
tokio_tungstenite::tungstenite::Message::Binary(writer.into())
}
}

Expand All @@ -157,6 +157,6 @@ impl From<Envelope<ClientToServerMessage>> for tokio_tungstenite::tungstenite::M
ciborium::into_writer(&val, &mut writer)
.expect("Failed to serialize ClientToServerMessage");

tokio_tungstenite::tungstenite::Message::Binary(writer)
tokio_tungstenite::tungstenite::Message::Binary(writer.into())
}
}
4 changes: 2 additions & 2 deletions opsqueue/src/consumer/dispatcher/metastate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ mod tests {
.collect();

// Increment in one order
vals.shuffle(&mut rand::thread_rng());
vals.shuffle(&mut rand::rng());
for val in &vals {
sut.increment(key, val);
}
Expand All @@ -153,7 +153,7 @@ mod tests {
assert_eq!(too_highs.len(), n_groups);

// Decrement in a different order
vals.shuffle(&mut rand::thread_rng());
vals.shuffle(&mut rand::rng());
for val in &vals {
sut.decrement(key, val);
}
Expand Down
3 changes: 2 additions & 1 deletion opsqueue/src/consumer/server/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
};

use axum::extract::ws::{Message, WebSocket};
use futures::SinkExt;
use tokio::{
select,
sync::{
Expand Down Expand Up @@ -134,7 +135,7 @@ impl ConsumerConn {
}
}

async fn graceful_shutdown(self) {
async fn graceful_shutdown(mut self) {
const GRACEFUL_WEBSOCKET_CLOSE_TIMEOUT: Duration = Duration::from_millis(100);
select! {
_ = self.ws_stream.close() => {},
Expand Down
1 change: 1 addition & 0 deletions opsqueue/src/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::common::chunk;
use futures::stream::{self, TryStreamExt};
use object_store::path::Path;
use object_store::DynObjectStore;
use object_store::ObjectStoreExt;
use reqwest::Url;
use ux::u63;

Expand Down
4 changes: 2 additions & 2 deletions opsqueue/src/producer/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ impl ServerState {
)
.route("/submissions/count", get(submissions_count))
.route(
"/submissions/lookup_id_by_prefix/:prefix",
"/submissions/lookup_id_by_prefix/{prefix}",
get(lookup_submission_id_by_prefix),
)
.route("/submissions/:submission_id", get(submission_status))
.route("/submissions/{submission_id}", get(submission_status))
.route("/version", get(crate::server::version_endpoint)) // We're also exposing it here so the producer client can view it
.with_state(self)
}
Expand Down
Loading