Skip to main content

Common RxJS operators in Angular

RxJS operators are pure functions that build Observable chains to transform, filter, and combine streams of async data in Angular apps.

Theory

TL;DR

  • Operators are like conveyor belt tools: each one shapes or sorts items passing through without stopping the line
  • Four categories: transformation (map, switchMap), filtering (filter, debounceTime), combination (combineLatest, forkJoin), error handling (catchError, retry)
  • Decision rule: switchMap for search (latest wins), mergeMap for parallel fetches (all run), concatMap for ordered saves (queue)
  • Operators never mutate the source Observable. Each returns a new one.

Quick example

typescript
import { of } from 'rxjs'; import { map, filter } from 'rxjs/operators'; of(1, 2, 3, 4).pipe( filter(n => n % 2 === 0), // keeps: 2, 4 map(n => n * 2) // transforms to: 4, 8 ).subscribe(console.log); // logs: 4, 8

filter runs first, map runs on what survives. The source stays untouched.

The flattening operators: switchMap, mergeMap, concatMap

These three appear in almost every Angular interview. All of them map source emissions to inner Observables, but handle concurrency differently.

switchMap cancels the previous inner Observable when a new emission arrives. User types "an" then "ang"? The "an" request gets cancelled. Perfect for search inputs.

mergeMap lets all inner Observables run in parallel. Three emissions trigger three simultaneous requests. Good for independent data fetches where order does not matter.

concatMap queues inner Observables and processes them one by one in order. The next one starts only after the previous completes. Use it when sequence matters, like saving items in a specific order.

When to use

  • Type-ahead search: debounceTime(300) + switchMap - debounce cuts redundant keystrokes, switchMap cancels stale requests
  • Parallel API calls: mergeMap or forkJoin - all requests run simultaneously
  • Ordered saves: concatMap - request 1 finishes before request 2 starts
  • Multi-source form: combineLatest - reacts whenever any stream emits
  • One-time HTTP call: take(1) - completes automatically after the first emission
  • Component cleanup: takeUntil(destroy$) - stops the stream when the component destroys

Comparison table

OperatorBehaviorCancels prior?Parallel?Use case
mapTransforms each emissionNoNoExtract API response data
switchMapSwitches to inner, cancels previousYesNoSearch input
mergeMapRuns all inners in parallelNoYesBatch user fetches
concatMapQueues inners sequentiallyNoNoOrdered bulk saves
filterEmits only if condition is trueN/AN/ANull guards, conditions
debounceTimeWaits for a pause before emittingN/AN/AKeystroke throttling
combineLatestEmits when any source emits (with all latest values)NoYesMultiple data sources
forkJoinWaits for all sources to completeNoYesParallel HTTP calls
catchErrorIntercepts errors, returns fallbackN/AN/AError recovery

How operators work internally

Each operator returns a new Observable that subscribes to the source and applies logic through Subscriber objects in the pipe chain. Nothing mutates. Zone.js patches async operations (fetch, timers, browser events) so Angular's change detection picks up emissions and runs re-renders automatically. This is why you do not need to call detectChanges() after an HTTP response when using the async pipe.

Common mistakes

Forgetting .pipe():

typescript
// Wrong - .map() is not a direct method on Observable in RxJS 6+ this.http.get('/api/users').map(res => res.data); // Fix this.http.get('/api/users').pipe(map(res => res.data));

switchMap without debounceTime floods the server:

typescript
// Wrong - every keystroke fires a request this.searchCtrl.valueChanges.pipe( switchMap(q => this.http.get(`/api/search?q=${q}`)) ); // Fix this.searchCtrl.valueChanges.pipe( debounceTime(300), switchMap(q => this.http.get(`/api/search?q=${q}`)) );

Nested subscribes instead of flattening:

typescript
// Wrong - no cleanup, callback hell this.http.get('/api/user').subscribe(user => { this.http.get(`/api/orders/${user.id}`).subscribe(orders => { /* ... */ }); }); // Fix this.http.get<User>('/api/user').pipe( switchMap(user => this.http.get(`/api/orders/${user.id}`)) ).subscribe(orders => { /* ... */ });

catchError on the outer stream kills everything on first error:

typescript
// Wrong - one failed request ends the whole stream input$.pipe( switchMap(val => this.http.get(`/api/${val}`)), catchError(() => of([])) // stream ends here after first error ).subscribe(); // Fix - handle errors inside the inner Observable input$.pipe( switchMap(val => this.http.get(`/api/${val}`).pipe( catchError(() => of([])) // outer stream survives ) ) ).subscribe();

Not unsubscribing from long-lived streams:

typescript
// Wrong - memory leak when component destroys ngOnInit() { this.router.events.subscribe(event => { /* ... */ }); } // Fix private destroy$ = new Subject<void>(); ngOnInit() { this.router.events.pipe( takeUntil(this.destroy$) ).subscribe(event => { /* ... */ }); } ngOnDestroy() { this.destroy$.next(); this.destroy$.complete(); }

Real-world usage

  • Angular Material autocomplete: debounceTime + switchMap for typeahead
  • NGRX Effects: switchMap or mergeMap for action-triggered HTTP calls
  • AngularFire (Firestore): combineLatest for syncing multiple real-time collections
  • Route guards: switchMap to fetch data before navigation completes
  • Component teardown: takeUntil is the standard pattern for preventing memory leaks

Follow-up questions

Q: What is the difference between switchMap, mergeMap, and concatMap?
A: switchMap cancels the previous inner Observable when a new emission arrives, so only the latest wins. mergeMap runs all inners in parallel. concatMap queues them and processes one at a time, preserving order.

Q: When would you use exhaustMap?
A: When you want to ignore new emissions while the current inner Observable is still active. Submit buttons are the classic case: the first click is processed, and clicks during the in-flight request are ignored. switchMap would cancel the in-flight request; exhaustMap lets it finish.

Q: How do you handle errors without killing the outer stream?
A: Wrap catchError inside the inner Observable. If catchError sits on the outer stream, a single error terminates everything. Moving it inside the inner pipe keeps the outer stream alive for the next emission.

Q: Why combine debounceTime with distinctUntilChanged?
A: debounceTime waits for a pause after typing. distinctUntilChanged skips the emission if the value did not change. Together they stop redundant API calls when a user pauses, deletes a character, then types the same one back.

Q: How do you debug a pipe chain?
A: Add tap(console.log) between operators to see what each step emits. Rename it for clarity: tap(val => console.log('after filter:', val)). The RxJS devtools browser extension can also visualize the stream timeline.

Q: Senior: walk through a race condition with mergeMap in a search and explain the fix.
A: With mergeMap, request A (slow) and request B (fast) both run. B resolves first and shows the correct result. Then A resolves and overwrites B with stale data. The UI shows the wrong result. switchMap cancels A the moment B starts, so only B ever reaches the subscriber. You can prove the cancellation is real by adding finalize() inside the inner pipe and watching it fire on each switchMap cancel.

Examples

Basic: map and filter

typescript
import { of } from 'rxjs'; import { map, filter } from 'rxjs/operators'; // Keep only even numbers, then double them of(1, 2, 3, 4, 5).pipe( filter(n => n % 2 === 0), // keeps: 2, 4 map(n => n * 2) // doubles to: 4, 8 ).subscribe(console.log); // Output: 4, 8

filter reduces what flows downstream. map changes the shape of each item that passes through. The source Observable is never modified.

Intermediate: type-ahead search with switchMap

typescript
import { FormControl } from '@angular/forms'; import { debounceTime, distinctUntilChanged, switchMap, map } from 'rxjs/operators'; searchCtrl = new FormControl(''); users$ = this.searchCtrl.valueChanges.pipe( debounceTime(300), // wait 300ms after last keystroke distinctUntilChanged(), // skip if value did not change switchMap(query => this.http.get<{ users: User[] }>('/api/users', { params: { q: query } }).pipe( map(res => res.users) // extract array from response wrapper ) ) ); // Template: *ngFor="let user of users$ | async"

Each new keystroke (after the debounce window) cancels the previous HTTP request. No stale results reach the template.

Advanced: error isolation inside switchMap

typescript
import { Subject, of } from 'rxjs'; import { switchMap, catchError, finalize } from 'rxjs/operators'; const input$ = new Subject<string>(); input$.pipe( switchMap(val => this.http.get(`/api/data/${val}`).pipe( catchError(err => { console.warn('request failed, using fallback', err); return of([]); // keeps the outer stream alive }), finalize(() => console.log('inner completed or was cancelled by switchMap')) ) ) ).subscribe(console.log); // A failed request returns [] instead of killing the stream. // finalize fires on both normal completion and cancellation. // The next input$ emission works regardless of what happened before.

I have seen teams spend an hour debugging why their stream stops after the first 404. The answer is always catchError sitting outside the inner pipe. Moving it inside is the fix.

Short Answer

Interview ready
Premium

A concise answer to help you respond confidently on this topic during an interview.

Finished reading?