diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index ee496f30e5934e..92508613e8883e 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -2480,21 +2480,26 @@ function readableStreamDefaultControllerClose(controller) { } function readableStreamDefaultControllerEnqueue(controller, chunk) { - if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller)) + // Equivalent to readableStreamDefaultControllerCanCloseOrEnqueue() + // followed by isReadableStreamLocked() and + // readableStreamGetNumReadRequests(), but with the state loaded once: + // this runs for every enqueued chunk. + const controllerState = controller[kState]; + const stream = controllerState.stream; + if (controllerState.closeRequested || stream[kState].state !== 'readable') return; - const { - stream, - } = controller[kState]; - - if (isReadableStreamLocked(stream) && - readableStreamGetNumReadRequests(stream)) { + const reader = stream[kState].reader; + if (reader !== undefined && + reader[kState] !== undefined && + reader[kType] === 'ReadableStreamDefaultReader' && + reader[kState].readRequests.length) { readableStreamFulfillReadRequest(stream, chunk, false); } else { try { const chunkSize = FunctionPrototypeCall( - controller[kState].sizeAlgorithm, + controllerState.sizeAlgorithm, undefined, chunk); enqueueValueWithSize(controller, chunk, chunkSize); @@ -2533,22 +2538,27 @@ function readableStreamDefaultControllerGetDesiredSize(controller) { } function readableStreamDefaultControllerShouldCallPull(controller) { - const { - stream, - } = controller[kState]; - if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller) || - !controller[kState].started) + // Single-pass version of the spec's predicate chain (CanCloseOrEnqueue, + // IsLocked, HasDefaultReader, GetNumReadRequests, GetDesiredSize): this + // runs at least once per chunk on every default-stream path. The + // desired-size computation is inlined because the stream state is + // already known to be 'readable' here. + const controllerState = controller[kState]; + const stream = controllerState.stream; + if (controllerState.closeRequested || + stream[kState].state !== 'readable' || + !controllerState.started) return false; - if (isReadableStreamLocked(stream) && - readableStreamGetNumReadRequests(stream)) { + const reader = stream[kState].reader; + if (reader !== undefined && + reader[kState] !== undefined && + reader[kType] === 'ReadableStreamDefaultReader' && + reader[kState].readRequests.length) { return true; } - const desiredSize = readableStreamDefaultControllerGetDesiredSize(controller); - assert(desiredSize !== null); - - return desiredSize > 0; + return controllerState.highWaterMark - controllerState.queueTotalSize > 0; } function readableStreamDefaultControllerCallPullIfNeeded(controller) { @@ -2798,28 +2808,29 @@ function readableByteStreamControllerGetDesiredSize(controller) { } function readableByteStreamControllerShouldCallPull(controller) { - const { - stream, - } = controller[kState]; + // Single-pass version of the spec's predicate chain (HasDefaultReader, + // GetNumReadRequests, HasBYOBReader, GetNumReadIntoRequests, + // GetDesiredSize): this runs at least once per chunk on every byte + // stream path. The desired-size computation is inlined because the + // stream state is already known to be 'readable' here. + const controllerState = controller[kState]; + const stream = controllerState.stream; if (stream[kState].state !== 'readable' || - controller[kState].closeRequested || - !controller[kState].started) { + controllerState.closeRequested || + !controllerState.started) { return false; } - if (readableStreamHasDefaultReader(stream) && - readableStreamGetNumReadRequests(stream) > 0) { - return true; - } - - if (readableStreamHasBYOBReader(stream) && - readableStreamGetNumReadIntoRequests(stream) > 0) { - return true; + const reader = stream[kState].reader; + if (reader !== undefined && reader[kState] !== undefined) { + const type = reader[kType]; + if (type === 'ReadableStreamDefaultReader') { + if (reader[kState].readRequests.length) return true; + } else if (type === 'ReadableStreamBYOBReader') { + if (reader[kState].readIntoRequests.length) return true; + } } - const desiredSize = readableByteStreamControllerGetDesiredSize(controller); - assert(desiredSize !== null); - - return desiredSize > 0; + return controllerState.highWaterMark - controllerState.queueTotalSize > 0; } function readableByteStreamControllerHandleQueueDrain(controller) { diff --git a/lib/internal/webstreams/transformstream.js b/lib/internal/webstreams/transformstream.js index 5b1be9e3aa7a65..535c783a3a31c3 100644 --- a/lib/internal/webstreams/transformstream.js +++ b/lib/internal/webstreams/transformstream.js @@ -258,12 +258,7 @@ function InternalTransferredTransformStream() { readable: undefined, writable: undefined, backpressure: undefined, - backpressureChange: { - __proto__: null, - promise: undefined, - resolve: undefined, - reject: undefined, - }, + backpressureChange: undefined, controller: undefined, }; } @@ -390,12 +385,7 @@ function initializeTransformStream( writable, controller: undefined, backpressure: undefined, - backpressureChange: { - __proto__: null, - promise: undefined, - resolve: undefined, - reject: undefined, - }, + backpressureChange: undefined, }; transformStreamSetBackpressure(stream, true); @@ -429,12 +419,27 @@ function transformStreamUnblockWrite(stream) { transformStreamSetBackpressure(stream, false); } +// The spec's [[backpressureChangePromise]] is only ever observed by the +// source pull algorithm (settles when backpressure next becomes true) and +// by a sink write arriving while backpressure is set (settles when +// backpressure next becomes false). Instead of allocating a fresh promise +// record on every flip, the record is materialized lazily on first +// observation and dropped once settled; flips nobody is waiting on +// allocate nothing. +function transformStreamBackpressureChangePromise(stream) { + const state = stream[kState]; + return (state.backpressureChange ??= PromiseWithResolvers()).promise; +} + function transformStreamSetBackpressure(stream, backpressure) { - assert(stream[kState].backpressure !== backpressure); - if (stream[kState].backpressureChange.promise !== undefined) - stream[kState].backpressureChange.resolve?.(); - stream[kState].backpressureChange = PromiseWithResolvers(); - stream[kState].backpressure = backpressure; + const state = stream[kState]; + assert(state.backpressure !== backpressure); + const backpressureChange = state.backpressureChange; + if (backpressureChange !== undefined) { + state.backpressureChange = undefined; + backpressureChange.resolve(); + } + state.backpressure = backpressure; } function setupTransformStreamDefaultController( @@ -554,7 +559,7 @@ function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) { } = stream[kState]; assert(writable[kState].state === 'writable'); if (stream[kState].backpressure) { - const backpressureChange = stream[kState].backpressureChange.promise; + const backpressureChange = transformStreamBackpressureChangePromise(stream); return PromisePrototypeThen( backpressureChange, () => { @@ -638,9 +643,8 @@ function transformStreamDefaultSinkCloseAlgorithm(stream) { function transformStreamDefaultSourcePullAlgorithm(stream) { assert(stream[kState].backpressure); - assert(stream[kState].backpressureChange.promise !== undefined); transformStreamSetBackpressure(stream, false); - return stream[kState].backpressureChange.promise; + return transformStreamBackpressureChangePromise(stream); } function transformStreamDefaultSourceCancelAlgorithm(stream, reason) { diff --git a/lib/internal/webstreams/util.js b/lib/internal/webstreams/util.js index 836e1de130505b..cd203b77c2d22a 100644 --- a/lib/internal/webstreams/util.js +++ b/lib/internal/webstreams/util.js @@ -134,45 +134,44 @@ function isBrandCheck(brand) { }; } +// The queue helpers below run once per chunk on the hot paths of every +// default readable/writable stream, so they load the controller state a +// single time and don't assert the existence of the queue fields (both +// are unconditionally initialized during controller setup and only ever +// replaced wholesale). function dequeueValue(controller) { - assert(controller[kState].queue !== undefined); - assert(controller[kState].queueTotalSize !== undefined); - assert(controller[kState].queue.length); + const state = controller[kState]; + assert(state.queue.length); const { value, size, - } = ArrayPrototypeShift(controller[kState].queue); - controller[kState].queueTotalSize = - MathMax(0, controller[kState].queueTotalSize - size); + } = ArrayPrototypeShift(state.queue); + state.queueTotalSize = MathMax(0, state.queueTotalSize - size); return value; } function resetQueue(controller) { - assert(controller[kState].queue !== undefined); - assert(controller[kState].queueTotalSize !== undefined); - controller[kState].queue = []; - controller[kState].queueTotalSize = 0; + const state = controller[kState]; + state.queue = []; + state.queueTotalSize = 0; } function peekQueueValue(controller) { - assert(controller[kState].queue !== undefined); - assert(controller[kState].queueTotalSize !== undefined); - assert(controller[kState].queue.length); - return controller[kState].queue[0].value; + const state = controller[kState]; + assert(state.queue.length); + return state.queue[0].value; } function enqueueValueWithSize(controller, value, size) { - assert(controller[kState].queue !== undefined); - assert(controller[kState].queueTotalSize !== undefined); + const state = controller[kState]; const coercedSize = +size; if (NumberIsNaN(coercedSize) || coercedSize < 0 || coercedSize === Infinity) { throw new ERR_INVALID_ARG_VALUE.RangeError('size', size); } - size = coercedSize; - ArrayPrototypePush(controller[kState].queue, { value, size }); - controller[kState].queueTotalSize += size; + ArrayPrototypePush(state.queue, { value, size: coercedSize }); + state.queueTotalSize += coercedSize; } // Arity-specialized variants of the promise-callback wrapper. The generic diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index bc7f58a05fc2b9..37ef513a4e4166 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -601,6 +601,10 @@ function createWritableStreamState() { __proto__: null, closedPromise: undefined, closeRequest: kNilRequest, + // Mirrors "closeRequest or inFlightCloseRequest is pending"; kept as a + // flag because the predicate runs several times per chunk on the write + // hot path. + closeQueuedOrInFlight: false, inFlightWriteRequest: kNilRequest, inFlightCloseRequest: kNilRequest, pendingAbortRequest: kNilPendingAbortRequest, @@ -742,6 +746,7 @@ function writableStreamClose(stream) { assert(state === 'writable' || state === 'erroring'); assert(!writableStreamCloseQueuedOrInFlight(stream)); stream[kState].closeRequest = PromiseWithResolvers(); + stream[kState].closeQueuedOrInFlight = true; const { promise } = stream[kState].closeRequest; if (writer !== undefined && backpressure && state === 'writable') writer[kState].ready?.resolve?.(); @@ -750,12 +755,13 @@ function writableStreamClose(stream) { } function writableStreamUpdateBackpressure(stream, backpressure) { - assert(stream[kState].state === 'writable'); - assert(!writableStreamCloseQueuedOrInFlight(stream)); + const streamState = stream[kState]; + assert(streamState.state === 'writable'); + assert(!streamState.closeQueuedOrInFlight); const { writer, - } = stream[kState]; - if (writer !== undefined && stream[kState].backpressure !== backpressure) { + } = streamState; + if (writer !== undefined && streamState.backpressure !== backpressure) { if (backpressure) { // The spec replaces [[readyPromise]] with a fresh pending promise; // dropping the cache lets the next observation derive it. @@ -764,7 +770,7 @@ function writableStreamUpdateBackpressure(stream, backpressure) { writer[kState].ready?.resolve?.(); } } - stream[kState].backpressure = backpressure; + streamState.backpressure = backpressure; } function writableStreamStartErroring(stream, reason) { @@ -792,6 +798,7 @@ function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream) { assert(stream[kState].inFlightCloseRequest.promise === undefined); stream[kState].closeRequest.reject?.(stream[kState].storedError); stream[kState].closeRequest = kNilRequest; + stream[kState].closeQueuedOrInFlight = false; } const closedPromiseCache = stream[kState].closedPromise; @@ -857,6 +864,7 @@ function writableStreamFinishInFlightCloseWithError(stream, error) { assert(stream[kState].inFlightCloseRequest.promise !== undefined); stream[kState].inFlightCloseRequest.reject?.(error); stream[kState].inFlightCloseRequest = kNilRequest; + stream[kState].closeQueuedOrInFlight = false; assert(stream[kState].state === 'writable' || stream[kState].state === 'erroring'); if (stream[kState].pendingAbortRequest.abort.promise !== undefined) { @@ -870,6 +878,7 @@ function writableStreamFinishInFlightClose(stream) { assert(stream[kState].inFlightCloseRequest.promise !== undefined); stream[kState].inFlightCloseRequest.resolve?.(); stream[kState].inFlightCloseRequest = kNilRequest; + stream[kState].closeQueuedOrInFlight = false; if (stream[kState].state === 'erroring') { stream[kState].storedError = undefined; if (stream[kState].pendingAbortRequest.abort.promise !== undefined) { @@ -933,11 +942,7 @@ function writableStreamDealWithRejection(stream, error) { } function writableStreamCloseQueuedOrInFlight(stream) { - if (stream[kState].closeRequest.promise === undefined && - stream[kState].inFlightCloseRequest.promise === undefined) { - return false; - } - return true; + return stream[kState].closeQueuedOrInFlight; } function writableStreamAddWriteRequest(stream) { @@ -951,34 +956,34 @@ function writableStreamAddWriteRequest(stream) { } function writableStreamDefaultWriterWrite(writer, chunk) { - const { - stream, - } = writer[kState]; + const writerState = writer[kState]; + const stream = writerState.stream; assert(stream !== undefined); + const streamState = stream[kState]; const { controller, - } = stream[kState]; + } = streamState; const chunkSize = writableStreamDefaultControllerGetChunkSize( controller, chunk); - if (stream !== writer[kState].stream) { + if (stream !== writerState.stream) { return PromiseReject( new ERR_INVALID_STATE.TypeError('Mismatched WritableStreams')); } const { state, - } = stream[kState]; + } = streamState; if (state === 'errored') - return PromiseReject(stream[kState].storedError); + return PromiseReject(streamState.storedError); - if (writableStreamCloseQueuedOrInFlight(stream) || state === 'closed') { + if (streamState.closeQueuedOrInFlight || state === 'closed') { return PromiseReject( new ERR_INVALID_STATE.TypeError('WritableStream is closed')); } if (state === 'erroring') - return PromiseReject(stream[kState].storedError); + return PromiseReject(streamState.storedError); assert(state === 'writable'); @@ -1085,8 +1090,9 @@ function writableStreamDefaultControllerWrite(controller, chunk, chunkSize) { const { stream, } = controller[kState]; - if (!writableStreamCloseQueuedOrInFlight(stream) && - stream[kState].state === 'writable') { + const streamState = stream[kState]; + if (!streamState.closeQueuedOrInFlight && + streamState.state === 'writable') { writableStreamUpdateBackpressure( stream, writableStreamDefaultControllerGetBackpressure(controller)); @@ -1107,12 +1113,13 @@ function writableStreamDefaultControllerProcessWrite(controller, chunk) { // subsequent write instead of allocating two fresh closures per chunk. controller[kState].writeFulfilled = () => { writableStreamFinishInFlightWrite(stream); + const streamState = stream[kState]; const { state, - } = stream[kState]; + } = streamState; assert(state === 'writable' || state === 'erroring'); dequeueValue(controller); - if (!writableStreamCloseQueuedOrInFlight(stream) && + if (!streamState.closeQueuedOrInFlight && state === 'writable') { writableStreamUpdateBackpressure( stream,