Як реалізувати обробку черги з Bull у NestJS?
Bull - це бібліотека для обробки черг (queue processing) у NestJS на базі Redis. Вона переміщує повільні задачі (листи, ресайз зображень, генерація PDF) з HTTP-циклу у фонові воркери з персистентністю та повторними спробами.
Теорія
TL;DR
- Bull зберігає завдання в Redis:
queue.add()кладе завдання, воркер забирає і обробляє, потім позначає як виконане або провалене - Аналогія: квитанції на кухні ресторану. Офіціант (HTTP-хендлер) вішає замовлення на рейку (Redis), кухар (processor) бере його і готує без того, щоб стіл чекав
- Використовуй Bull для задач довше 500мс або тих, що потребують повторних спроб: листи, обробка медіа, звіти, аналітика
- Для швидких синхронних операцій або разових затримок без повторних спроб Bull зайвий
@nestjs/bullдодає декоратори@Processor(),@Process(),@InjectQueue()поверх Bull
Швидкий приклад
// 1. Глобальні налаштування Redis
BullModule.forRoot({ redis: { host: 'localhost', port: 6379 } })
// 2. Реєстрація черги в модулі
BullModule.registerQueue({ name: 'email', defaultJobOptions: { attempts: 3, removeOnComplete: true } })
// 3. Додаємо завдання з сервісу
await this.emailQueue.add('welcome', { to: 'user@example.com', subject: 'Ласкаво просимо' });
// 4. Обробляємо в окремому класі
@Processor('email')
export class EmailProcessor {
@Process('welcome')
async handle(job: Job<{ to: string; subject: string }>) {
await this.mailer.send(job.data); // HTTP вже відповів до цього моменту
}
}HTTP-хендлер додає завдання і повертає відповідь одразу. Processor виконує роботу коли є вільний воркер.
Як Bull інтегрується в NestJS
BullModule.forRoot() налаштовує Redis один раз для всього застосунку. BullModule.registerQueue() створює іменовану чергу і реєструє її як провайдер. @InjectQueue('name') дає доступ до черги в будь-якому сервісі. @Processor('name') позначає клас як консьюмер цієї черги.
Всередині Bull використовує Lua-скрипти Redis для атомарних операцій. queue.add() кладе дані завдання в Redis list з метаданими: кількість спроб, затримка, налаштування backoff. Воркери опитують чергу через BRPOPLPUSH - blocking команду Redis, яка чекає нових елементів без активного spinning. Коли воркер підхоплює завдання, воно переходить в "active" set. Після обробки - в "completed" або "failed". Redis зберігає все навіть якщо Node-процес впаде під час роботи.
Бачив команди, які пропускали removeOnComplete: true, і через кілька тижнів Redis забивався гігабайтами виконаних завдань. Краще ставити це в defaultJobOptions з першого дня.
Налаштування
npm install @nestjs/bull bull
npm install --save-dev @types/bull// app.module.ts
@Module({
imports: [
BullModule.forRoot({
redis: { host: 'localhost', port: 6379 },
}),
],
})
export class AppModule {}
// email.module.ts
@Module({
imports: [
BullModule.registerQueue({
name: 'email',
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: true,
},
}),
],
providers: [EmailProcessor, EmailService],
exports: [EmailService],
})
export class EmailModule {}Обидва виклики обов'язкові. forRoot() без registerQueue() у фічовому модулі - processor не запуститься і @InjectQueue впаде в рантаймі.
Додавання та обробка завдань
// email.service.ts
@Injectable()
export class EmailService {
constructor(@InjectQueue('email') private emailQueue: Queue) {}
async sendWelcome(email: string) {
await this.emailQueue.add('welcome', { email });
}
async scheduleDigest(email: string) {
await this.emailQueue.add('digest', { email }, { delay: 86400000 }); // 24год
}
async sendBulk(users: { email: string }[]) {
const jobs = users.map((u) => ({ name: 'bulk', data: u }));
await this.emailQueue.addBulk(jobs);
}
}
// email.processor.ts
@Processor('email')
export class EmailProcessor {
constructor(private readonly mailer: MailerService) {}
@Process('welcome')
async handleWelcome(job: Job<{ email: string }>) {
await this.mailer.sendMail({ to: job.data.email, subject: 'Ласкаво просимо!' });
return { sent: true };
}
@OnQueueFailed()
onFailed(job: Job, error: Error) {
console.error(`Завдання ${job.id} провалилось після ${job.attemptsMade} спроб:`, error.message);
}
}@OnQueueFailed() - місце для алертів і dead-letter логіки. Не покладайся тільки на автоматичні повторні спроби Bull у продакшені.
Розширені опції завдань
// Cron-стиль: щодня опівночі
await this.queue.add('daily-cleanup', {}, {
repeat: { cron: '0 0 * * *' },
});
// Пріоритет: менше число - вища черговість
await this.queue.add('urgent-notification', data, { priority: 1 });
await this.queue.add('batch-export', data, { priority: 10 });
// Rate limiting для зовнішніх API
await this.queue.add('api-sync', data, {
limiter: { max: 10, duration: 1000 }, // 10 завдань на секунду
});Пріоритети важливі коли в одній черзі змішані термінові і пакетні задачі. Сповіщення не повинно чекати за масовим розсиланням на 10 тисяч адресатів.
Типові помилки
Забутий registerQueue() у фічовому модулі. BullModule.forRoot() тільки підключає Redis. Кожна черга потребує свого registerQueue() у модулі, що її використовує. Без цього @InjectQueue падає в рантаймі, а @Processor не стартує.
// Неправильно: тільки forRoot в AppModule, нічого в EmailModule
// Правильно:
@Module({ imports: [BullModule.registerQueue({ name: 'email' })] })
export class EmailModule {}Відсутній removeOnComplete у продакшені. Виконані завдання залишаються в Redis назавжди. При навантаженні це заповнює пам'ять за кілька тижнів. Встановлюй removeOnComplete: true у defaultJobOptions.
Велика кількість attempts без backoff. Воркери повторюють спроби без затримки, що перевантажує Redis при постійних збоях.
// Неправильно
{ attempts: 50 }
// Правильно
{ attempts: 5, backoff: { type: 'exponential', delay: 5000 } }CPU-важка синхронна робота всередині processor. Processor працює в event loop Node.js. Блокуюча генерація PDF або важкий image resize зупиняє обробку інших завдань. Для таких задач використовуй concurrency: 1 або виноси в дочірній процес.
Моніторинг у продакшені
Bull Board - UI для всіх черг: провалені завдання, кількість спроб, глибина черги:
npm install @bull-board/express @bull-board/apiimport { BullAdapter } from '@bull-board/api/bullAdapter';
import { ExpressAdapter } from '@bull-board/express';
import { createBullBoard } from '@bull-board/api';
const serverAdapter = new ExpressAdapter();
createBullBoard({
queues: [new BullAdapter(emailQueue)],
serverAdapter,
});
app.use('/admin/queues', serverAdapter.getRouter());Для метрик bullmq-prometheus експортує глибину черги і частоту збоїв у Prometheus. Алерти на зависання завдань понад 5 хвилин або частоту збоїв вище 1%.
Коли використовувати Bull і коли ні
| Сценарій | Інструмент |
|---|---|
| Задача довше 500мс або потрібні повторні спроби | Bull |
| Розподілені воркери на кількох інстансах | Bull |
| Вже є MongoDB, Redis не потрібен | Agenda.js |
| Проста затримка без персистентності і повторних спроб | setTimeout |
| Потокова обробка подій з великою пропускною здатністю | Kafka |
| Відповідь має бути синхронною | Черга не потрібна |
Де зустрічається в реальних проектах
- Онбординг користувачів: ресайз аватара + вітальний лист + аналітика, все в черзі після реєстрації
- E-commerce: генерація інвойсу і оновлення складу після оформлення замовлення
- SaaS-дашборди: планові звіти через cron-чергу
- Мікросервіси: Kafka-подія прийшла, Bull обробляє повільний запис у БД асинхронно
Follow-up питання
Q: Як уникнути подвійної обробки якщо воркер впав посеред завдання?
A: Bull переводить активні завдання в стан "stalled" якщо воркер падає. При перепідключенні вони автоматично повертаються в чергу. Для ідемпотентності перевіряй прапор у БД перед обробкою: if (await this.repo.isProcessed(job.data.id)) return;.
Q: Розкажи про модель конкурентності Bull.
A: @Process({ name: 'send', concurrency: 5 }) запускає до 5 завдань паралельно на одному воркер-процесі. Для горизонтального масштабування запускай кілька NestJS-інстансів - кожен окремо опитує одну Redis-чергу.
Q: Що відбувається якщо Redis впаде під час обробки?
A: Воркери зупиняють polling. Завдання в "active" залишаються там до відновлення Redis. Після підключення Bull продовжує з персистентного стану. Завдання не втрачаються якщо Redis налаштований з RDB або AOF.
Q: Яка різниця між Bull і BullMQ?
A: BullMQ - новіша бібліотека зі швидшими Lua-скриптами, підтримкою батьківсько-дочірніх потоків (flows) і кращими TypeScript-типами. @nestjs/bullmq обгортає її. Для нових NestJS-проектів краще брати @nestjs/bullmq. Старий @nestjs/bull підтримується але нових фіч не отримує.
Q: Як масштабувати до 1 мільйона завдань на день?
A: Шардуй черги за типом: окремо для листів, зображень, звітів. Запускай кілька воркер-інстансів для кожної черги. Redis Cluster для надійності. Auto-scale воркер-поди на основі глибини черги по кожному шарду, і призначай конкретні черги конкретним воркерам щоб уникнути змагання за ресурси.
Приклади
Базовий: email-черга від початку до кінця
Повне налаштування: реєстрація модуля, сервіс, processor.
// app.module.ts
import { BullModule } from '@nestjs/bull';
import { EmailModule } from './email/email.module';
@Module({
imports: [
BullModule.forRoot({ redis: { host: 'localhost', port: 6379 } }),
EmailModule,
],
})
export class AppModule {}
// email.module.ts
@Module({
imports: [
BullModule.registerQueue({
name: 'email',
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: true,
},
}),
],
providers: [EmailProcessor, EmailService],
exports: [EmailService],
})
export class EmailModule {}
// email.service.ts
@Injectable()
export class EmailService {
constructor(@InjectQueue('email') private emailQueue: Queue) {}
async sendWelcome(email: string) {
await this.emailQueue.add('welcome', { email });
}
}
// email.processor.ts
@Processor('email')
export class EmailProcessor {
@Process('welcome')
async handleWelcome(job: Job<{ email: string }>) {
console.log(`Надсилаємо лист на ${job.data.email}`);
return { sent: true };
}
}Контролер викликає emailService.sendWelcome() і відповідає користувачу за мілісекунди. Processor обробляє лист у фоні. При збої Bull повторить до 3 разів із затримками 2с, 4с, 8с.
Середній: онбординг з кількома типами завдань
Реєстрація запускає три фонові задачі паралельно: ресайз аватара, вітальний лист, аналітика.
// user.service.ts
@Injectable()
export class UserService {
constructor(
@InjectQueue('onboarding') private onboardingQueue: Queue,
private userRepo: UserRepository,
) {}
async createUser(dto: CreateUserDto) {
const user = await this.userRepo.save(dto);
await Promise.all([
this.onboardingQueue.add('resize-avatar', { userId: user.id }),
this.onboardingQueue.add('send-welcome', { userId: user.id, email: user.email }),
this.onboardingQueue.add('track-signup', { userId: user.id }),
]);
return user;
}
}
// onboarding.processor.ts
@Processor('onboarding')
export class OnboardingProcessor {
@Process('resize-avatar')
async resizeAvatar(job: Job<{ userId: string }>) {
console.log(`Аватар оброблено для користувача ${job.data.userId}`);
}
@Process('send-welcome')
async sendWelcome(job: Job<{ userId: string; email: string }>) {
await this.mailer.send({ to: job.data.email, subject: 'Ласкаво просимо!' });
}
@Process('track-signup')
async trackSignup(job: Job<{ userId: string }>) {
await this.analytics.track('signup', job.data);
}
@OnQueueFailed()
onFailed(job: Job, err: Error) {
console.error(`Крок онбордингу ${job.name} провалився:`, err.message);
}
}Кожне завдання незалежне. Якщо send-welcome провалиться, воно ретраїться без блокування track-signup. Відповідь API повертається ще до початку будь-якого з трьох завдань.
Розширений: дедублікований відкладений звіт з відстеженням прогресу
Щоденна генерація звіту з concurrency: 1, дедублікацією і відстеженням прогресу.
// reports.service.ts
@Injectable()
export class ReportsService {
constructor(@InjectQueue('reports') private reportsQueue: Queue) {}
async scheduleDaily(reportId: string) {
await this.reportsQueue.add(
'generate-daily',
{ reportId },
{
delay: 86400000, // Запустити через 24год
attempts: 3,
backoff: { type: 'exponential', delay: 5000 },
jobId: `daily-${reportId}`, // Той самий jobId = один екземпляр
},
);
}
}
// reports.processor.ts
@Processor('reports')
export class ReportProcessor {
@Process({ name: 'generate-daily', concurrency: 1 }) // По одному
async generate(job: Job<{ reportId: string }>) {
await job.progress(10);
const pdf = await this.pdfService.generate(job.data.reportId);
await job.progress(80);
await this.storage.upload(pdf, `report-${job.data.reportId}.pdf`);
await job.progress(100);
return { path: `report-${job.data.reportId}.pdf` };
}
@OnQueueCompleted()
async onComplete(job: Job, result: { path: string }) {
await this.notifyQueue.add('email-report', {
reportId: job.data.reportId,
path: result.path,
});
}
}concurrency: 1 запобігає стрибкам пам'яті від паралельної генерації PDF на одному воркері. Опція jobId дедублікує: виклик scheduleDaily() двічі з тим самим reportId додасть лише одне завдання. job.progress() показує відсоток виконання в Bull Board.
Коротка відповідь
Для співбесідиКоротка відповідь допоможе вам впевнено відповідати на цю тему під час співбесіди.