Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions .github/workflows/async-retry-remote-registry-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
name: Async Retry RemoteRegistryClient CI

# Triggered on every push / PR that touches the remote registry client sources,
# the LLM timeout manager (RetryPolicy fix), or the focused test file.
# Targets v1.3.0, Issue #3986.
on:
push:
branches:
- main
- develop
paths:
- 'include/themis/base/remote_registry_client.h'
- 'src/base/remote_registry_client.cpp'
- 'include/aql/llm_timeout_manager.h'
- 'tests/test_remote_registry_client.cpp'
- 'cmake/ModularBuild.cmake'
- 'tests/CMakeLists.txt'
- '.github/workflows/async-retry-remote-registry-ci.yml'
pull_request:
types: [opened, synchronize, reopened]
paths:
- 'include/themis/base/remote_registry_client.h'
- 'src/base/remote_registry_client.cpp'
- 'include/aql/llm_timeout_manager.h'
- 'tests/test_remote_registry_client.cpp'
- 'cmake/ModularBuild.cmake'
- 'tests/CMakeLists.txt'
- '.github/workflows/async-retry-remote-registry-ci.yml'

concurrency:
group: async-retry-remote-registry-${{ github.ref }}
cancel-in-progress: true

jobs:
ci-scope-classifier:
permissions:
contents: read
uses: ./.github/workflows/ci-scope-classifier.yml

async-retry-tests:
needs: ci-scope-classifier
if: needs.ci-scope-classifier.outputs.has_code_changes == 'true'
name: Async Retry RemoteRegistryClient (${{ matrix.os }} / ${{ matrix.compiler }})
runs-on: ${{ matrix.os }}
timeout-minutes: 60

permissions:
contents: read

strategy:
fail-fast: false
matrix:
include:
- os: ubuntu-22.04
compiler: gcc-12
cc: gcc-12
cxx: g++-12
- os: ubuntu-24.04
compiler: gcc-14
cc: gcc-14
cxx: g++-14

steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Set up C++ build environment
uses: ./.github/actions/setup-cpp-build
with:
cc: ${{ matrix.cc }}
cxx: ${{ matrix.cxx }}
extra-packages: libssl-dev libcurl4-openssl-dev libboost-all-dev librocksdb-dev libfmt-dev libtbb-dev libspdlog-dev nlohmann-json3-dev

- name: Configure and build
uses: ./.github/actions/configure-themis
with:
cc: ${{ matrix.cc }}
cxx: ${{ matrix.cxx }}

# ── Focused test target ────────────────────────────────────────────────
- name: Run RemoteRegistryClientFocusedTests
run: |
set -o pipefail
cd build
ctest -R RemoteRegistryClientFocusedTests \
--output-on-failure \
--timeout 120 \
2>&1 | tee async_retry_results.txt

- name: Upload test results
if: always()
uses: actions/upload-artifact@v4
with:
name: async-retry-results-${{ matrix.os }}-${{ matrix.compiler }}
path: |
build/async_retry_results.txt
retention-days: 30
135 changes: 135 additions & 0 deletions .github/workflows/blob-redundancy-event-listener-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
name: BlobRedundancyManager – RocksDB Event Listener CI

# Triggered on every push/PR that touches the BlobRedundancyManager sources,
# the RocksDBWrapper, the RAID/redundancy test file, or this workflow itself.
on:
push:
branches:
- main
- develop
paths:
- 'src/storage/blob_redundancy_manager.cpp'
- 'include/storage/blob_redundancy_manager.h'
- 'src/storage/rocksdb_wrapper.cpp'
- 'include/storage/rocksdb_wrapper.h'
- 'tests/test_raid_redundancy.cpp'
- 'tests/CMakeLists.txt'
- '.github/workflows/blob-redundancy-event-listener-ci.yml'
pull_request:
types: [opened, synchronize, reopened]
paths:
- 'src/storage/blob_redundancy_manager.cpp'
- 'include/storage/blob_redundancy_manager.h'
- 'src/storage/rocksdb_wrapper.cpp'
- 'include/storage/rocksdb_wrapper.h'
- 'tests/test_raid_redundancy.cpp'
- 'tests/CMakeLists.txt'
- '.github/workflows/blob-redundancy-event-listener-ci.yml'
workflow_dispatch:

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
ci-scope-classifier:
permissions:
contents: read
uses: ./.github/workflows/ci-scope-classifier.yml

# ---------------------------------------------------------------------------
# Build and run the BlobRedundancyEventListener focused test suite.
# Tests cover:
# 1. createRocksDBListener() returns a valid, non-null listener
# 2. notifySSTFileDeleted() marks matching location as unhealthy
# 3. notifySSTFileDeleted() with an unknown path is a no-op
# 4. notifySSTFileDeleted() only affects blobs that match the deleted path
# 5. RocksDBBlobListener::OnTableFileDeleted() delegates correctly and
# marks the backing location unhealthy
# ---------------------------------------------------------------------------
blob-redundancy-listener-unit-tests:
needs: ci-scope-classifier
if: needs.ci-scope-classifier.outputs.has_code_changes == 'true'
name: BlobRedundancy EventListener tests (${{ matrix.os }} / ${{ matrix.compiler }})
runs-on: ${{ matrix.os }}
permissions:
contents: read

