How to implement queue processing with Bull in NestJS?
Bull queue processing in NestJS moves slow tasks out of the HTTP request cycle and into Redis-backed background workers, with retries and persistence built in.
Theory
TL;DR
- Bull uses Redis as a persistent job store:
queue.add()pushes a job, a worker picks it up, processes it, marks it done or failed - Think of it like a restaurant kitchen ticket system: the waiter (HTTP handler) hangs a ticket on the rail (Redis), the cook (processor) grabs it without making the table wait
- Use Bull for tasks that take over 500ms or need reliability: emails, image resize, PDF generation, analytics pings
- Skip Bull for fast synchronous work or simple one-off delays with no retry needs
@nestjs/bullwraps the Bull Node.js library with NestJS decorators:@Processor(),@Process(),@InjectQueue()
Quick example
// 1. App-level Redis config
BullModule.forRoot({ redis: { host: 'localhost', port: 6379 } })
// 2. Queue registration in your module
BullModule.registerQueue({ name: 'email', defaultJobOptions: { attempts: 3, removeOnComplete: true } })
// 3. Add a job from any service
await this.emailQueue.add('welcome', { to: 'user@example.com', subject: 'Welcome' });
// 4. Process it
@Processor('email')
export class EmailProcessor {
@Process('welcome')
async handle(job: Job<{ to: string; subject: string }>) {
await this.mailer.send(job.data); // HTTP already responded before this runs
}
}The HTTP handler adds a job and returns immediately. The processor runs it whenever a worker is free.
How Bull fits into NestJS
@nestjs/bull maps Bull onto NestJS's module system. BullModule.forRoot() configures Redis once for the whole app. BullModule.registerQueue() creates a named queue and registers it as a provider. @InjectQueue('name') gives you the queue instance in any service. @Processor('name') marks a class as the consumer for that queue.
Internally, Bull uses Redis Lua scripts for atomic operations. When you call queue.add(), the job data lands in a Redis list with metadata: attempt count, delay, backoff config. Workers poll via BRPOPLPUSH, a blocking Redis command that waits efficiently without spinning. On pickup, the job moves to an "active" set. After processing, it goes to "completed" or "failed". Redis keeps everything durable even if the Node process crashes mid-poll.
I have seen teams skip removeOnComplete: true and end up with gigabytes of completed jobs filling Redis after a few weeks of production traffic. Set it in defaultJobOptions from day one.
Setup
npm install @nestjs/bull bull
npm install --save-dev @types/bull// app.module.ts
@Module({
imports: [
BullModule.forRoot({
redis: { host: 'localhost', port: 6379 },
}),
],
})
export class AppModule {}
// email.module.ts
@Module({
imports: [
BullModule.registerQueue({
name: 'email',
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: true,
},
}),
],
providers: [EmailProcessor, EmailService],
exports: [EmailService],
})
export class EmailModule {}Both calls are required. forRoot() without registerQueue() in the feature module means the processor never starts and @InjectQueue throws at runtime.
Adding jobs and processing them
// email.service.ts
@Injectable()
export class EmailService {
constructor(@InjectQueue('email') private emailQueue: Queue) {}
async sendWelcome(email: string) {
await this.emailQueue.add('welcome', { email });
}
async scheduleDigest(email: string) {
await this.emailQueue.add('digest', { email }, { delay: 86400000 }); // 24h
}
async sendBulk(users: { email: string }[]) {
const jobs = users.map((u) => ({ name: 'bulk', data: u }));
await this.emailQueue.addBulk(jobs);
}
}
// email.processor.ts
@Processor('email')
export class EmailProcessor {
constructor(private readonly mailer: MailerService) {}
@Process('welcome')
async handleWelcome(job: Job<{ email: string }>) {
await this.mailer.sendMail({ to: job.data.email, subject: 'Welcome!' });
return { sent: true };
}
@OnQueueFailed()
onFailed(job: Job, error: Error) {
console.error(`Job ${job.id} failed after ${job.attemptsMade} attempts:`, error.message);
}
}The @OnQueueFailed() hook is where you add alerting or dead-letter logic. Do not rely only on Bull's silent retry behavior in production.
Advanced job options
// Cron-style recurring job
await this.queue.add('daily-cleanup', {}, {
repeat: { cron: '0 0 * * *' },
});
// Priority: lower number = picked up first
await this.queue.add('urgent-notification', data, { priority: 1 });
await this.queue.add('batch-export', data, { priority: 10 });
// Rate limiting for third-party API calls
await this.queue.add('api-sync', data, {
limiter: { max: 10, duration: 1000 }, // 10 jobs per second
});Priority queues matter when you mix user-facing and batch jobs. An urgent notification should not sit behind a 10k email batch.
Common mistakes
Forgetting registerQueue() in the feature module. BullModule.forRoot() only sets up the Redis connection. Each queue needs its own registerQueue() in the module that uses it. Skip this and @InjectQueue throws at runtime.
// Wrong: only forRoot in AppModule, nothing in EmailModule
// Right:
@Module({ imports: [BullModule.registerQueue({ name: 'email' })] })
export class EmailModule {}No removeOnComplete in production. Completed jobs stay in Redis forever by default. Under load, this fills memory within weeks. Set removeOnComplete: true in defaultJobOptions.
High attempts without backoff. Workers retry with no delay, saturating Redis on permanent failures.
// Wrong
{ attempts: 50 }
// Right
{ attempts: 5, backoff: { type: 'exponential', delay: 5000 } }CPU-heavy synchronous work inside a processor. Bull processors run in the Node.js event loop. A blocking PDF generation or image resize starves other jobs. Use concurrency: 1 for heavy processors, or offload to a child process.
Monitoring in production
Bull Board provides a UI over all queues showing failed jobs, retry counts, and queue depths:
npm install @bull-board/express @bull-board/apiimport { BullAdapter } from '@bull-board/api/bullAdapter';
import { ExpressAdapter } from '@bull-board/express';
import { createBullBoard } from '@bull-board/api';
const serverAdapter = new ExpressAdapter();
createBullBoard({
queues: [new BullAdapter(emailQueue)],
serverAdapter,
});
app.use('/admin/queues', serverAdapter.getRouter());For metrics, bullmq-prometheus exports queue depth and failure rate to Prometheus. Alert when stalled jobs exceed 5 minutes or failure rate climbs above 1%.
When to use Bull vs alternatives
| Scenario | Tool |
|---|---|
| Task takes over 500ms or needs retries | Bull |
| Distributed workers across multiple instances | Bull |
| Already using MongoDB, no Redis available | Agenda.js |
| Simple delay, no persistence or retries needed | setTimeout |
| High-throughput event streaming | Kafka |
| Response must be synchronous | Skip queues |
Real-world usage
- User onboarding: avatar resize + welcome email + analytics ping, all queued on signup
- E-commerce: invoice PDF and stock update queued after order placement
- SaaS dashboards: scheduled report exports triggered via cron queue
- Microservices: Kafka event received, Bull handles the slow DB write asynchronously
Follow-up questions
Q: How do you prevent double-processing if a worker crashes mid-job?
A: Bull moves in-flight jobs to a "stalled" state when a worker dies. On reconnect, stalled jobs are re-queued automatically. For idempotency, check a flag in your DB before processing: if (await this.repo.isProcessed(job.data.id)) return;.
Q: Explain Bull's concurrency model.
A: @Process({ name: 'send', concurrency: 5 }) runs up to 5 jobs in parallel per worker process. To scale beyond that, run multiple NestJS instances: each worker polls the same Redis queue independently.
Q: What happens if Redis goes down mid-processing?
A: Workers stop polling. Jobs in the "active" set stay there until Redis recovers. On reconnect, Bull resumes from the persisted state. No jobs are lost if Redis has RDB or AOF persistence enabled.
Q: What is the difference between Bull and BullMQ?
A: BullMQ is the newer library with faster Lua scripts, parent-child job flows, and better TypeScript types. @nestjs/bullmq wraps it. For new NestJS projects, @nestjs/bullmq is the better pick. The older @nestjs/bull is still maintained but not gaining new features.
Q: How would you scale to 1 million jobs per day?
A: Shard by job type: separate queues for emails, images, reports. Run multiple worker instances per queue. Use Redis Cluster for durability. Auto-scale worker pods based on queue depth per shard, and assign specific queues to specific worker groups to prevent resource contention.
Examples
Basic: email queue end-to-end
Complete working setup: module registration, service, processor.
// app.module.ts
import { BullModule } from '@nestjs/bull';
import { EmailModule } from './email/email.module';
@Module({
imports: [
BullModule.forRoot({ redis: { host: 'localhost', port: 6379 } }),
EmailModule,
],
})
export class AppModule {}
// email.module.ts
@Module({
imports: [
BullModule.registerQueue({
name: 'email',
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: true,
},
}),
],
providers: [EmailProcessor, EmailService],
exports: [EmailService],
})
export class EmailModule {}
// email.service.ts
@Injectable()
export class EmailService {
constructor(@InjectQueue('email') private emailQueue: Queue) {}
async sendWelcome(email: string) {
await this.emailQueue.add('welcome', { email });
}
}
// email.processor.ts
@Processor('email')
export class EmailProcessor {
@Process('welcome')
async handleWelcome(job: Job<{ email: string }>) {
console.log(`Sending to ${job.data.email}`);
return { sent: true };
}
}The controller calls emailService.sendWelcome() and responds in milliseconds. The processor runs the actual send logic in the background. If it throws, Bull retries up to 3 times with delays of 2s, 4s, 8s.
Intermediate: user onboarding with multiple job types
Signup triggers three background tasks in parallel: avatar resize, welcome email, analytics.
// user.service.ts
@Injectable()
export class UserService {
constructor(
@InjectQueue('onboarding') private onboardingQueue: Queue,
private userRepo: UserRepository,
) {}
async createUser(dto: CreateUserDto) {
const user = await this.userRepo.save(dto);
await Promise.all([
this.onboardingQueue.add('resize-avatar', { userId: user.id }),
this.onboardingQueue.add('send-welcome', { userId: user.id, email: user.email }),
this.onboardingQueue.add('track-signup', { userId: user.id }),
]);
return user;
}
}
// onboarding.processor.ts
@Processor('onboarding')
export class OnboardingProcessor {
@Process('resize-avatar')
async resizeAvatar(job: Job<{ userId: string }>) {
// Sharp resize logic
console.log(`Avatar resized for user ${job.data.userId}`);
}
@Process('send-welcome')
async sendWelcome(job: Job<{ userId: string; email: string }>) {
await this.mailer.send({ to: job.data.email, subject: 'Welcome aboard' });
}
@Process('track-signup')
async trackSignup(job: Job<{ userId: string }>) {
await this.analytics.track('signup', job.data);
}
@OnQueueFailed()
onFailed(job: Job, err: Error) {
console.error(`Onboarding step ${job.name} failed:`, err.message);
}
}Each job is independent. If send-welcome fails, it retries without blocking track-signup. The user creation API responds before any of the three tasks begin.
Advanced: deduplicated delayed report with progress tracking
Daily report generation with exactly-one-at-a-time concurrency, deduplication, and progress reporting.
// reports.service.ts
@Injectable()
export class ReportsService {
constructor(@InjectQueue('reports') private reportsQueue: Queue) {}
async scheduleDaily(reportId: string) {
await this.reportsQueue.add(
'generate-daily',
{ reportId },
{
delay: 86400000, // Run after 24h
attempts: 3,
backoff: { type: 'exponential', delay: 5000 },
jobId: `daily-${reportId}`, // Same jobId = no duplicate queued
},
);
}
}
// reports.processor.ts
@Processor('reports')
export class ReportProcessor {
@Process({ name: 'generate-daily', concurrency: 1 }) // One at a time
async generate(job: Job<{ reportId: string }>) {
await job.progress(10);
const pdf = await this.pdfService.generate(job.data.reportId);
await job.progress(80);
await this.storage.upload(pdf, `report-${job.data.reportId}.pdf`);
await job.progress(100);
return { path: `report-${job.data.reportId}.pdf` };
}
@OnQueueCompleted()
async onComplete(job: Job, result: { path: string }) {
await this.notifyQueue.add('email-report', {
reportId: job.data.reportId,
path: result.path,
});
}
}concurrency: 1 prevents memory spikes from parallel PDF generation on the same worker. The jobId option deduplicates: calling scheduleDaily() twice with the same reportId adds only one job. job.progress() exposes completion percentage in Bull Board UI.
Short Answer
Interview readyA concise answer to help you respond confidently on this topic during an interview.