How to implement queue processing with Bull in NestJS?
Queue Processing in NestJS with Bull
Queues handle background jobs — tasks that are too slow or resource-intensive for the request-response cycle: sending emails, processing images, generating reports, etc.
NestJS integrates with Bull (backed by Redis) through @nestjs/bull.
Setup
bash
npm install @nestjs/bull bull
npm install --save-dev @types/bulltypescript
// app.module.ts
import { BullModule } from '@nestjs/bull';
@Module({
imports: [
BullModule.forRoot({
redis: {
host: 'localhost',
port: 6379,
},
}),
],
})
export class AppModule {}Defining a Queue
typescript
// email.module.ts
@Module({
imports: [
BullModule.registerQueue({
name: 'email',
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: true,
},
}),
],
providers: [EmailProcessor, EmailService],
exports: [EmailService],
})
export class EmailModule {}Adding Jobs to Queue
typescript
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';
@Injectable()
export class EmailService {
constructor(@InjectQueue('email') private emailQueue: Queue) {}
async sendWelcomeEmail(userId: string, email: string) {
await this.emailQueue.add('welcome', {
userId,
email,
template: 'welcome',
});
}
async sendBulkEmails(users: { email: string; data: any }[]) {
const jobs = users.map((user) => ({
name: 'bulk',
data: user,
}));
await this.emailQueue.addBulk(jobs);
}
async scheduleEmail(email: string, data: any, delay: number) {
await this.emailQueue.add('scheduled', { email, data }, {
delay, // milliseconds
});
}
}Processing Jobs
typescript
import { Processor, Process, OnQueueActive, OnQueueCompleted, OnQueueFailed } from '@nestjs/bull';
import { Job } from 'bull';
@Processor('email')
export class EmailProcessor {
constructor(private readonly mailer: MailerService) {}
@Process('welcome')
async handleWelcome(job: Job<{ userId: string; email: string }>) {
const { email, userId } = job.data;
await this.mailer.sendMail({
to: email,
subject: 'Welcome!',
template: 'welcome',
context: { userId },
});
return { sent: true };
}
@Process('bulk')
async handleBulk(job: Job) {
await this.mailer.sendMail(job.data);
}
@OnQueueActive()
onActive(job: Job) {
console.log(`Processing job ${job.id} of type ${job.name}`);
}
@OnQueueCompleted()
onComplete(job: Job, result: any) {
console.log(`Job ${job.id} completed`, result);
}
@OnQueueFailed()
onFailed(job: Job, error: Error) {
console.error(`Job ${job.id} failed:`, error.message);
}
}Advanced: Job Options
typescript
// Delayed job
await this.queue.add('report', data, { delay: 60000 });
// Repeated job (cron)
await this.queue.add('cleanup', {}, {
repeat: { cron: '0 0 * * *' }, // Every day at midnight
});
// Priority (lower = higher priority)
await this.queue.add('urgent', data, { priority: 1 });
await this.queue.add('normal', data, { priority: 10 });
// Rate limiting
await this.queue.add('api-call', data, {
limiter: { max: 10, duration: 1000 }, // 10 per second
});Monitoring with Bull Board
bash
npm install @bull-board/express @bull-board/apitypescript
import { BullAdapter } from '@bull-board/api/bullAdapter';
import { ExpressAdapter } from '@bull-board/express';
import { createBullBoard } from '@bull-board/api';
const serverAdapter = new ExpressAdapter();
createBullBoard({
queues: [new BullAdapter(emailQueue)],
serverAdapter,
});
app.use('/admin/queues', serverAdapter.getRouter());Common Use Cases
| Use Case | Queue Name | Priority |
|---|---|---|
| Email sending | email | Normal |
| Image processing | media | Low |
| PDF generation | reports | Low |
| Notifications | notifications | High |
| Data import/export | data | Low |
| Scheduled tasks | cron | Normal |
Best practice: Use queues for anything that takes more than a few hundred milliseconds. Configure retry policies with exponential backoff. Monitor queue health in production with Bull Board or similar dashboards.
Short Answer
Interview readyPremium
A concise answer to help you respond confidently on this topic during an interview.