diff --git a/perf/StressTest.cs b/perf/StressTest.cs index 627ad74..1da38e8 100644 --- a/perf/StressTest.cs +++ b/perf/StressTest.cs @@ -46,6 +46,22 @@ public static async Task RunAsync(TimeSpan duration, ILogger log) { return bytesPerSecond; } + /// + /// Runs a correctness stress test against for the specified + /// using * 2 + /// concurrent read/write tasks. Each task writes random blocks and reads them back, + /// recomputing the hash to verify data integrity. Throws + /// on corruption. + /// + public static async Task RunCorrectnessAsync(IContentCache cache, TimeSpan duration) { + var timeIsUp = duration.ToCancellation(); + var tasks = new List(); + for (int i = 0; i < Environment.ProcessorCount * 2; i++) + tasks.Add(AbuseAsync(cache, timeIsUp)); + + await Task.WhenAll(tasks); + } + static async Task AbuseAsync(IContentCache cache, CancellationToken cancel) { byte[] data = new byte[cache.MaxBlockSize]; var random = new Random(); @@ -84,7 +100,7 @@ static async Task AbuseAsync(IContentCache cache, CancellationToken cancel throw new HashMismatchException(); transmitted += read; } - } catch (OperationCanceledException e) { + } catch (OperationCanceledException) { var opTime = accessStart.Elapsed; if (!cancel.IsCancellationRequested) { await Console.Error.WriteLineAsync($"last access: {opTime.TotalMilliseconds:N0}ms"); diff --git a/src/BlockCache.cs b/src/BlockCache.cs index fc3d819..0220b36 100644 --- a/src/BlockCache.cs +++ b/src/BlockCache.cs @@ -46,8 +46,13 @@ public async ValueTask WriteAsync(ContentHash hash, ReadOnlyMemory WriteAsync(ContentHash hash, ReadOnlyMemoryindex dictionary under the index lock so + // concurrent readers and writers see a consistent view immediately. + this.storage.UpdateIndex(index, newHash: hash, oldHash: evicted); + await this.storage.MarkDirtyAsync().ConfigureAwait(false); + } catch { + await blockWriteLock.DisposeAsync().ConfigureAwait(false); + throw; + } } else { #if DEBUG index = this.storage.BlockIndex(hash); @@ -84,12 +96,26 @@ await this.storage.WriteAsync(index, content, hash, CancellationToken.None) return TimeSpan.Zero; #endif } - - this.Available?.Invoke(this, hash, content.Span); - return start.Elapsed; } finally { + // Release the index lock before writing block data so other reads and writes + // can proceed concurrently while the (potentially large) copy is in progress. this.indexLock.Release(); } + + // Write the persisted index entry and block data under only the block write lock. + // The index lock has already been released, allowing other operations to proceed. + // index is always valid here: the early-return branches never reach this point. + Debug.Assert(index >= 0); + await using (blockWriteLock) { + this.storage.CommitWrite(index, content, hash); + } + + this.log.Log( + perfLevel, "Set content[{Length}] for {Hash} at {Index} in {Microseconds:F0}us", + content.Length, hash, index, start.Elapsed.TotalMicroseconds); + + this.Available?.Invoke(this, hash, content.Span); + return start.Elapsed; } public async ValueTask ReadAsync(ContentHash hash, long offset, Memory buffer, diff --git a/src/BlockIndex.cs b/src/BlockIndex.cs index f15894e..3587327 100644 --- a/src/BlockIndex.cs +++ b/src/BlockIndex.cs @@ -4,7 +4,7 @@ namespace Hash; using System.Runtime.InteropServices; sealed class BlockIndex: IAsyncDisposable { - readonly Dictionary positions = new(); + internal readonly Dictionary positions = new(); readonly IBlockReader reader; readonly IBlockWriter writer; @@ -78,7 +78,7 @@ public bool TrySet(int index, Entry value) { return true; } - void SetUnchecked(int index, Entry value) { + internal void SetUnchecked(int index, Entry value) { var span = MemoryMarshal.CreateSpan(ref value, 1); var bytes = MemoryMarshal.Cast(span); this.writer.Write(bytes, index, offset: 0); diff --git a/src/BlockStorage.cs b/src/BlockStorage.cs index a2d9e97..996047e 100644 --- a/src/BlockStorage.cs +++ b/src/BlockStorage.cs @@ -116,7 +116,7 @@ public async ValueTask FlushAsync(CancellationToken cancel = default) { blocksTime, indexTime); } - async ValueTask MarkDirtyAsync() { + internal async ValueTask MarkDirtyAsync() { if (this.dirty) return; @@ -125,6 +125,28 @@ async ValueTask MarkDirtyAsync() { this.dirty = true; } + /// + /// Updates only the in-memory hash→index dictionary for the given block. + /// Must be called under the index lock, with the block write lock already held. + /// + internal void UpdateIndex(int blockIndex, ContentHash newHash, ContentHash oldHash) { + if (!this.index.positions.TryAdd(newHash, blockIndex)) + throw new InvalidOperationException( + $"Internal error: hash {newHash} is already mapped to a block (attempted to map to block {blockIndex})"); + if (!this.index.positions.Remove(oldHash)) + throw new InvalidOperationException( + $"Internal error: evicted hash {oldHash} was not found in the index (block {blockIndex})"); + } + + /// + /// Writes the persisted index entry and block data. + /// Must be called under the block write lock (not the index lock). + /// + internal void CommitWrite(int blockIndex, ReadOnlyMemory block, ContentHash hash) { + this.index.SetUnchecked(blockIndex, new(hash, block.Length)); + this.writer.Write(block.Span, blockIndex, offset: 0); + } + public static async Task CreateAsync(string indexPath, string blocksPath, int blockSize, ILogger log, diff --git a/test/CorrectnessTests.cs b/test/CorrectnessTests.cs new file mode 100644 index 0000000..add90b9 --- /dev/null +++ b/test/CorrectnessTests.cs @@ -0,0 +1,26 @@ +namespace Hash; + +using Microsoft.Extensions.Logging.Abstractions; + +/// +/// Correctness stress tests for . +/// Verifies that concurrent reads and writes never produce data corruption, +/// exercising the locking logic directly without a TCP layer. +/// +public class CorrectnessTests { + // Small block count so evictions happen frequently — maximises the chance of + // exposing races between eviction, index update, and the data copy. + const int BlockCount = 64; + + [Fact] + public async Task ConcurrentReadWriteProducesNoDataCorruption() { + var cache = new BlockCache(BlockCache.DEFAULT_BLOCK_SIZE, BlockCount, + NullLogger.Instance); + await using var _ = cache; + + // RunCorrectnessAsync drives ProcessorCount*2 concurrent tasks, each writing + // random blocks and reading them back, recomputing the hash to detect corruption. + // A HashMismatchException is thrown (and propagated) if any mismatch is found. + await StressTest.RunCorrectnessAsync(cache, TimeSpan.FromSeconds(15)); + } +}