Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion lib/internal/fs/streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const kIsPerformingIO = Symbol('kIsPerformingIO');

const kFs = Symbol('kFs');
const kHandle = Symbol('kHandle');
const kHandleCleanup = Symbol('kHandleCleanup');

function _construct(callback) {
const stream = this;
Expand Down Expand Up @@ -152,14 +153,31 @@ function importFd(stream, options) {
stream[kHandle] = options.fd;
stream[kFs] = FileHandleOperations(stream[kHandle]);
stream[kHandle][kRef]();
options.fd.on('close', FunctionPrototypeBind(stream.close, stream));
if (options.autoClose === false) {
stream[kHandleCleanup] = FunctionPrototypeBind(cleanupFileHandleRef, stream);
stream.once('end', stream[kHandleCleanup]);
stream.once('finish', stream[kHandleCleanup]);
stream.once('error', stream[kHandleCleanup]);
}
return options.fd.fd;
}

throw new ERR_INVALID_ARG_TYPE('options.fd',
['number', 'FileHandle'], options.fd);
}

function cleanupFileHandleRef() {
if (this[kHandleCleanup] === undefined) {
return;
}

this[kHandle][kUnref]();
this.removeListener('end', this[kHandleCleanup]);
this.removeListener('finish', this[kHandleCleanup]);
this.removeListener('error', this[kHandleCleanup]);
this[kHandleCleanup] = undefined;
}

function ReadStream(path, options) {
if (!(this instanceof ReadStream))
return new ReadStream(path, options);
Expand Down
80 changes: 80 additions & 0 deletions test/parallel/test-fs-promises-file-handle-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,87 @@ async function validateRead() {
);
}

async function validateReadStreamReleasesFileHandleCloseListener() {
const filePathForHandle = path.resolve(tmpDir, 'tmp-read-listener.txt');
const buf = Buffer.from('Hello world', 'utf8');

fs.writeFileSync(filePathForHandle, buf);

const fileHandle = await open(filePathForHandle);

for (let i = 0; i < buf.length; i++) {
await buffer(fileHandle.createReadStream({
start: i,
end: i,
autoClose: false,
}));

assert.strictEqual(fileHandle.listenerCount('close'), 0);
}

await fileHandle.close();
}

async function validateWriteStreamReleasesFileHandleCloseListener() {
const filePathForHandle = path.resolve(tmpDir, 'tmp-write-listener.txt');
const buf = Buffer.from('Hello world', 'utf8');

const fileHandle = await open(filePathForHandle, 'w');

for (let i = 0; i < buf.length; i++) {
const stream = fileHandle.createWriteStream({
start: i,
autoClose: false,
});
stream.end(buf.subarray(i, i + 1));
await finished(stream);

assert.strictEqual(fileHandle.listenerCount('close'), 0);
}

await fileHandle.close();
assert.deepStrictEqual(fs.readFileSync(filePathForHandle), buf);
}

async function validateReadStreamAutoCloseClosesFileHandle() {
const filePathForHandle = path.resolve(tmpDir, 'tmp-read-auto-close.txt');
const buf = Buffer.from('Hello world', 'utf8');

fs.writeFileSync(filePathForHandle, buf);

const fileHandle = await open(filePathForHandle);
const closed = new Promise((resolve) => {
fileHandle.once('close', common.mustCall(resolve));
});

assert.deepStrictEqual(await buffer(fileHandle.createReadStream()), buf);
await closed;
assert.strictEqual(fileHandle.listenerCount('close'), 0);
}

async function validateWriteStreamAutoCloseClosesFileHandle() {
const filePathForHandle = path.resolve(tmpDir, 'tmp-write-auto-close.txt');
const buf = Buffer.from('Hello world', 'utf8');

const fileHandle = await open(filePathForHandle, 'w');
const closed = new Promise((resolve) => {
fileHandle.once('close', common.mustCall(resolve));
});
const stream = fileHandle.createWriteStream();

stream.end(buf);
await finished(stream);
await closed;

assert.strictEqual(fileHandle.listenerCount('close'), 0);
assert.deepStrictEqual(fs.readFileSync(filePathForHandle), buf);
}

Promise.all([
validateWrite(),
validateRead(),
validateReadStreamReleasesFileHandleCloseListener(),
validateWriteStreamReleasesFileHandleCloseListener(),
validateReadStreamAutoCloseClosesFileHandle(),
validateWriteStreamAutoCloseClosesFileHandle(),
]).then(common.mustCall());