Suggest an editImprove this articleRefine the answer for “How to implement queue processing with Bull in NestJS?”. Your changes go to moderation before they’re published.Approval requiredContentWhat you’re changing🇺🇸EN🇺🇦UAPreviewTitle (EN)Short answer (EN)**Bull queue processing in NestJS** routes slow background tasks through Redis so they run outside the HTTP request cycle, with retries and persistence. Install `@nestjs/bull bull`, call `BullModule.forRoot()` with Redis config, then `BullModule.registerQueue()` in each feature module. ```typescript // Add a job await this.emailQueue.add('welcome', { to: 'user@example.com' }); // Process it @Processor('email') export class EmailProcessor { @Process('welcome') async handle(job: Job<{ to: string }>) { /* send mail */ } } ``` **Key:** `forRoot()` connects Redis globally; `registerQueue()` must be called in every module that uses the queue.Shown above the full answer for quick recall.Answer (EN)Image**Bull queue processing in NestJS** moves slow tasks out of the HTTP request cycle and into Redis-backed background workers, with retries and persistence built in. ## Theory ### TL;DR - Bull uses Redis as a persistent job store: `queue.add()` pushes a job, a worker picks it up, processes it, marks it done or failed - Think of it like a restaurant kitchen ticket system: the waiter (HTTP handler) hangs a ticket on the rail (Redis), the cook (processor) grabs it without making the table wait - Use Bull for tasks that take over 500ms or need reliability: emails, image resize, PDF generation, analytics pings - Skip Bull for fast synchronous work or simple one-off delays with no retry needs - `@nestjs/bull` wraps the Bull Node.js library with NestJS decorators: `@Processor()`, `@Process()`, `@InjectQueue()` ### Quick example ```typescript // 1. App-level Redis config BullModule.forRoot({ redis: { host: 'localhost', port: 6379 } }) // 2. Queue registration in your module BullModule.registerQueue({ name: 'email', defaultJobOptions: { attempts: 3, removeOnComplete: true } }) // 3. Add a job from any service await this.emailQueue.add('welcome', { to: 'user@example.com', subject: 'Welcome' }); // 4. Process it @Processor('email') export class EmailProcessor { @Process('welcome') async handle(job: Job<{ to: string; subject: string }>) { await this.mailer.send(job.data); // HTTP already responded before this runs } } ``` The HTTP handler adds a job and returns immediately. The processor runs it whenever a worker is free. ### How Bull fits into NestJS `@nestjs/bull` maps Bull onto NestJS's module system. `BullModule.forRoot()` configures Redis once for the whole app. `BullModule.registerQueue()` creates a named queue and registers it as a provider. `@InjectQueue('name')` gives you the queue instance in any service. `@Processor('name')` marks a class as the consumer for that queue. Internally, Bull uses Redis Lua scripts for atomic operations. When you call `queue.add()`, the job data lands in a Redis list with metadata: attempt count, delay, backoff config. Workers poll via `BRPOPLPUSH`, a blocking Redis command that waits efficiently without spinning. On pickup, the job moves to an "active" set. After processing, it goes to "completed" or "failed". Redis keeps everything durable even if the Node process crashes mid-poll. I have seen teams skip `removeOnComplete: true` and end up with gigabytes of completed jobs filling Redis after a few weeks of production traffic. Set it in `defaultJobOptions` from day one. ### Setup ```bash npm install @nestjs/bull bull npm install --save-dev @types/bull ``` ```typescript // app.module.ts @Module({ imports: [ BullModule.forRoot({ redis: { host: 'localhost', port: 6379 }, }), ], }) export class AppModule {} // email.module.ts @Module({ imports: [ BullModule.registerQueue({ name: 'email', defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 2000 }, removeOnComplete: true, }, }), ], providers: [EmailProcessor, EmailService], exports: [EmailService], }) export class EmailModule {} ``` Both calls are required. `forRoot()` without `registerQueue()` in the feature module means the processor never starts and `@InjectQueue` throws at runtime. ### Adding jobs and processing them ```typescript // email.service.ts @Injectable() export class EmailService { constructor(@InjectQueue('email') private emailQueue: Queue) {} async sendWelcome(email: string) { await this.emailQueue.add('welcome', { email }); } async scheduleDigest(email: string) { await this.emailQueue.add('digest', { email }, { delay: 86400000 }); // 24h } async sendBulk(users: { email: string }[]) { const jobs = users.map((u) => ({ name: 'bulk', data: u })); await this.emailQueue.addBulk(jobs); } } // email.processor.ts @Processor('email') export class EmailProcessor { constructor(private readonly mailer: MailerService) {} @Process('welcome') async handleWelcome(job: Job<{ email: string }>) { await this.mailer.sendMail({ to: job.data.email, subject: 'Welcome!' }); return { sent: true }; } @OnQueueFailed() onFailed(job: Job, error: Error) { console.error(`Job ${job.id} failed after ${job.attemptsMade} attempts:`, error.message); } } ``` The `@OnQueueFailed()` hook is where you add alerting or dead-letter logic. Do not rely only on Bull's silent retry behavior in production. ### Advanced job options ```typescript // Cron-style recurring job await this.queue.add('daily-cleanup', {}, { repeat: { cron: '0 0 * * *' }, }); // Priority: lower number = picked up first await this.queue.add('urgent-notification', data, { priority: 1 }); await this.queue.add('batch-export', data, { priority: 10 }); // Rate limiting for third-party API calls await this.queue.add('api-sync', data, { limiter: { max: 10, duration: 1000 }, // 10 jobs per second }); ``` Priority queues matter when you mix user-facing and batch jobs. An urgent notification should not sit behind a 10k email batch. ### Common mistakes **Forgetting `registerQueue()` in the feature module.** `BullModule.forRoot()` only sets up the Redis connection. Each queue needs its own `registerQueue()` in the module that uses it. Skip this and `@InjectQueue` throws at runtime. ```typescript // Wrong: only forRoot in AppModule, nothing in EmailModule // Right: @Module({ imports: [BullModule.registerQueue({ name: 'email' })] }) export class EmailModule {} ``` **No `removeOnComplete` in production.** Completed jobs stay in Redis forever by default. Under load, this fills memory within weeks. Set `removeOnComplete: true` in `defaultJobOptions`. **High `attempts` without `backoff`.** Workers retry with no delay, saturating Redis on permanent failures. ```typescript // Wrong { attempts: 50 } // Right { attempts: 5, backoff: { type: 'exponential', delay: 5000 } } ``` **CPU-heavy synchronous work inside a processor.** Bull processors run in the Node.js event loop. A blocking PDF generation or image resize starves other jobs. Use `concurrency: 1` for heavy processors, or offload to a child process. ### Monitoring in production Bull Board provides a UI over all queues showing failed jobs, retry counts, and queue depths: ```bash npm install @bull-board/express @bull-board/api ``` ```typescript import { BullAdapter } from '@bull-board/api/bullAdapter'; import { ExpressAdapter } from '@bull-board/express'; import { createBullBoard } from '@bull-board/api'; const serverAdapter = new ExpressAdapter(); createBullBoard({ queues: [new BullAdapter(emailQueue)], serverAdapter, }); app.use('/admin/queues', serverAdapter.getRouter()); ``` For metrics, `bullmq-prometheus` exports queue depth and failure rate to Prometheus. Alert when stalled jobs exceed 5 minutes or failure rate climbs above 1%. ### When to use Bull vs alternatives | Scenario | Tool | |---|---| | Task takes over 500ms or needs retries | Bull | | Distributed workers across multiple instances | Bull | | Already using MongoDB, no Redis available | Agenda.js | | Simple delay, no persistence or retries needed | `setTimeout` | | High-throughput event streaming | Kafka | | Response must be synchronous | Skip queues | ### Real-world usage - User onboarding: avatar resize + welcome email + analytics ping, all queued on signup - E-commerce: invoice PDF and stock update queued after order placement - SaaS dashboards: scheduled report exports triggered via cron queue - Microservices: Kafka event received, Bull handles the slow DB write asynchronously ### Follow-up questions **Q:** How do you prevent double-processing if a worker crashes mid-job? **A:** Bull moves in-flight jobs to a "stalled" state when a worker dies. On reconnect, stalled jobs are re-queued automatically. For idempotency, check a flag in your DB before processing: `if (await this.repo.isProcessed(job.data.id)) return;`. **Q:** Explain Bull's concurrency model. **A:** `@Process({ name: 'send', concurrency: 5 })` runs up to 5 jobs in parallel per worker process. To scale beyond that, run multiple NestJS instances: each worker polls the same Redis queue independently. **Q:** What happens if Redis goes down mid-processing? **A:** Workers stop polling. Jobs in the "active" set stay there until Redis recovers. On reconnect, Bull resumes from the persisted state. No jobs are lost if Redis has RDB or AOF persistence enabled. **Q:** What is the difference between Bull and BullMQ? **A:** BullMQ is the newer library with faster Lua scripts, parent-child job flows, and better TypeScript types. `@nestjs/bullmq` wraps it. For new NestJS projects, `@nestjs/bullmq` is the better pick. The older `@nestjs/bull` is still maintained but not gaining new features. **Q:** How would you scale to 1 million jobs per day? **A:** Shard by job type: separate queues for emails, images, reports. Run multiple worker instances per queue. Use Redis Cluster for durability. Auto-scale worker pods based on queue depth per shard, and assign specific queues to specific worker groups to prevent resource contention. ## Examples ### Basic: email queue end-to-end Complete working setup: module registration, service, processor. ```typescript // app.module.ts import { BullModule } from '@nestjs/bull'; import { EmailModule } from './email/email.module'; @Module({ imports: [ BullModule.forRoot({ redis: { host: 'localhost', port: 6379 } }), EmailModule, ], }) export class AppModule {} // email.module.ts @Module({ imports: [ BullModule.registerQueue({ name: 'email', defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 2000 }, removeOnComplete: true, }, }), ], providers: [EmailProcessor, EmailService], exports: [EmailService], }) export class EmailModule {} // email.service.ts @Injectable() export class EmailService { constructor(@InjectQueue('email') private emailQueue: Queue) {} async sendWelcome(email: string) { await this.emailQueue.add('welcome', { email }); } } // email.processor.ts @Processor('email') export class EmailProcessor { @Process('welcome') async handleWelcome(job: Job<{ email: string }>) { console.log(`Sending to ${job.data.email}`); return { sent: true }; } } ``` The controller calls `emailService.sendWelcome()` and responds in milliseconds. The processor runs the actual send logic in the background. If it throws, Bull retries up to 3 times with delays of 2s, 4s, 8s. ### Intermediate: user onboarding with multiple job types Signup triggers three background tasks in parallel: avatar resize, welcome email, analytics. ```typescript // user.service.ts @Injectable() export class UserService { constructor( @InjectQueue('onboarding') private onboardingQueue: Queue, private userRepo: UserRepository, ) {} async createUser(dto: CreateUserDto) { const user = await this.userRepo.save(dto); await Promise.all([ this.onboardingQueue.add('resize-avatar', { userId: user.id }), this.onboardingQueue.add('send-welcome', { userId: user.id, email: user.email }), this.onboardingQueue.add('track-signup', { userId: user.id }), ]); return user; } } // onboarding.processor.ts @Processor('onboarding') export class OnboardingProcessor { @Process('resize-avatar') async resizeAvatar(job: Job<{ userId: string }>) { // Sharp resize logic console.log(`Avatar resized for user ${job.data.userId}`); } @Process('send-welcome') async sendWelcome(job: Job<{ userId: string; email: string }>) { await this.mailer.send({ to: job.data.email, subject: 'Welcome aboard' }); } @Process('track-signup') async trackSignup(job: Job<{ userId: string }>) { await this.analytics.track('signup', job.data); } @OnQueueFailed() onFailed(job: Job, err: Error) { console.error(`Onboarding step ${job.name} failed:`, err.message); } } ``` Each job is independent. If `send-welcome` fails, it retries without blocking `track-signup`. The user creation API responds before any of the three tasks begin. ### Advanced: deduplicated delayed report with progress tracking Daily report generation with exactly-one-at-a-time concurrency, deduplication, and progress reporting. ```typescript // reports.service.ts @Injectable() export class ReportsService { constructor(@InjectQueue('reports') private reportsQueue: Queue) {} async scheduleDaily(reportId: string) { await this.reportsQueue.add( 'generate-daily', { reportId }, { delay: 86400000, // Run after 24h attempts: 3, backoff: { type: 'exponential', delay: 5000 }, jobId: `daily-${reportId}`, // Same jobId = no duplicate queued }, ); } } // reports.processor.ts @Processor('reports') export class ReportProcessor { @Process({ name: 'generate-daily', concurrency: 1 }) // One at a time async generate(job: Job<{ reportId: string }>) { await job.progress(10); const pdf = await this.pdfService.generate(job.data.reportId); await job.progress(80); await this.storage.upload(pdf, `report-${job.data.reportId}.pdf`); await job.progress(100); return { path: `report-${job.data.reportId}.pdf` }; } @OnQueueCompleted() async onComplete(job: Job, result: { path: string }) { await this.notifyQueue.add('email-report', { reportId: job.data.reportId, path: result.path, }); } } ``` `concurrency: 1` prevents memory spikes from parallel PDF generation on the same worker. The `jobId` option deduplicates: calling `scheduleDaily()` twice with the same `reportId` adds only one job. `job.progress()` exposes completion percentage in Bull Board UI.For the reviewerNote to the moderator (optional)Visible only to the moderator. Helps review go faster.