Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
2bb334e
Start implementing auto-repair
albe Sep 28, 2019
10bbc59
Merge branch 'main' into auto-repair
albe Mar 20, 2026
26945c3
Initial plan
Copilot Mar 20, 2026
75ddaf9
Fix checkUnfinishedCommits - remove broken code causing infinite loop
Copilot Mar 20, 2026
b56a267
Merge pull request #251 from albe/copilot/sub-pr-107
albe Mar 20, 2026
079cfa0
Initial plan
Copilot Mar 20, 2026
0a6183f
Add tests verifying repair does not lose previously committed data
Copilot Mar 20, 2026
e60f448
Repair secondary indexes when opened after primary index truncation
Copilot Mar 20, 2026
b79e1e8
Merge pull request #252 from albe/copilot/sub-pr-107
albe Mar 20, 2026
209ea30
Initial plan
Copilot Mar 20, 2026
2ea58fe
Improve test coverage: fix off-by-one bug in checkUnfinishedCommits, …
Copilot Mar 20, 2026
c83ae3b
Merge pull request #253 from albe/copilot/sub-pr-107
albe Mar 20, 2026
efee33e
Initial plan
Copilot Mar 21, 2026
ebc6385
Fix: guarantee branchOff runs before truncation, even with corrupted …
Copilot Mar 21, 2026
53a59d5
Merge pull request #259 from albe/copilot/sub-pr-107
albe Mar 21, 2026
6c02882
Initial plan
Copilot Mar 21, 2026
af1107d
Fix bench-index.js: use unique filenames per call to avoid consistenc…
Copilot Mar 21, 2026
2608a02
Merge pull request #260 from albe/copilot/sub-pr-107
albe Mar 21, 2026
03002e3
Initial plan
Copilot Mar 21, 2026
26150b5
Fix: checkUnfinishedCommits gracefully handles preRead hook errors
Copilot Mar 21, 2026
2c94d36
Merge pull request #261 from albe/copilot/sub-pr-107
albe Mar 21, 2026
4fa67b6
Initial plan
Copilot Mar 22, 2026
436b007
Detect primary-index-lagging: add getLastSequenceNumber() and emit ev…
Copilot Mar 22, 2026
6d1b2a1
Merge getLastSequenceNumber() into checkTornWrite() with signed retur…
Copilot Mar 22, 2026
f6b934c
Deduplicate findDocumentPositionBefore/prepareReadBuffer/readDocument…
Copilot Mar 22, 2026
967ba2d
Merge pull request #264 from albe/copilot/sub-pr-107
albe Mar 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -425,13 +425,21 @@ The storage engine is not strictly designed to follow ACID semantics. However, i

#### Atomicity

A single document write is guaranteed to be atomic. Unless specifically configured, atomicity spreads to all subsequent
writes until the write buffer is flushed, which happens either if the current document doesn't fully fit into the write
buffer or on the next node event loop.
This can be (ab)used to create a reduced form of transactional behaviour: All writes that happen within a single event loop
and still fit into the write buffer will all happen together or not at all.
If strict atomicity for single documents is required, you can configure the option `maxWriteBufferDocuments` to 1, which
leads to every single document being flushed directly.
A single document write is guaranteed to be atomic through the deserializer. Incomplete writes can not be deserialized due
to the nature of JSON. If you use a custom serialization format, e.g. msgpack or protobuf, you should make use of a checksum.
If deserialization of the last document fails on startup, the storage will be truncated and hence repair itself. This covers
cases of "torn writes", where not all blocks are written by the disk, due to powerfail.

Multi document writes (a commit of multiple events) is guaranteed to be atomic by checking the last commit to have been fully
finished during startup. If the last `commitId` does not match the `commitSize`, then this last commit was incomplete and will
be rolled back, by truncating the storage to the position before the commit.

Due to the write buffering applied, writes typically also happen in batches of multiple documents. So logical atomicity spreads
over multiple documents. This can be controlled through the options `maxWriteBufferDocuments`, which defines how many documents
may at maximum sit inside a single write buffer before being flushed, and the `writeBufferSize` which gives a size limit to the
write buffer. For optimal performance, a write buffer of 16kb has turned out to be good, at least on the SSD I use, but YMMV.
Generally, not limiting the write buffer filling through `maxWriteBufferDocuments` is recommended, since flushing only a part of
a full page size (typically 4kb) will increase write amplification.

#### Consistency

Expand Down
6 changes: 4 additions & 2 deletions bench/bench-index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ const Stable = require('event-storage');
const Latest = require('../index');

const WRITES = 1000;
let stableCallCount = 0;
let latestCallCount = 0;

