Запропонувати правкуПокращити цю статтюДопрацюйте відповідь до «Що таке streams у Node.js?». Ваші зміни проходять модерацію перед публікацією.Потрібне підтвердженняКонтентЩо ви змінюєте🇺🇸EN🇺🇦UAПереглядЗаголовок (UA)Коротка відповідь (UA)**Streams (потоки)** у Node.js - це об'єкти, які читають або записують дані частинами, не завантажуючи все у пам'ять одразу. ```js // Файл 4GB, пам'ять ~1-2MB fs.createReadStream('video.mp4').pipe(res); ``` **Ключове:** `pipe()` з'єднує потоки і автоматично обробляє backpressure. У продакшені обирай `pipeline()` (Node 10+) - він закриває всі потоки при будь-якій помилці в ланцюжку.Показується над повною відповіддю для швидкого нагадування.Відповідь (UA)Зображення**Streams (потоки)** у Node.js - це об'єкти, які читають або записують дані частинами, не завантажуючи весь обсяг у пам'ять перед початком роботи. ## Теорія ### TL;DR - Аналогія: потік як конвеєр на заводі. Кожен шматок проходить через нього по черзі і ніколи не накопичується в одному місці. - `fs.readFileSync('4gb.mp4')` завантажує всі 4GB у RAM. `fs.createReadStream()` читає по 64KB, обробляє і відкидає, потім бере наступні 64KB. - Чотири типи: Readable (джерело), Writable (приймач), Duplex (обидва напрямки), Transform (змінює дані в процесі передачі). - `pipe()` з'єднує потоки. У продакшені краще `pipeline()` - він автоматично закриває всі потоки при помилці. - Правило вибору: файл більше 50MB або будь-яке real-time джерело - потоки. Менше 1MB без конкурентних запитів - синхронні методи цілком підходять. ### Швидкий приклад ```js // Без потоків - весь файл у пам'яті const data = fs.readFileSync('4gb-video.mp4'); // Падає, якщо файл > RAM res.end(data); // З потоками - по 64KB, пам'ять ~1-2MB незалежно від розміру fs.createReadStream('4gb-video.mp4').pipe(res); // Читає 64KB, відправляє, читає наступні 64KB ``` Другий варіант читає один шматок, відправляє його і бере наступний. Файл може бути 50GB - рівень пам'яті не змінюється. ### Як працює backpressure Коли Readable stream видає дані швидше, ніж Writable встигає їх приймати, Node.js автоматично ставить джерело на паузу. Це і є backpressure (зворотний тиск). Без нього дані накопичувались би в буфері до краху процесу. `pipe()` обробляє все це сам. Якщо підключати потоки вручну через `.on('data')`, backpressure треба реалізовувати самостійно - і більшість коду, який це робить, робить неправильно. Одного разу я дебажив крах у продакшені: Readable stream писав у повільний TCP-сокет, backpressure не оброблявся, буфер ріс до OOM. Після того перестав писати `.on('data')` взагалі і перейшов на `pipe()` або `pipeline()` скрізь. ### Чотири типи потоків | Тип | Напрямок | Приклади | |-----|----------|---------| | Readable | Дані виходять | `fs.createReadStream()`, `http.IncomingMessage` | | Writable | Дані входять | `fs.createWriteStream()`, `http.ServerResponse` | | Duplex | Обидва напрямки | `net.Socket` | | Transform | Читає, змінює, виводить | `zlib.createGzip()`, `crypto.createCipheriv()` | Transform потоки найкорисніші на практиці. Вони стоять посередині ланцюжка і змінюють кожен шматок даних на льоту. ### Коли використовувати потоки - Великі файли (50MB+): запобігають збоям через брак пам'яті - HTTP-запити та відповіді: `req` в Express - це вже Readable stream - Real-time дані: WebSocket, курсори бази даних, stdout дочірнього процесу - Трансформація на льоту: стиснення, шифрування, парсинг під час читання - Передача між джерелами: файл до файлу, мережа до файлу, база даних до HTTP-відповіді Для конфігів і маленького JSON до 1MB `readFileSync` простіший і достатньо швидкий. Налаштовувати потік там не варто. ### Як це працює всередині Node.js використовує libuv для I/O. Коли створюється Readable stream, libuv читає дані з диска шматками. Розмір шматка за замовчуванням - 64KB, налаштовується через `highWaterMark`. Кожен шматок генерує подію `'data'`. Якщо приймач не встигає, внутрішній буфер заповнюється і потік сам викликає `pause()`. Коли приймач наздоганяє і звільняє буфер - `resume()`. Це повний цикл backpressure. ### Типові помилки **Помилка 1: Вважати `pipe()` синхронним** ```js // Неправильно - "Готово!" з'явиться до завершення передачі fs.createReadStream('file.txt') .pipe(fs.createWriteStream('output.txt')); console.log('Готово!'); // Виконується одразу // Правильно - слухати подію finish fs.createReadStream('file.txt') .pipe(fs.createWriteStream('output.txt')) .on('finish', () => console.log('Готово!')); ``` `pipe()` повертається одразу. Сама передача даних відбувається асинхронно. **Помилка 2: Не обробляти помилки потоків** ```js // Неправильно - помилки зникають без будь-якого сигналу fs.createReadStream('file.txt') .pipe(fs.createWriteStream('output.txt')); // Правильно - pipeline закриває всі потоки при помилці const { pipeline } = require('stream'); pipeline( fs.createReadStream('file.txt'), fs.createWriteStream('output.txt'), (err) => { if (err) console.error('Помилка:', err); else console.log('Готово'); } ); ``` Якщо файл зник посеред передачі або диск заповнився, спрацьовує подія `error`. Без обробника отримаєш unhandled error. `pipeline()` ще й закриває всі потоки в ланцюжку автоматично. **Помилка 3: Занадто великий `highWaterMark`** ```js // Неправильно - накопичує 10MB до паузи const readable = fs.createReadStream('file.txt', { highWaterMark: 10 * 1024 * 1024 }); // Правильно - 64KB за замовчуванням підходить у більшості випадків const readable = fs.createReadStream('file.txt'); // Виняток - для повільних мережевих з'єднань можна збільшити const readable = fs.createReadStream('file.txt', { highWaterMark: 256 * 1024 // 256KB }); ``` `highWaterMark` - це поріг буфера, після якого спрацьовує backpressure. Значення 10MB означає, що Node.js накопичує 10MB перед паузою. Це зводить нанівець переваги потоків. **Помилка 4: Два `pipe()` на один Readable stream** ```js // Неправильно - другий pipe не отримає нічого const readable = fs.createReadStream('file.txt'); readable.pipe(fs.createWriteStream('copy1.txt')); readable.pipe(fs.createWriteStream('copy2.txt')); // Порожньо // Правильно - два окремих потоки читання fs.createReadStream('file.txt').pipe(fs.createWriteStream('copy1.txt')); fs.createReadStream('file.txt').pipe(fs.createWriteStream('copy2.txt')); ``` Readable stream можна прочитати тільки один раз. Після першого `pipe()` дані вже передані. **Помилка 5: Передавати об'єкти без `objectMode`** ```js // Неправильно - об'єкт перетвориться на "[object Object]" const transform = new Transform({ transform(chunk, encoding, callback) { callback(null, { processed: true }); } }); // Правильно const transform = new Transform({ objectMode: true, transform(chunk, encoding, callback) { callback(null, { processed: true }); // Передається як є } }); ``` За замовчуванням потоки працюють з Buffer і рядками. Для JavaScript-об'єктів потрібен `objectMode: true`. ### Де зустрічається у реальному коді - Express: `res` - це Writable stream; файли стримяться через `fs.createReadStream().pipe(res)` - `zlib`: `createGzip()` стискає дані на льоту під час передачі - Драйвери баз даних: Mongoose `.cursor()`, MongoDB `.find().stream()` для великих вибірок - `csv-parser`: читає CSV рядок за рядком, генеруючи один об'єкт на рядок - `child_process`: `child.stdout` - Readable stream, `child.stdin` - Writable - HTTP/2 і WebSocket використовують потоки всередині ### Питання на співбесіді **Q:** У чому різниця між `pipe()` і `pipeline()`? **A:** `pipe()` з'єднує потоки, але не закриває їх при помилці. Інші потоки в ланцюжку продовжують працювати і витікають у пам'ять. `pipeline()` (Node.js 10+) автоматично закриває всі потоки при будь-якому збої. У продакшені використовуй `pipeline()`. **Q:** Як дізнатись, чи спрацьовує backpressure? **A:** `writable.write(chunk)` повертає `false`, коли внутрішній буфер повний. При `true` - приймач готовий до нових даних. Саме це значення перевіряє `pipe()` на кожному записі. **Q:** Навіщо існує `fs.readFileSync()`, якщо потоки краще справляються з великими файлами? **A:** Для файлів до 1MB, де потрібен вміст одразу і немає конкурентних запитів, синхронний метод простіший. Немає подій, немає колбеків - просто значення. При малих розмірах overhead від налаштування потоку не виправдовується. **Q:** (Senior) Як реалізувати backpressure у кастомному Transform stream, який викликає async API для кожного шматка? **A:** Викликати `callback()` тільки після того, як async-виклик завершився. Черга потоку заповнюється природно, якщо API повільний, і backpressure спрацьовує автоматично. Структура: `async transform(chunk, encoding, callback) { const result = await apiCall(chunk); callback(null, result); }`. Головне - не викликати `callback` до завершення асинхронної роботи, інакше черга не заповниться і backpressure не спрацює. ## Приклади ### Стримінг великого файлу як HTTP-відповідь ```js const http = require('http'); const fs = require('fs'); const server = http.createServer((req, res) => { // Файл 2GB - пам'ять залишається ~1-2MB fs.createReadStream('./large-video.mp4') .on('error', (err) => { res.writeHead(500); res.end('Файл не знайдено'); }) .pipe(res); }); server.listen(3000); ``` Без потоку `fs.readFile()` завантажив би весь файл у пам'ять перед тим, як відправити перший байт. З потоком перший шматок доходить до клієнта за мілісекунди. ### Обробка великого CSV через Transform stream ```js const fs = require('fs'); const { Transform } = require('stream'); const csv = require('csv-parser'); // 500MB CSV без завантаження в пам'ять fs.createReadStream('users-500mb.csv') .pipe(csv()) .pipe(new Transform({ objectMode: true, transform(row, encoding, callback) { // Змінюємо кожен рядок по черзі row.email = row.email.toUpperCase(); callback(null, JSON.stringify(row) + '\n'); } })) .pipe(fs.createWriteStream('users-processed.txt')) .on('finish', () => console.log('Готово')); // Пам'ять ~5-10MB протягом усього процесу ``` Transform stream отримує один розпарсений рядок, змінює його і передає далі. Файл ніколи не опиняється в пам'яті повністю. ### pipeline з обробкою помилок ```js const { pipeline } = require('stream'); const fs = require('fs'); const zlib = require('zlib'); // Стиснення файлу з автоматичним очищенням при збої pipeline( fs.createReadStream('access.log'), zlib.createGzip(), fs.createWriteStream('access.log.gz'), (err) => { if (err) { console.error('Стиснення не вдалося:', err); } else { console.log('Стиснено успішно'); } } ); ``` Якщо будь-який потік у ланцюжку дав збій (диск заповнився, файл зник, мережа впала), `pipeline()` закриває всі потоки і передає помилку в колбек. З `pipe()` потрібно було б вручну додавати обробник `error` до кожного потоку окремо.Для рев’юераПримітка для модератора (необов’язково)Бачить лише модератор. Прискорює рев’ю.