Skip to main content

Що таке streams у Node.js?

Streams (потоки) у Node.js - це об'єкти, які читають або записують дані частинами, не завантажуючи весь обсяг у пам'ять перед початком роботи.

Теорія

TL;DR

  • Аналогія: потік як конвеєр на заводі. Кожен шматок проходить через нього по черзі і ніколи не накопичується в одному місці.
  • fs.readFileSync('4gb.mp4') завантажує всі 4GB у RAM. fs.createReadStream() читає по 64KB, обробляє і відкидає, потім бере наступні 64KB.
  • Чотири типи: Readable (джерело), Writable (приймач), Duplex (обидва напрямки), Transform (змінює дані в процесі передачі).
  • pipe() з'єднує потоки. У продакшені краще pipeline() - він автоматично закриває всі потоки при помилці.
  • Правило вибору: файл більше 50MB або будь-яке real-time джерело - потоки. Менше 1MB без конкурентних запитів - синхронні методи цілком підходять.

Швидкий приклад

js
// Без потоків - весь файл у пам'яті const data = fs.readFileSync('4gb-video.mp4'); // Падає, якщо файл > RAM res.end(data); // З потоками - по 64KB, пам'ять ~1-2MB незалежно від розміру fs.createReadStream('4gb-video.mp4').pipe(res); // Читає 64KB, відправляє, читає наступні 64KB

Другий варіант читає один шматок, відправляє його і бере наступний. Файл може бути 50GB - рівень пам'яті не змінюється.

Як працює backpressure

Коли Readable stream видає дані швидше, ніж Writable встигає їх приймати, Node.js автоматично ставить джерело на паузу. Це і є backpressure (зворотний тиск). Без нього дані накопичувались би в буфері до краху процесу. pipe() обробляє все це сам. Якщо підключати потоки вручну через .on('data'), backpressure треба реалізовувати самостійно - і більшість коду, який це робить, робить неправильно.

Одного разу я дебажив крах у продакшені: Readable stream писав у повільний TCP-сокет, backpressure не оброблявся, буфер ріс до OOM. Після того перестав писати .on('data') взагалі і перейшов на pipe() або pipeline() скрізь.

Чотири типи потоків

ТипНапрямокПриклади
ReadableДані виходятьfs.createReadStream(), http.IncomingMessage
WritableДані входятьfs.createWriteStream(), http.ServerResponse
DuplexОбидва напрямкиnet.Socket
TransformЧитає, змінює, виводитьzlib.createGzip(), crypto.createCipheriv()

Transform потоки найкорисніші на практиці. Вони стоять посередині ланцюжка і змінюють кожен шматок даних на льоту.

Коли використовувати потоки

  • Великі файли (50MB+): запобігають збоям через брак пам'яті
  • HTTP-запити та відповіді: req в Express - це вже Readable stream
  • Real-time дані: WebSocket, курсори бази даних, stdout дочірнього процесу
  • Трансформація на льоту: стиснення, шифрування, парсинг під час читання
  • Передача між джерелами: файл до файлу, мережа до файлу, база даних до HTTP-відповіді

Для конфігів і маленького JSON до 1MB readFileSync простіший і достатньо швидкий. Налаштовувати потік там не варто.

Як це працює всередині

Node.js використовує libuv для I/O. Коли створюється Readable stream, libuv читає дані з диска шматками. Розмір шматка за замовчуванням - 64KB, налаштовується через highWaterMark. Кожен шматок генерує подію 'data'. Якщо приймач не встигає, внутрішній буфер заповнюється і потік сам викликає pause(). Коли приймач наздоганяє і звільняє буфер - resume(). Це повний цикл backpressure.

Типові помилки

Помилка 1: Вважати pipe() синхронним

js
// Неправильно - "Готово!" з'явиться до завершення передачі fs.createReadStream('file.txt') .pipe(fs.createWriteStream('output.txt')); console.log('Готово!'); // Виконується одразу // Правильно - слухати подію finish fs.createReadStream('file.txt') .pipe(fs.createWriteStream('output.txt')) .on('finish', () => console.log('Готово!'));

pipe() повертається одразу. Сама передача даних відбувається асинхронно.

Помилка 2: Не обробляти помилки потоків

js
// Неправильно - помилки зникають без будь-якого сигналу fs.createReadStream('file.txt') .pipe(fs.createWriteStream('output.txt')); // Правильно - pipeline закриває всі потоки при помилці const { pipeline } = require('stream'); pipeline( fs.createReadStream('file.txt'), fs.createWriteStream('output.txt'), (err) => { if (err) console.error('Помилка:', err); else console.log('Готово'); } );

Якщо файл зник посеред передачі або диск заповнився, спрацьовує подія error. Без обробника отримаєш unhandled error. pipeline() ще й закриває всі потоки в ланцюжку автоматично.

Помилка 3: Занадто великий highWaterMark

js
// Неправильно - накопичує 10MB до паузи const readable = fs.createReadStream('file.txt', { highWaterMark: 10 * 1024 * 1024 }); // Правильно - 64KB за замовчуванням підходить у більшості випадків const readable = fs.createReadStream('file.txt'); // Виняток - для повільних мережевих з'єднань можна збільшити const readable = fs.createReadStream('file.txt', { highWaterMark: 256 * 1024 // 256KB });