function bench(index) {
index.open();
Expand All @@ -30,11 +32,11 @@ function bench(index) {
}

Suite.add('index [stable]', function() {
bench(new Stable.Index(this.cycles + '.index', { dataDirectory: 'data/stable' }));
bench(new Stable.Index((stableCallCount++) + '.index', { dataDirectory: 'data/stable' }));
});

Suite.add('index [latest]', function() {
bench(new Latest.Index(this.cycles + '.index', { dataDirectory: 'data/latest' }));
bench(new Latest.Index((latestCallCount++) + '.index', { dataDirectory: 'data/latest' }));
});

Suite.run();
33 changes: 33 additions & 0 deletions src/EventStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,43 @@ class EventStore extends events.EventEmitter {
this.storage.close();
throw err;
}
this.checkUnfinishedCommits();
this.emit('ready');
});
}

/**
* Check if the last commit in the store was unfinished, which is the case if not all events of the commit have been written.
* Torn writes are handled at the storage level, so this method only deals with unfinished commits.
* @private
*/
checkUnfinishedCommits() {
let position = this.storage.length;
let lastEvent;
let truncateIndex = false;
while (position > 0) {
try {
lastEvent = this.storage.read(position);
} catch (e) {
// A preRead hook may throw (e.g. access control). Stop repair check.
return;
}
if (lastEvent !== false) break;
truncateIndex = true;
position--;
}

if (lastEvent && lastEvent.metadata.commitSize && lastEvent.metadata.commitVersion !== lastEvent.metadata.commitSize - 1) {
this.emit('unfinished-commit', lastEvent);
// commitId = global sequence number at which the commit started
this.storage.truncate(lastEvent.metadata.commitId);
} else if (truncateIndex) {
// The index contained items that are not in the storage file; truncate everything
// after `position`, the last sequence number that was successfully read.
this.storage.truncate(position);
}
}

