Як реалізувати обробку черги з Bull у NestJS?
Обробка черг у NestJS за допомогою Bull
Черги обробляють фонові завдання — задачі, які є занадто повільними або ресурсомісткими для циклу запит-відповідь: надсилання електронних листів, обробка зображень, генерація звітів тощо.
NestJS інтегрується з Bull (на базі Redis) через @nestjs/bull.
Налаштування
bash
npm install @nestjs/bull bull
npm install --save-dev @types/bulltypescript
// app.module.ts
import { BullModule } from '@nestjs/bull';
@Module({
imports: [
BullModule.forRoot({
redis: {
host: 'localhost',
port: 6379,
},
}),
],
})
export class AppModule {}Визначення черги
typescript
// email.module.ts
@Module({
imports: [
BullModule.registerQueue({
name: 'email',
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: true,
},
}),
],
providers: [EmailProcessor, EmailService],
exports: [EmailService],
})
export class EmailModule {}Додавання завдань до черги
typescript
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';
@Injectable()
export class EmailService {
constructor(@InjectQueue('email') private emailQueue: Queue) {}
async sendWelcomeEmail(userId: string, email: string) {
await this.emailQueue.add('welcome', {
userId,
email,
template: 'welcome',
});
}
async sendBulkEmails(users: { email: string; data: any }[]) {
const jobs = users.map((user) => ({
name: 'bulk',
data: user,
}));
await this.emailQueue.addBulk(jobs);
}
async scheduleEmail(email: string, data: any, delay: number) {
await this.emailQueue.add('scheduled', { email, data }, {
delay, // мілісекунди
});
}
}Обробка завдань
typescript
import { Processor, Process, OnQueueActive, OnQueueCompleted, OnQueueFailed } from '@nestjs/bull';
import { Job } from 'bull';
@Processor('email')
export class EmailProcessor {
constructor(private readonly mailer: MailerService) {}
@Process('welcome')
async handleWelcome(job: Job<{ userId: string; email: string }>) {
const { email, userId } = job.data;
await this.mailer.sendMail({
to: email,
subject: 'Ласкаво просимо!',
template: 'welcome',
context: { userId },
});
return { sent: true };
}
@Process('bulk')
async handleBulk(job: Job) {
await this.mailer.sendMail(job.data);
}
@OnQueueActive()
onActive(job: Job) {
console.log(`Обробка завдання ${job.id} типу ${job.name}`);
}
@OnQueueCompleted()
onComplete(job: Job, result: any) {
console.log(`Завдання ${job.id} завершено`, result);
}
@OnQueueFailed()
onFailed(job: Job, error: Error) {
console.error(`Завдання ${job.id} не вдалося:`, error.message);
}
}Розширене: Опції завдань
typescript
// Завдання з затримкою
await this.queue.add('report', data, { delay: 60000 });
// Повторюване завдання (cron)
await this.queue.add('cleanup', {}, {
repeat: { cron: '0 0 * * *' }, // Щодня опівночі
});
// Пріоритет (менше = вищий пріоритет)
await this.queue.add('urgent', data, { priority: 1 });
await this.queue.add('normal', data, { priority: 10 });
// Обмеження швидкості
await this.queue.add('api-call', data, {
limiter: { max: 10, duration: 1000 }, // 10 за секунду
});Моніторинг за допомогою Bull Board
bash
npm install @bull-board/express @bull-board/apitypescript
import { BullAdapter } from '@bull-board/api/bullAdapter';
import { ExpressAdapter } from '@bull-board/express';
import { createBullBoard } from '@bull-board/api';
const serverAdapter = new ExpressAdapter();
createBullBoard({
queues: [new BullAdapter(emailQueue)],
serverAdapter,
});
app.use('/admin/queues', serverAdapter.getRouter());Загальні випадки використання
| Випадок використання | Назва черги | Пріоритет |
|---|---|---|
| Надсилання електронних листів | email | Нормальний |
| Обробка зображень | media | Низький |
| Генерація PDF | reports | Низький |
| Сповіщення | notifications | Високий |
| Імпорт/експорт даних | data | Низький |
| Заплановані завдання | cron | Нормальний |
Найкраща практика: Використовуйте черги для всього, що займає більше кількох сотень мілісекунд. Налаштуйте політики повторних спроб з експоненційною затримкою. Моніторте стан черг у виробництві за допомогою Bull Board або подібних панелей моніторингу.
Коротка відповідь
Для співбесідиPremium
Коротка відповідь допоможе вам впевнено відповідати на цю тему під час співбесіди.