What is RxJS and how is it integrated in Angular?
RxJS is a JavaScript library for reactive programming that lets you compose asynchronous data streams using Observables, operators, and subscribers.
Theory
TL;DR
- RxJS works like a conveyor belt: data flows through, you tap in with
subscribe(), and pipe it through operators (filters, transforms) before it reaches you. - Main difference from Promises: a Promise resolves once; an Observable emits zero, one, or many values over time, and nothing runs until you subscribe.
- Decision rule: multiple async sources or cancelable operations → RxJS. One-shot fetch with no transformation → Promise is fine.
- Angular 16+ ships with RxJS 7.8+ and uses Observables in HttpClient, Router, and Forms by default.
- Forgetting to unsubscribe is the top memory leak source in Angular apps.
Quick example
import { of } from 'rxjs';
import { map, delay } from 'rxjs/operators';
const user$ = of('Alice').pipe(
map(name => `Hello, ${name}`), // transform the value
delay(1000) // simulate async
);
user$.subscribe(msg => console.log(msg)); // logs "Hello, Alice" after 1sof('Alice') creates a stream that emits once and completes. pipe() chains operators without mutating the original stream. Nothing runs until subscribe() is called. That is the lazy execution model.
Key difference
RxJS replaces callback hell (nested .then() chains) with declarative pipelines. You describe what should happen to each emission: map it, filter it, cancel it, combine it with another stream. The library handles timing and state. Code reads top-to-bottom without manually tracking whether a previous request has finished.
How Angular uses RxJS
Angular 16+ bundles RxJS 7.8+ via @angular/core. Nearly every async API in Angular returns an Observable:
| Angular feature | RxJS role | Common operator |
|---|---|---|
HttpClient | All requests return Observable<T> | switchMap for chained calls |
| Router | router.events is Observable<RouterEvent> | filter(e => e instanceof NavigationEnd) |
| Reactive Forms | form.valueChanges is Observable<FormValue> | debounceTime(300) for validation |
| Async pipe | Auto-subscribes and unsubscribes in templates | {{ data$ | async }} |
The async pipe deserves a separate mention. It handles subscription cleanup when the component is destroyed, which removes the most common source of leaks in Angular templates.
How it works internally
Observables are plain objects with a subscribe method. No special runtime, no magic. When you call subscribe(), it invokes the producer function: synchronously or asynchronously depending on the source. Operators return new Observables that proxy emissions through a chain of subscriber functions. RxJS 7+ uses TeardownLogic for cleanup: when a stream completes or errors, teardown callbacks run automatically.
Cold vs hot matters here. A cold Observable starts a new producer per subscriber. HttpClient is cold: each subscribe() fires a new HTTP request. A hot Observable shares one producer across all subscribers. Subject is hot: late subscribers miss past emissions. Wrapping a cold Observable with shareReplay(1) makes it behave hot.
When to use
- Multiple async inputs that need combining (HTTP + user clicks) →
combineLatestormerge. - Cancelable operations like search-as-you-type →
switchMapcancels the previous inner Observable. - Real-time streams (WebSocket, intervals) → pair with
takeUntilDestroyed()to clean up. - Simple one-shot async with no transformation → native Promise works.
- No async at all → plain functions.
Common mistakes
1. Forgetting to unsubscribe
// wrong: interval keeps firing after the component is destroyed
ngOnInit() {
interval(1000).subscribe(v => console.log(v));
}
// fix: takeUntilDestroyed (Angular 16+)
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
readonly count$ = interval(1000).pipe(
takeUntilDestroyed() // auto-unsubscribes on component destroy
);
// template: {{ count$ | async }}Observables do not auto-clean. The interval keeps running after the component is destroyed. Use async pipe or takeUntilDestroyed().
2. Treating an Observable like a Promise
// wrong: TypeError - .then is not a function
this.http.get('/api/users').then(data => console.log(data));
// fix: subscribe or convert explicitly
this.http.get('/api/users').subscribe(data => console.log(data));
const data = await firstValueFrom(this.http.get('/api/users'));3. Subscribing multiple times to a cold Observable
// wrong: fires 2 separate HTTP requests
const users$ = this.http.get('/api/users');
users$.subscribe(u => this.list = u);
users$.subscribe(u => this.count = u.length);
// fix: share one execution
const users$ = this.http.get('/api/users').pipe(shareReplay(1));4. Ignoring errors
// wrong: stream dies with no trace
obs$.subscribe(v => console.log(v));
// fix: always handle errors
obs$.subscribe({
next: v => console.log(v),
error: e => console.error('Stream failed:', e)
});Real-world usage
- Angular HttpClient →
switchMapchains token refresh before the actual API call. - Angular Router →
router.events.pipe(filter(...))drives route guards and analytics. - NgRx Effects →
actions$.pipe(ofType(loadUsers), switchMap(() => this.service.get())). - Angular Material autocomplete →
debounceTime(300) + distinctUntilChanged() + switchMapon input events. - Angular 18 →
toSignal()converts an Observable to a signal for fine-grained reactivity.
Follow-up questions
Q: What is the difference between cold and hot Observables?
A: Cold Observables start a new producer per subscriber. Each subscription to an HttpClient call fires a new HTTP request. Hot Observables share one producer: Subject and fromEvent on a DOM node are hot. Wrapping a cold Observable with shareReplay(1) makes it behave hot.
Q: How does Angular prevent RxJS memory leaks?
A: The async pipe auto-unsubscribes when the component is destroyed. Since Angular 16, takeUntilDestroyed() does the same for manual subscriptions inside constructors or injection contexts.
Q: What is the difference between switchMap and mergeMap?
A: switchMap cancels the previous inner Observable when a new emission arrives. Good for search inputs where stale results are useless. mergeMap runs all inner Observables in parallel. Good for independent requests that should all complete.
Q: How do you convert an Observable to a Promise?
A: firstValueFrom(obs$) resolves with the first emission. lastValueFrom(obs$) waits for the stream to complete and resolves with the last value.
Q: Design an NgRx effect that retries a failed HTTP call when the device comes back online.
A: Use retry({ delay: () => fromEvent(window, 'online') }) in RxJS 7+ inside a switchMap. The effect waits for network reconnection before retrying instead of immediately hammering the server. Bonus: add take(1) on the retry signal to avoid infinite retry loops.
Examples
Basic: cold Observable emissions
import { Observable } from 'rxjs';
const data$ = new Observable(sub => {
console.log('Producer runs'); // logs once per subscribe call
sub.next(42);
sub.complete();
});
data$.subscribe(v => console.log('Sub 1:', v)); // Producer runs | Sub 1: 42
data$.subscribe(v => console.log('Sub 2:', v)); // Producer runs again | Sub 2: 42Each subscribe() creates an independent execution. For HTTP, two subscriptions mean two network requests. Wrap with shareReplay(1) if you need to share one result across multiple consumers.
Intermediate: search input with debounce and cancel
import { fromEvent } from 'rxjs';
import { debounceTime, map, distinctUntilChanged, switchMap } from 'rxjs/operators';
import { HttpClient } from '@angular/common/http';
const input = document.querySelector<HTMLInputElement>('#search')!;
fromEvent(input, 'input').pipe(
debounceTime(300), // wait 300ms after last keystroke
map(e => (e.target as HTMLInputElement).value),
distinctUntilChanged(), // skip if value did not change
switchMap(term => this.http.get(`/api/users?q=${term}`)) // cancel previous request
).subscribe(results => console.log(results));switchMap is the operator that makes this pattern work. When the user types a new character before the previous request completes, switchMap cancels the old request and starts a fresh one. Without it, responses can arrive out of order and overwrite each other.
Advanced: memory leak and the fix with takeUntilDestroyed
import { Component, OnInit } from '@angular/core';
import { interval } from 'rxjs';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
// wrong: leaks after component is destroyed
@Component({ selector: 'app-broken', template: '{{ count }}' })
class BrokenComponent implements OnInit {
count = 0;
ngOnInit() {
interval(1000).subscribe(v => {
this.count = v; // keeps updating a destroyed component
});
}
}
// fix: auto-unsubscribe using Angular 16+ API
@Component({ selector: 'app-fixed', template: '{{ count$ | async }}' })
class FixedComponent {
readonly count$ = interval(1000).pipe(
takeUntilDestroyed() // unsubscribes when the component is destroyed
);
}I have seen the broken pattern in almost every Angular codebase that skips code review on lifecycle methods. The async pipe plus takeUntilDestroyed() combination is now the standard, and interviewers specifically ask about it when probing Angular 16+ knowledge.
Short Answer
Interview readyA concise answer to help you respond confidently on this topic during an interview.