Skip to main content

Що таке RxJS і як він інтегрується в Angular?

RxJS - це JavaScript-бібліотека для реактивного програмування, яка дозволяє описувати асинхронні потоки даних через Observable-и, оператори і підписки.

Теорія

TL;DR

  • RxJS - це як конвеєрна стрічка: дані течуть через неї, ти підключаєшся через subscribe(), і пропускаєш їх через оператори (фільтри, трансформації) перед отриманням.
  • Головна різниця від Promise: Promise вирішується один раз; Observable (спостережуваний потік) може видавати нуль, одне або багато значень з часом, і нічого не відбувається до виклику subscribe().
  • Правило вибору: кілька асинхронних джерел або операції з можливістю скасування → RxJS. Один HTTP-запит без трансформацій → Promise підходить.
  • Angular 16+ постачається з RxJS 7.8+ і використовує Observable-и в HttpClient, Router і Forms за замовчуванням.
  • Забути відписатися - це найпоширеніше джерело витоків пам'яті в Angular-проектах.

Швидкий приклад

typescript
import { of } from 'rxjs'; import { map, delay } from 'rxjs/operators'; const user$ = of('Alice').pipe( map(name => `Hello, ${name}`), // трансформуємо значення delay(1000) // імітуємо асинхронність ); user$.subscribe(msg => console.log(msg)); // виводить "Hello, Alice" через 1с

of('Alice') створює потік, який видає одне значення і завершується. pipe() з'єднує оператори без змін оригінального потоку. Нічого не запускається до виклику subscribe(). Це і є модель відкладеного виконання (lazy execution).

Ключова різниця

RxJS замінює callback-пекло (вкладені .then()) на декларативні пайплайни. Ти описуєш що робити з кожним значенням: трансформувати, відфільтрувати, скасувати, поєднати з іншим потоком. Бібліотека бере на себе тайминг і стан. Код читається зверху вниз, без ручного відстеження того, чи завершився попередній запит.

Як Angular використовує RxJS

Angular 16+ постачається з RxJS 7.8+ через @angular/core. Майже кожен асинхронний API в Angular повертає Observable:

Функція AngularРоль RxJSТиповий оператор
HttpClientВсі запити повертають Observable<T>switchMap для ланцюгових запитів
Routerrouter.events - це Observable<RouterEvent>filter(e => e instanceof NavigationEnd)
Reactive Formsform.valueChanges - це Observable<FormValue>debounceTime(300) для валідації
Async pipeАвтоматично підписується і відписується в шаблонах{{ data$ | async }}

Async pipe заслуговує окремої уваги. Він очищає підписку при знищенні компонента, що прибирає найпоширеніше джерело витоків у Angular-шаблонах.

Як це працює всередині

Observable - це звичайний об'єкт з методом subscribe. Жодного спеціального рантайму. Коли ти викликаєш subscribe(), він запускає функцію-продюсер: синхронно або асинхронно залежно від джерела. Оператори повертають нові Observable-и, які проксують емісії через ланцюг підписників. У RxJS 7+ використовується TeardownLogic для очищення: коли потік завершується або виникає помилка, автоматично виконуються teardown-колбеки.

Тут важлива різниця між cold і hot Observable. Cold Observable запускає новий продюсер для кожного підписника. HttpClient - cold: кожна підписка робить новий HTTP-запит. Hot Observable ділить один продюсер між усіма підписниками. Subject - hot: нові підписники пропускають попередні емісії. Якщо обгорнути cold Observable у shareReplay(1), він поводитиметься як hot.

Коли використовувати

  • Кілька асинхронних джерел, які треба поєднати (HTTP + кліки користувача) → combineLatest або merge.
  • Операції з можливістю скасування, як пошук під час введення → switchMap скасовує попередній inner Observable.
  • Потоки в реальному часі (WebSocket, інтервали) → разом з takeUntilDestroyed() для очищення.
  • Простий одиночний асинхронний запит без трансформацій → нативний Promise підходить.
  • Без асинхронності взагалі → звичайні функції.

Типові помилки

1. Забути відписатися

typescript
// неправильно: інтервал працює після знищення компонента ngOnInit() { interval(1000).subscribe(v => console.log(v)); } // виправлення через takeUntilDestroyed (Angular 16+) import { takeUntilDestroyed } from '@angular/core/rxjs-interop'; readonly count$ = interval(1000).pipe( takeUntilDestroyed() // автоматично відписується при знищенні компонента ); // шаблон: {{ count$ | async }}

Observable-и не очищаються самостійно. Інтервал продовжує працювати після знищення компонента. Використовуй async pipe або takeUntilDestroyed().

2. Використовувати Observable як Promise

typescript
// неправильно: TypeError - .then is not a function this.http.get('/api/users').then(data => console.log(data)); // виправлення this.http.get('/api/users').subscribe(data => console.log(data)); // або явне перетворення const data = await firstValueFrom(this.http.get('/api/users'));

3. Кілька підписок на cold Observable

typescript
// неправильно: виконує 2 окремі HTTP-запити const users$ = this.http.get('/api/users'); users$.subscribe(u => this.list = u); users$.subscribe(u => this.count = u.length); // виправлення: ділимо одне виконання const users$ = this.http.get('/api/users').pipe(shareReplay(1));

4. Ігнорувати помилки

typescript
// неправильно: потік зупиняється без жодного сліду obs$.subscribe(v => console.log(v)); // виправлення: завжди обробляй помилки obs$.subscribe({ next: v => console.log(v), error: e => console.error('Потік впав:', e) });

Де зустрічається в реальних проектах

  • Angular HttpClient → switchMap ланцюгує оновлення токена перед основним API-запитом.
  • Angular Router → router.events.pipe(filter(...)) для route guards і аналітики.
  • NgRx Effects → actions$.pipe(ofType(loadUsers), switchMap(() => this.service.get())).
  • Angular Material autocomplete → debounceTime(300) + distinctUntilChanged() + switchMap на події введення.
  • Angular 18 → toSignal() конвертує Observable у сигнал для дрібнозернистої реактивності.

Питання на співбесіді

Q: Яка різниця між cold і hot Observable?
A: Cold Observable запускає новий продюсер для кожного підписника: кожна підписка на HttpClient = новий HTTP-запит. Hot Observable ділить один продюсер між підписниками: Subject і fromEvent на DOM-елементі - hot. Якщо обгорнути cold Observable у shareReplay(1), він поводитиметься як hot.

Q: Як Angular запобігає витокам пам'яті з RxJS?
A: async pipe автоматично відписується при знищенні компонента. З Angular 16 takeUntilDestroyed() робить те саме для ручних підписок у конструкторі або injection context.

Q: Яка різниця між switchMap і mergeMap?
A: switchMap скасовує попередній inner Observable при новій емісії - підходить для пошуку, де застарілі результати непотрібні. mergeMap виконує всі inner Observable паралельно - підходить для незалежних запитів, які мають завершитися всі.

Q: Як конвертувати Observable у Promise?
A: firstValueFrom(obs$) вирішується з першим значенням. lastValueFrom(obs$) чекає завершення потоку і вирішується з останнім значенням.

Q: Спроєктуй NgRx effect, який повторює HTTP-запит після відновлення з'єднання з мережею.
A: Використовуй retry({ delay: () => fromEvent(window, 'online') }) з RxJS 7+ всередині switchMap. Effect чекатиме підключення до мережі перед повтором, а не одразу надсилатиме нові запити. Додай take(1) на retry-сигнал, щоб уникнути нескінченних повторів.

Приклади

Базовий: емісії cold Observable

typescript
import { Observable } from 'rxjs'; const data$ = new Observable(sub => { console.log('Продюсер запущено'); // виводиться при кожній підписці sub.next(42); sub.complete(); }); data$.subscribe(v => console.log('Підписка 1:', v)); // Продюсер запущено | Підписка 1: 42 data$.subscribe(v => console.log('Підписка 2:', v)); // Продюсер запущено знову | Підписка 2: 42

Кожен виклик subscribe() створює незалежне виконання. Для HTTP це означає дві підписки = два мережевих запити. Обгорни shareReplay(1), якщо потрібно поділити один результат між кількома споживачами.

Середній: пошуковий input з debounce і скасуванням запитів

typescript
import { fromEvent } from 'rxjs'; import { debounceTime, map, distinctUntilChanged, switchMap } from 'rxjs/operators'; import { HttpClient } from '@angular/common/http'; const input = document.querySelector<HTMLInputElement>('#search')!; fromEvent(input, 'input').pipe( debounceTime(300), // чекаємо 300мс після останнього натискання map(e => (e.target as HTMLInputElement).value), distinctUntilChanged(), // пропускаємо, якщо значення не змінилось switchMap(term => this.http.get(`/api/users?q=${term}`)) // скасовуємо попередній запит ).subscribe(results => console.log(results));

switchMap - ключовий оператор у цьому патерні. Коли користувач вводить новий символ до завершення попереднього запиту, switchMap скасовує старий і починає новий. Без нього відповіді можуть прийти в неправильному порядку і перезаписати одна одну.

Просунутий: витік пам'яті і виправлення через takeUntilDestroyed

typescript
import { Component, OnInit } from '@angular/core'; import { interval } from 'rxjs'; import { takeUntilDestroyed } from '@angular/core/rxjs-interop'; // неправильно: витік після знищення компонента @Component({ selector: 'app-broken', template: '{{ count }}' }) class BrokenComponent implements OnInit { count = 0; ngOnInit() { interval(1000).subscribe(v => { this.count = v; // продовжує оновлювати знищений компонент }); } } // виправлення: автоматична відписка через API Angular 16+ @Component({ selector: 'app-fixed', template: '{{ count$ | async }}' }) class FixedComponent { readonly count$ = interval(1000).pipe( takeUntilDestroyed() // відписується при знищенні компонента ); }

Я зустрічав такий патерн витоку майже в кожній Angular-кодовій базі, де не вистачало code review на lifecycle-методи. Комбінація async pipe і takeUntilDestroyed() зараз є стандартом, і інтерв'юери цілеспрямовано запитують про це, перевіряючи знання Angular 16+.

Коротка відповідь

Для співбесіди
Premium

Коротка відповідь допоможе вам впевнено відповідати на цю тему під час співбесіди.

Дочитали статтю?