strategy:
fail-fast: false
matrix:
include:
- os: ubuntu-22.04
compiler: gcc-12
cc: gcc-12
cxx: g++-12
- os: ubuntu-22.04
compiler: clang-15
cc: clang-15
cxx: clang++-15
- os: ubuntu-24.04
compiler: gcc-13
cc: gcc-13
cxx: g++-13

steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
submodules: recursive

- name: Set up C++ build environment
uses: ./.github/actions/setup-cpp-build
with:
cc: ${{ matrix.cc }}
cxx: ${{ matrix.cxx }}
extra-packages: nlohmann-json3-dev libssl-dev libzstd-dev

- name: Configure (blob redundancy focused test target)
env:
CC: ${{ matrix.cc }}
CXX: ${{ matrix.cxx }}
run: |
cmake -B build -G Ninja \
-DCMAKE_BUILD_TYPE=Debug \
-DCMAKE_C_COMPILER=${{ matrix.cc }} \
-DCMAKE_CXX_COMPILER=${{ matrix.cxx }} \
-DTHEMIS_ENABLE_LLM=OFF \
-DTHEMIS_ENABLE_GPU=OFF

- name: Build focused test binary
run: cmake --build build --target test_blob_redundancy_event_listener_focused -- -j$(nproc)

- name: Run BlobRedundancy EventListener unit tests
run: |
cd build
ctest --test-dir . \
--tests-regex BlobRedundancyEventListenerFocusedTests \
--output-on-failure \
--timeout 60 \
2>&1 | tee blob_redundancy_event_listener_test_output.txt

- name: Upload test results
if: always()
uses: actions/upload-artifact@v4
with:
name: blob-redundancy-listener-results-${{ matrix.os }}-${{ matrix.compiler }}
path: |
build/blob_redundancy_event_listener_test_output.txt
retention-days: 14

