diff --git a/lib/internal/blob.js b/lib/internal/blob.js index f16e520e197cdd..e852f1fc9bcbf5 100644 --- a/lib/internal/blob.js +++ b/lib/internal/blob.js @@ -453,30 +453,40 @@ function createBlobReaderStream(reader) { // There really should only be one read at a time so using an // array here is purely defensive. this.pendingPulls = []; - // Register a wakeup callback that the C++ side can invoke + // Lazily register a wakeup callback that the C++ side can invoke // when new data is available after a STATUS_BLOCK. - reader.setWakeup(() => { + this.wakeup = () => { if (this.pendingPulls.length > 0) { this.readNext(c); } - }); + }; }, pull(c) { const { promise, resolve, reject } = PromiseWithResolvers(); + if (this.pendingPulls.length === 0) { + reader.setWakeup(this.wakeup); + } this.pendingPulls.push({ resolve, reject }); this.readNext(c); return promise; }, + clearWakeupIfIdle() { + if (this.pendingPulls.length === 0) { + reader.setWakeup(undefined); + } + }, readNext(c) { reader.pull((status, buffer) => { // If pendingPulls is empty here, the stream had to have // been canceled, and we don't really care about the result. // We can simply exit. if (this.pendingPulls.length === 0) { + reader.setWakeup(undefined); return; } if (status === 0) { // EOS + reader.setWakeup(undefined); c.close(); // This is to signal the end for byob readers // see https://streams.spec.whatwg.org/#example-rbs-pull @@ -488,6 +498,7 @@ function createBlobReaderStream(reader) { // The read could fail for many different reasons when reading // from a non-memory resident blob part (e.g. file-backed blob). // The error details the system error code. + reader.setWakeup(undefined); const error = lazyDOMException('The blob could not be read', 'NotReadableError'); @@ -497,7 +508,7 @@ function createBlobReaderStream(reader) { return; } else if (status === 2) { // STATUS_BLOCK: No data available yet. The wakeup callback - // registered in start() will re-invoke readNext when data + // registered in pull() will re-invoke readNext when data // arrives. return; } @@ -517,6 +528,7 @@ function createBlobReaderStream(reader) { if (this.pendingPulls.length !== 0) { const pending = this.pendingPulls.shift(); pending.resolve(); + this.clearWakeupIfIdle(); } return; } @@ -525,6 +537,7 @@ function createBlobReaderStream(reader) { }); }, cancel(reason) { + reader.setWakeup(undefined); // Reject any currently pending pulls here. for (const pending of this.pendingPulls) { pending.reject(reason); diff --git a/test/parallel/test-blob-stream-gc.js b/test/parallel/test-blob-stream-gc.js new file mode 100644 index 00000000000000..24b2f15efc1265 --- /dev/null +++ b/test/parallel/test-blob-stream-gc.js @@ -0,0 +1,56 @@ +// Flags: --expose-gc --no-concurrent-array-buffer-sweeping +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { setImmediate: setImmediatePromise } = require('timers/promises'); + +const MiB = 1024 * 1024; +const iterations = 64; +const maxRetained = 16 * MiB; + +async function collectArrayBuffers() { + for (let i = 0; i < 3; i++) { + global.gc(); + await setImmediatePromise(); + } +} + +async function assertNoBlobStreamRetention(name, fn) { + const buffer = Buffer.alloc(MiB); + + await collectArrayBuffers(); + const before = process.memoryUsage().arrayBuffers; + + for (let i = 0; i < iterations; i++) { + await fn(buffer); + } + + await collectArrayBuffers(); + const retained = process.memoryUsage().arrayBuffers - before; + + assert( + retained < maxRetained, + `${name} retained ${retained} bytes in arrayBuffers`, + ); +} + +(async () => { + await assertNoBlobStreamRetention('unused Blob streams', + common.mustCall(async (buffer) => { + new Blob([buffer]).stream(); + }, iterations)); + + await assertNoBlobStreamRetention('cancelled Blob streams', + common.mustCall(async (buffer) => { + await new Blob([buffer]).stream() + .cancel(); + }, iterations)); + + await assertNoBlobStreamRetention('drained Blob streams', + common.mustCall(async (buffer) => { + await new Response( + new Blob([buffer]).stream(), + ).arrayBuffer(); + }, iterations)); +})().then(common.mustCall());