Skip to main content

Як реалізувати обробку черги з Bull у NestJS?

Bull - це бібліотека для обробки черг (queue processing) у NestJS на базі Redis. Вона переміщує повільні задачі (листи, ресайз зображень, генерація PDF) з HTTP-циклу у фонові воркери з персистентністю та повторними спробами.

Теорія

TL;DR

  • Bull зберігає завдання в Redis: queue.add() кладе завдання, воркер забирає і обробляє, потім позначає як виконане або провалене
  • Аналогія: квитанції на кухні ресторану. Офіціант (HTTP-хендлер) вішає замовлення на рейку (Redis), кухар (processor) бере його і готує без того, щоб стіл чекав
  • Використовуй Bull для задач довше 500мс або тих, що потребують повторних спроб: листи, обробка медіа, звіти, аналітика
  • Для швидких синхронних операцій або разових затримок без повторних спроб Bull зайвий
  • @nestjs/bull додає декоратори @Processor(), @Process(), @InjectQueue() поверх Bull

Швидкий приклад

typescript
// 1. Глобальні налаштування Redis BullModule.forRoot({ redis: { host: 'localhost', port: 6379 } }) // 2. Реєстрація черги в модулі BullModule.registerQueue({ name: 'email', defaultJobOptions: { attempts: 3, removeOnComplete: true } }) // 3. Додаємо завдання з сервісу await this.emailQueue.add('welcome', { to: 'user@example.com', subject: 'Ласкаво просимо' }); // 4. Обробляємо в окремому класі @Processor('email') export class EmailProcessor { @Process('welcome') async handle(job: Job<{ to: string; subject: string }>) { await this.mailer.send(job.data); // HTTP вже відповів до цього моменту } }

HTTP-хендлер додає завдання і повертає відповідь одразу. Processor виконує роботу коли є вільний воркер.

Як Bull інтегрується в NestJS

BullModule.forRoot() налаштовує Redis один раз для всього застосунку. BullModule.registerQueue() створює іменовану чергу і реєструє її як провайдер. @InjectQueue('name') дає доступ до черги в будь-якому сервісі. @Processor('name') позначає клас як консьюмер цієї черги.

Всередині Bull використовує Lua-скрипти Redis для атомарних операцій. queue.add() кладе дані завдання в Redis list з метаданими: кількість спроб, затримка, налаштування backoff. Воркери опитують чергу через BRPOPLPUSH - blocking команду Redis, яка чекає нових елементів без активного spinning. Коли воркер підхоплює завдання, воно переходить в "active" set. Після обробки - в "completed" або "failed". Redis зберігає все навіть якщо Node-процес впаде під час роботи.

Бачив команди, які пропускали removeOnComplete: true, і через кілька тижнів Redis забивався гігабайтами виконаних завдань. Краще ставити це в defaultJobOptions з першого дня.

Налаштування

bash
npm install @nestjs/bull bull npm install --save-dev @types/bull
typescript
// app.module.ts @Module({ imports: [ BullModule.forRoot({ redis: { host: 'localhost', port: 6379 }, }), ], }) export class AppModule {} // email.module.ts @Module({ imports: [ BullModule.registerQueue({ name: 'email', defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 2000 }, removeOnComplete: true, }, }), ], providers: [EmailProcessor, EmailService], exports: [EmailService], }) export class EmailModule {}

Обидва виклики обов'язкові. forRoot() без registerQueue() у фічовому модулі - processor не запуститься і @InjectQueue впаде в рантаймі.

Додавання та обробка завдань

typescript
// email.service.ts @Injectable() export class EmailService { constructor(@InjectQueue('email') private emailQueue: Queue) {} async sendWelcome(email: string) { await this.emailQueue.add('welcome', { email }); } async scheduleDigest(email: string) { await this.emailQueue.add('digest', { email }, { delay: 86400000 }); // 24год } async sendBulk(users: { email: string }[]) { const jobs = users.map((u) => ({ name: 'bulk', data: u })); await this.emailQueue.addBulk(jobs); } } // email.processor.ts @Processor('email') export class EmailProcessor { constructor(private readonly mailer: MailerService) {} @Process('welcome') async handleWelcome(job: Job<{ email: string }>) { await this.mailer.sendMail({ to: job.data.email, subject: 'Ласкаво просимо!' }); return { sent: true }; } @OnQueueFailed() onFailed(job: Job, error: Error) { console.error(`Завдання ${job.id} провалилось після ${job.attemptsMade} спроб:`, error.message); } }

@OnQueueFailed() - місце для алертів і dead-letter логіки. Не покладайся тільки на автоматичні повторні спроби Bull у продакшені.

Розширені опції завдань