- name: Write job summary
if: always()
run: |
echo "## 🗄️ BlobRedundancyManager – RocksDB Event Listener – Unit Tests" >> "$GITHUB_STEP_SUMMARY"
echo "" >> "$GITHUB_STEP_SUMMARY"
echo "| Parameter | Value |" >> "$GITHUB_STEP_SUMMARY"
echo "|-----------|-------|" >> "$GITHUB_STEP_SUMMARY"
echo "| **OS** | \`${{ matrix.os }}\` |" >> "$GITHUB_STEP_SUMMARY"
echo "| **Compiler** | \`${{ matrix.compiler }}\` |" >> "$GITHUB_STEP_SUMMARY"
echo "| **Event** | \`${{ github.event_name }}\` |" >> "$GITHUB_STEP_SUMMARY"
echo "| **Branch** | \`${{ github.ref_name }}\` |" >> "$GITHUB_STEP_SUMMARY"
echo "| **Commit** | \`${{ github.sha }}\` |" >> "$GITHUB_STEP_SUMMARY"
echo "| **Triggered by** | ${{ github.actor }} |" >> "$GITHUB_STEP_SUMMARY"
echo "" >> "$GITHUB_STEP_SUMMARY"
echo "BlobRedundancyEventListener: createRocksDBListener(), notifySSTFileDeleted(), OnTableFileDeleted() location health tracking." >> "$GITHUB_STEP_SUMMARY"
1 change: 1 addition & 0 deletions cmake/ModularBuild.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ set(THEMIS_BASE_SOURCES
../src/base/module_sandbox.cpp
../src/base/hot_reload_manager.cpp
../src/base/ab_test_manager.cpp
../src/base/remote_registry_client.cpp
../src/base/wasm_plugin_sandbox.cpp
../src/base/wasm_runtime_injector.cpp
../src/base/plugin_dependency_graph.cpp
Expand Down
3 changes: 3 additions & 0 deletions include/aql/llm_timeout_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,9 @@ class RetryPolicy {
throw; // Non-retryable error
}

// Check if we've exhausted retries.
// max_retries is the number of retries after the initial call,
// so we exhaust only once attempt exceeds it.
// Check if we've exhausted retries (max_retries counts retry attempts,
// not the initial call)
if (attempt > config_.max_retries) {
Expand Down
5 changes: 5 additions & 0 deletions include/storage/blob_redundancy_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,11 @@ class BlobRedundancyManager {
// RocksDB Integration
Result<std::shared_ptr<rocksdb::EventListener>> createRocksDBListener();

// Called by RocksDBBlobListener when an SST file is deleted by RocksDB.
// Marks all blob locations backed by the deleted file as unhealthy and
// queues the affected blobs for re-replication.
void notifySSTFileDeleted(const std::string& file_path);

private:
Config config_;
std::atomic<bool> running_{false};
Expand Down
5 changes: 5 additions & 0 deletions include/storage/rocksdb_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ namespace rocksdb {
class Snapshot;
class DB;
class ColumnFamilyHandle;
class EventListener;
}

namespace themis {
Expand Down Expand Up @@ -248,6 +249,10 @@ class RocksDBWrapper {
/// Check if database is open
bool isOpen() const;

/// Register a RocksDB EventListener that will receive compaction/flush/deletion
/// events once the database is opened. Must be called before open().
void addEventListener(std::shared_ptr<rocksdb::EventListener> listener);

// ===== CRUD Operations =====

/// Get value by key
Expand Down
52 changes: 35 additions & 17 deletions include/themis/base/remote_registry_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,19 +180,23 @@ struct RequestStats {
*
* Thread safety: all public methods are safe to call from multiple threads.
*
* Async API: listPluginsAsync(), fetchPluginAsync(), and downloadPluginAsync()
* dispatch work to a background thread via std::async so the calling thread is
* released immediately (no blocking during retry backoffs). These methods
* require that the client is owned by a std::shared_ptr; calling them on a
* stack-allocated instance throws std::bad_weak_ptr.
* Async usage: the *Async variants (listPluginsAsync, fetchPluginAsync,
* downloadPluginAsync) return a std::future and run the entire operation —
* including retry back-off sleeps — on a detached worker thread, so the
* calling thread is never blocked. The client must be owned by a
* std::shared_ptr for those methods to work; they call shared_from_this()
* internally and throw std::bad_weak_ptr if the client is stack-allocated.
*
* Typical usage:
* @code
* RegistryConfig cfg;
* cfg.registry_url = "https://registry.example.com/api/v1";
* cfg.auth_token = "my-secret-token";
* cfg.download_dir = "/opt/themis/plugins";
* auto client = std::make_shared<RemoteRegistryClient>(cfg);
*
* // Synchronous
* auto plugins = client->listPlugins();
*
* // Asynchronous (calling thread released immediately)
* auto future = client->listPluginsAsync();
* auto plugins = future.get();
* // Synchronous usage (calling thread blocks during retries):
* RemoteRegistryClient client(cfg);
* auto plugins = client.listPlugins();
Expand Down Expand Up @@ -235,16 +239,14 @@ class RemoteRegistryClient : public std::enable_shared_from_this<RemoteRegistryC
std::vector<RegistryPluginEntry> listPlugins();

/**
* @brief Asynchronous version of listPlugins().
* @brief Asynchronous variant of listPlugins().
*
* Dispatches the entire operation (HTTP request + retry loop) to a
* worker thread via std::async, releasing the calling thread during
* back-off sleeps. Multiple concurrent async calls on the same client
* are safe because all shared state is protected by internal mutexes.
* Launches the operation on a worker thread and returns immediately.
* The calling thread is never blocked by retry back-off sleeps.
*
* @return std::future that resolves to the plugin list.
* @throws std::bad_weak_ptr if the client was not constructed via
* std::make_shared (i.e. has never been owned by a shared_ptr).
* @note The client must be owned by a std::shared_ptr. Calling this
* method on a stack-allocated instance throws std::bad_weak_ptr.
* @return Future that resolves to the plugin list (may be empty on error).
*/
std::future<std::vector<RegistryPluginEntry>> listPluginsAsync();

Expand All @@ -259,6 +261,14 @@ class RemoteRegistryClient : public std::enable_shared_from_this<RemoteRegistryC
std::optional<RegistryPluginEntry> fetchPlugin(const std::string& name);

/**
* @brief Asynchronous variant of fetchPlugin().
*
* @note The client must be owned by a std::shared_ptr.
* @param name Plugin name to look up.
* @return Future that resolves to the plugin entry, or std::nullopt.
*/
std::future<std::optional<RegistryPluginEntry>> fetchPluginAsync(
const std::string& name);
* @brief Asynchronous version of fetchPlugin().
*
* Dispatches the entire operation (HTTP request + retry loop) to a
Expand Down Expand Up @@ -290,6 +300,14 @@ class RemoteRegistryClient : public std::enable_shared_from_this<RemoteRegistryC
PluginDownloadResult downloadPlugin(const RegistryPluginEntry& entry);

/**
* @brief Asynchronous variant of downloadPlugin().
*
* @note The client must be owned by a std::shared_ptr.
* @param entry Plugin entry to download.
* @return Future that resolves to the download result.
*/
std::future<PluginDownloadResult> downloadPluginAsync(
const RegistryPluginEntry& entry);
* @brief Asynchronous version of downloadPlugin().
*
* Dispatches the entire operation (HTTP request + retry loop) to a
Expand Down
Loading
Loading