Запропонувати правкуПокращити цю статтюДопрацюйте відповідь до «Як реалізувати обробку черги з Bull у NestJS?». Ваші зміни проходять модерацію перед публікацією.Потрібне підтвердженняКонтентЩо ви змінюєте🇺🇸EN🇺🇦UAПереглядЗаголовок (UA)Коротка відповідь (UA)**Bull** - це бібліотека для обробки черг (queue processing) у NestJS на базі Redis. Вона запускає повільні задачі поза HTTP-циклом з підтримкою повторних спроб і персистентності. Встанови `@nestjs/bull bull`, налаштуй `BullModule.forRoot()` з Redis, потім `BullModule.registerQueue()` у кожному модулі. ```typescript // Додаємо завдання await this.emailQueue.add('welcome', { to: 'user@example.com' }); // Обробляємо @Processor('email') export class EmailProcessor { @Process('welcome') async handle(job: Job<{ to: string }>) { /* send mail */ } } ``` **Головне:** `forRoot()` підключає Redis глобально; `registerQueue()` потрібен у кожному модулі, який використовує чергу.Показується над повною відповіддю для швидкого нагадування.Відповідь (UA)Зображення**Bull** - це бібліотека для обробки черг (queue processing) у NestJS на базі Redis. Вона переміщує повільні задачі (листи, ресайз зображень, генерація PDF) з HTTP-циклу у фонові воркери з персистентністю та повторними спробами. ## Теорія ### TL;DR - Bull зберігає завдання в Redis: `queue.add()` кладе завдання, воркер забирає і обробляє, потім позначає як виконане або провалене - Аналогія: квитанції на кухні ресторану. Офіціант (HTTP-хендлер) вішає замовлення на рейку (Redis), кухар (processor) бере його і готує без того, щоб стіл чекав - Використовуй Bull для задач довше 500мс або тих, що потребують повторних спроб: листи, обробка медіа, звіти, аналітика - Для швидких синхронних операцій або разових затримок без повторних спроб Bull зайвий - `@nestjs/bull` додає декоратори `@Processor()`, `@Process()`, `@InjectQueue()` поверх Bull ### Швидкий приклад ```typescript // 1. Глобальні налаштування Redis BullModule.forRoot({ redis: { host: 'localhost', port: 6379 } }) // 2. Реєстрація черги в модулі BullModule.registerQueue({ name: 'email', defaultJobOptions: { attempts: 3, removeOnComplete: true } }) // 3. Додаємо завдання з сервісу await this.emailQueue.add('welcome', { to: 'user@example.com', subject: 'Ласкаво просимо' }); // 4. Обробляємо в окремому класі @Processor('email') export class EmailProcessor { @Process('welcome') async handle(job: Job<{ to: string; subject: string }>) { await this.mailer.send(job.data); // HTTP вже відповів до цього моменту } } ``` HTTP-хендлер додає завдання і повертає відповідь одразу. Processor виконує роботу коли є вільний воркер. ### Як Bull інтегрується в NestJS `BullModule.forRoot()` налаштовує Redis один раз для всього застосунку. `BullModule.registerQueue()` створює іменовану чергу і реєструє її як провайдер. `@InjectQueue('name')` дає доступ до черги в будь-якому сервісі. `@Processor('name')` позначає клас як консьюмер цієї черги. Всередині Bull використовує Lua-скрипти Redis для атомарних операцій. `queue.add()` кладе дані завдання в Redis list з метаданими: кількість спроб, затримка, налаштування backoff. Воркери опитують чергу через `BRPOPLPUSH` - blocking команду Redis, яка чекає нових елементів без активного spinning. Коли воркер підхоплює завдання, воно переходить в "active" set. Після обробки - в "completed" або "failed". Redis зберігає все навіть якщо Node-процес впаде під час роботи. Бачив команди, які пропускали `removeOnComplete: true`, і через кілька тижнів Redis забивався гігабайтами виконаних завдань. Краще ставити це в `defaultJobOptions` з першого дня. ### Налаштування ```bash npm install @nestjs/bull bull npm install --save-dev @types/bull ``` ```typescript // app.module.ts @Module({ imports: [ BullModule.forRoot({ redis: { host: 'localhost', port: 6379 }, }), ], }) export class AppModule {} // email.module.ts @Module({ imports: [ BullModule.registerQueue({ name: 'email', defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 2000 }, removeOnComplete: true, }, }), ], providers: [EmailProcessor, EmailService], exports: [EmailService], }) export class EmailModule {} ``` Обидва виклики обов'язкові. `forRoot()` без `registerQueue()` у фічовому модулі - processor не запуститься і `@InjectQueue` впаде в рантаймі. ### Додавання та обробка завдань ```typescript // email.service.ts @Injectable() export class EmailService { constructor(@InjectQueue('email') private emailQueue: Queue) {} async sendWelcome(email: string) { await this.emailQueue.add('welcome', { email }); } async scheduleDigest(email: string) { await this.emailQueue.add('digest', { email }, { delay: 86400000 }); // 24год } async sendBulk(users: { email: string }[]) { const jobs = users.map((u) => ({ name: 'bulk', data: u })); await this.emailQueue.addBulk(jobs); } } // email.processor.ts @Processor('email') export class EmailProcessor { constructor(private readonly mailer: MailerService) {} @Process('welcome') async handleWelcome(job: Job<{ email: string }>) { await this.mailer.sendMail({ to: job.data.email, subject: 'Ласкаво просимо!' }); return { sent: true }; } @OnQueueFailed() onFailed(job: Job, error: Error) { console.error(`Завдання ${job.id} провалилось після ${job.attemptsMade} спроб:`, error.message); } } ``` `@OnQueueFailed()` - місце для алертів і dead-letter логіки. Не покладайся тільки на автоматичні повторні спроби Bull у продакшені. ### Розширені опції завдань ```typescript // Cron-стиль: щодня опівночі await this.queue.add('daily-cleanup', {}, { repeat: { cron: '0 0 * * *' }, }); // Пріоритет: менше число - вища черговість await this.queue.add('urgent-notification', data, { priority: 1 }); await this.queue.add('batch-export', data, { priority: 10 }); // Rate limiting для зовнішніх API await this.queue.add('api-sync', data, { limiter: { max: 10, duration: 1000 }, // 10 завдань на секунду }); ``` Пріоритети важливі коли в одній черзі змішані термінові і пакетні задачі. Сповіщення не повинно чекати за масовим розсиланням на 10 тисяч адресатів. ### Типові помилки **Забутий `registerQueue()` у фічовому модулі.** `BullModule.forRoot()` тільки підключає Redis. Кожна черга потребує свого `registerQueue()` у модулі, що її використовує. Без цього `@InjectQueue` падає в рантаймі, а `@Processor` не стартує. ```typescript // Неправильно: тільки forRoot в AppModule, нічого в EmailModule // Правильно: @Module({ imports: [BullModule.registerQueue({ name: 'email' })] }) export class EmailModule {} ``` **Відсутній `removeOnComplete` у продакшені.** Виконані завдання залишаються в Redis назавжди. При навантаженні це заповнює пам'ять за кілька тижнів. Встановлюй `removeOnComplete: true` у `defaultJobOptions`. **Велика кількість `attempts` без `backoff`.** Воркери повторюють спроби без затримки, що перевантажує Redis при постійних збоях. ```typescript // Неправильно { attempts: 50 } // Правильно { attempts: 5, backoff: { type: 'exponential', delay: 5000 } } ``` **CPU-важка синхронна робота всередині processor.** Processor працює в event loop Node.js. Блокуюча генерація PDF або важкий image resize зупиняє обробку інших завдань. Для таких задач використовуй `concurrency: 1` або виноси в дочірній процес. ### Моніторинг у продакшені Bull Board - UI для всіх черг: провалені завдання, кількість спроб, глибина черги: ```bash npm install @bull-board/express @bull-board/api ``` ```typescript import { BullAdapter } from '@bull-board/api/bullAdapter'; import { ExpressAdapter } from '@bull-board/express'; import { createBullBoard } from '@bull-board/api'; const serverAdapter = new ExpressAdapter(); createBullBoard({ queues: [new BullAdapter(emailQueue)], serverAdapter, }); app.use('/admin/queues', serverAdapter.getRouter()); ``` Для метрик `bullmq-prometheus` експортує глибину черги і частоту збоїв у Prometheus. Алерти на зависання завдань понад 5 хвилин або частоту збоїв вище 1%. ### Коли використовувати Bull і коли ні | Сценарій | Інструмент | |---|---| | Задача довше 500мс або потрібні повторні спроби | Bull | | Розподілені воркери на кількох інстансах | Bull | | Вже є MongoDB, Redis не потрібен | Agenda.js | | Проста затримка без персистентності і повторних спроб | `setTimeout` | | Потокова обробка подій з великою пропускною здатністю | Kafka | | Відповідь має бути синхронною | Черга не потрібна | ### Де зустрічається в реальних проектах - Онбординг користувачів: ресайз аватара + вітальний лист + аналітика, все в черзі після реєстрації - E-commerce: генерація інвойсу і оновлення складу після оформлення замовлення - SaaS-дашборди: планові звіти через cron-чергу - Мікросервіси: Kafka-подія прийшла, Bull обробляє повільний запис у БД асинхронно ### Follow-up питання **Q:** Як уникнути подвійної обробки якщо воркер впав посеред завдання? **A:** Bull переводить активні завдання в стан "stalled" якщо воркер падає. При перепідключенні вони автоматично повертаються в чергу. Для ідемпотентності перевіряй прапор у БД перед обробкою: `if (await this.repo.isProcessed(job.data.id)) return;`. **Q:** Розкажи про модель конкурентності Bull. **A:** `@Process({ name: 'send', concurrency: 5 })` запускає до 5 завдань паралельно на одному воркер-процесі. Для горизонтального масштабування запускай кілька NestJS-інстансів - кожен окремо опитує одну Redis-чергу. **Q:** Що відбувається якщо Redis впаде під час обробки? **A:** Воркери зупиняють polling. Завдання в "active" залишаються там до відновлення Redis. Після підключення Bull продовжує з персистентного стану. Завдання не втрачаються якщо Redis налаштований з RDB або AOF. **Q:** Яка різниця між Bull і BullMQ? **A:** BullMQ - новіша бібліотека зі швидшими Lua-скриптами, підтримкою батьківсько-дочірніх потоків (flows) і кращими TypeScript-типами. `@nestjs/bullmq` обгортає її. Для нових NestJS-проектів краще брати `@nestjs/bullmq`. Старий `@nestjs/bull` підтримується але нових фіч не отримує. **Q:** Як масштабувати до 1 мільйона завдань на день? **A:** Шардуй черги за типом: окремо для листів, зображень, звітів. Запускай кілька воркер-інстансів для кожної черги. Redis Cluster для надійності. Auto-scale воркер-поди на основі глибини черги по кожному шарду, і призначай конкретні черги конкретним воркерам щоб уникнути змагання за ресурси. ## Приклади ### Базовий: email-черга від початку до кінця Повне налаштування: реєстрація модуля, сервіс, processor. ```typescript // app.module.ts import { BullModule } from '@nestjs/bull'; import { EmailModule } from './email/email.module'; @Module({ imports: [ BullModule.forRoot({ redis: { host: 'localhost', port: 6379 } }), EmailModule, ], }) export class AppModule {} // email.module.ts @Module({ imports: [ BullModule.registerQueue({ name: 'email', defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 2000 }, removeOnComplete: true, }, }), ], providers: [EmailProcessor, EmailService], exports: [EmailService], }) export class EmailModule {} // email.service.ts @Injectable() export class EmailService { constructor(@InjectQueue('email') private emailQueue: Queue) {} async sendWelcome(email: string) { await this.emailQueue.add('welcome', { email }); } } // email.processor.ts @Processor('email') export class EmailProcessor { @Process('welcome') async handleWelcome(job: Job<{ email: string }>) { console.log(`Надсилаємо лист на ${job.data.email}`); return { sent: true }; } } ``` Контролер викликає `emailService.sendWelcome()` і відповідає користувачу за мілісекунди. Processor обробляє лист у фоні. При збої Bull повторить до 3 разів із затримками 2с, 4с, 8с. ### Середній: онбординг з кількома типами завдань Реєстрація запускає три фонові задачі паралельно: ресайз аватара, вітальний лист, аналітика. ```typescript // user.service.ts @Injectable() export class UserService { constructor( @InjectQueue('onboarding') private onboardingQueue: Queue, private userRepo: UserRepository, ) {} async createUser(dto: CreateUserDto) { const user = await this.userRepo.save(dto); await Promise.all([ this.onboardingQueue.add('resize-avatar', { userId: user.id }), this.onboardingQueue.add('send-welcome', { userId: user.id, email: user.email }), this.onboardingQueue.add('track-signup', { userId: user.id }), ]); return user; } } // onboarding.processor.ts @Processor('onboarding') export class OnboardingProcessor { @Process('resize-avatar') async resizeAvatar(job: Job<{ userId: string }>) { console.log(`Аватар оброблено для користувача ${job.data.userId}`); } @Process('send-welcome') async sendWelcome(job: Job<{ userId: string; email: string }>) { await this.mailer.send({ to: job.data.email, subject: 'Ласкаво просимо!' }); } @Process('track-signup') async trackSignup(job: Job<{ userId: string }>) { await this.analytics.track('signup', job.data); } @OnQueueFailed() onFailed(job: Job, err: Error) { console.error(`Крок онбордингу ${job.name} провалився:`, err.message); } } ``` Кожне завдання незалежне. Якщо `send-welcome` провалиться, воно ретраїться без блокування `track-signup`. Відповідь API повертається ще до початку будь-якого з трьох завдань. ### Розширений: дедублікований відкладений звіт з відстеженням прогресу Щоденна генерація звіту з `concurrency: 1`, дедублікацією і відстеженням прогресу. ```typescript // reports.service.ts @Injectable() export class ReportsService { constructor(@InjectQueue('reports') private reportsQueue: Queue) {} async scheduleDaily(reportId: string) { await this.reportsQueue.add( 'generate-daily', { reportId }, { delay: 86400000, // Запустити через 24год attempts: 3, backoff: { type: 'exponential', delay: 5000 }, jobId: `daily-${reportId}`, // Той самий jobId = один екземпляр }, ); } } // reports.processor.ts @Processor('reports') export class ReportProcessor { @Process({ name: 'generate-daily', concurrency: 1 }) // По одному async generate(job: Job<{ reportId: string }>) { await job.progress(10); const pdf = await this.pdfService.generate(job.data.reportId); await job.progress(80); await this.storage.upload(pdf, `report-${job.data.reportId}.pdf`); await job.progress(100); return { path: `report-${job.data.reportId}.pdf` }; } @OnQueueCompleted() async onComplete(job: Job, result: { path: string }) { await this.notifyQueue.add('email-report', { reportId: job.data.reportId, path: result.path, }); } } ``` `concurrency: 1` запобігає стрибкам пам'яті від паралельної генерації PDF на одному воркері. Опція `jobId` дедублікує: виклик `scheduleDaily()` двічі з тим самим `reportId` додасть лише одне завдання. `job.progress()` показує відсоток виконання в Bull Board.Для рев’юераПримітка для модератора (необов’язково)Бачить лише модератор. Прискорює рев’ю.