typescript
// Cron-стиль: щодня опівночі await this.queue.add('daily-cleanup', {}, { repeat: { cron: '0 0 * * *' }, }); // Пріоритет: менше число - вища черговість await this.queue.add('urgent-notification', data, { priority: 1 }); await this.queue.add('batch-export', data, { priority: 10 }); // Rate limiting для зовнішніх API await this.queue.add('api-sync', data, { limiter: { max: 10, duration: 1000 }, // 10 завдань на секунду });

Пріоритети важливі коли в одній черзі змішані термінові і пакетні задачі. Сповіщення не повинно чекати за масовим розсиланням на 10 тисяч адресатів.

Типові помилки

Забутий registerQueue() у фічовому модулі. BullModule.forRoot() тільки підключає Redis. Кожна черга потребує свого registerQueue() у модулі, що її використовує. Без цього @InjectQueue падає в рантаймі, а @Processor не стартує.

typescript
// Неправильно: тільки forRoot в AppModule, нічого в EmailModule // Правильно: @Module({ imports: [BullModule.registerQueue({ name: 'email' })] }) export class EmailModule {}

Відсутній removeOnComplete у продакшені. Виконані завдання залишаються в Redis назавжди. При навантаженні це заповнює пам'ять за кілька тижнів. Встановлюй removeOnComplete: true у defaultJobOptions.

Велика кількість attempts без backoff. Воркери повторюють спроби без затримки, що перевантажує Redis при постійних збоях.

typescript
// Неправильно { attempts: 50 } // Правильно { attempts: 5, backoff: { type: 'exponential', delay: 5000 } }

CPU-важка синхронна робота всередині processor. Processor працює в event loop Node.js. Блокуюча генерація PDF або важкий image resize зупиняє обробку інших завдань. Для таких задач використовуй concurrency: 1 або виноси в дочірній процес.

Моніторинг у продакшені

Bull Board - UI для всіх черг: провалені завдання, кількість спроб, глибина черги:

bash
npm install @bull-board/express @bull-board/api
typescript
import { BullAdapter } from '@bull-board/api/bullAdapter'; import { ExpressAdapter } from '@bull-board/express'; import { createBullBoard } from '@bull-board/api'; const serverAdapter = new ExpressAdapter(); createBullBoard({ queues: [new BullAdapter(emailQueue)], serverAdapter, }); app.use('/admin/queues', serverAdapter.getRouter());

Для метрик bullmq-prometheus експортує глибину черги і частоту збоїв у Prometheus. Алерти на зависання завдань понад 5 хвилин або частоту збоїв вище 1%.

Коли використовувати Bull і коли ні

СценарійІнструмент
Задача довше 500мс або потрібні повторні спробиBull
Розподілені воркери на кількох інстансахBull
Вже є MongoDB, Redis не потрібенAgenda.js
Проста затримка без персистентності і повторних спробsetTimeout
Потокова обробка подій з великою пропускною здатністюKafka
Відповідь має бути синхронноюЧерга не потрібна

Де зустрічається в реальних проектах

  • Онбординг користувачів: ресайз аватара + вітальний лист + аналітика, все в черзі після реєстрації
  • E-commerce: генерація інвойсу і оновлення складу після оформлення замовлення
  • SaaS-дашборди: планові звіти через cron-чергу
  • Мікросервіси: Kafka-подія прийшла, Bull обробляє повільний запис у БД асинхронно

Follow-up питання

Q: Як уникнути подвійної обробки якщо воркер впав посеред завдання?
A: Bull переводить активні завдання в стан "stalled" якщо воркер падає. При перепідключенні вони автоматично повертаються в чергу. Для ідемпотентності перевіряй прапор у БД перед обробкою: if (await this.repo.isProcessed(job.data.id)) return;.

Q: Розкажи про модель конкурентності Bull.
A: @Process({ name: 'send', concurrency: 5 }) запускає до 5 завдань паралельно на одному воркер-процесі. Для горизонтального масштабування запускай кілька NestJS-інстансів - кожен окремо опитує одну Redis-чергу.

Q: Що відбувається якщо Redis впаде під час обробки?
A: Воркери зупиняють polling. Завдання в "active" залишаються там до відновлення Redis. Після підключення Bull продовжує з персистентного стану. Завдання не втрачаються якщо Redis налаштований з RDB або AOF.

Q: Яка різниця між Bull і BullMQ?
A: BullMQ - новіша бібліотека зі швидшими Lua-скриптами, підтримкою батьківсько-дочірніх потоків (flows) і кращими TypeScript-типами. @nestjs/bullmq обгортає її. Для нових NestJS-проектів краще брати @nestjs/bullmq. Старий @nestjs/bull підтримується але нових фіч не отримує.

