Skip to main content

What is subject and what types of subjects exist in RxJS?

Subject in RxJS is an Observable that is also an Observer - it multicasts values to all active subscribers at once via .next().

Theory

TL;DR

  • A Subject is like a live radio broadcast: tune in late and you miss what was already said
  • BehaviorSubject gives any new subscriber the latest value on arrival (good for state)
  • ReplaySubject(n) buffers the last n emissions and replays them to late subscribers
  • AsyncSubject holds back all values and emits only the last one after .complete()
  • Decision rule: no history needed (Subject), current state (BehaviorSubject), recent N events (ReplaySubject), final result only (AsyncSubject)

Quick example

typescript
import { Subject } from 'rxjs'; const messages$ = new Subject<string>(); // Both subscribers join before any emission messages$.subscribe(val => console.log('A:', val)); messages$.subscribe(val => console.log('B:', val)); messages$.next('Hello'); // A: Hello // B: Hello <- same value, one push, two subscribers

One .next() call reached both subscribers at the same time. That is multicasting. A regular new Observable() creates a separate execution per subscriber - a Subject shares one.

Hot vs cold: the core difference

A regular Observable is cold: it starts a new execution for each subscriber. A Subject is hot by default. It runs regardless of how many subscribers exist, and you push values explicitly with .next() instead of inside a factory function.

Internally, RxJS Subjects extend Observable and implement the Observer interface. They maintain an array of Subscriber objects. When .next(value) fires, it loops through that array synchronously, calling each subscriber's next(value) in order. No lazy initialization, no per-subscriber setup.

Subject types at a glance

TypeStores values?New subscriber getsTypical use
SubjectNoOnly future .next() callsLive events, button clicks
BehaviorSubjectLast valueCurrent value immediatelyUI state, auth status
ReplaySubjectLast N valuesBuffered values, then futureChat history, undo stack
AsyncSubjectLast valueOnly after complete()One-time API response

BehaviorSubject

BehaviorSubject requires an initial value and always holds the most recent one. Any subscriber gets that value synchronously on subscribe, before any future emissions arrive.