/**
* Scan the streams directory for existing streams so they are ready for `getEventStream()`.
*
Expand Down
5 changes: 5 additions & 0 deletions src/Index/WritableIndex.js
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ class WritableIndex extends ReadableIndex {
assertEqual(entry.constructor.name, this.EntryClass.name, `Wrong entry object.`);
assertEqual(entry.constructor.size, this.EntryClass.size, `Invalid entry size.`);

const lastEntry = this.lastEntry;
if (lastEntry !== false && lastEntry.number >= entry.number) {
throw new Error('Consistency error. Tried to add an index that should come before existing last entry.');
}

if (this.readUntil === this.data.length - 1) {
this.readUntil++;
}
Expand Down
28 changes: 21 additions & 7 deletions src/Partition/ReadablePartition.js
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,32 @@ class ReadablePartition extends events.EventEmitter {
}

/**
* @returns {number} -1 if the partition is ok and the sequence number of the broken document if a torn write was detected.
* Check if the last document in this partition is a torn write, and return the sequence
* number of the relevant document, encoded by sign:
* - Returns a positive value `(lastCompleteSeqnum + 1)` when no torn write was found and the
* partition is non-empty; the last complete document's sequence number is `result - 1`.
* - Returns a negative value `-(tornSeqnum + 1)` when a torn write was detected; the torn
* document's sequence number is `-(result) - 1`, and the last *complete* document's
* sequence number (if any) is `-(result) - 2`.
* - Returns `0` when the partition is empty (no documents at all).
*
* @returns {number}
*/
checkTornWrite() {
if (this.size === 0) {
return 0;
}
const reader = this.prepareReadBufferBackwards(this.size);
const separator = reader.buffer.toString('ascii', reader.cursor - DOCUMENT_SEPARATOR.length, reader.cursor);
if (separator !== DOCUMENT_SEPARATOR) {
const position = this.findDocumentPositionBefore(this.size);
const reader = this.prepareReadBuffer(position);
const { sequenceNumber } = this.readDocumentHeader(reader.buffer, reader.cursor, position);
return sequenceNumber;
const torn = separator !== DOCUMENT_SEPARATOR;
const position = this.findDocumentPositionBefore(this.size);
/* istanbul ignore if */
if (position === false || position < 0) {
return 0;
}
return -1;
const lastReader = this.prepareReadBuffer(position);
const { sequenceNumber } = this.readDocumentHeader(lastReader.buffer, lastReader.cursor, position);
return torn ? -(sequenceNumber + 1) : sequenceNumber + 1;
}

/**
Expand Down
9 changes: 4 additions & 5 deletions src/Partition/WritablePartition.js
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,16 @@ class WritablePartition extends ReadablePartition {
* @param {number} after The file position after which to truncate the partition.
*/
truncate(after) {
if (after > this.size) {
if (after >= this.size) {
return;
}
this.open();
after = Math.max(0, after);
this.flush();

// Always save the truncated part for manual recovery, even if it contains corrupted data
this.branchOff('truncated-' + Date.now(), after);

try {
this.readFrom(after);
} catch (e) {
Expand All @@ -329,10 +332,6 @@ class WritablePartition extends ReadablePartition {
}
}

// copy all truncated documents to some delete log
const backupName = (new Date()).toISOString().substring(0,10);
this.branchOff(backupName, after);

fs.truncateSync(this.fileName, this.headerSize + after);
this.truncateReadBuffer(after);
this.size = after;
Expand Down
3 changes: 2 additions & 1 deletion src/Storage/ReadableStorage.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ class ReadableStorage extends events.EventEmitter {

this.dataDirectory = path.resolve(config.dataDirectory);

this.initializeIndexes(config);
this.scanPartitions(config);
this.initializeIndexes(config);
}

/**
Expand Down Expand Up @@ -392,3 +392,4 @@ class ReadableStorage extends events.EventEmitter {

module.exports = ReadableStorage;
module.exports.matches = matches;
module.exports.CorruptFileError = Partition.CorruptFileError;
49 changes: 45 additions & 4 deletions src/Storage/WritableStorage.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,40 @@ class WritableStorage extends ReadableStorage {
* Check all partitions torn writes and truncate the storage to the position before the first torn write.
* This might delete correctly written events in partitions, if their sequence number is higher than the
* torn write in another partition.
* Also detects when the primary index is lagging behind the actual partition data and emits a
* 'primary-index-lagging' event in that case.
*/
checkTornWrites() {
let lastValidSequenceNumber = Number.MAX_SAFE_INTEGER;
let maxPartitionSequenceNumber = -1;
this.forEachPartition(partition => {
partition.open();
const tornSequenceNumber = partition.checkTornWrite();
if (tornSequenceNumber >= 0) {
lastValidSequenceNumber = Math.min(lastValidSequenceNumber, tornSequenceNumber);
const result = partition.checkTornWrite();
if (result < 0) {
// Torn write: result encodes -(tornSeqnum + 1), so torn seqnum = -result - 1.
const tornSeqnum = -result - 1;
lastValidSequenceNumber = Math.min(lastValidSequenceNumber, tornSeqnum);
// Any complete documents before the torn one contribute to the lagging check.
// Their last seqnum is tornSeqnum - 1 (if > 0; otherwise no complete docs).
if (tornSeqnum > 0) {
maxPartitionSequenceNumber = Math.max(maxPartitionSequenceNumber, tornSeqnum - 1);
}
} else if (result > 0) {
// No torn write: result encodes (lastCompleteSeqnum + 1), so seqnum = result - 1.
maxPartitionSequenceNumber = Math.max(maxPartitionSequenceNumber, result - 1);
}
// result === 0: empty partition, no action needed.
});
if (lastValidSequenceNumber < Number.MAX_SAFE_INTEGER) {
this.truncate(lastValidSequenceNumber);
// After truncation, account for documents beyond the truncation point being removed.
// truncate(N) keeps index entries 1..N, so the last kept partition seqnum is N-1.
maxPartitionSequenceNumber = Math.min(maxPartitionSequenceNumber, lastValidSequenceNumber - 1);
}
// A partition seqnum of N means the document was written when the index had N entries,
// so the index should contain at least N+1 entries to be consistent.
if (maxPartitionSequenceNumber >= 0 && maxPartitionSequenceNumber + 1 > this.index.length) {
this.emit('primary-index-lagging', maxPartitionSequenceNumber + 1, this.index.length);
}
this.forEachPartition(partition => partition.close());
}
Expand Down Expand Up @@ -343,6 +365,9 @@ class WritableStorage extends ReadableStorage {
if (!this.index.isOpen()) {
this.index.open();
}
if (after < 0) {
after += this.index.length;
}

this.truncatePartitions(after);

Expand All @@ -364,6 +389,21 @@ class WritableStorage extends ReadableStorage {
});
}

/**
* @inheritDoc
* Open an existing secondary index and repair any stale entries beyond the current primary
* index length. Stale entries can be present when checkTornWrites() truncated the primary
* index before this secondary index was loaded into memory.
*/
openIndex(name, matcher) {
const index = super.openIndex(name, matcher);
const lastEntry = index.lastEntry;
if (lastEntry !== false && lastEntry.number > this.index.length) {
index.truncate(index.find(this.index.length));
}
return index;
}

/**
* @protected
* @param {string} name
Expand Down Expand Up @@ -402,5 +442,6 @@ class WritableStorage extends ReadableStorage {

module.exports = WritableStorage;
module.exports.StorageLockedError = StorageLockedError;
module.exports.CorruptFileError = ReadableStorage.CorruptFileError;
module.exports.LOCK_THROW = LOCK_THROW;
module.exports.LOCK_RECLAIM = LOCK_RECLAIM;
module.exports.LOCK_RECLAIM = LOCK_RECLAIM;
Loading