Skip to main content
Practice Problems

How to implement queue processing with Bull in NestJS?

Queue Processing in NestJS with Bull

Queues handle background jobs — tasks that are too slow or resource-intensive for the request-response cycle: sending emails, processing images, generating reports, etc.

NestJS integrates with Bull (backed by Redis) through @nestjs/bull.


Setup

bash
npm install @nestjs/bull bull npm install --save-dev @types/bull
typescript
// app.module.ts import { BullModule } from '@nestjs/bull'; @Module({ imports: [ BullModule.forRoot({ redis: { host: 'localhost', port: 6379, }, }), ], }) export class AppModule {}

Defining a Queue

typescript
// email.module.ts @Module({ imports: [ BullModule.registerQueue({ name: 'email', defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 2000 }, removeOnComplete: true, }, }), ], providers: [EmailProcessor, EmailService], exports: [EmailService], }) export class EmailModule {}

Adding Jobs to Queue

typescript
import { InjectQueue } from '@nestjs/bull'; import { Queue } from 'bull'; @Injectable() export class EmailService { constructor(@InjectQueue('email') private emailQueue: Queue) {} async sendWelcomeEmail(userId: string, email: string) { await this.emailQueue.add('welcome', { userId, email, template: 'welcome', }); } async sendBulkEmails(users: { email: string; data: any }[]) { const jobs = users.map((user) => ({ name: 'bulk', data: user, })); await this.emailQueue.addBulk(jobs); } async scheduleEmail(email: string, data: any, delay: number) { await this.emailQueue.add('scheduled', { email, data }, { delay, // milliseconds }); } }

Processing Jobs

typescript
import { Processor, Process, OnQueueActive, OnQueueCompleted, OnQueueFailed } from '@nestjs/bull'; import { Job } from 'bull'; @Processor('email') export class EmailProcessor { constructor(private readonly mailer: MailerService) {} @Process('welcome') async handleWelcome(job: Job<{ userId: string; email: string }>) { const { email, userId } = job.data; await this.mailer.sendMail({ to: email, subject: 'Welcome!', template: 'welcome', context: { userId }, }); return { sent: true }; } @Process('bulk') async handleBulk(job: Job) { await this.mailer.sendMail(job.data); } @OnQueueActive() onActive(job: Job) { console.log(`Processing job ${job.id} of type ${job.name}`); } @OnQueueCompleted() onComplete(job: Job, result: any) { console.log(`Job ${job.id} completed`, result); } @OnQueueFailed() onFailed(job: Job, error: Error) { console.error(`Job ${job.id} failed:`, error.message); } }

Advanced: Job Options

typescript
// Delayed job await this.queue.add('report', data, { delay: 60000 }); // Repeated job (cron) await this.queue.add('cleanup', {}, { repeat: { cron: '0 0 * * *' }, // Every day at midnight }); // Priority (lower = higher priority) await this.queue.add('urgent', data, { priority: 1 }); await this.queue.add('normal', data, { priority: 10 }); // Rate limiting await this.queue.add('api-call', data, { limiter: { max: 10, duration: 1000 }, // 10 per second });

Monitoring with Bull Board

bash
npm install @bull-board/express @bull-board/api
typescript
import { BullAdapter } from '@bull-board/api/bullAdapter'; import { ExpressAdapter } from '@bull-board/express'; import { createBullBoard } from '@bull-board/api'; const serverAdapter = new ExpressAdapter(); createBullBoard({ queues: [new BullAdapter(emailQueue)], serverAdapter, }); app.use('/admin/queues', serverAdapter.getRouter());

Common Use Cases

Use CaseQueue NamePriority
Email sendingemailNormal
Image processingmediaLow
PDF generationreportsLow
NotificationsnotificationsHigh
Data import/exportdataLow
Scheduled taskscronNormal

Best practice: Use queues for anything that takes more than a few hundred milliseconds. Configure retry policies with exponential backoff. Monitor queue health in production with Bull Board or similar dashboards.

Short Answer

Interview ready
Premium

A concise answer to help you respond confidently on this topic during an interview.

Finished reading?
Practice Problems