Що таке streams у Node.js?
Streams (потоки) у Node.js - це об'єкти, які читають або записують дані частинами, не завантажуючи весь обсяг у пам'ять перед початком роботи.
Теорія
TL;DR
- Аналогія: потік як конвеєр на заводі. Кожен шматок проходить через нього по черзі і ніколи не накопичується в одному місці.
fs.readFileSync('4gb.mp4')завантажує всі 4GB у RAM.fs.createReadStream()читає по 64KB, обробляє і відкидає, потім бере наступні 64KB.- Чотири типи: Readable (джерело), Writable (приймач), Duplex (обидва напрямки), Transform (змінює дані в процесі передачі).
pipe()з'єднує потоки. У продакшені кращеpipeline()- він автоматично закриває всі потоки при помилці.- Правило вибору: файл більше 50MB або будь-яке real-time джерело - потоки. Менше 1MB без конкурентних запитів - синхронні методи цілком підходять.
Швидкий приклад
// Без потоків - весь файл у пам'яті
const data = fs.readFileSync('4gb-video.mp4'); // Падає, якщо файл > RAM
res.end(data);
// З потоками - по 64KB, пам'ять ~1-2MB незалежно від розміру
fs.createReadStream('4gb-video.mp4').pipe(res);
// Читає 64KB, відправляє, читає наступні 64KBДругий варіант читає один шматок, відправляє його і бере наступний. Файл може бути 50GB - рівень пам'яті не змінюється.
Як працює backpressure
Коли Readable stream видає дані швидше, ніж Writable встигає їх приймати, Node.js автоматично ставить джерело на паузу. Це і є backpressure (зворотний тиск). Без нього дані накопичувались би в буфері до краху процесу. pipe() обробляє все це сам. Якщо підключати потоки вручну через .on('data'), backpressure треба реалізовувати самостійно - і більшість коду, який це робить, робить неправильно.
Одного разу я дебажив крах у продакшені: Readable stream писав у повільний TCP-сокет, backpressure не оброблявся, буфер ріс до OOM. Після того перестав писати .on('data') взагалі і перейшов на pipe() або pipeline() скрізь.
Чотири типи потоків
| Тип | Напрямок | Приклади |
|---|---|---|
| Readable | Дані виходять | fs.createReadStream(), http.IncomingMessage |
| Writable | Дані входять | fs.createWriteStream(), http.ServerResponse |
| Duplex | Обидва напрямки | net.Socket |
| Transform | Читає, змінює, виводить | zlib.createGzip(), crypto.createCipheriv() |
Transform потоки найкорисніші на практиці. Вони стоять посередині ланцюжка і змінюють кожен шматок даних на льоту.
Коли використовувати потоки
- Великі файли (50MB+): запобігають збоям через брак пам'яті
- HTTP-запити та відповіді:
reqв Express - це вже Readable stream - Real-time дані: WebSocket, курсори бази даних, stdout дочірнього процесу
- Трансформація на льоту: стиснення, шифрування, парсинг під час читання
- Передача між джерелами: файл до файлу, мережа до файлу, база даних до HTTP-відповіді
Для конфігів і маленького JSON до 1MB readFileSync простіший і достатньо швидкий. Налаштовувати потік там не варто.
Як це працює всередині
Node.js використовує libuv для I/O. Коли створюється Readable stream, libuv читає дані з диска шматками. Розмір шматка за замовчуванням - 64KB, налаштовується через highWaterMark. Кожен шматок генерує подію 'data'. Якщо приймач не встигає, внутрішній буфер заповнюється і потік сам викликає pause(). Коли приймач наздоганяє і звільняє буфер - resume(). Це повний цикл backpressure.
Типові помилки
Помилка 1: Вважати pipe() синхронним
// Неправильно - "Готово!" з'явиться до завершення передачі
fs.createReadStream('file.txt')
.pipe(fs.createWriteStream('output.txt'));
console.log('Готово!'); // Виконується одразу
// Правильно - слухати подію finish
fs.createReadStream('file.txt')
.pipe(fs.createWriteStream('output.txt'))
.on('finish', () => console.log('Готово!'));pipe() повертається одразу. Сама передача даних відбувається асинхронно.
Помилка 2: Не обробляти помилки потоків
// Неправильно - помилки зникають без будь-якого сигналу
fs.createReadStream('file.txt')
.pipe(fs.createWriteStream('output.txt'));
// Правильно - pipeline закриває всі потоки при помилці
const { pipeline } = require('stream');
pipeline(
fs.createReadStream('file.txt'),
fs.createWriteStream('output.txt'),
(err) => {
if (err) console.error('Помилка:', err);
else console.log('Готово');
}
);Якщо файл зник посеред передачі або диск заповнився, спрацьовує подія error. Без обробника отримаєш unhandled error. pipeline() ще й закриває всі потоки в ланцюжку автоматично.
Помилка 3: Занадто великий highWaterMark
// Неправильно - накопичує 10MB до паузи
const readable = fs.createReadStream('file.txt', {
highWaterMark: 10 * 1024 * 1024
});
// Правильно - 64KB за замовчуванням підходить у більшості випадків
const readable = fs.createReadStream('file.txt');
// Виняток - для повільних мережевих з'єднань можна збільшити
const readable = fs.createReadStream('file.txt', {
highWaterMark: 256 * 1024 // 256KB
});highWaterMark - це поріг буфера, після якого спрацьовує backpressure. Значення 10MB означає, що Node.js накопичує 10MB перед паузою. Це зводить нанівець переваги потоків.
Помилка 4: Два pipe() на один Readable stream
// Неправильно - другий pipe не отримає нічого
const readable = fs.createReadStream('file.txt');
readable.pipe(fs.createWriteStream('copy1.txt'));
readable.pipe(fs.createWriteStream('copy2.txt')); // Порожньо
// Правильно - два окремих потоки читання
fs.createReadStream('file.txt').pipe(fs.createWriteStream('copy1.txt'));
fs.createReadStream('file.txt').pipe(fs.createWriteStream('copy2.txt'));Readable stream можна прочитати тільки один раз. Після першого pipe() дані вже передані.
Помилка 5: Передавати об'єкти без objectMode
// Неправильно - об'єкт перетвориться на "[object Object]"
const transform = new Transform({
transform(chunk, encoding, callback) {
callback(null, { processed: true });
}
});
// Правильно
const transform = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
callback(null, { processed: true }); // Передається як є
}
});За замовчуванням потоки працюють з Buffer і рядками. Для JavaScript-об'єктів потрібен objectMode: true.
Де зустрічається у реальному коді
- Express:
res- це Writable stream; файли стримяться черезfs.createReadStream().pipe(res) zlib:createGzip()стискає дані на льоту під час передачі- Драйвери баз даних: Mongoose
.cursor(), MongoDB.find().stream()для великих вибірок csv-parser: читає CSV рядок за рядком, генеруючи один об'єкт на рядокchild_process:child.stdout- Readable stream,child.stdin- Writable- HTTP/2 і WebSocket використовують потоки всередині
Питання на співбесіді
Q: У чому різниця між pipe() і pipeline()?
A: pipe() з'єднує потоки, але не закриває їх при помилці. Інші потоки в ланцюжку продовжують працювати і витікають у пам'ять. pipeline() (Node.js 10+) автоматично закриває всі потоки при будь-якому збої. У продакшені використовуй pipeline().
Q: Як дізнатись, чи спрацьовує backpressure?
A: writable.write(chunk) повертає false, коли внутрішній буфер повний. При true - приймач готовий до нових даних. Саме це значення перевіряє pipe() на кожному записі.
Q: Навіщо існує fs.readFileSync(), якщо потоки краще справляються з великими файлами?
A: Для файлів до 1MB, де потрібен вміст одразу і немає конкурентних запитів, синхронний метод простіший. Немає подій, немає колбеків - просто значення. При малих розмірах overhead від налаштування потоку не виправдовується.
Q: (Senior) Як реалізувати backpressure у кастомному Transform stream, який викликає async API для кожного шматка?
A: Викликати callback() тільки після того, як async-виклик завершився. Черга потоку заповнюється природно, якщо API повільний, і backpressure спрацьовує автоматично. Структура: async transform(chunk, encoding, callback) { const result = await apiCall(chunk); callback(null, result); }. Головне - не викликати callback до завершення асинхронної роботи, інакше черга не заповниться і backpressure не спрацює.
Приклади
Стримінг великого файлу як HTTP-відповідь
const http = require('http');
const fs = require('fs');
const server = http.createServer((req, res) => {
// Файл 2GB - пам'ять залишається ~1-2MB
fs.createReadStream('./large-video.mp4')
.on('error', (err) => {
res.writeHead(500);
res.end('Файл не знайдено');
})
.pipe(res);
});
server.listen(3000);Без потоку fs.readFile() завантажив би весь файл у пам'ять перед тим, як відправити перший байт. З потоком перший шматок доходить до клієнта за мілісекунди.
Обробка великого CSV через Transform stream
const fs = require('fs');
const { Transform } = require('stream');
const csv = require('csv-parser');
// 500MB CSV без завантаження в пам'ять
fs.createReadStream('users-500mb.csv')
.pipe(csv())
.pipe(new Transform({
objectMode: true,
transform(row, encoding, callback) {
// Змінюємо кожен рядок по черзі
row.email = row.email.toUpperCase();
callback(null, JSON.stringify(row) + '\n');
}
}))
.pipe(fs.createWriteStream('users-processed.txt'))
.on('finish', () => console.log('Готово'));
// Пам'ять ~5-10MB протягом усього процесуTransform stream отримує один розпарсений рядок, змінює його і передає далі. Файл ніколи не опиняється в пам'яті повністю.
pipeline з обробкою помилок
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
// Стиснення файлу з автоматичним очищенням при збої
pipeline(
fs.createReadStream('access.log'),
zlib.createGzip(),
fs.createWriteStream('access.log.gz'),
(err) => {
if (err) {
console.error('Стиснення не вдалося:', err);
} else {
console.log('Стиснено успішно');
}
}
);Якщо будь-який потік у ланцюжку дав збій (диск заповнився, файл зник, мережа впала), pipeline() закриває всі потоки і передає помилку в колбек. З pipe() потрібно було б вручну додавати обробник error до кожного потоку окремо.
Коротка відповідь
Для співбесідиКоротка відповідь допоможе вам впевнено відповідати на цю тему під час співбесіди.