Common RxJS operators in Angular
RxJS operators are pure functions that build Observable chains to transform, filter, and combine streams of async data in Angular apps.
Theory
TL;DR
- Operators are like conveyor belt tools: each one shapes or sorts items passing through without stopping the line
- Four categories: transformation (map, switchMap), filtering (filter, debounceTime), combination (combineLatest, forkJoin), error handling (catchError, retry)
- Decision rule: switchMap for search (latest wins), mergeMap for parallel fetches (all run), concatMap for ordered saves (queue)
- Operators never mutate the source Observable. Each returns a new one.
Quick example
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';
of(1, 2, 3, 4).pipe(
filter(n => n % 2 === 0), // keeps: 2, 4
map(n => n * 2) // transforms to: 4, 8
).subscribe(console.log); // logs: 4, 8filter runs first, map runs on what survives. The source stays untouched.
The flattening operators: switchMap, mergeMap, concatMap
These three appear in almost every Angular interview. All of them map source emissions to inner Observables, but handle concurrency differently.
switchMap cancels the previous inner Observable when a new emission arrives. User types "an" then "ang"? The "an" request gets cancelled. Perfect for search inputs.
mergeMap lets all inner Observables run in parallel. Three emissions trigger three simultaneous requests. Good for independent data fetches where order does not matter.
concatMap queues inner Observables and processes them one by one in order. The next one starts only after the previous completes. Use it when sequence matters, like saving items in a specific order.
When to use
- Type-ahead search:
debounceTime(300)+switchMap- debounce cuts redundant keystrokes, switchMap cancels stale requests - Parallel API calls:
mergeMaporforkJoin- all requests run simultaneously - Ordered saves:
concatMap- request 1 finishes before request 2 starts - Multi-source form:
combineLatest- reacts whenever any stream emits - One-time HTTP call:
take(1)- completes automatically after the first emission - Component cleanup:
takeUntil(destroy$)- stops the stream when the component destroys
Comparison table
| Operator | Behavior | Cancels prior? | Parallel? | Use case |
|---|---|---|---|---|
map | Transforms each emission | No | No | Extract API response data |
switchMap | Switches to inner, cancels previous | Yes | No | Search input |
mergeMap | Runs all inners in parallel | No | Yes | Batch user fetches |
concatMap | Queues inners sequentially | No | No | Ordered bulk saves |
filter | Emits only if condition is true | N/A | N/A | Null guards, conditions |
debounceTime | Waits for a pause before emitting | N/A | N/A | Keystroke throttling |
combineLatest | Emits when any source emits (with all latest values) | No | Yes | Multiple data sources |
forkJoin | Waits for all sources to complete | No | Yes | Parallel HTTP calls |
catchError | Intercepts errors, returns fallback | N/A | N/A | Error recovery |
How operators work internally
Each operator returns a new Observable that subscribes to the source and applies logic through Subscriber objects in the pipe chain. Nothing mutates. Zone.js patches async operations (fetch, timers, browser events) so Angular's change detection picks up emissions and runs re-renders automatically. This is why you do not need to call detectChanges() after an HTTP response when using the async pipe.
Common mistakes
Forgetting .pipe():
// Wrong - .map() is not a direct method on Observable in RxJS 6+
this.http.get('/api/users').map(res => res.data);
// Fix
this.http.get('/api/users').pipe(map(res => res.data));switchMap without debounceTime floods the server:
// Wrong - every keystroke fires a request
this.searchCtrl.valueChanges.pipe(
switchMap(q => this.http.get(`/api/search?q=${q}`))
);
// Fix
this.searchCtrl.valueChanges.pipe(
debounceTime(300),
switchMap(q => this.http.get(`/api/search?q=${q}`))
);Nested subscribes instead of flattening:
// Wrong - no cleanup, callback hell
this.http.get('/api/user').subscribe(user => {
this.http.get(`/api/orders/${user.id}`).subscribe(orders => { /* ... */ });
});
// Fix
this.http.get<User>('/api/user').pipe(
switchMap(user => this.http.get(`/api/orders/${user.id}`))
).subscribe(orders => { /* ... */ });catchError on the outer stream kills everything on first error:
// Wrong - one failed request ends the whole stream
input$.pipe(
switchMap(val => this.http.get(`/api/${val}`)),
catchError(() => of([])) // stream ends here after first error
).subscribe();
// Fix - handle errors inside the inner Observable
input$.pipe(
switchMap(val =>
this.http.get(`/api/${val}`).pipe(
catchError(() => of([])) // outer stream survives
)
)
).subscribe();Not unsubscribing from long-lived streams:
// Wrong - memory leak when component destroys
ngOnInit() {
this.router.events.subscribe(event => { /* ... */ });
}
// Fix
private destroy$ = new Subject<void>();
ngOnInit() {
this.router.events.pipe(
takeUntil(this.destroy$)
).subscribe(event => { /* ... */ });
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}Real-world usage
- Angular Material autocomplete:
debounceTime + switchMapfor typeahead - NGRX Effects:
switchMapormergeMapfor action-triggered HTTP calls - AngularFire (Firestore):
combineLatestfor syncing multiple real-time collections - Route guards:
switchMapto fetch data before navigation completes - Component teardown:
takeUntilis the standard pattern for preventing memory leaks
Follow-up questions
Q: What is the difference between switchMap, mergeMap, and concatMap?
A: switchMap cancels the previous inner Observable when a new emission arrives, so only the latest wins. mergeMap runs all inners in parallel. concatMap queues them and processes one at a time, preserving order.
Q: When would you use exhaustMap?
A: When you want to ignore new emissions while the current inner Observable is still active. Submit buttons are the classic case: the first click is processed, and clicks during the in-flight request are ignored. switchMap would cancel the in-flight request; exhaustMap lets it finish.
Q: How do you handle errors without killing the outer stream?
A: Wrap catchError inside the inner Observable. If catchError sits on the outer stream, a single error terminates everything. Moving it inside the inner pipe keeps the outer stream alive for the next emission.
Q: Why combine debounceTime with distinctUntilChanged?
A: debounceTime waits for a pause after typing. distinctUntilChanged skips the emission if the value did not change. Together they stop redundant API calls when a user pauses, deletes a character, then types the same one back.
Q: How do you debug a pipe chain?
A: Add tap(console.log) between operators to see what each step emits. Rename it for clarity: tap(val => console.log('after filter:', val)). The RxJS devtools browser extension can also visualize the stream timeline.
Q: Senior: walk through a race condition with mergeMap in a search and explain the fix.
A: With mergeMap, request A (slow) and request B (fast) both run. B resolves first and shows the correct result. Then A resolves and overwrites B with stale data. The UI shows the wrong result. switchMap cancels A the moment B starts, so only B ever reaches the subscriber. You can prove the cancellation is real by adding finalize() inside the inner pipe and watching it fire on each switchMap cancel.
Examples
Basic: map and filter
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';
// Keep only even numbers, then double them
of(1, 2, 3, 4, 5).pipe(
filter(n => n % 2 === 0), // keeps: 2, 4
map(n => n * 2) // doubles to: 4, 8
).subscribe(console.log); // Output: 4, 8filter reduces what flows downstream. map changes the shape of each item that passes through. The source Observable is never modified.
Intermediate: type-ahead search with switchMap
import { FormControl } from '@angular/forms';
import { debounceTime, distinctUntilChanged, switchMap, map } from 'rxjs/operators';
searchCtrl = new FormControl('');
users$ = this.searchCtrl.valueChanges.pipe(
debounceTime(300), // wait 300ms after last keystroke
distinctUntilChanged(), // skip if value did not change
switchMap(query =>
this.http.get<{ users: User[] }>('/api/users', { params: { q: query } }).pipe(
map(res => res.users) // extract array from response wrapper
)
)
);
// Template: *ngFor="let user of users$ | async"Each new keystroke (after the debounce window) cancels the previous HTTP request. No stale results reach the template.
Advanced: error isolation inside switchMap
import { Subject, of } from 'rxjs';
import { switchMap, catchError, finalize } from 'rxjs/operators';
const input$ = new Subject<string>();
input$.pipe(
switchMap(val =>
this.http.get(`/api/data/${val}`).pipe(
catchError(err => {
console.warn('request failed, using fallback', err);
return of([]); // keeps the outer stream alive
}),
finalize(() => console.log('inner completed or was cancelled by switchMap'))
)
)
).subscribe(console.log);
// A failed request returns [] instead of killing the stream.
// finalize fires on both normal completion and cancellation.
// The next input$ emission works regardless of what happened before.I have seen teams spend an hour debugging why their stream stops after the first 404. The answer is always catchError sitting outside the inner pipe. Moving it inside is the fix.
Short Answer
Interview readyA concise answer to help you respond confidently on this topic during an interview.