highWaterMark - це поріг буфера, після якого спрацьовує backpressure. Значення 10MB означає, що Node.js накопичує 10MB перед паузою. Це зводить нанівець переваги потоків.

Помилка 4: Два pipe() на один Readable stream

js
// Неправильно - другий pipe не отримає нічого const readable = fs.createReadStream('file.txt'); readable.pipe(fs.createWriteStream('copy1.txt')); readable.pipe(fs.createWriteStream('copy2.txt')); // Порожньо // Правильно - два окремих потоки читання fs.createReadStream('file.txt').pipe(fs.createWriteStream('copy1.txt')); fs.createReadStream('file.txt').pipe(fs.createWriteStream('copy2.txt'));

Readable stream можна прочитати тільки один раз. Після першого pipe() дані вже передані.

Помилка 5: Передавати об'єкти без objectMode

js
// Неправильно - об'єкт перетвориться на "[object Object]" const transform = new Transform({ transform(chunk, encoding, callback) { callback(null, { processed: true }); } }); // Правильно const transform = new Transform({ objectMode: true, transform(chunk, encoding, callback) { callback(null, { processed: true }); // Передається як є } });

За замовчуванням потоки працюють з Buffer і рядками. Для JavaScript-об'єктів потрібен objectMode: true.

Де зустрічається у реальному коді

  • Express: res - це Writable stream; файли стримяться через fs.createReadStream().pipe(res)
  • zlib: createGzip() стискає дані на льоту під час передачі
  • Драйвери баз даних: Mongoose .cursor(), MongoDB .find().stream() для великих вибірок
  • csv-parser: читає CSV рядок за рядком, генеруючи один об'єкт на рядок
  • child_process: child.stdout - Readable stream, child.stdin - Writable
  • HTTP/2 і WebSocket використовують потоки всередині

Питання на співбесіді

Q: У чому різниця між pipe() і pipeline()?
A: pipe() з'єднує потоки, але не закриває їх при помилці. Інші потоки в ланцюжку продовжують працювати і витікають у пам'ять. pipeline() (Node.js 10+) автоматично закриває всі потоки при будь-якому збої. У продакшені використовуй pipeline().

Q: Як дізнатись, чи спрацьовує backpressure?
A: writable.write(chunk) повертає false, коли внутрішній буфер повний. При true - приймач готовий до нових даних. Саме це значення перевіряє pipe() на кожному записі.

Q: Навіщо існує fs.readFileSync(), якщо потоки краще справляються з великими файлами?
A: Для файлів до 1MB, де потрібен вміст одразу і немає конкурентних запитів, синхронний метод простіший. Немає подій, немає колбеків - просто значення. При малих розмірах overhead від налаштування потоку не виправдовується.

Q: (Senior) Як реалізувати backpressure у кастомному Transform stream, який викликає async API для кожного шматка?
A: Викликати callback() тільки після того, як async-виклик завершився. Черга потоку заповнюється природно, якщо API повільний, і backpressure спрацьовує автоматично. Структура: async transform(chunk, encoding, callback) { const result = await apiCall(chunk); callback(null, result); }. Головне - не викликати callback до завершення асинхронної роботи, інакше черга не заповниться і backpressure не спрацює.

Приклади

Стримінг великого файлу як HTTP-відповідь

js
const http = require('http'); const fs = require('fs'); const server = http.createServer((req, res) => { // Файл 2GB - пам'ять залишається ~1-2MB fs.createReadStream('./large-video.mp4') .on('error', (err) => { res.writeHead(500); res.end('Файл не знайдено'); }) .pipe(res); }); server.listen(3000);

Без потоку fs.readFile() завантажив би весь файл у пам'ять перед тим, як відправити перший байт. З потоком перший шматок доходить до клієнта за мілісекунди.

Обробка великого CSV через Transform stream

js
const fs = require('fs'); const { Transform } = require('stream'); const csv = require('csv-parser'); // 500MB CSV без завантаження в пам'ять fs.createReadStream('users-500mb.csv') .pipe(csv()) .pipe(new Transform({ objectMode: true, transform(row, encoding, callback) { // Змінюємо кожен рядок по черзі row.email = row.email.toUpperCase(); callback(null, JSON.stringify(row) + '\n'); } })) .pipe(fs.createWriteStream('users-processed.txt')) .on('finish', () => console.log('Готово')); // Пам'ять ~5-10MB протягом усього процесу

Transform stream отримує один розпарсений рядок, змінює його і передає далі. Файл ніколи не опиняється в пам'яті повністю.

pipeline з обробкою помилок

js
const { pipeline } = require('stream'); const fs = require('fs'); const zlib = require('zlib'); // Стиснення файлу з автоматичним очищенням при збої pipeline( fs.createReadStream('access.log'), zlib.createGzip(), fs.createWriteStream('access.log.gz'), (err) => { if (err) { console.error('Стиснення не вдалося:', err); } else { console.log('Стиснено успішно'); } } );

Якщо будь-який потік у ланцюжку дав збій (диск заповнився, файл зник, мережа впала), pipeline() закриває всі потоки і передає помилку в колбек. З pipe() потрібно було б вручну додавати обробник error до кожного потоку окремо.

Коротка відповідь

Для співбесіди
Premium

Коротка відповідь допоможе вам впевнено відповідати на цю тему під час співбесіди.

Дочитали статтю?