From 3cccf3cf0a354a202bdee5dafde7c0ef42b6fe05 Mon Sep 17 00:00:00 2001 From: Igor Date: Wed, 25 Feb 2026 15:53:03 +0300 Subject: [PATCH] lib: keep event loop alive for pending Atomics.waitAsync --- lib/internal/atomics/wait_async.js | 36 ++++++ lib/internal/bootstrap/node.js | 24 ++++ lib/internal/worker/messaging.js | 7 +- .../test-atomics-waitasync-event-loop.mjs | 121 ++++++++++++++++++ .../test-worker-messaging-event-loop-ref.mjs | 20 +++ 5 files changed, 207 insertions(+), 1 deletion(-) create mode 100644 lib/internal/atomics/wait_async.js create mode 100644 test/parallel/test-atomics-waitasync-event-loop.mjs create mode 100644 test/parallel/test-worker-messaging-event-loop-ref.mjs diff --git a/lib/internal/atomics/wait_async.js b/lib/internal/atomics/wait_async.js new file mode 100644 index 00000000000000..9ba5086973d993 --- /dev/null +++ b/lib/internal/atomics/wait_async.js @@ -0,0 +1,36 @@ +'use strict'; + +const { + PromisePrototypeThen +} = primordials; + +const timers = require('timers'); + +const keepAliveInterval = 2 ** 31 - 1; + +let pendingWaiters = 0; +let keepAliveHandle = null; + +function maybeStopKeepAlive() { + if (--pendingWaiters === 0) { + timers.clearInterval(keepAliveHandle); + keepAliveHandle = null; + } +} + +function trackWaitAsyncResult(result) { + if (!result.async) { + return result; + } + + if (++pendingWaiters === 1) { + keepAliveHandle = timers.setInterval(() => {}, keepAliveInterval); + } + + PromisePrototypeThen(result.value, maybeStopKeepAlive, maybeStopKeepAlive); + return result; +} + +module.exports = { + trackWaitAsyncResult, +}; diff --git a/lib/internal/bootstrap/node.js b/lib/internal/bootstrap/node.js index 2fafdb92a78e04..de6d5e7cfa45db 100644 --- a/lib/internal/bootstrap/node.js +++ b/lib/internal/bootstrap/node.js @@ -54,6 +54,7 @@ /* global process, require, internalBinding, primordials */ const { + AtomicsWaitAsync, FunctionPrototypeCall, JSONParse, Number, @@ -62,11 +63,14 @@ const { ObjectFreeze, ObjectGetPrototypeOf, ObjectSetPrototypeOf, + Proxy, + ReflectApply, SymbolToStringTag, globalThis, } = primordials; const config = internalBinding('config'); const internalTimers = require('internal/timers'); +const { trackWaitAsyncResult } = require('internal/atomics/wait_async'); const { defineOperation } = require('internal/util'); const { validateInteger, @@ -414,6 +418,26 @@ internalBinding('process_methods').setEmitWarningSync(emitWarningSync); process.getBuiltinModule = getBuiltinModule; } +// Patch Atomics.waitAsync to hold an event-loop reference while async waiters +// are pending. Without this, Node.js exits if Atomics.waitAsync() is the only +// outstanding operation and no other ref'd handle (timer, socket, worker) is +// active, because the underlying libuv handles are unreferenced. +{ + const wrappedAtomicsWaitAsync = new Proxy(AtomicsWaitAsync, { + __proto__: null, + apply(target, thisArg, args) { + return trackWaitAsyncResult(ReflectApply(target, thisArg, args)); + } + }); + + ObjectDefineProperty(globalThis.Atomics, 'waitAsync', { + __proto__: null, + value: wrappedAtomicsWaitAsync, + writable: true, + configurable: true, + }); +} + function setupProcessObject() { const EventEmitter = require('events'); const origProcProto = ObjectGetPrototypeOf(process); diff --git a/lib/internal/worker/messaging.js b/lib/internal/worker/messaging.js index d1573b870ef113..d87211fbfd2e61 100644 --- a/lib/internal/worker/messaging.js +++ b/lib/internal/worker/messaging.js @@ -16,6 +16,9 @@ const { const { constructSharedArrayBuffer, } = require('internal/util'); +const { + trackWaitAsyncResult, +} = require('internal/atomics/wait_async'); const { codes: { @@ -204,7 +207,9 @@ async function postMessageToThread(threadId, value, transferList, timeout) { const memory = constructSharedArrayBuffer(WORKER_MESSAGING_SHARED_DATA); const status = new Int32Array(memory); - const promise = AtomicsWaitAsync(status, WORKER_MESSAGING_STATUS_INDEX, 0, timeout).value; + const promise = + trackWaitAsyncResult( + AtomicsWaitAsync(status, WORKER_MESSAGING_STATUS_INDEX, 0, timeout)).value; const message = { type: messageTypes.SEND_MESSAGE_TO_WORKER, diff --git a/test/parallel/test-atomics-waitasync-event-loop.mjs b/test/parallel/test-atomics-waitasync-event-loop.mjs new file mode 100644 index 00000000000000..256a719747958f --- /dev/null +++ b/test/parallel/test-atomics-waitasync-event-loop.mjs @@ -0,0 +1,121 @@ +import { Worker } from 'node:worker_threads'; +import assert from 'node:assert'; + +function makeInt32(initialValue) { + const sab = new SharedArrayBuffer(4); + const view = new Int32Array(sab, 0, 1); + Atomics.store(view, 0, initialValue); + return view; +} + +{ + const view = makeInt32(0); + assert.strictEqual( + Object.prototype.hasOwnProperty.call(Atomics.waitAsync, 'prototype'), + false); + assert.throws(() => new Atomics.waitAsync(view, 0, 0), { + name: 'TypeError', + }); +} + +// No timeout, notified by a worker thread +// Must not exit early and the promise must resolve with 'ok' +{ + const view = makeInt32(0); + + const workerCode = ` + import { workerData, parentPort } from 'node:worker_threads'; + const view = new Int32Array(workerData, 0, 1); + setTimeout(() => { + Atomics.store(view, 0, 1); + Atomics.notify(view, 0, 1); + }, 50); + parentPort.postMessage('ready'); + `; + + const worker = new Worker(workerCode, { eval: true, workerData: view.buffer }); + await new Promise((resolve) => worker.once('message', resolve)); + worker.unref(); // only the waitAsync should keep the loop alive + + const result = Atomics.waitAsync(view, 0, 0); + assert.strictEqual(result.async, true, 'should be async'); + const value = await result.value; + assert.strictEqual(value, 'ok', `expected 'ok', got "${value}"`); +} + +// With timeout, resolved by notify +// A timeout is specified but the notify arrives first. Must resolve to "ok". +{ + const view = makeInt32(0); + + const workerCode = ` + import { workerData, parentPort } from 'node:worker_threads'; + const view = new Int32Array(workerData, 0, 1); + setTimeout(() => { + Atomics.store(view, 0, 1); + Atomics.notify(view, 0, 1); + }, 50); + parentPort.postMessage('ready'); + `; + + const worker = new Worker(workerCode, { eval: true, workerData: view.buffer }); + await new Promise((resolve) => worker.once('message', resolve)); + worker.unref(); + + const result = Atomics.waitAsync(view, 0, 0, 5_000 /* 5 s – won't fire */); + assert.strictEqual(result.async, true); + const value = await result.value; + assert.strictEqual(value, 'ok'); +} + +// With timeout, resolved by timeout itself +// No notify ever fires; the promise must resolve to "timed-out" via the timer. +{ + const view = makeInt32(0); + + const result = Atomics.waitAsync(view, 0, 0, 30); + assert.strictEqual(result.async, true); + const value = await result.value; + assert.strictEqual(value, 'timed-out'); +} + +// Multiple concurrent waiters +// All promises must resolve and the loop must not exit until the last one does. +{ + const view = makeInt32(0); + + const workerCode = ` + import { workerData, parentPort } from 'node:worker_threads'; + const view = new Int32Array(workerData, 0, 1); + setTimeout(() => { + Atomics.store(view, 0, 1); + Atomics.notify(view, 0, 3); // wake up to 3 waiters + }, 50); + parentPort.postMessage('ready'); + `; + + const worker = new Worker(workerCode, { eval: true, workerData: view.buffer }); + await new Promise((resolve) => worker.once('message', resolve)); + worker.unref(); + + const [r1, r2, r3] = [ + Atomics.waitAsync(view, 0, 0), + Atomics.waitAsync(view, 0, 0), + Atomics.waitAsync(view, 0, 0), + ]; + assert.strictEqual(r1.async, true); + assert.strictEqual(r2.async, true); + assert.strictEqual(r3.async, true); + const values = await Promise.all([r1.value, r2.value, r3.value]); + assert.deepStrictEqual(values, ['ok', 'ok', 'ok']); +} + +// Immediate synchronous resolution (value mismatch) +// When the current value does not equal the expected value, waitAsync must +// return { async: false, value: "not-equal" } and not ref any handle. +{ + const view = makeInt32(99); // already != 0 + const result = Atomics.waitAsync(view, 0, 0); + assert.strictEqual(result.async, false); + assert.strictEqual(result.value, 'not-equal'); +} diff --git a/test/parallel/test-worker-messaging-event-loop-ref.mjs b/test/parallel/test-worker-messaging-event-loop-ref.mjs new file mode 100644 index 00000000000000..ff64c2da2a8619 --- /dev/null +++ b/test/parallel/test-worker-messaging-event-loop-ref.mjs @@ -0,0 +1,20 @@ +import '../common/index.mjs'; +import assert from 'node:assert'; +import { + Worker, + postMessageToThread, +} from 'node:worker_threads'; + +// postMessageToThread() uses an internal Atomics.waitAsync path, which should keep the event loop alive while awaiting the response. +const workerCode = ` +const { parentPort } = require('node:worker_threads'); +process.on('workerMessage', () => {}); +parentPort.postMessage('ready'); +`; + +const worker = new Worker(workerCode, { eval: true }); +await new Promise((resolve) => worker.once('message', resolve)); +worker.unref(); + +await assert.doesNotReject(postMessageToThread(worker.threadId, { hello: 1 })); +await worker.terminate();