typescript
import { Injectable } from '@angular/core'; import { BehaviorSubject } from 'rxjs'; @Injectable({ providedIn: 'root' }) export class UserService { private statusSubject = new BehaviorSubject<'online' | 'offline'>('offline'); public status$ = this.statusSubject.asObservable(); // hides .next() from consumers setOnline() { this.statusSubject.next('online'); } } // Component subscribes - gets 'offline' immediately (current value) userService.status$.subscribe(s => console.log('Status:', s)); // → Status: offline (synchronous, on subscribe) userService.setOnline(); // → Status: online (to all current subscribers)

The .asObservable() wraps the Subject so consumers only get a plain Observable. Only the service controls when the value changes. I've seen bugs appear in almost every codebase where the Subject was public and two different components both called .next() without coordination.

ReplaySubject

ReplaySubject(n) buffers the last n emissions. A late subscriber receives those buffered values immediately, then continues with future ones.

typescript
import { ReplaySubject } from 'rxjs'; const chat$ = new ReplaySubject<string>(2); // buffer last 2 messages chat$.next('Hello'); chat$.next('How are you?'); chat$.next('Still there?'); // buffer is now ['How are you?', 'Still there?'] // Late subscriber (user opens chat panel) chat$.subscribe(msg => console.log('Replayed:', msg)); // → Replayed: How are you? // → Replayed: Still there?

Without a buffer size, new ReplaySubject() keeps every emission in memory for the app's lifetime. In long-lived apps that is a memory leak. Use new ReplaySubject(100, 5000) to cap at 100 values from the last 5 seconds.

AsyncSubject

AsyncSubject collects all values but emits nothing until .complete() is called. Then it emits only the last value to all current and future subscribers.

typescript
import { AsyncSubject } from 'rxjs'; const result$ = new AsyncSubject<number>(); result$.subscribe(v => console.log('Result:', v)); result$.next(1); result$.next(2); result$.next(3); result$.complete(); // → Result: 3

Not commonly used in Angular UIs. It fits cases where you wrap a one-time operation and only the final answer matters - similar to a Promise that resolves once.

When to use

  • Button clicks, WebSocket messages, DOM events: plain Subject (no history needed)
  • Auth status, selected theme, current user: BehaviorSubject (every subscriber needs the current state)
  • Recent chat messages, undo history, last N sensor readings: ReplaySubject(n)
  • Wrapping a single HTTP request result: AsyncSubject (or just use a Promise)

Common mistakes

1. Expecting a plain Subject to deliver past values to late subscribers

typescript
const subject = new Subject<string>(); subject.next('Missed'); // emitted before any subscriber joined subject.subscribe(v => console.log(v)); // gets nothing

Plain Subject drops any value emitted before a subscriber joins. Use BehaviorSubject('default') or ReplaySubject(10) when late subscribers need to catch up.

2. Not cleaning up subscriptions

typescript
const subject = new Subject<string>(); subject.subscribe(() => {}); // nothing unsubscribes this // subject.complete() is never called // subscriber stays in memory for the entire app lifetime

Call subject.complete() when the Subject's lifetime ends, or use takeUntil(this.destroy$) in Angular components. In templates, the async pipe cleans up automatically.

3. Pushing values after .complete()

typescript
const subject = new Subject<string>(); subject.complete(); subject.next('Too late'); // ignored silently in RxJS v7+

A completed Subject is in a terminal state. Any .next() after that is a no-op. Create a new Subject if you need a fresh stream.

4. Exposing the Subject directly instead of asObservable()

typescript
// Avoid this pattern in Angular services public userSubject = new BehaviorSubject<User | null>(null); // Any consumer can call .next() from anywhere

Mark the Subject private, expose only .asObservable() on the public API. This is a convention rather than a hard error, but it prevents accidental state mutations from outside the service.

5. Unbounded ReplaySubject buffer

typescript
// Memory leak in a long-lived app const history$ = new ReplaySubject<AppState>(); // no size limit // grows with every emission, forever

Always pass a buffer size. Add a time window with the second constructor argument if the data has a natural expiry.

Real-world usage

  • Angular services: BehaviorSubject for user session, feature flags, active route state
  • NgRx: action streams use Subjects internally inside effects (ngrx/platform)
  • NestJS: WebSocket gateways broadcast messages to clients via Subjects
  • React with rxjs-hooks: reactive state without Redux boilerplate
  • socket.io patterns: wrapping event emitters in a Subject for a clean Observable API

Follow-up questions

Q: What is the difference between a Subject and using shareReplay() on a regular Observable?
A: shareReplay() takes a cold Observable and makes it hot with a replay buffer attached. A Subject gives you manual control over when values are pushed. Use shareReplay() when you have an existing Observable source; use a Subject when you need to emit values imperatively.

Q: Why is BehaviorSubject safe to use in Angular components with zone.js?
A: It emits synchronously on subscribe, which happens inside Angular's zone. Zone.js detects the emission and schedules change detection. The async pipe in templates subscribes inside the zone automatically, so no manual markForCheck() is needed.

Q: How does ReplaySubject buffer size affect memory?
A: A fixed size like ReplaySubject(10) evicts old values as new ones arrive, keeping memory stable. An unlimited ReplaySubject() grows without bound. Combine size and time window: new ReplaySubject(100, 5000) keeps at most 100 values from the last 5 seconds.

Q: Can a Subject be cold?
A: No. All RxJS Subjects are hot by design. They do not create a new execution per subscriber. For cold behavior use defer() or new Observable() directly.

Q: In Angular with SSR, what happens when a Subject emits during server-side rendering?
A: The emission runs on the server too. If a subscriber updates component state, it may cause a hydration mismatch on the client. Guard server-specific emissions with isPlatformBrowser(platformId) using inject(PLATFORM_ID).

Examples

Plain Subject: event bus for multiple handlers

typescript
import { Subject } from 'rxjs'; const click$ = new Subject<MouseEvent>(); // Single event listener feeds multiple handlers document.getElementById('btn')!.addEventListener('click', e => click$.next(e)); click$.subscribe(e => console.log('Handler 1: clicked at', e.clientX)); click$.subscribe(e => analytics.track('button_click', { x: e.clientX })); // Both handlers fire on every click // No duplicate addEventListener calls, no manual coordination

One event source feeds multiple consumers without adding separate DOM listeners per handler. When the component is destroyed, call click$.complete() to clean up all subscribers at once.

BehaviorSubject: Angular auth state with private/public pattern

typescript
import { Injectable } from '@angular/core'; import { BehaviorSubject } from 'rxjs'; import { User } from './user.model'; @Injectable({ providedIn: 'root' }) export class AuthService { private userSubject = new BehaviorSubject<User | null>(null); public user$ = this.userSubject.asObservable(); login(user: User) { this.userSubject.next(user); } logout() { this.userSubject.next(null); } } // Header component - always has current auth state, no flicker authService.user$.subscribe(user => { this.isLoggedIn = user !== null; // null on first render (before login), then User object after login });

Because BehaviorSubject emits synchronously on subscribe, the header renders with the correct auth state from the first frame. The null initial value represents "not logged in" without any special loading state.

ReplaySubject: undo history with size and time constraints

typescript
import { ReplaySubject } from 'rxjs'; interface EditorState { content: string; cursor: number; } // Keep last 10 states, discard anything older than 30 seconds const undoHistory$ = new ReplaySubject<EditorState>(10, 30000); function saveState(state: EditorState) { undoHistory$.next(state); } function openUndoPanel() { // Subscriber gets up to 10 recent states replayed immediately undoHistory$.subscribe(state => renderUndoItem(state)); } saveState({ content: 'Hello', cursor: 5 }); saveState({ content: 'Hello world', cursor: 11 }); openUndoPanel(); // → renderUndoItem called for each buffered state // States older than 30s are automatically dropped

The time window (30 000 ms) means states from a session that was idle for 30 seconds are not shown in the undo panel. That matches user expectations better than showing edits from an hour ago.

Short Answer

Interview ready
Premium

A concise answer to help you respond confidently on this topic during an interview.

Finished reading?