Skip to main content

What is RxJS and how is it integrated in Angular?

RxJS is a JavaScript library for reactive programming that lets you compose asynchronous data streams using Observables, operators, and subscribers.

Theory

TL;DR

  • RxJS works like a conveyor belt: data flows through, you tap in with subscribe(), and pipe it through operators (filters, transforms) before it reaches you.
  • Main difference from Promises: a Promise resolves once; an Observable emits zero, one, or many values over time, and nothing runs until you subscribe.
  • Decision rule: multiple async sources or cancelable operations → RxJS. One-shot fetch with no transformation → Promise is fine.
  • Angular 16+ ships with RxJS 7.8+ and uses Observables in HttpClient, Router, and Forms by default.
  • Forgetting to unsubscribe is the top memory leak source in Angular apps.

Quick example

typescript
import { of } from 'rxjs'; import { map, delay } from 'rxjs/operators'; const user$ = of('Alice').pipe( map(name => `Hello, ${name}`), // transform the value delay(1000) // simulate async ); user$.subscribe(msg => console.log(msg)); // logs "Hello, Alice" after 1s

of('Alice') creates a stream that emits once and completes. pipe() chains operators without mutating the original stream. Nothing runs until subscribe() is called. That is the lazy execution model.

Key difference

RxJS replaces callback hell (nested .then() chains) with declarative pipelines. You describe what should happen to each emission: map it, filter it, cancel it, combine it with another stream. The library handles timing and state. Code reads top-to-bottom without manually tracking whether a previous request has finished.

How Angular uses RxJS

Angular 16+ bundles RxJS 7.8+ via @angular/core. Nearly every async API in Angular returns an Observable:

Angular featureRxJS roleCommon operator
HttpClientAll requests return Observable<T>switchMap for chained calls
Routerrouter.events is Observable<RouterEvent>filter(e => e instanceof NavigationEnd)
Reactive Formsform.valueChanges is Observable<FormValue>debounceTime(300) for validation
Async pipeAuto-subscribes and unsubscribes in templates{{ data$ | async }}

The async pipe deserves a separate mention. It handles subscription cleanup when the component is destroyed, which removes the most common source of leaks in Angular templates.

How it works internally

Observables are plain objects with a subscribe method. No special runtime, no magic. When you call subscribe(), it invokes the producer function: synchronously or asynchronously depending on the source. Operators return new Observables that proxy emissions through a chain of subscriber functions. RxJS 7+ uses TeardownLogic for cleanup: when a stream completes or errors, teardown callbacks run automatically.

Cold vs hot matters here. A cold Observable starts a new producer per subscriber. HttpClient is cold: each subscribe() fires a new HTTP request. A hot Observable shares one producer across all subscribers. Subject is hot: late subscribers miss past emissions. Wrapping a cold Observable with shareReplay(1) makes it behave hot.

When to use

  • Multiple async inputs that need combining (HTTP + user clicks) → combineLatest or merge.
  • Cancelable operations like search-as-you-type → switchMap cancels the previous inner Observable.
  • Real-time streams (WebSocket, intervals) → pair with takeUntilDestroyed() to clean up.
  • Simple one-shot async with no transformation → native Promise works.
  • No async at all → plain functions.

Common mistakes

1. Forgetting to unsubscribe

typescript
// wrong: interval keeps firing after the component is destroyed ngOnInit() { interval(1000).subscribe(v => console.log(v)); } // fix: takeUntilDestroyed (Angular 16+) import { takeUntilDestroyed } from '@angular/core/rxjs-interop'; readonly count$ = interval(1000).pipe( takeUntilDestroyed() // auto-unsubscribes on component destroy ); // template: {{ count$ | async }}

Observables do not auto-clean. The interval keeps running after the component is destroyed. Use async pipe or takeUntilDestroyed().

2. Treating an Observable like a Promise

typescript
// wrong: TypeError - .then is not a function this.http.get('/api/users').then(data => console.log(data)); // fix: subscribe or convert explicitly this.http.get('/api/users').subscribe(data => console.log(data)); const data = await firstValueFrom(this.http.get('/api/users'));

3. Subscribing multiple times to a cold Observable

typescript
// wrong: fires 2 separate HTTP requests const users$ = this.http.get('/api/users'); users$.subscribe(u => this.list = u); users$.subscribe(u => this.count = u.length); // fix: share one execution const users$ = this.http.get('/api/users').pipe(shareReplay(1));

4. Ignoring errors

typescript
// wrong: stream dies with no trace obs$.subscribe(v => console.log(v)); // fix: always handle errors obs$.subscribe({ next: v => console.log(v), error: e => console.error('Stream failed:', e) });

Real-world usage

  • Angular HttpClient → switchMap chains token refresh before the actual API call.
  • Angular Router → router.events.pipe(filter(...)) drives route guards and analytics.
  • NgRx Effects → actions$.pipe(ofType(loadUsers), switchMap(() => this.service.get())).
  • Angular Material autocomplete → debounceTime(300) + distinctUntilChanged() + switchMap on input events.
  • Angular 18 → toSignal() converts an Observable to a signal for fine-grained reactivity.

Follow-up questions

Q: What is the difference between cold and hot Observables?
A: Cold Observables start a new producer per subscriber. Each subscription to an HttpClient call fires a new HTTP request. Hot Observables share one producer: Subject and fromEvent on a DOM node are hot. Wrapping a cold Observable with shareReplay(1) makes it behave hot.

Q: How does Angular prevent RxJS memory leaks?
A: The async pipe auto-unsubscribes when the component is destroyed. Since Angular 16, takeUntilDestroyed() does the same for manual subscriptions inside constructors or injection contexts.

Q: What is the difference between switchMap and mergeMap?
A: switchMap cancels the previous inner Observable when a new emission arrives. Good for search inputs where stale results are useless. mergeMap runs all inner Observables in parallel. Good for independent requests that should all complete.

Q: How do you convert an Observable to a Promise?
A: firstValueFrom(obs$) resolves with the first emission. lastValueFrom(obs$) waits for the stream to complete and resolves with the last value.

Q: Design an NgRx effect that retries a failed HTTP call when the device comes back online.
A: Use retry({ delay: () => fromEvent(window, 'online') }) in RxJS 7+ inside a switchMap. The effect waits for network reconnection before retrying instead of immediately hammering the server. Bonus: add take(1) on the retry signal to avoid infinite retry loops.

Examples

Basic: cold Observable emissions

typescript
import { Observable } from 'rxjs'; const data$ = new Observable(sub => { console.log('Producer runs'); // logs once per subscribe call sub.next(42); sub.complete(); }); data$.subscribe(v => console.log('Sub 1:', v)); // Producer runs | Sub 1: 42 data$.subscribe(v => console.log('Sub 2:', v)); // Producer runs again | Sub 2: 42

Each subscribe() creates an independent execution. For HTTP, two subscriptions mean two network requests. Wrap with shareReplay(1) if you need to share one result across multiple consumers.

Intermediate: search input with debounce and cancel

typescript
import { fromEvent } from 'rxjs'; import { debounceTime, map, distinctUntilChanged, switchMap } from 'rxjs/operators'; import { HttpClient } from '@angular/common/http'; const input = document.querySelector<HTMLInputElement>('#search')!; fromEvent(input, 'input').pipe( debounceTime(300), // wait 300ms after last keystroke map(e => (e.target as HTMLInputElement).value), distinctUntilChanged(), // skip if value did not change switchMap(term => this.http.get(`/api/users?q=${term}`)) // cancel previous request ).subscribe(results => console.log(results));

switchMap is the operator that makes this pattern work. When the user types a new character before the previous request completes, switchMap cancels the old request and starts a fresh one. Without it, responses can arrive out of order and overwrite each other.

Advanced: memory leak and the fix with takeUntilDestroyed

typescript
import { Component, OnInit } from '@angular/core'; import { interval } from 'rxjs'; import { takeUntilDestroyed } from '@angular/core/rxjs-interop'; // wrong: leaks after component is destroyed @Component({ selector: 'app-broken', template: '{{ count }}' }) class BrokenComponent implements OnInit { count = 0; ngOnInit() { interval(1000).subscribe(v => { this.count = v; // keeps updating a destroyed component }); } } // fix: auto-unsubscribe using Angular 16+ API @Component({ selector: 'app-fixed', template: '{{ count$ | async }}' }) class FixedComponent { readonly count$ = interval(1000).pipe( takeUntilDestroyed() // unsubscribes when the component is destroyed ); }

I have seen the broken pattern in almost every Angular codebase that skips code review on lifecycle methods. The async pipe plus takeUntilDestroyed() combination is now the standard, and interviewers specifically ask about it when probing Angular 16+ knowledge.

Short Answer

Interview ready
Premium

A concise answer to help you respond confidently on this topic during an interview.

Finished reading?