Skip to content
Merged
18 changes: 17 additions & 1 deletion perf/StressTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,22 @@ public static async Task<ulong> RunAsync(TimeSpan duration, ILogger log) {
return bytesPerSecond;
}

/// <summary>
/// Runs a correctness stress test against <paramref name="cache"/> for the specified
/// <paramref name="duration"/> using <see cref="Environment.ProcessorCount"/> * 2
/// concurrent read/write tasks. Each task writes random blocks and reads them back,
/// recomputing the hash to verify data integrity. Throws
/// <see cref="HashMismatchException"/> on corruption.
/// </summary>
public static async Task RunCorrectnessAsync(IContentCache cache, TimeSpan duration) {
var timeIsUp = duration.ToCancellation();
var tasks = new List<Task>();
for (int i = 0; i < Environment.ProcessorCount * 2; i++)
tasks.Add(AbuseAsync(cache, timeIsUp));

await Task.WhenAll(tasks);
}

static async Task<long> AbuseAsync(IContentCache cache, CancellationToken cancel) {
byte[] data = new byte[cache.MaxBlockSize];
var random = new Random();
Expand Down Expand Up @@ -84,7 +100,7 @@ static async Task<long> AbuseAsync(IContentCache cache, CancellationToken cancel
throw new HashMismatchException();
transmitted += read;
}
} catch (OperationCanceledException e) {
} catch (OperationCanceledException) {
var opTime = accessStart.Elapsed;
if (!cancel.IsCancellationRequested) {
await Console.Error.WriteLineAsync($"last access: {opTime.TotalMilliseconds:N0}ms");
Expand Down
46 changes: 36 additions & 10 deletions src/BlockCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,13 @@ public async ValueTask<TimeSpan> WriteAsync(ContentHash hash, ReadOnlyMemory<byt
if (reportPerf)
this.log.LogDebug("write lock wait: {Microseconds:F0}us",
start.Elapsed.TotalMicroseconds);

// Hoisted outside the try so they survive the finally (indexLock.Release) and are
// accessible in the continuation: index is used in CommitWrite and blockWriteLock
// in the await-using block, both of which execute after the index lock is dropped.
int index = -1;
AsyncReaderWriterLock.Releaser blockWriteLock = default;
try {
int index;
var writeTime = start = StopwatchTimestamp.Now;
if (this.evictionStrategy.Access(hash, out var evicted)) {
this.Evicted?.Invoke(this, evicted);
Expand All @@ -61,13 +66,20 @@ public async ValueTask<TimeSpan> WriteAsync(ContentHash hash, ReadOnlyMemory<byt

start = StopwatchTimestamp.Now;
var entryLock = this.blockLocks[index % this.blockLocks.Length];
await using (await entryLock.WriteLockAsync(cancel))
await this.storage.WriteAsync(index, content, hash, CancellationToken.None)
.ConfigureAwait(false);

this.log.Log(
perfLevel, "Set content[{Length}] for {Hash} at {Index} in {Microseconds:F0}us",
content.Length, hash, index, start.Elapsed.TotalMicroseconds);
// Acquire the block write lock while still holding the index lock.
// This prevents another writer from claiming the same block between the
// dictionary update below and the actual data write after the index lock drops.
blockWriteLock = await entryLock.WriteLockAsync(cancel);
try {
// Update the in-memory hash->index dictionary under the index lock so
// concurrent readers and writers see a consistent view immediately.
this.storage.UpdateIndex(index, newHash: hash, oldHash: evicted);
await this.storage.MarkDirtyAsync().ConfigureAwait(false);
} catch {
await blockWriteLock.DisposeAsync().ConfigureAwait(false);
throw;
}
} else {
#if DEBUG
index = this.storage.BlockIndex(hash);
Expand All @@ -84,12 +96,26 @@ await this.storage.WriteAsync(index, content, hash, CancellationToken.None)
return TimeSpan.Zero;
#endif
}

this.Available?.Invoke(this, hash, content.Span);
return start.Elapsed;
} finally {
// Release the index lock before writing block data so other reads and writes
// can proceed concurrently while the (potentially large) copy is in progress.
this.indexLock.Release();
}

// Write the persisted index entry and block data under only the block write lock.
// The index lock has already been released, allowing other operations to proceed.
// index is always valid here: the early-return branches never reach this point.
Debug.Assert(index >= 0);
await using (blockWriteLock) {
this.storage.CommitWrite(index, content, hash);
}

this.log.Log(
perfLevel, "Set content[{Length}] for {Hash} at {Index} in {Microseconds:F0}us",
content.Length, hash, index, start.Elapsed.TotalMicroseconds);

this.Available?.Invoke(this, hash, content.Span);
return start.Elapsed;
}

public async ValueTask<int?> ReadAsync(ContentHash hash, long offset, Memory<byte> buffer,
Expand Down
4 changes: 2 additions & 2 deletions src/BlockIndex.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace Hash;
using System.Runtime.InteropServices;

sealed class BlockIndex: IAsyncDisposable {
readonly Dictionary<ContentHash, int> positions = new();
internal readonly Dictionary<ContentHash, int> positions = new();
readonly IBlockReader reader;
readonly IBlockWriter writer;

Expand Down Expand Up @@ -78,7 +78,7 @@ public bool TrySet(int index, Entry value) {
return true;
}

void SetUnchecked(int index, Entry value) {
internal void SetUnchecked(int index, Entry value) {
var span = MemoryMarshal.CreateSpan(ref value, 1);
var bytes = MemoryMarshal.Cast<Entry, byte>(span);
this.writer.Write(bytes, index, offset: 0);
Expand Down
24 changes: 23 additions & 1 deletion src/BlockStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public async ValueTask FlushAsync(CancellationToken cancel = default) {
blocksTime, indexTime);
}

async ValueTask MarkDirtyAsync() {
internal async ValueTask MarkDirtyAsync() {
if (this.dirty)
return;

Expand All @@ -125,6 +125,28 @@ async ValueTask MarkDirtyAsync() {
this.dirty = true;
}

/// <summary>
/// Updates only the in-memory hash→index dictionary for the given block.
/// Must be called under the index lock, with the block write lock already held.
/// </summary>
internal void UpdateIndex(int blockIndex, ContentHash newHash, ContentHash oldHash) {
if (!this.index.positions.TryAdd(newHash, blockIndex))
throw new InvalidOperationException(
$"Internal error: hash {newHash} is already mapped to a block (attempted to map to block {blockIndex})");
if (!this.index.positions.Remove(oldHash))
throw new InvalidOperationException(
$"Internal error: evicted hash {oldHash} was not found in the index (block {blockIndex})");
}

/// <summary>
/// Writes the persisted index entry and block data.
/// Must be called under the block write lock (not the index lock).
/// </summary>
internal void CommitWrite(int blockIndex, ReadOnlyMemory<byte> block, ContentHash hash) {
this.index.SetUnchecked(blockIndex, new(hash, block.Length));
this.writer.Write(block.Span, blockIndex, offset: 0);
}

public static async Task<BlockStorage> CreateAsync(string indexPath, string blocksPath,
int blockSize,
ILogger log,
Expand Down
26 changes: 26 additions & 0 deletions test/CorrectnessTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
namespace Hash;

using Microsoft.Extensions.Logging.Abstractions;

/// <summary>
/// Correctness stress tests for <see cref="BlockCache"/>.
/// Verifies that concurrent reads and writes never produce data corruption,
/// exercising the locking logic directly without a TCP layer.
/// </summary>
public class CorrectnessTests {
// Small block count so evictions happen frequently — maximises the chance of
// exposing races between eviction, index update, and the data copy.
const int BlockCount = 64;

[Fact]
public async Task ConcurrentReadWriteProducesNoDataCorruption() {
var cache = new BlockCache(BlockCache.DEFAULT_BLOCK_SIZE, BlockCount,
NullLogger<BlockCache>.Instance);
await using var _ = cache;

// RunCorrectnessAsync drives ProcessorCount*2 concurrent tasks, each writing
// random blocks and reading them back, recomputing the hash to detect corruption.
// A HashMismatchException is thrown (and propagated) if any mismatch is found.
await StressTest.RunCorrectnessAsync(cache, TimeSpan.FromSeconds(15));
}
}