Q: Як масштабувати до 1 мільйона завдань на день?
A: Шардуй черги за типом: окремо для листів, зображень, звітів. Запускай кілька воркер-інстансів для кожної черги. Redis Cluster для надійності. Auto-scale воркер-поди на основі глибини черги по кожному шарду, і призначай конкретні черги конкретним воркерам щоб уникнути змагання за ресурси.

Приклади

Базовий: email-черга від початку до кінця

Повне налаштування: реєстрація модуля, сервіс, processor.

typescript
// app.module.ts import { BullModule } from '@nestjs/bull'; import { EmailModule } from './email/email.module'; @Module({ imports: [ BullModule.forRoot({ redis: { host: 'localhost', port: 6379 } }), EmailModule, ], }) export class AppModule {} // email.module.ts @Module({ imports: [ BullModule.registerQueue({ name: 'email', defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 2000 }, removeOnComplete: true, }, }), ], providers: [EmailProcessor, EmailService], exports: [EmailService], }) export class EmailModule {} // email.service.ts @Injectable() export class EmailService { constructor(@InjectQueue('email') private emailQueue: Queue) {} async sendWelcome(email: string) { await this.emailQueue.add('welcome', { email }); } } // email.processor.ts @Processor('email') export class EmailProcessor { @Process('welcome') async handleWelcome(job: Job<{ email: string }>) { console.log(`Надсилаємо лист на ${job.data.email}`); return { sent: true }; } }

Контролер викликає emailService.sendWelcome() і відповідає користувачу за мілісекунди. Processor обробляє лист у фоні. При збої Bull повторить до 3 разів із затримками 2с, 4с, 8с.

Середній: онбординг з кількома типами завдань

Реєстрація запускає три фонові задачі паралельно: ресайз аватара, вітальний лист, аналітика.

typescript
// user.service.ts @Injectable() export class UserService { constructor( @InjectQueue('onboarding') private onboardingQueue: Queue, private userRepo: UserRepository, ) {} async createUser(dto: CreateUserDto) { const user = await this.userRepo.save(dto); await Promise.all([ this.onboardingQueue.add('resize-avatar', { userId: user.id }), this.onboardingQueue.add('send-welcome', { userId: user.id, email: user.email }), this.onboardingQueue.add('track-signup', { userId: user.id }), ]); return user; } } // onboarding.processor.ts @Processor('onboarding') export class OnboardingProcessor { @Process('resize-avatar') async resizeAvatar(job: Job<{ userId: string }>) { console.log(`Аватар оброблено для користувача ${job.data.userId}`); } @Process('send-welcome') async sendWelcome(job: Job<{ userId: string; email: string }>) { await this.mailer.send({ to: job.data.email, subject: 'Ласкаво просимо!' }); } @Process('track-signup') async trackSignup(job: Job<{ userId: string }>) { await this.analytics.track('signup', job.data); } @OnQueueFailed() onFailed(job: Job, err: Error) { console.error(`Крок онбордингу ${job.name} провалився:`, err.message); } }

Кожне завдання незалежне. Якщо send-welcome провалиться, воно ретраїться без блокування track-signup. Відповідь API повертається ще до початку будь-якого з трьох завдань.

Розширений: дедублікований відкладений звіт з відстеженням прогресу

Щоденна генерація звіту з concurrency: 1, дедублікацією і відстеженням прогресу.

typescript
// reports.service.ts @Injectable() export class ReportsService { constructor(@InjectQueue('reports') private reportsQueue: Queue) {} async scheduleDaily(reportId: string) { await this.reportsQueue.add( 'generate-daily', { reportId }, { delay: 86400000, // Запустити через 24год attempts: 3, backoff: { type: 'exponential', delay: 5000 }, jobId: `daily-${reportId}`, // Той самий jobId = один екземпляр }, ); } } // reports.processor.ts @Processor('reports') export class ReportProcessor { @Process({ name: 'generate-daily', concurrency: 1 }) // По одному async generate(job: Job<{ reportId: string }>) { await job.progress(10); const pdf = await this.pdfService.generate(job.data.reportId); await job.progress(80); await this.storage.upload(pdf, `report-${job.data.reportId}.pdf`); await job.progress(100); return { path: `report-${job.data.reportId}.pdf` }; } @OnQueueCompleted() async onComplete(job: Job, result: { path: string }) { await this.notifyQueue.add('email-report', { reportId: job.data.reportId, path: result.path, }); } }

concurrency: 1 запобігає стрибкам пам'яті від паралельної генерації PDF на одному воркері. Опція jobId дедублікує: виклик scheduleDaily() двічі з тим самим reportId додасть лише одне завдання. job.progress() показує відсоток виконання в Bull Board.

Коротка відповідь

Для співбесіди
Premium

Коротка відповідь допоможе вам впевнено відповідати на цю тему під час співбесіди.

Дочитали статтю?